CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cloudevents

CloudEvents Python SDK for creating, sending, and receiving CloudEvents over HTTP in both binary and structured content modes

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

kafka-integration.mddocs/

Kafka Integration

CloudEvent conversion functions for Apache Kafka message broker integration, supporting both binary and structured formats with flexible key mapping capabilities for event routing and partitioning.

Capabilities

KafkaMessage Type

Represents a Kafka message structure containing key, value, and headers used for CloudEvent transport over Kafka.

class KafkaMessage(NamedTuple):
    """
    Represents the elements of a message sent or received through the Kafka protocol.
    Callers can map their client-specific message representation to and from this
    type in order to use the cloudevents.kafka conversion functions.
    """
    headers: Dict[str, bytes]  # The dictionary of message headers key/values
    key: Optional[Union[str, bytes]]  # The message key
    value: Union[str, bytes]  # The message value

KeyMapper Type

Type alias for callable functions that map CloudEvent attributes to Kafka message keys for event routing and partitioning strategies.

KeyMapper = Callable[[AnyCloudEvent], AnyStr]
"""
A callable function that creates a Kafka message key, given a CloudEvent instance.

The function takes a CloudEvent and returns a string or bytes value to be used
as the Kafka message key for partitioning.
"""

DEFAULT_KEY_MAPPER: KeyMapper = lambda event: event.get("partitionkey")
"""
The default KeyMapper which maps the user provided `partitionkey` attribute value
to the `key` of the Kafka message as-is, if present.
"""

Binary Format Conversion

Converts CloudEvents to and from Kafka binary format, where CloudEvent attributes are stored in message headers and data in the message value.

def to_binary(event: CloudEvent, 
              key_mapper: Optional[KeyMapper] = None,
              data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage:
    """
    Converts CloudEvent to binary Kafka message format.
    
    In binary format, CloudEvent attributes are stored as message headers
    with 'ce-' prefix, and event data becomes the message value.
    
    Args:
        event: CloudEvent to convert
        key_mapper: Optional mapper for generating message key from event
        data_marshaller: Optional function to serialize event data
        
    Returns:
        KafkaMessage in binary format
        
    Raises:
        DataMarshallerError: If data marshalling fails
    """

def from_binary(message: KafkaMessage, 
                data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent:
    """
    Creates CloudEvent from binary Kafka message format.
    
    Extracts CloudEvent attributes from message headers (ce- prefixed)
    and uses message value as event data.
    
    Args:
        message: KafkaMessage in binary format
        data_unmarshaller: Optional function to deserialize event data
        
    Returns:
        CloudEvent created from Kafka message
        
    Raises:
        MissingRequiredFields: If required CloudEvent attributes are missing
        DataUnmarshallerError: If data unmarshalling fails
    """

Usage Example

from cloudevents.kafka import to_binary, from_binary, KeyMapper
from cloudevents.http import CloudEvent

# Create a CloudEvent
event = CloudEvent({
    "type": "com.example.orders.created",
    "source": "https://example.com/orders", 
    "id": "order-123"
}, {"order_id": "12345", "amount": 99.99})

# Convert to binary Kafka message
kafka_msg = to_binary(event)
print(f"Headers: {kafka_msg.headers}")  # Contains ce-type, ce-source, etc.
print(f"Value: {kafka_msg.value}")      # Contains serialized data

# Convert back to CloudEvent
restored_event = from_binary(kafka_msg)
print(f"Event type: {restored_event['type']}")

# Using KeyMapper for partitioning
def extract_customer_id(event):
    data = event.get_data()
    return data.get("customer_id") if isinstance(data, dict) else None

kafka_msg = to_binary(event, key_mapper=extract_customer_id)

Structured Format Conversion

Converts CloudEvents to and from Kafka structured format, where the entire CloudEvent (attributes and data) is stored as JSON in the message value.

def to_structured(event: CloudEvent,
                  key_mapper: Optional[KeyMapper] = None, 
                  data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage:
    """
    Converts CloudEvent to structured Kafka message format.
    
    In structured format, the entire CloudEvent (attributes and data) 
    is serialized as JSON and stored in the message value.
    
    Args:
        event: CloudEvent to convert
        key_mapper: Optional mapper for generating message key from event  
        data_marshaller: Optional function to serialize event data
        
    Returns:
        KafkaMessage in structured format
        
    Raises:
        DataMarshallerError: If data marshalling fails
    """

def from_structured(message: KafkaMessage,
                    data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent:
    """
    Creates CloudEvent from structured Kafka message format.
    
    Deserializes the entire CloudEvent from JSON stored in message value.
    
    Args:  
        message: KafkaMessage in structured format
        data_unmarshaller: Optional function to deserialize event data
        
    Returns:
        CloudEvent created from Kafka message
        
    Raises:
        InvalidStructuredJSON: If JSON format is invalid
        MissingRequiredFields: If required CloudEvent attributes are missing
        DataUnmarshallerError: If data unmarshalling fails
    """

Usage Example

from cloudevents.kafka import to_structured, from_structured
from cloudevents.http import CloudEvent

# Create a CloudEvent
event = CloudEvent({
    "type": "com.example.orders.created",
    "source": "https://example.com/orders",
    "id": "order-123", 
    "datacontenttype": "application/json"
}, {"order_id": "12345", "amount": 99.99})

# Convert to structured Kafka message
kafka_msg = to_structured(event)
print(f"Value: {kafka_msg.value}")  # Contains complete CloudEvent as JSON

# Convert back to CloudEvent  
restored_event = from_structured(kafka_msg)
print(f"Event type: {restored_event['type']}")
print(f"Event data: {restored_event.get_data()}")

Advanced Usage

Custom Key Mapping Strategies

# Route by event type
type_mapper = lambda event: event.get('type')

# Route by source
source_mapper = lambda event: event.get('source')

# Route by custom data attribute
def extract_tenant_id(event):
    data = event.get_data()
    if isinstance(data, dict):
        return data.get("tenant_id")
    return None

tenant_mapper = extract_tenant_id

Error Handling

from cloudevents.kafka import from_binary
from cloudevents.exceptions import MissingRequiredFields, DataUnmarshallerError

try:
    event = from_binary(kafka_message)
except MissingRequiredFields as e:
    print(f"Invalid CloudEvent: {e}")
except DataUnmarshallerError as e:
    print(f"Failed to deserialize data: {e}")

Types

# Type aliases used in Kafka integration
MarshallerType = Callable[[Any], AnyStr]
UnmarshallerType = Callable[[AnyStr], Any]

# KeyMapper type alias 
KeyMapper = Callable[[AnyCloudEvent], AnyStr]

Install with Tessl CLI

npx tessl i tessl/pypi-cloudevents

docs

http-operations.md

index.md

kafka-integration.md

legacy-functions.md

pydantic-validation.md

tile.json