Skip to content

feat: base CloudEvent class as per v1 specs, including attribute validation #242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a2ac762
feat: base `CloudEvent` class as per v1 specs, including attribute va…
PlugaruT Nov 8, 2024
8db1e29
chore: add typings and docstrings
PlugaruT Nov 8, 2024
35dee7d
chore: Add support for custom extension names and validate them
PlugaruT Nov 9, 2024
42b4fe1
chore: Add copyright and fix missing type info
PlugaruT Nov 9, 2024
f83c363
chore: Add getters for attributes and test happy path
PlugaruT Nov 9, 2024
9d1aa35
fix: typing
PlugaruT Nov 9, 2024
aa81ca0
chore: Split validation logic into smaller methods
PlugaruT Nov 11, 2024
b2b0649
chore: Add method to extract extension by name
PlugaruT Nov 11, 2024
b202325
chore: configure ruff to sort imports also
PlugaruT Nov 11, 2024
c5e6df9
chore: Returns all the errors at ones instead of raising early. Impro…
PlugaruT Nov 11, 2024
6e13f72
fix missing type info
PlugaruT Nov 11, 2024
e78a70b
chore: Improve exceptions handling. Have exceptions grouped by attrib…
PlugaruT Nov 13, 2024
443aee9
chore: Skip type checing for getters of required attributes
PlugaruT Nov 13, 2024
1d43d68
fix: missing type
PlugaruT Nov 14, 2024
21493e1
chore: Improve exceptions and introduce a new one for invalid values
PlugaruT Nov 14, 2024
68337f9
fix: str representation for validation error
PlugaruT Nov 14, 2024
d0bba86
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 14, 2024
599d05c
fix: Fix missing type definitions
PlugaruT Nov 14, 2024
43f1d0c
small fix
PlugaruT Nov 14, 2024
7d18098
remove cast of defaultdict to dict
PlugaruT Nov 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ exclude = [
[tool.ruff.lint]
ignore = ["E731"]
extend-ignore = ["E203"]
select = ["I"]


[tool.pytest.ini_options]
testpaths = [
Expand Down
17 changes: 17 additions & 0 deletions src/cloudevents/core/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""
CloudEvent implementation for v1.0
"""
324 changes: 324 additions & 0 deletions src/cloudevents/core/v1/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import re
from collections import defaultdict
from datetime import datetime
from typing import Any, Final, Optional

from cloudevents.core.v1.exceptions import (
BaseCloudEventException,
CloudEventValidationError,
CustomExtensionAttributeError,
InvalidAttributeTypeError,
InvalidAttributeValueError,
MissingRequiredAttributeError,
)

REQUIRED_ATTRIBUTES: Final[list[str]] = ["id", "source", "type", "specversion"]
OPTIONAL_ATTRIBUTES: Final[list[str]] = [
"datacontenttype",
"dataschema",
"subject",
"time",
]


class CloudEvent:
"""
The CloudEvent Python wrapper contract exposing generically-available
properties and APIs.

Implementations might handle fields and have other APIs exposed but are
obliged to follow this contract.
"""

def __init__(self, attributes: dict[str, Any], data: Optional[dict] = None) -> None:
"""
Create a new CloudEvent instance.

:param attributes: The attributes of the CloudEvent instance.
:param data: The payload of the CloudEvent instance.

:raises ValueError: If any of the required attributes are missing or have invalid values.
:raises TypeError: If any of the attributes have invalid types.
"""
self._validate_attribute(attributes=attributes)
self._attributes: dict[str, Any] = attributes
self._data: Optional[dict] = data

@staticmethod
def _validate_attribute(attributes: dict[str, Any]) -> None:
"""
Validates the attributes of the CloudEvent as per the CloudEvents specification.

See https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#required-attributes
"""
errors: dict[str, list[BaseCloudEventException]] = defaultdict(list)
errors.update(CloudEvent._validate_required_attributes(attributes=attributes))
errors.update(CloudEvent._validate_optional_attributes(attributes=attributes))
errors.update(CloudEvent._validate_extension_attributes(attributes=attributes))
if errors:
raise CloudEventValidationError(errors=errors)

@staticmethod
def _validate_required_attributes(
attributes: dict[str, Any],
) -> dict[str, list[BaseCloudEventException]]:
"""
Validates the types of the required attributes.

:param attributes: The attributes of the CloudEvent instance.
:return: A dictionary of validation error messages.
"""
errors = defaultdict(list)

if "id" not in attributes:
errors["id"].append(MissingRequiredAttributeError(attribute_name="id"))
if attributes.get("id") is None:
errors["id"].append(
InvalidAttributeValueError(
attribute_name="id", msg="Attribute 'id' must not be None"
)
)
if not isinstance(attributes.get("id"), str):
errors["id"].append(
InvalidAttributeTypeError(attribute_name="id", expected_type=str)
)

if "source" not in attributes:
errors["source"].append(
MissingRequiredAttributeError(attribute_name="source")
)
if not isinstance(attributes.get("source"), str):
errors["source"].append(
InvalidAttributeTypeError(attribute_name="source", expected_type=str)
)

if "type" not in attributes:
errors["type"].append(MissingRequiredAttributeError(attribute_name="type"))
if not isinstance(attributes.get("type"), str):
errors["type"].append(
InvalidAttributeTypeError(attribute_name="type", expected_type=str)
)

if "specversion" not in attributes:
errors["specversion"].append(
MissingRequiredAttributeError(attribute_name="specversion")
)
if not isinstance(attributes.get("specversion"), str):
errors["specversion"].append(
InvalidAttributeTypeError(
attribute_name="specversion", expected_type=str
)
)
if attributes.get("specversion") != "1.0":
errors["specversion"].append(
InvalidAttributeValueError(
attribute_name="specversion",
msg="Attribute 'specversion' must be '1.0'",
)
)
return errors

@staticmethod
def _validate_optional_attributes(
attributes: dict[str, Any],
) -> dict[str, list[BaseCloudEventException]]:
"""
Validates the types and values of the optional attributes.

:param attributes: The attributes of the CloudEvent instance.
:return: A dictionary of validation error messages.
"""
errors = defaultdict(list)

if "time" in attributes:
if not isinstance(attributes["time"], datetime):
errors["time"].append(
InvalidAttributeTypeError(
attribute_name="time", expected_type=datetime
)
)
if hasattr(attributes["time"], "tzinfo") and not attributes["time"].tzinfo:
errors["time"].append(
InvalidAttributeValueError(
attribute_name="time",
msg="Attribute 'time' must be timezone aware",
)
)
if "subject" in attributes:
if not isinstance(attributes["subject"], str):
errors["subject"].append(
InvalidAttributeTypeError(
attribute_name="subject", expected_type=str
)
)
if not attributes["subject"]:
errors["subject"].append(
InvalidAttributeValueError(
attribute_name="subject",
msg="Attribute 'subject' must not be empty",
)
)
if "datacontenttype" in attributes:
if not isinstance(attributes["datacontenttype"], str):
errors["datacontenttype"].append(
InvalidAttributeTypeError(
attribute_name="datacontenttype", expected_type=str
)
)
if not attributes["datacontenttype"]:
errors["datacontenttype"].append(
InvalidAttributeValueError(
attribute_name="datacontenttype",
msg="Attribute 'datacontenttype' must not be empty",
)
)
if "dataschema" in attributes:
if not isinstance(attributes["dataschema"], str):
errors["dataschema"].append(
InvalidAttributeTypeError(
attribute_name="dataschema", expected_type=str
)
)
if not attributes["dataschema"]:
errors["dataschema"].append(
InvalidAttributeValueError(
attribute_name="dataschema",
msg="Attribute 'dataschema' must not be empty",
)
)
return errors

@staticmethod
def _validate_extension_attributes(
attributes: dict[str, Any],
) -> dict[str, list[BaseCloudEventException]]:
"""
Validates the extension attributes.

:param attributes: The attributes of the CloudEvent instance.
:return: A dictionary of validation error messages.
"""
errors = defaultdict(list)
extension_attributes = [
key
for key in attributes.keys()
if key not in REQUIRED_ATTRIBUTES and key not in OPTIONAL_ATTRIBUTES
]
for extension_attribute in extension_attributes:
if extension_attribute == "data":
errors[extension_attribute].append(
CustomExtensionAttributeError(
attribute_name=extension_attribute,
msg="Extension attribute 'data' is reserved and must not be used",
)
)
if not (1 <= len(extension_attribute) <= 20):
errors[extension_attribute].append(
CustomExtensionAttributeError(
attribute_name=extension_attribute,
msg=f"Extension attribute '{extension_attribute}' should be between 1 and 20 characters long",
)
)
if not re.match(r"^[a-z0-9]+$", extension_attribute):
errors[extension_attribute].append(
CustomExtensionAttributeError(
attribute_name=extension_attribute,
msg=f"Extension attribute '{extension_attribute}' should only contain lowercase letters and numbers",
)
)
return errors

def get_id(self) -> str:
"""
Retrieve the ID of the event.

:return: The ID of the event.
"""
return self._attributes["id"] # type: ignore

def get_source(self) -> str:
"""
Retrieve the source of the event.

:return: The source of the event.
"""
return self._attributes["source"] # type: ignore

def get_type(self) -> str:
"""
Retrieve the type of the event.

:return: The type of the event.
"""
return self._attributes["type"] # type: ignore

def get_specversion(self) -> str:
"""
Retrieve the specversion of the event.

:return: The specversion of the event.
"""
return self._attributes["specversion"] # type: ignore

def get_datacontenttype(self) -> Optional[str]:
"""
Retrieve the datacontenttype of the event.

:return: The datacontenttype of the event.
"""
return self._attributes.get("datacontenttype")

def get_dataschema(self) -> Optional[str]:
"""
Retrieve the dataschema of the event.

:return: The dataschema of the event.
"""
return self._attributes.get("dataschema")

def get_subject(self) -> Optional[str]:
"""
Retrieve the subject of the event.

:return: The subject of the event.
"""
return self._attributes.get("subject")

def get_time(self) -> Optional[datetime]:
"""
Retrieve the time of the event.

:return: The time of the event.
"""
return self._attributes.get("time")

def get_extension(self, extension_name: str) -> Any:
"""
Retrieve an extension attribute of the event.

:param extension_name: The name of the extension attribute.
:return: The value of the extension attribute.
"""
return self._attributes.get(extension_name)

def get_data(self) -> Optional[dict]:
"""
Retrieve data of the event.

:return: The data of the event.
"""
return self._data
Loading