0
# Data Structures
1
2
Core data structures for representing Kafka concepts including topics, partitions, offsets, broker metadata, and consumer group information.
3
4
## Capabilities
5
6
### Topic and Partition Identifiers
7
8
Fundamental structures for identifying Kafka topics and partitions.
9
10
```python { .api }
11
TopicPartition = NamedTuple('TopicPartition', [
12
('topic', str), # Topic name
13
('partition', int) # Partition number
14
])
15
16
def TopicPartition(topic, partition):
17
"""
18
Create topic-partition identifier.
19
20
Parameters:
21
- topic: str, topic name
22
- partition: int, partition number (0-based)
23
24
Returns:
25
- TopicPartition: immutable topic-partition tuple
26
"""
27
28
# Usage examples:
29
# tp = TopicPartition('events', 0)
30
# tp = TopicPartition(topic='logs', partition=2)
31
```
32
33
### Offset Management
34
35
Structures for managing consumer offsets and metadata.
36
37
```python { .api }
38
OffsetAndMetadata = NamedTuple('OffsetAndMetadata', [
39
('offset', int), # Offset to commit
40
('metadata', str), # Optional metadata string
41
('leader_epoch', int) # Leader epoch (optional)
42
])
43
44
def OffsetAndMetadata(offset, metadata=None, leader_epoch=None):
45
"""
46
Create offset commit information.
47
48
Parameters:
49
- offset: int, offset to commit
50
- metadata: str, optional metadata (max 4096 bytes)
51
- leader_epoch: int, optional leader epoch for fencing
52
53
Returns:
54
- OffsetAndMetadata: offset commit information
55
"""
56
57
OffsetAndTimestamp = NamedTuple('OffsetAndTimestamp', [
58
('offset', int), # Message offset
59
('timestamp', int), # Message timestamp (milliseconds)
60
('leader_epoch', int) # Leader epoch
61
])
62
63
def OffsetAndTimestamp(offset, timestamp, leader_epoch=None):
64
"""
65
Create offset-timestamp pair.
66
67
Parameters:
68
- offset: int, message offset
69
- timestamp: int, message timestamp in milliseconds
70
- leader_epoch: int, optional leader epoch
71
72
Returns:
73
- OffsetAndTimestamp: offset with timestamp information
74
"""
75
```
76
77
### Broker and Cluster Metadata
78
79
Structures representing broker information and cluster topology.
80
81
```python { .api }
82
BrokerMetadata = NamedTuple('BrokerMetadata', [
83
('nodeId', int), # Broker node ID
84
('host', str), # Broker hostname
85
('port', int), # Broker port
86
('rack', str) # Rack identifier (may be None)
87
])
88
89
def BrokerMetadata(nodeId, host, port, rack=None):
90
"""
91
Create broker metadata.
92
93
Parameters:
94
- nodeId: int, unique broker ID
95
- host: str, broker hostname or IP
96
- port: int, broker port number
97
- rack: str, optional rack identifier for rack awareness
98
99
Returns:
100
- BrokerMetadata: broker information
101
"""
102
103
PartitionMetadata = NamedTuple('PartitionMetadata', [
104
('topic', str), # Topic name
105
('partition', int), # Partition number
106
('leader', int), # Leader broker ID
107
('leader_epoch', int), # Leader epoch
108
('replicas', list), # List of replica broker IDs
109
('isr', list), # In-sync replica broker IDs
110
('offline_replicas', list), # Offline replica broker IDs
111
('error', 'KafkaError') # Error information (None if no error)
112
])
113
114
def PartitionMetadata(topic, partition, leader, leader_epoch,
115
replicas, isr, offline_replicas=None, error=None):
116
"""
117
Create partition metadata.
118
119
Parameters:
120
- topic: str, topic name
121
- partition: int, partition number
122
- leader: int, leader broker ID (-1 if no leader)
123
- leader_epoch: int, current leader epoch
124
- replicas: List[int], all replica broker IDs
125
- isr: List[int], in-sync replica broker IDs
126
- offline_replicas: List[int], offline replica broker IDs
127
- error: KafkaError, error information if any
128
129
Returns:
130
- PartitionMetadata: complete partition information
131
"""
132
```
133
134
### Consumer Group Information
135
136
Structures for consumer group membership and coordination.
137
138
```python { .api }
139
MemberInformation = NamedTuple('MemberInformation', [
140
('member_id', str), # Member ID assigned by coordinator
141
('client_id', str), # Client ID specified by consumer
142
('client_host', str), # Client hostname
143
('member_metadata', bytes), # Member metadata (serialized)
144
('member_assignment', bytes) # Member assignment (serialized)
145
])
146
147
def MemberInformation(member_id, client_id, client_host,
148
member_metadata, member_assignment):
149
"""
150
Create consumer group member information.
151
152
Parameters:
153
- member_id: str, unique member identifier
154
- client_id: str, client identifier
155
- client_host: str, client hostname
156
- member_metadata: bytes, serialized member metadata
157
- member_assignment: bytes, serialized partition assignment
158
159
Returns:
160
- MemberInformation: consumer group member details
161
"""
162
163
GroupInformation = NamedTuple('GroupInformation', [
164
('error_code', int), # Error code (0 = success)
165
('group', str), # Group ID
166
('state', str), # Group state
167
('protocol_type', str), # Protocol type (usually 'consumer')
168
('protocol', str), # Assignment protocol name
169
('members', list), # List of MemberInformation
170
('authorized_operations', list) # List of authorized operations
171
])
172
173
def GroupInformation(error_code, group, state, protocol_type, protocol,
174
members, authorized_operations=None):
175
"""
176
Create consumer group information.
177
178
Parameters:
179
- error_code: int, operation error code
180
- group: str, consumer group ID
181
- state: str, group state ('Stable', 'Rebalancing', etc.)
182
- protocol_type: str, protocol type
183
- protocol: str, partition assignment protocol
184
- members: List[MemberInformation], group members
185
- authorized_operations: List[int], authorized operations
186
187
Returns:
188
- GroupInformation: complete group details
189
"""
190
```
191
192
### Configuration and Operational Data
193
194
Structures for configuration management and operational parameters.
195
196
```python { .api }
197
RetryOptions = NamedTuple('RetryOptions', [
198
('limit', int), # Maximum retry attempts
199
('backoff_ms', int), # Backoff interval in milliseconds
200
('retry_on_timeouts', bool) # Whether to retry on timeout errors
201
])
202
203
def RetryOptions(limit=3, backoff_ms=1000, retry_on_timeouts=True):
204
"""
205
Create retry configuration.
206
207
Parameters:
208
- limit: int, maximum number of retry attempts
209
- backoff_ms: int, backoff interval between retries
210
- retry_on_timeouts: bool, retry on timeout errors
211
212
Returns:
213
- RetryOptions: retry configuration
214
"""
215
216
Node = NamedTuple('Node', [
217
('id', int), # Node ID
218
('host', str), # Hostname
219
('port', int), # Port number
220
('rack', str) # Rack (may be None)
221
])
222
223
def Node(id, host, port, rack=None):
224
"""
225
Create node information.
226
227
Parameters:
228
- id: int, node identifier
229
- host: str, hostname
230
- port: int, port number
231
- rack: str, optional rack identifier
232
233
Returns:
234
- Node: node connection information
235
"""
236
```
237
238
### Record and Message Data
239
240
Structures representing individual messages and record batches.
241
242
```python { .api }
243
class ConsumerRecord:
244
"""
245
Individual record consumed from Kafka.
246
247
Attributes:
248
- topic: str, topic name
249
- partition: int, partition number
250
- offset: int, message offset
251
- timestamp: int, message timestamp (milliseconds since epoch)
252
- timestamp_type: int, timestamp type (0=CreateTime, 1=LogAppendTime)
253
- key: bytes, message key (raw bytes, may be None)
254
- value: bytes, message value (raw bytes, may be None)
255
- headers: List[Tuple[str, bytes]], message headers
256
- checksum: int, message checksum
257
- serialized_key_size: int, size of serialized key
258
- serialized_value_size: int, size of serialized value
259
- leader_epoch: int, leader epoch when message was written
260
"""
261
262
def __init__(self, topic, partition, offset, timestamp, timestamp_type,
263
key, value, headers=None, checksum=None,
264
serialized_key_size=None, serialized_value_size=None,
265
leader_epoch=None):
266
self.topic = topic
267
self.partition = partition
268
self.offset = offset
269
self.timestamp = timestamp
270
self.timestamp_type = timestamp_type
271
self.key = key
272
self.value = value
273
self.headers = headers or []
274
self.checksum = checksum
275
self.serialized_key_size = serialized_key_size
276
self.serialized_value_size = serialized_value_size
277
self.leader_epoch = leader_epoch
278
279
class RecordMetadata:
280
"""
281
Metadata returned after successful record production.
282
283
Attributes:
284
- topic: str, topic name
285
- partition: int, partition number
286
- offset: int, assigned offset
287
- timestamp: int, record timestamp
288
- checksum: int, record checksum
289
- serialized_key_size: int, size of serialized key
290
- serialized_value_size: int, size of serialized value
291
- leader_epoch: int, leader epoch
292
"""
293
294
def __init__(self, topic, partition, offset, timestamp=None,
295
checksum=None, serialized_key_size=None,
296
serialized_value_size=None, leader_epoch=None):
297
self.topic = topic
298
self.partition = partition
299
self.offset = offset
300
self.timestamp = timestamp
301
self.checksum = checksum
302
self.serialized_key_size = serialized_key_size
303
self.serialized_value_size = serialized_value_size
304
self.leader_epoch = leader_epoch
305
306
class ConsumerRecords:
307
"""
308
Collection of records returned by consumer poll operation.
309
310
Provides various ways to access records: by partition, by topic,
311
or iterate over all records.
312
"""
313
314
def __init__(self, record_map):
315
"""
316
Initialize with mapping of TopicPartition to List[ConsumerRecord].
317
318
Parameters:
319
- record_map: Dict[TopicPartition, List[ConsumerRecord]]
320
"""
321
self._record_map = record_map
322
323
def __iter__(self):
324
"""Iterate over all records across all partitions."""
325
for records in self._record_map.values():
326
for record in records:
327
yield record
328
329
def __len__(self):
330
"""Total number of records across all partitions."""
331
return sum(len(records) for records in self._record_map.values())
332
333
def __bool__(self):
334
"""True if contains any records."""
335
return len(self) > 0
336
337
def records(self, partition):
338
"""
339
Get records for specific partition.
340
341
Parameters:
342
- partition: TopicPartition
343
344
Returns:
345
- List[ConsumerRecord]: records for partition
346
"""
347
return self._record_map.get(partition, [])
348
349
def by_topic(self):
350
"""
351
Group records by topic.
352
353
Returns:
354
- Dict[str, List[ConsumerRecord]]: records grouped by topic
355
"""
356
topic_records = {}
357
for tp, records in self._record_map.items():
358
if tp.topic not in topic_records:
359
topic_records[tp.topic] = []
360
topic_records[tp.topic].extend(records)
361
return topic_records
362
363
def partitions(self):
364
"""
365
Get all partitions with records.
366
367
Returns:
368
- Set[TopicPartition]: partitions with records
369
"""
370
return set(self._record_map.keys())
371
```
372
373
## Usage Examples
374
375
### Working with Topic Partitions
376
377
```python
378
from kafka import TopicPartition, KafkaConsumer, KafkaProducer
379
380
# Create topic partition identifiers
381
tp1 = TopicPartition('events', 0)
382
tp2 = TopicPartition('events', 1)
383
tp3 = TopicPartition('logs', 0)
384
385
# Use with consumer assignment
386
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])
387
consumer.assign([tp1, tp2, tp3])
388
389
# Use with producer partition selection
390
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
391
392
# Send to specific partition
393
future = producer.send(tp1.topic, value=b'message', partition=tp1.partition)
394
metadata = future.get()
395
396
print(f"Sent to {metadata.topic} partition {metadata.partition} at offset {metadata.offset}")
397
398
# Check available partitions
399
available_partitions = producer.partitions_for(tp1.topic)
400
print(f"Available partitions for {tp1.topic}: {available_partitions}")
401
402
consumer.close()
403
producer.close()
404
```
405
406
### Offset Management
407
408
```python
409
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
410
411
consumer = KafkaConsumer(
412
bootstrap_servers=['localhost:9092'],
413
group_id='offset-demo',
414
enable_auto_commit=False # Manual offset management
415
)
416
417
# Subscribe to topic
418
consumer.subscribe(['events'])
419
420
# Poll for messages
421
records = consumer.poll(timeout_ms=5000)
422
423
# Process records and commit offsets manually
424
for partition, messages in records.items():
425
for message in messages:
426
# Process message
427
print(f"Processing message: {message.value}")
428
429
# Create offset metadata with custom information
430
offset_metadata = OffsetAndMetadata(
431
offset=message.offset + 1, # Next offset to read
432
metadata=f"processed_at_{int(time.time())}"
433
)
434
435
# Commit offset for this partition
436
consumer.commit({partition: offset_metadata})
437
438
# Check committed offsets
439
for partition in consumer.assignment():
440
committed = consumer.committed(partition)
441
if committed:
442
print(f"Partition {partition}: committed offset {committed.offset}, "
443
f"metadata: {committed.metadata}")
444
445
consumer.close()
446
```
447
448
### Cluster Metadata Inspection
449
450
```python
451
from kafka.client_async import KafkaClient
452
from kafka import TopicPartition
453
454
client = KafkaClient(bootstrap_servers=['localhost:9092'])
455
456
try:
457
# Wait for metadata to load
458
client.poll(timeout_ms=5000)
459
460
cluster = client.cluster
461
462
# Examine broker information
463
print("Cluster Brokers:")
464
for broker in cluster.brokers():
465
print(f" Broker {broker.nodeId}: {broker.host}:{broker.port}")
466
if broker.rack:
467
print(f" Rack: {broker.rack}")
468
469
# Examine topic metadata
470
print(f"\nTopics ({len(cluster.topics())}):")
471
for topic in sorted(cluster.topics()):
472
partitions = cluster.partitions_for_topic(topic)
473
print(f" {topic}: {len(partitions)} partitions")
474
475
# Get detailed partition information
476
for partition_id in sorted(partitions):
477
tp = TopicPartition(topic, partition_id)
478
leader = cluster.leader_for_partition(tp)
479
replicas = cluster.replicas_for_partition(tp)
480
isr = cluster.in_sync_replicas_for_partition(tp)
481
482
print(f" Partition {partition_id}:")
483
print(f" Leader: {leader}")
484
print(f" Replicas: {replicas}")
485
print(f" ISR: {isr}")
486
487
finally:
488
client.close()
489
```
490
491
### Consumer Group Analysis
492
493
```python
494
from kafka import KafkaAdminClient
495
496
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
497
498
try:
499
# List all consumer groups
500
groups = admin.list_consumer_groups()
501
print(f"Found {len(groups)} consumer groups:")
502
503
group_ids = []
504
for group in groups:
505
print(f" {group.group}: {group.state}")
506
group_ids.append(group.group)
507
508
# Get detailed information for each group
509
if group_ids:
510
descriptions = admin.describe_consumer_groups(group_ids)
511
512
for group_id, description in descriptions.items():
513
print(f"\nGroup: {group_id}")
514
print(f" State: {description.state}")
515
print(f" Protocol: {description.partition_assignor}")
516
print(f" Members: {len(description.members)}")
517
518
for i, member in enumerate(description.members):
519
print(f" Member {i+1}:")
520
print(f" ID: {member.member_id}")
521
print(f" Client: {member.client_id}")
522
print(f" Host: {member.host}")
523
524
# Parse assignment if available
525
if hasattr(member, 'assignment') and member.assignment:
526
partitions = member.assignment.topic_partitions
527
print(f" Assigned partitions: {len(partitions)}")
528
for tp in sorted(partitions):
529
print(f" {tp.topic}[{tp.partition}]")
530
531
finally:
532
admin.close()
533
```
534
535
### Record Processing Patterns
536
537
```python
538
from kafka import KafkaConsumer
539
import json
540
from collections import defaultdict
541
542
consumer = KafkaConsumer(
543
'events',
544
bootstrap_servers=['localhost:9092'],
545
group_id='processor',
546
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
547
max_poll_records=100
548
)
549
550
# Statistics tracking
551
partition_stats = defaultdict(lambda: {'count': 0, 'latest_offset': -1})
552
message_types = defaultdict(int)
553
554
try:
555
while True:
556
# Poll for batch of records
557
record_batch = consumer.poll(timeout_ms=5000)
558
559
if not record_batch:
560
print("No new messages")
561
continue
562
563
print(f"Received {len(record_batch)} records")
564
565
# Process records by partition to maintain ordering
566
for partition, records in record_batch.items():
567
print(f"\nProcessing {len(records)} records from {partition}")
568
569
for record in records:
570
# Update statistics
571
partition_stats[partition]['count'] += 1
572
partition_stats[partition]['latest_offset'] = record.offset
573
574
# Analyze message content
575
if isinstance(record.value, dict) and 'type' in record.value:
576
message_types[record.value['type']] += 1
577
578
# Process message headers
579
if record.headers:
580
print(f" Headers: {dict(record.headers)}")
581
582
# Process message based on timestamp
583
age_seconds = (time.time() * 1000 - record.timestamp) / 1000
584
if age_seconds > 300: # 5 minutes
585
print(f" Warning: Old message ({age_seconds:.1f}s old)")
586
587
# Print periodic statistics
588
print(f"\nPartition Statistics:")
589
for partition, stats in partition_stats.items():
590
print(f" {partition}: {stats['count']} messages, "
591
f"latest offset {stats['latest_offset']}")
592
593
print(f"Message Types: {dict(message_types)}")
594
595
except KeyboardInterrupt:
596
print("Shutting down...")
597
598
finally:
599
consumer.close()
600
```
601
602
### Custom Data Structures
603
604
```python
605
from collections import namedtuple
606
from kafka import TopicPartition
607
608
# Custom application-specific structures
609
MessageEnvelope = namedtuple('MessageEnvelope', [
610
'correlation_id', 'timestamp', 'source', 'data'
611
])
612
613
ProcessingResult = namedtuple('ProcessingResult', [
614
'success', 'partition', 'offset', 'processing_time', 'error'
615
])
616
617
PartitionLag = namedtuple('PartitionLag', [
618
'partition', 'current_offset', 'high_water_mark', 'lag'
619
])
620
621
def calculate_consumer_lag(consumer, partitions):
622
"""Calculate lag for consumer partitions."""
623
624
# Get current positions
625
current_offsets = {}
626
for partition in partitions:
627
try:
628
current_offsets[partition] = consumer.position(partition)
629
except Exception:
630
current_offsets[partition] = -1
631
632
# Get high water marks
633
end_offsets = consumer.end_offsets(partitions)
634
635
# Calculate lag
636
lag_info = []
637
for partition in partitions:
638
current = current_offsets.get(partition, -1)
639
high_water = end_offsets.get(partition, -1)
640
641
if current >= 0 and high_water >= 0:
642
lag = high_water - current
643
else:
644
lag = -1 # Unknown
645
646
lag_info.append(PartitionLag(
647
partition=partition,
648
current_offset=current,
649
high_water_mark=high_water,
650
lag=lag
651
))
652
653
return lag_info
654
655
# Usage
656
consumer = KafkaConsumer('events', bootstrap_servers=['localhost:9092'])
657
consumer.poll(0) # Initialize assignments
658
659
partitions = list(consumer.assignment())
660
lag_info = calculate_consumer_lag(consumer, partitions)
661
662
for lag in lag_info:
663
print(f"Partition {lag.partition}: "
664
f"current={lag.current_offset}, "
665
f"high_water={lag.high_water_mark}, "
666
f"lag={lag.lag}")
667
668
consumer.close()
669
```