or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

http-operations.mdindex.mdkafka-integration.mdlegacy-functions.mdpydantic-validation.md

kafka-integration.mddocs/

0

# Kafka Integration

1

2

CloudEvent conversion functions for Apache Kafka message broker integration, supporting both binary and structured formats with flexible key mapping capabilities for event routing and partitioning.

3

4

## Capabilities

5

6

### KafkaMessage Type

7

8

Represents a Kafka message structure containing key, value, and headers used for CloudEvent transport over Kafka.

9

10

```python { .api }

11

class KafkaMessage(NamedTuple):

12

"""

13

Represents the elements of a message sent or received through the Kafka protocol.

14

Callers can map their client-specific message representation to and from this

15

type in order to use the cloudevents.kafka conversion functions.

16

"""

17

headers: Dict[str, bytes] # The dictionary of message headers key/values

18

key: Optional[Union[str, bytes]] # The message key

19

value: Union[str, bytes] # The message value

20

```

21

22

### KeyMapper Type

23

24

Type alias for callable functions that map CloudEvent attributes to Kafka message keys for event routing and partitioning strategies.

25

26

```python { .api }

27

KeyMapper = Callable[[AnyCloudEvent], AnyStr]

28

"""

29

A callable function that creates a Kafka message key, given a CloudEvent instance.

30

31

The function takes a CloudEvent and returns a string or bytes value to be used

32

as the Kafka message key for partitioning.

33

"""

34

35

DEFAULT_KEY_MAPPER: KeyMapper = lambda event: event.get("partitionkey")

36

"""

37

The default KeyMapper which maps the user provided `partitionkey` attribute value

38

to the `key` of the Kafka message as-is, if present.

39

"""

40

```

41

42

### Binary Format Conversion

43

44

Converts CloudEvents to and from Kafka binary format, where CloudEvent attributes are stored in message headers and data in the message value.

45

46

```python { .api }

47

def to_binary(event: CloudEvent,

48

key_mapper: Optional[KeyMapper] = None,

49

data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage:

50

"""

51

Converts CloudEvent to binary Kafka message format.

52

53

In binary format, CloudEvent attributes are stored as message headers

54

with 'ce-' prefix, and event data becomes the message value.

55

56

Args:

57

event: CloudEvent to convert

58

key_mapper: Optional mapper for generating message key from event

59

data_marshaller: Optional function to serialize event data

60

61

Returns:

62

KafkaMessage in binary format

63

64

Raises:

65

DataMarshallerError: If data marshalling fails

66

"""

67

68

def from_binary(message: KafkaMessage,

69

data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent:

70

"""

71

Creates CloudEvent from binary Kafka message format.

72

73

Extracts CloudEvent attributes from message headers (ce- prefixed)

74

and uses message value as event data.

75

76

Args:

77

message: KafkaMessage in binary format

78

data_unmarshaller: Optional function to deserialize event data

79

80

Returns:

81

CloudEvent created from Kafka message

82

83

Raises:

84

MissingRequiredFields: If required CloudEvent attributes are missing

85

DataUnmarshallerError: If data unmarshalling fails

86

"""

87

```

88

89

#### Usage Example

90

91

```python

92

from cloudevents.kafka import to_binary, from_binary, KeyMapper

93

from cloudevents.http import CloudEvent

94

95

# Create a CloudEvent

96

event = CloudEvent({

97

"type": "com.example.orders.created",

98

"source": "https://example.com/orders",

99

"id": "order-123"

100

}, {"order_id": "12345", "amount": 99.99})

101

102

# Convert to binary Kafka message

103

kafka_msg = to_binary(event)

104

print(f"Headers: {kafka_msg.headers}") # Contains ce-type, ce-source, etc.

105

print(f"Value: {kafka_msg.value}") # Contains serialized data

106

107

# Convert back to CloudEvent

108

restored_event = from_binary(kafka_msg)

109

print(f"Event type: {restored_event['type']}")

110

111

# Using KeyMapper for partitioning

112

def extract_customer_id(event):

113

data = event.get_data()

114

return data.get("customer_id") if isinstance(data, dict) else None

115

116

kafka_msg = to_binary(event, key_mapper=extract_customer_id)

117

```

118

119

### Structured Format Conversion

120

121

Converts CloudEvents to and from Kafka structured format, where the entire CloudEvent (attributes and data) is stored as JSON in the message value.

122

123

```python { .api }

124

def to_structured(event: CloudEvent,

125

key_mapper: Optional[KeyMapper] = None,

126

data_marshaller: Optional[MarshallerType] = None) -> KafkaMessage:

127

"""

128

Converts CloudEvent to structured Kafka message format.

129

130

In structured format, the entire CloudEvent (attributes and data)

131

is serialized as JSON and stored in the message value.

132

133

Args:

134

event: CloudEvent to convert

135

key_mapper: Optional mapper for generating message key from event

136

data_marshaller: Optional function to serialize event data

137

138

Returns:

139

KafkaMessage in structured format

140

141

Raises:

142

DataMarshallerError: If data marshalling fails

143

"""

144

145

def from_structured(message: KafkaMessage,

146

data_unmarshaller: Optional[UnmarshallerType] = None) -> CloudEvent:

147

"""

148

Creates CloudEvent from structured Kafka message format.

149

150

Deserializes the entire CloudEvent from JSON stored in message value.

151

152

Args:

153

message: KafkaMessage in structured format

154

data_unmarshaller: Optional function to deserialize event data

155

156

Returns:

157

CloudEvent created from Kafka message

158

159

Raises:

160

InvalidStructuredJSON: If JSON format is invalid

161

MissingRequiredFields: If required CloudEvent attributes are missing

162

DataUnmarshallerError: If data unmarshalling fails

163

"""

164

```

165

166

#### Usage Example

167

168

```python

169

from cloudevents.kafka import to_structured, from_structured

170

from cloudevents.http import CloudEvent

171

172

# Create a CloudEvent

173

event = CloudEvent({

174

"type": "com.example.orders.created",

175

"source": "https://example.com/orders",

176

"id": "order-123",

177

"datacontenttype": "application/json"

178

}, {"order_id": "12345", "amount": 99.99})

179

180

# Convert to structured Kafka message

181

kafka_msg = to_structured(event)

182

print(f"Value: {kafka_msg.value}") # Contains complete CloudEvent as JSON

183

184

# Convert back to CloudEvent

185

restored_event = from_structured(kafka_msg)

186

print(f"Event type: {restored_event['type']}")

187

print(f"Event data: {restored_event.get_data()}")

188

```

189

190

## Advanced Usage

191

192

### Custom Key Mapping Strategies

193

194

```python

195

# Route by event type

196

type_mapper = lambda event: event.get('type')

197

198

# Route by source

199

source_mapper = lambda event: event.get('source')

200

201

# Route by custom data attribute

202

def extract_tenant_id(event):

203

data = event.get_data()

204

if isinstance(data, dict):

205

return data.get("tenant_id")

206

return None

207

208

tenant_mapper = extract_tenant_id

209

```

210

211

### Error Handling

212

213

```python

214

from cloudevents.kafka import from_binary

215

from cloudevents.exceptions import MissingRequiredFields, DataUnmarshallerError

216

217

try:

218

event = from_binary(kafka_message)

219

except MissingRequiredFields as e:

220

print(f"Invalid CloudEvent: {e}")

221

except DataUnmarshallerError as e:

222

print(f"Failed to deserialize data: {e}")

223

```

224

225

## Types

226

227

```python { .api }

228

# Type aliases used in Kafka integration

229

MarshallerType = Callable[[Any], AnyStr]

230

UnmarshallerType = Callable[[AnyStr], Any]

231

232

# KeyMapper type alias

233

KeyMapper = Callable[[AnyCloudEvent], AnyStr]

234

```