or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

http-operations.mdindex.mdkafka-integration.mdlegacy-functions.mdpydantic-validation.md
tile.json

tessl/pypi-cloudevents

CloudEvents Python SDK for creating, sending, and receiving CloudEvents over HTTP in both binary and structured content modes

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/cloudevents@1.12.x

To install, run

npx @tessl/cli install tessl/pypi-cloudevents@1.12.0

index.mddocs/

CloudEvents Python SDK

A comprehensive Python SDK for CloudEvents, a specification for describing event data in a common way. The library enables developers to create, send, and receive CloudEvents over HTTP in both binary and structured content modes, with support for CloudEvents v1.0 and v0.3 specifications.

Package Information

  • Package Name: cloudevents
  • Language: Python
  • Installation: pip install cloudevents
  • Optional Features: pip install cloudevents[pydantic] for Pydantic validation support

Core Imports

Primary HTTP functionality:

from cloudevents.http import CloudEvent, from_json, from_http, from_dict

Abstract base classes:

from cloudevents.abstract import CloudEvent, AnyCloudEvent

Kafka integration:

from cloudevents.kafka import to_binary, from_binary, to_structured, from_structured, KafkaMessage

Pydantic integration (requires pydantic extra):

from cloudevents.pydantic import CloudEvent, from_json, from_http, from_dict

Basic Usage

from cloudevents.http import CloudEvent, from_json
import json

# Create a CloudEvent
attributes = {
    "type": "com.example.string",
    "source": "https://example.com/source",
    "id": "my-event-id",
    "specversion": "1.0",
    "datacontenttype": "application/json"
}

data = {"message": "Hello CloudEvents!"}
event = CloudEvent(attributes, data)

# Convert to JSON
from cloudevents.http.json_methods import to_json
json_event = to_json(event)
print(json_event)

# Parse from JSON
parsed_event = from_json(json_event)
print(f"Event type: {parsed_event['type']}")
print(f"Event data: {parsed_event.get_data()}")

Architecture

The CloudEvents Python SDK is organized around several key components:

  • Abstract Base Classes: Define the common CloudEvent interface that all implementations must follow
  • HTTP Module: Primary CloudEvent implementation with HTTP transport support
  • Transport Modules: Specialized support for different message brokers (Kafka)
  • Pydantic Integration: Optional typed validation using Pydantic models
  • Legacy SDK: Core functionality for backward compatibility

This modular design allows developers to choose the appropriate level of abstraction and features for their use case, from simple HTTP events to fully-typed Pydantic models with validation.

Capabilities

HTTP CloudEvent Operations

Core CloudEvent creation, parsing, and conversion functionality for HTTP transport. Supports both binary and structured content modes with automatic format detection.

class CloudEvent:
    def __init__(self, attributes: Mapping[str, str], data: Any = None): ...
    @classmethod
    def create(cls, attributes: Mapping[str, Any], data: Optional[Any]) -> CloudEvent: ...

def from_json(data: Union[str, bytes], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def from_http(headers: Union[Mapping[str, str], SupportsDuplicateItems[str, str]], 
              data: Optional[Union[str, bytes]], 
              data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def from_dict(data: Mapping[str, Any], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...

HTTP Operations

Kafka Integration

CloudEvent conversion functions for Kafka message broker integration, supporting both binary and structured formats with key mapping capabilities.

class KafkaMessage(NamedTuple):
    headers: Dict[str, bytes]
    key: Optional[Union[str, bytes]]
    value: Union[str, bytes]

def to_binary(event: CloudEvent, key_mapper: Optional[KeyMapper] = None, 
              data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage: ...
def from_binary(message: KafkaMessage, data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def to_structured(event: CloudEvent, key_mapper: Optional[KeyMapper] = None,
                  data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage: ...
def from_structured(message: KafkaMessage, data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...

Kafka Integration

Pydantic Validation

Optional Pydantic-based CloudEvent implementation providing runtime validation, type safety, and IDE support with automatic serialization/deserialization.

class CloudEvent(BaseModel):
    # Pydantic model with validation
    pass

def from_json(data: Union[str, bytes], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def from_http(headers: Union[Mapping[str, str], SupportsDuplicateItems[str, str]],
              data: Optional[Union[str, bytes]],
              data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
def from_dict(data: Mapping[str, Any], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...

Pydantic Validation

Legacy HTTP Functions

Deprecated conversion functions maintained for backward compatibility. These functions provide binary and structured format conversion but are superseded by newer APIs.

def to_binary(event: CloudEvent, data_marshaller: Optional[MarshallerType] = None) -> Tuple[Dict[str, str], bytes]: ...
def to_structured(event: CloudEvent, data_marshaller: Optional[MarshallerType] = None) -> bytes: ...  
def to_json(event: CloudEvent, data_marshaller: Optional[MarshallerType] = None) -> str: ...
def is_binary(headers: Dict[str, str]) -> bool: ...
def is_structured(headers: Dict[str, str]) -> bool: ...

Legacy Functions

Common Types

# Type aliases for marshalling/unmarshalling functions
MarshallerType = Callable[[Any], AnyStr]
UnmarshallerType = Callable[[AnyStr], Any]

# Protocol for headers that may have duplicate items
class SupportsDuplicateItems(Protocol[_K_co, _V_co]):
    def items(self) -> Iterable[Tuple[_K_co, _V_co]]: ...

# Generic CloudEvent type variable  
AnyCloudEvent = TypeVar("AnyCloudEvent", bound=CloudEvent)

Error Handling

The library defines several exception types for different error conditions:

class GenericException(Exception): 
    """Base exception for the CloudEvents library"""

class MissingRequiredFields(GenericException):
    """Raised when required CloudEvent fields are missing"""

class InvalidRequiredFields(GenericException): 
    """Raised when required CloudEvent fields are invalid"""

class InvalidStructuredJSON(GenericException):
    """Raised when structured JSON format is invalid"""

class InvalidHeadersFormat(GenericException):
    """Raised when HTTP headers format is invalid""" 

class DataMarshallerError(GenericException):
    """Raised when data marshalling fails"""

class DataUnmarshallerError(GenericException):
    """Raised when data unmarshalling fails"""

class IncompatibleArgumentsError(GenericException):
    """Raised when incompatible function arguments are provided"""

class PydanticFeatureNotInstalled(GenericException):
    """Raised when Pydantic feature is used but not installed"""