0
# Kafka Integration
1
2
CloudEvent conversion functions for Apache Kafka message broker integration, supporting both binary and structured formats with flexible key mapping capabilities for event routing and partitioning.
3
4
## Capabilities
5
6
### KafkaMessage Type
7
8
Represents a Kafka message structure containing key, value, and headers used for CloudEvent transport over Kafka.
9
10
```python { .api }
11
class KafkaMessage(NamedTuple):
12
"""
13
Represents the elements of a message sent or received through the Kafka protocol.
14
Callers can map their client-specific message representation to and from this
15
type in order to use the cloudevents.kafka conversion functions.
16
"""
17
headers: Dict[str, bytes] # The dictionary of message headers key/values
18
key: Optional[Union[str, bytes]] # The message key
19
value: Union[str, bytes] # The message value
20
```
21
22
### KeyMapper Type
23
24
Type alias for callable functions that map CloudEvent attributes to Kafka message keys for event routing and partitioning strategies.
25
26
```python { .api }
27
KeyMapper = Callable[[AnyCloudEvent], AnyStr]
28
"""
29
A callable function that creates a Kafka message key, given a CloudEvent instance.
30
31
The function takes a CloudEvent and returns a string or bytes value to be used
32
as the Kafka message key for partitioning.
33
"""
34
35
DEFAULT_KEY_MAPPER: KeyMapper = lambda event: event.get("partitionkey")
36
"""
37
The default KeyMapper which maps the user provided `partitionkey` attribute value
38
to the `key` of the Kafka message as-is, if present.
39
"""
40
```
41
42
### Binary Format Conversion
43
44
Converts CloudEvents to and from Kafka binary format, where CloudEvent attributes are stored in message headers and data in the message value.
45
46
```python { .api }
47
def to_binary(event: CloudEvent,
48
key_mapper: Optional[KeyMapper] = None,
49
data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage:
50
"""
51
Converts CloudEvent to binary Kafka message format.
52
53
In binary format, CloudEvent attributes are stored as message headers
54
with 'ce-' prefix, and event data becomes the message value.
55
56
Args:
57
event: CloudEvent to convert
58
key_mapper: Optional mapper for generating message key from event
59
data_marshaller: Optional function to serialize event data
60
61
Returns:
62
KafkaMessage in binary format
63
64
Raises:
65
DataMarshallerError: If data marshalling fails
66
"""
67
68
def from_binary(message: KafkaMessage,
69
data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent:
70
"""
71
Creates CloudEvent from binary Kafka message format.
72
73
Extracts CloudEvent attributes from message headers (ce- prefixed)
74
and uses message value as event data.
75
76
Args:
77
message: KafkaMessage in binary format
78
data_unmarshaller: Optional function to deserialize event data
79
80
Returns:
81
CloudEvent created from Kafka message
82
83
Raises:
84
MissingRequiredFields: If required CloudEvent attributes are missing
85
DataUnmarshallerError: If data unmarshalling fails
86
"""
87
```
88
89
#### Usage Example
90
91
```python
92
from cloudevents.kafka import to_binary, from_binary, KeyMapper
93
from cloudevents.http import CloudEvent
94
95
# Create a CloudEvent
96
event = CloudEvent({
97
"type": "com.example.orders.created",
98
"source": "https://example.com/orders",
99
"id": "order-123"
100
}, {"order_id": "12345", "amount": 99.99})
101
102
# Convert to binary Kafka message
103
kafka_msg = to_binary(event)
104
print(f"Headers: {kafka_msg.headers}") # Contains ce-type, ce-source, etc.
105
print(f"Value: {kafka_msg.value}") # Contains serialized data
106
107
# Convert back to CloudEvent
108
restored_event = from_binary(kafka_msg)
109
print(f"Event type: {restored_event['type']}")
110
111
# Using KeyMapper for partitioning
112
def extract_customer_id(event):
113
data = event.get_data()
114
return data.get("customer_id") if isinstance(data, dict) else None
115
116
kafka_msg = to_binary(event, key_mapper=extract_customer_id)
117
```
118
119
### Structured Format Conversion
120
121
Converts CloudEvents to and from Kafka structured format, where the entire CloudEvent (attributes and data) is stored as JSON in the message value.
122
123
```python { .api }
124
def to_structured(event: CloudEvent,
125
key_mapper: Optional[KeyMapper] = None,
126
data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage:
127
"""
128
Converts CloudEvent to structured Kafka message format.
129
130
In structured format, the entire CloudEvent (attributes and data)
131
is serialized as JSON and stored in the message value.
132
133
Args:
134
event: CloudEvent to convert
135
key_mapper: Optional mapper for generating message key from event
136
data_marshaller: Optional function to serialize event data
137
138
Returns:
139
KafkaMessage in structured format
140
141
Raises:
142
DataMarshallerError: If data marshalling fails
143
"""
144
145
def from_structured(message: KafkaMessage,
146
data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent:
147
"""
148
Creates CloudEvent from structured Kafka message format.
149
150
Deserializes the entire CloudEvent from JSON stored in message value.
151
152
Args:
153
message: KafkaMessage in structured format
154
data_unmarshaller: Optional function to deserialize event data
155
156
Returns:
157
CloudEvent created from Kafka message
158
159
Raises:
160
InvalidStructuredJSON: If JSON format is invalid
161
MissingRequiredFields: If required CloudEvent attributes are missing
162
DataUnmarshallerError: If data unmarshalling fails
163
"""
164
```
165
166
#### Usage Example
167
168
```python
169
from cloudevents.kafka import to_structured, from_structured
170
from cloudevents.http import CloudEvent
171
172
# Create a CloudEvent
173
event = CloudEvent({
174
"type": "com.example.orders.created",
175
"source": "https://example.com/orders",
176
"id": "order-123",
177
"datacontenttype": "application/json"
178
}, {"order_id": "12345", "amount": 99.99})
179
180
# Convert to structured Kafka message
181
kafka_msg = to_structured(event)
182
print(f"Value: {kafka_msg.value}") # Contains complete CloudEvent as JSON
183
184
# Convert back to CloudEvent
185
restored_event = from_structured(kafka_msg)
186
print(f"Event type: {restored_event['type']}")
187
print(f"Event data: {restored_event.get_data()}")
188
```
189
190
## Advanced Usage
191
192
### Custom Key Mapping Strategies
193
194
```python
195
# Route by event type
196
type_mapper = lambda event: event.get('type')
197
198
# Route by source
199
source_mapper = lambda event: event.get('source')
200
201
# Route by custom data attribute
202
def extract_tenant_id(event):
203
data = event.get_data()
204
if isinstance(data, dict):
205
return data.get("tenant_id")
206
return None
207
208
tenant_mapper = extract_tenant_id
209
```
210
211
### Error Handling
212
213
```python
214
from cloudevents.kafka import from_binary
215
from cloudevents.exceptions import MissingRequiredFields, DataUnmarshallerError
216
217
try:
218
event = from_binary(kafka_message)
219
except MissingRequiredFields as e:
220
print(f"Invalid CloudEvent: {e}")
221
except DataUnmarshallerError as e:
222
print(f"Failed to deserialize data: {e}")
223
```
224
225
## Types
226
227
```python { .api }
228
# Type aliases used in Kafka integration
229
MarshallerType = Callable[[Any], AnyStr]
230
UnmarshallerType = Callable[[AnyStr], Any]
231
232
# KeyMapper type alias
233
KeyMapper = Callable[[AnyCloudEvent], AnyStr]
234
```