0
# Data Structures
1
2
Core data structures for representing Kafka concepts including topic partitions, offsets, broker metadata, and consumer group information.
3
4
## Imports
5
6
```python
7
from collections import namedtuple
8
```
9
10
## Capabilities
11
12
### Topic and Partition Identifiers
13
14
Data structures for identifying topics and partitions within a Kafka cluster.
15
16
```python { .api }
17
TopicPartition = namedtuple("TopicPartition", ["topic", "partition"])
18
"""
19
A topic and partition tuple.
20
21
Args:
22
topic (str): Topic name
23
partition (int): Partition number
24
25
Usage:
26
tp = TopicPartition('my-topic', 0)
27
print(tp.topic) # 'my-topic'
28
print(tp.partition) # 0
29
"""
30
```
31
32
### Offset and Metadata
33
34
Data structures for representing offsets with associated metadata.
35
36
```python { .api }
37
OffsetAndMetadata = namedtuple("OffsetAndMetadata", ["offset", "metadata"])
38
"""
39
Offset with commit metadata.
40
41
Args:
42
offset (int): The offset to be committed
43
metadata (str): Non-null metadata
44
45
Usage:
46
oam = OffsetAndMetadata(100, "my-metadata")
47
print(oam.offset) # 100
48
print(oam.metadata) # "my-metadata"
49
"""
50
51
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", ["offset", "timestamp"])
52
"""
53
Offset with associated timestamp.
54
55
Args:
56
offset (int): An offset
57
timestamp (int): The timestamp associated to the offset
58
59
Usage:
60
oat = OffsetAndTimestamp(100, 1640995200000)
61
print(oat.offset) # 100
62
print(oat.timestamp) # 1640995200000
63
"""
64
```
65
66
### Broker and Cluster Metadata
67
68
Data structures for representing broker and cluster information.
69
70
```python { .api }
71
class BrokerMetadata:
72
def __init__(self, nodeId: int, host: str, port: int, rack: str = None):
73
"""
74
Kafka broker metadata.
75
76
Args:
77
nodeId (int): Broker node ID
78
host (str): Broker hostname
79
port (int): Broker port
80
rack (str): Broker rack identifier (optional)
81
"""
82
83
nodeId: int # Broker node ID
84
host: str # Broker hostname
85
port: int # Broker port
86
rack: str # Broker rack (optional)
87
88
class PartitionMetadata:
89
def __init__(self, topic: str, partition: int, leader: int, replicas: list, isr: list, error: int = None):
90
"""
91
Partition metadata from cluster.
92
93
Args:
94
topic (str): Topic name
95
partition (int): Partition number
96
leader (int): Leader broker node ID
97
replicas (list): List of replica broker node IDs
98
isr (list): List of in-sync replica broker node IDs
99
error (int): Error code (optional)
100
"""
101
102
topic: str # Topic name
103
partition: int # Partition number
104
leader: int # Leader broker node ID
105
replicas: list # Replica broker node IDs
106
isr: list # In-sync replica broker node IDs
107
error: int # Error code
108
```
109
110
### Consumer Group Information
111
112
Data structures for representing consumer group state and member information.
113
114
```python { .api }
115
class MemberInformation:
116
def __init__(self, member_id: str, client_id: str, client_host: str, member_metadata: bytes, member_assignment: bytes):
117
"""
118
Consumer group member information.
119
120
Args:
121
member_id (str): Member identifier
122
client_id (str): Client identifier
123
client_host (str): Client hostname
124
member_metadata (bytes): Member metadata
125
member_assignment (bytes): Member assignment data
126
"""
127
128
member_id: str # Member identifier
129
client_id: str # Client identifier
130
client_host: str # Client hostname
131
member_metadata: bytes # Member metadata
132
member_assignment: bytes # Member assignment
133
134
class GroupInformation:
135
def __init__(self, error_code: int, group: str, state: str, protocol_type: str, protocol: str, members: list, authorized_operations: set = None):
136
"""
137
Consumer group information.
138
139
Args:
140
error_code (int): Error code
141
group (str): Group ID
142
state (str): Group state
143
protocol_type (str): Protocol type
144
protocol (str): Protocol name
145
members (list): List of MemberInformation objects
146
authorized_operations (set): Authorized operations (optional)
147
"""
148
149
error_code: int # Error code
150
group: str # Group ID
151
state: str # Group state
152
protocol_type: str # Protocol type
153
protocol: str # Protocol name
154
members: list # List of members
155
authorized_operations: set # Authorized operations
156
```
157
158
### Producer Configuration
159
160
Data structures for producer retry and configuration options.
161
162
```python { .api }
163
class RetryOptions:
164
def __init__(self, limit: int, backoff_ms: int, retry_on_timeouts: bool = True):
165
"""
166
Retry policy configuration for async producer.
167
168
Args:
169
limit (int): Maximum retry attempts
170
backoff_ms (int): Backoff time between retries in milliseconds
171
retry_on_timeouts (bool): Whether to retry on timeout errors
172
"""
173
174
limit: int # Maximum retry attempts
175
backoff_ms: int # Backoff time in milliseconds
176
retry_on_timeouts: bool # Retry on timeouts
177
```
178
179
### Record Metadata and Timestamps
180
181
Data structures for record metadata and timing information.
182
183
```python { .api }
184
class RecordMetadata:
185
def __init__(self, topic: str, partition: int, offset: int, timestamp: int = None, checksum: int = None, serialized_key_size: int = None, serialized_value_size: int = None):
186
"""
187
Metadata for a produced record.
188
189
Args:
190
topic (str): Topic name
191
partition (int): Partition number
192
offset (int): Record offset
193
timestamp (int): Record timestamp (optional)
194
checksum (int): Record checksum (optional)
195
serialized_key_size (int): Key size in bytes (optional)
196
serialized_value_size (int): Value size in bytes (optional)
197
"""
198
199
topic: str # Topic name
200
partition: int # Partition number
201
offset: int # Record offset
202
timestamp: int # Record timestamp
203
checksum: int # Record checksum
204
serialized_key_size: int # Key size in bytes
205
serialized_value_size: int # Value size in bytes
206
207
class ConsumerRecord:
208
def __init__(self, topic: str, partition: int, offset: int, timestamp: int, timestamp_type: int, key: bytes, value: bytes, headers: list = None, checksum: int = None, serialized_key_size: int = None, serialized_value_size: int = None):
209
"""
210
Record consumed from Kafka.
211
212
Args:
213
topic (str): Topic name
214
partition (int): Partition number
215
offset (int): Record offset
216
timestamp (int): Record timestamp
217
timestamp_type (int): Timestamp type
218
key (bytes): Record key
219
value (bytes): Record value
220
headers (list): List of (key, value) header tuples (optional)
221
checksum (int): Record checksum (optional)
222
serialized_key_size (int): Key size in bytes (optional)
223
serialized_value_size (int): Value size in bytes (optional)
224
"""
225
226
topic: str # Topic name
227
partition: int # Partition number
228
offset: int # Record offset
229
timestamp: int # Record timestamp
230
timestamp_type: int # Timestamp type
231
key: bytes # Record key
232
value: bytes # Record value
233
headers: list # Header tuples
234
checksum: int # Record checksum
235
serialized_key_size: int # Key size in bytes
236
serialized_value_size: int # Value size in bytes
237
```
238
239
## Usage Examples
240
241
### Working with TopicPartition
242
243
```python
244
from kafka.structs import TopicPartition
245
from kafka import KafkaConsumer
246
247
# Create topic partition identifiers
248
partition_0 = TopicPartition('my-topic', 0)
249
partition_1 = TopicPartition('my-topic', 1)
250
251
# Use in consumer assignment
252
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
253
consumer.assign([partition_0, partition_1])
254
255
# Use in offset management
256
consumer.seek(partition_0, 1000)
257
current_position = consumer.position(partition_0)
258
259
# TopicPartition objects are hashable
260
partition_set = {partition_0, partition_1}
261
partition_dict = {partition_0: 'data for partition 0'}
262
```
263
264
### Offset and Metadata Management
265
266
```python
267
from kafka.structs import TopicPartition, OffsetAndMetadata
268
from kafka import KafkaConsumer
269
270
consumer = KafkaConsumer(
271
bootstrap_servers=['localhost:9092'],
272
group_id='my-group',
273
enable_auto_commit=False
274
)
275
276
partition = TopicPartition('my-topic', 0)
277
278
# Manual offset commits with metadata
279
offset_metadata = OffsetAndMetadata(1500, 'processed batch 123')
280
consumer.commit({partition: offset_metadata})
281
282
# Check committed offset
283
committed = consumer.committed(partition, metadata=True)
284
print(f"Committed offset: {committed.offset}, metadata: {committed.metadata}")
285
286
# Get offsets with timestamps
287
from kafka.structs import OffsetAndTimestamp
288
# Note: OffsetAndTimestamp is typically returned by offset-by-timestamp queries
289
```
290
291
### Broker and Cluster Information
292
293
```python
294
from kafka import KafkaAdminClient
295
from kafka.structs import BrokerMetadata
296
297
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
298
299
# Get cluster metadata
300
metadata = admin.list_topics()
301
302
# Access broker information
303
for broker_id, broker in metadata.brokers.items():
304
print(f"Broker {broker.nodeId}: {broker.host}:{broker.port}")
305
if broker.rack:
306
print(f" Rack: {broker.rack}")
307
308
# Access partition metadata
309
for topic_name, topic_metadata in metadata.topics.items():
310
for partition in topic_metadata.partitions.values():
311
print(f"Topic {partition.topic} partition {partition.partition}:")
312
print(f" Leader: {partition.leader}")
313
print(f" Replicas: {partition.replicas}")
314
print(f" ISR: {partition.isr}")
315
```
316
317
### Consumer Group Information
318
319
```python
320
from kafka import KafkaAdminClient
321
322
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
323
324
# Get consumer group details
325
group_details = admin.describe_consumer_groups(['my-consumer-group'])
326
327
for group_id, group_info in group_details.items():
328
print(f"Group: {group_info.group}")
329
print(f"State: {group_info.state}")
330
print(f"Protocol: {group_info.protocol}")
331
332
print("Members:")
333
for member in group_info.members:
334
print(f" Member ID: {member.member_id}")
335
print(f" Client ID: {member.client_id}")
336
print(f" Host: {member.client_host}")
337
```
338
339
### Record Metadata Handling
340
341
```python
342
from kafka import KafkaProducer
343
344
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
345
346
# Send message and get metadata
347
future = producer.send('my-topic', key=b'key1', value=b'value1')
348
record_metadata = future.get(timeout=10)
349
350
print(f"Record sent to:")
351
print(f" Topic: {record_metadata.topic}")
352
print(f" Partition: {record_metadata.partition}")
353
print(f" Offset: {record_metadata.offset}")
354
print(f" Timestamp: {record_metadata.timestamp}")
355
print(f" Key size: {record_metadata.serialized_key_size} bytes")
356
print(f" Value size: {record_metadata.serialized_value_size} bytes")
357
```
358
359
### Consumer Record Processing
360
361
```python
362
from kafka import KafkaConsumer
363
364
consumer = KafkaConsumer(
365
'my-topic',
366
bootstrap_servers=['localhost:9092'],
367
group_id='my-group'
368
)
369
370
for message in consumer:
371
print(f"Consumed record:")
372
print(f" Topic: {message.topic}")
373
print(f" Partition: {message.partition}")
374
print(f" Offset: {message.offset}")
375
print(f" Timestamp: {message.timestamp}")
376
print(f" Key: {message.key}")
377
print(f" Value: {message.value}")
378
379
# Process headers if present
380
if message.headers:
381
print(" Headers:")
382
for header_key, header_value in message.headers:
383
print(f" {header_key}: {header_value}")
384
```
385
386
### Data Structure Comparisons and Collections
387
388
```python
389
from kafka.structs import TopicPartition, OffsetAndMetadata
390
391
# TopicPartition equality and hashing
392
tp1 = TopicPartition('topic-a', 0)
393
tp2 = TopicPartition('topic-a', 0)
394
tp3 = TopicPartition('topic-a', 1)
395
396
print(tp1 == tp2) # True
397
print(tp1 == tp3) # False
398
399
# Use in sets and dictionaries
400
partitions = {tp1, tp2, tp3} # Only 2 unique partitions
401
print(len(partitions)) # 2
402
403
# Offset mapping
404
offsets = {
405
tp1: OffsetAndMetadata(1000, 'first partition'),
406
tp3: OffsetAndMetadata(2000, 'second partition')
407
}
408
409
for partition, offset_data in offsets.items():
410
print(f"{partition.topic}:{partition.partition} = {offset_data.offset}")
411
```