0
# CloudEvents Python SDK
1
2
A comprehensive Python SDK for CloudEvents, a specification for describing event data in a common way. The library enables developers to create, send, and receive CloudEvents over HTTP in both binary and structured content modes, with support for CloudEvents v1.0 and v0.3 specifications.
3
4
## Package Information
5
6
- **Package Name**: cloudevents
7
- **Language**: Python
8
- **Installation**: `pip install cloudevents`
9
- **Optional Features**: `pip install cloudevents[pydantic]` for Pydantic validation support
10
11
## Core Imports
12
13
Primary HTTP functionality:
14
15
```python
16
from cloudevents.http import CloudEvent, from_json, from_http, from_dict
17
```
18
19
Abstract base classes:
20
21
```python
22
from cloudevents.abstract import CloudEvent, AnyCloudEvent
23
```
24
25
Kafka integration:
26
27
```python
28
from cloudevents.kafka import to_binary, from_binary, to_structured, from_structured, KafkaMessage
29
```
30
31
Pydantic integration (requires pydantic extra):
32
33
```python
34
from cloudevents.pydantic import CloudEvent, from_json, from_http, from_dict
35
```
36
37
## Basic Usage
38
39
```python
40
from cloudevents.http import CloudEvent, from_json
41
import json
42
43
# Create a CloudEvent
44
attributes = {
45
"type": "com.example.string",
46
"source": "https://example.com/source",
47
"id": "my-event-id",
48
"specversion": "1.0",
49
"datacontenttype": "application/json"
50
}
51
52
data = {"message": "Hello CloudEvents!"}
53
event = CloudEvent(attributes, data)
54
55
# Convert to JSON
56
from cloudevents.http.json_methods import to_json
57
json_event = to_json(event)
58
print(json_event)
59
60
# Parse from JSON
61
parsed_event = from_json(json_event)
62
print(f"Event type: {parsed_event['type']}")
63
print(f"Event data: {parsed_event.get_data()}")
64
```
65
66
## Architecture
67
68
The CloudEvents Python SDK is organized around several key components:
69
70
- **Abstract Base Classes**: Define the common CloudEvent interface that all implementations must follow
71
- **HTTP Module**: Primary CloudEvent implementation with HTTP transport support
72
- **Transport Modules**: Specialized support for different message brokers (Kafka)
73
- **Pydantic Integration**: Optional typed validation using Pydantic models
74
- **Legacy SDK**: Core functionality for backward compatibility
75
76
This modular design allows developers to choose the appropriate level of abstraction and features for their use case, from simple HTTP events to fully-typed Pydantic models with validation.
77
78
## Capabilities
79
80
### HTTP CloudEvent Operations
81
82
Core CloudEvent creation, parsing, and conversion functionality for HTTP transport. Supports both binary and structured content modes with automatic format detection.
83
84
```python { .api }
85
class CloudEvent:
86
def __init__(self, attributes: Mapping[str, str], data: Any = None): ...
87
@classmethod
88
def create(cls, attributes: Mapping[str, Any], data: Optional[Any]) -> CloudEvent: ...
89
90
def from_json(data: Union[str, bytes], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
91
def from_http(headers: Union[Mapping[str, str], SupportsDuplicateItems[str, str]],
92
data: Optional[Union[str, bytes]],
93
data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
94
def from_dict(data: Mapping[str, Any], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
95
```
96
97
[HTTP Operations](./http-operations.md)
98
99
### Kafka Integration
100
101
CloudEvent conversion functions for Kafka message broker integration, supporting both binary and structured formats with key mapping capabilities.
102
103
```python { .api }
104
class KafkaMessage(NamedTuple):
105
headers: Dict[str, bytes]
106
key: Optional[Union[str, bytes]]
107
value: Union[str, bytes]
108
109
def to_binary(event: CloudEvent, key_mapper: Optional[KeyMapper] = None,
110
data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage: ...
111
def from_binary(message: KafkaMessage, data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
112
def to_structured(event: CloudEvent, key_mapper: Optional[KeyMapper] = None,
113
data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage: ...
114
def from_structured(message: KafkaMessage, data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
115
```
116
117
[Kafka Integration](./kafka-integration.md)
118
119
### Pydantic Validation
120
121
Optional Pydantic-based CloudEvent implementation providing runtime validation, type safety, and IDE support with automatic serialization/deserialization.
122
123
```python { .api }
124
class CloudEvent(BaseModel):
125
# Pydantic model with validation
126
pass
127
128
def from_json(data: Union[str, bytes], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
129
def from_http(headers: Union[Mapping[str, str], SupportsDuplicateItems[str, str]],
130
data: Optional[Union[str, bytes]],
131
data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
132
def from_dict(data: Mapping[str, Any], data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent: ...
133
```
134
135
[Pydantic Validation](./pydantic-validation.md)
136
137
### Legacy HTTP Functions
138
139
Deprecated conversion functions maintained for backward compatibility. These functions provide binary and structured format conversion but are superseded by newer APIs.
140
141
```python { .api }
142
def to_binary(event: CloudEvent, data_marshaller: Optional[MarshallerType] = None) -> Tuple[Dict[str, str], bytes]: ...
143
def to_structured(event: CloudEvent, data_marshaller: Optional[MarshallerType] = None) -> bytes: ...
144
def to_json(event: CloudEvent, data_marshaller: Optional[MarshallerType] = None) -> str: ...
145
def is_binary(headers: Dict[str, str]) -> bool: ...
146
def is_structured(headers: Dict[str, str]) -> bool: ...
147
```
148
149
[Legacy Functions](./legacy-functions.md)
150
151
## Common Types
152
153
```python { .api }
154
# Type aliases for marshalling/unmarshalling functions
155
MarshallerType = Callable[[Any], AnyStr]
156
UnmarshallerType = Callable[[AnyStr], Any]
157
158
# Protocol for headers that may have duplicate items
159
class SupportsDuplicateItems(Protocol[_K_co, _V_co]):
160
def items(self) -> Iterable[Tuple[_K_co, _V_co]]: ...
161
162
# Generic CloudEvent type variable
163
AnyCloudEvent = TypeVar("AnyCloudEvent", bound=CloudEvent)
164
```
165
166
## Error Handling
167
168
The library defines several exception types for different error conditions:
169
170
```python { .api }
171
class GenericException(Exception):
172
"""Base exception for the CloudEvents library"""
173
174
class MissingRequiredFields(GenericException):
175
"""Raised when required CloudEvent fields are missing"""
176
177
class InvalidRequiredFields(GenericException):
178
"""Raised when required CloudEvent fields are invalid"""
179
180
class InvalidStructuredJSON(GenericException):
181
"""Raised when structured JSON format is invalid"""
182
183
class InvalidHeadersFormat(GenericException):
184
"""Raised when HTTP headers format is invalid"""
185
186
class DataMarshallerError(GenericException):
187
"""Raised when data marshalling fails"""
188
189
class DataUnmarshallerError(GenericException):
190
"""Raised when data unmarshalling fails"""
191
192
class IncompatibleArgumentsError(GenericException):
193
"""Raised when incompatible function arguments are provided"""
194
195
class PydanticFeatureNotInstalled(GenericException):
196
"""Raised when Pydantic feature is used but not installed"""
197
```