CloudEvents Python SDK for creating, sending, and receiving CloudEvents over HTTP in both binary and structured content modes
npx @tessl/cli install tessl/pypi-cloudevents@1.12.0A comprehensive Python SDK for CloudEvents, a specification for describing event data in a common way. The library enables developers to create, send, and receive CloudEvents over HTTP in both binary and structured content modes, with support for CloudEvents v1.0 and v0.3 specifications.
pip install cloudeventspip install cloudevents[pydantic] for Pydantic validation supportPrimary HTTP functionality:
from cloudevents.http import CloudEvent, from_json, from_http, from_dictAbstract base classes:
from cloudevents.abstract import CloudEvent, AnyCloudEventKafka integration:
from cloudevents.kafka import to_binary, from_binary, to_structured, from_structured, KafkaMessagePydantic integration (requires pydantic extra):
from cloudevents.pydantic import CloudEvent, from_json, from_http, from_dictfrom cloudevents.http import CloudEvent, from_json
import json
# Create a CloudEvent
attributes = {
"type": "com.example.string",
"source": "https://example.com/source",
"id": "my-event-id",
"specversion": "1.0",
"datacontenttype": "application/json"
}
data = {"message": "Hello CloudEvents!"}
event = CloudEvent(attributes, data)
# Convert to JSON
from cloudevents.http.json_methods import to_json
json_event = to_json(event)
print(json_event)
# Parse from JSON
parsed_event = from_json(json_event)
print(f"Event type: {parsed_event['type']}")
print(f"Event data: {parsed_event.get_data()}")The CloudEvents Python SDK is organized around several key components:
This modular design allows developers to choose the appropriate level of abstraction and features for their use case, from simple HTTP events to fully-typed Pydantic models with validation.
Core CloudEvent creation, parsing, and conversion functionality for HTTP transport. Supports both binary and structured content modes with automatic format detection.
class CloudEvent:
def __init__(self, attributes: Mapping[str, str], data: Any = None): ...
@classmethod
def create(cls, attributes: Mapping[str, Any], data: Optional[Any]) -> CloudEvent: ...
def from_json(data: Union[str, bytes], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def from_http(headers: Union[Mapping[str, str], SupportsDuplicateItems[str, str]],
data: Optional[Union[str, bytes]],
data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def from_dict(data: Mapping[str, Any], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...CloudEvent conversion functions for Kafka message broker integration, supporting both binary and structured formats with key mapping capabilities.
class KafkaMessage(NamedTuple):
headers: Dict[str, bytes]
key: Optional[Union[str, bytes]]
value: Union[str, bytes]
def to_binary(event: CloudEvent, key_mapper: Optional[KeyMapper] = None,
data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage: ...
def from_binary(message: KafkaMessage, data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def to_structured(event: CloudEvent, key_mapper: Optional[KeyMapper] = None,
data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage: ...
def from_structured(message: KafkaMessage, data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...Optional Pydantic-based CloudEvent implementation providing runtime validation, type safety, and IDE support with automatic serialization/deserialization.
class CloudEvent(BaseModel):
# Pydantic model with validation
pass
def from_json(data: Union[str, bytes], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def from_http(headers: Union[Mapping[str, str], SupportsDuplicateItems[str, str]],
data: Optional[Union[str, bytes]],
data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def from_dict(data: Mapping[str, Any], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...Deprecated conversion functions maintained for backward compatibility. These functions provide binary and structured format conversion but are superseded by newer APIs.
def to_binary(event: CloudEvent, data_marshaller: Optional[MarshallerType] = None) -> Tuple[Dict[str, str], bytes]: ...
def to_structured(event: CloudEvent, data_marshaller: Optional[MarshallerType] = None) -> bytes: ...
def to_json(event: CloudEvent, data_marshaller: Optional[MarshallerType] = None) -> str: ...
def is_binary(headers: Dict[str, str]) -> bool: ...
def is_structured(headers: Dict[str, str]) -> bool: ...# Type aliases for marshalling/unmarshalling functions
MarshallerType = Callable[[Any], AnyStr]
UnmarshallerType = Callable[[AnyStr], Any]
# Protocol for headers that may have duplicate items
class SupportsDuplicateItems(Protocol[_K_co, _V_co]):
def items(self) -> Iterable[Tuple[_K_co, _V_co]]: ...
# Generic CloudEvent type variable
AnyCloudEvent = TypeVar("AnyCloudEvent", bound=CloudEvent)The library defines several exception types for different error conditions:
class GenericException(Exception):
"""Base exception for the CloudEvents library"""
class MissingRequiredFields(GenericException):
"""Raised when required CloudEvent fields are missing"""
class InvalidRequiredFields(GenericException):
"""Raised when required CloudEvent fields are invalid"""
class InvalidStructuredJSON(GenericException):
"""Raised when structured JSON format is invalid"""
class InvalidHeadersFormat(GenericException):
"""Raised when HTTP headers format is invalid"""
class DataMarshallerError(GenericException):
"""Raised when data marshalling fails"""
class DataUnmarshallerError(GenericException):
"""Raised when data unmarshalling fails"""
class IncompatibleArgumentsError(GenericException):
"""Raised when incompatible function arguments are provided"""
class PydanticFeatureNotInstalled(GenericException):
"""Raised when Pydantic feature is used but not installed"""