or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconsumer.mderrors.mdindex.mdproducer.mdstructures.md

structures.mddocs/

0

# Data Structures

1

2

Core data structures for representing Kafka concepts including topics, partitions, offsets, broker metadata, and consumer group information.

3

4

## Capabilities

5

6

### Topic and Partition Identifiers

7

8

Fundamental structures for identifying Kafka topics and partitions.

9

10

```python { .api }

11

TopicPartition = NamedTuple('TopicPartition', [

12

('topic', str), # Topic name

13

('partition', int) # Partition number

14

])

15

16

def TopicPartition(topic, partition):

17

"""

18

Create topic-partition identifier.

19

20

Parameters:

21

- topic: str, topic name

22

- partition: int, partition number (0-based)

23

24

Returns:

25

- TopicPartition: immutable topic-partition tuple

26

"""

27

28

# Usage examples:

29

# tp = TopicPartition('events', 0)

30

# tp = TopicPartition(topic='logs', partition=2)

31

```

32

33

### Offset Management

34

35

Structures for managing consumer offsets and metadata.

36

37

```python { .api }

38

OffsetAndMetadata = NamedTuple('OffsetAndMetadata', [

39

('offset', int), # Offset to commit

40

('metadata', str), # Optional metadata string

41

('leader_epoch', int) # Leader epoch (optional)

42

])

43

44

def OffsetAndMetadata(offset, metadata=None, leader_epoch=None):

45

"""

46

Create offset commit information.

47

48

Parameters:

49

- offset: int, offset to commit

50

- metadata: str, optional metadata (max 4096 bytes)

51

- leader_epoch: int, optional leader epoch for fencing

52

53

Returns:

54

- OffsetAndMetadata: offset commit information

55

"""

56

57

OffsetAndTimestamp = NamedTuple('OffsetAndTimestamp', [

58

('offset', int), # Message offset

59

('timestamp', int), # Message timestamp (milliseconds)

60

('leader_epoch', int) # Leader epoch

61

])

62

63

def OffsetAndTimestamp(offset, timestamp, leader_epoch=None):

64

"""

65

Create offset-timestamp pair.

66

67

Parameters:

68

- offset: int, message offset

69

- timestamp: int, message timestamp in milliseconds

70

- leader_epoch: int, optional leader epoch

71

72

Returns:

73

- OffsetAndTimestamp: offset with timestamp information

74

"""

75

```

76

77

### Broker and Cluster Metadata

78

79

Structures representing broker information and cluster topology.

80

81

```python { .api }

82

BrokerMetadata = NamedTuple('BrokerMetadata', [

83

('nodeId', int), # Broker node ID

84

('host', str), # Broker hostname

85

('port', int), # Broker port

86

('rack', str) # Rack identifier (may be None)

87

])

88

89

def BrokerMetadata(nodeId, host, port, rack=None):

90

"""

91

Create broker metadata.

92

93

Parameters:

94

- nodeId: int, unique broker ID

95

- host: str, broker hostname or IP

96

- port: int, broker port number

97

- rack: str, optional rack identifier for rack awareness

98

99

Returns:

100

- BrokerMetadata: broker information

101

"""

102

103

PartitionMetadata = NamedTuple('PartitionMetadata', [

104

('topic', str), # Topic name

105

('partition', int), # Partition number

106

('leader', int), # Leader broker ID

107

('leader_epoch', int), # Leader epoch

108

('replicas', list), # List of replica broker IDs

109

('isr', list), # In-sync replica broker IDs

110

('offline_replicas', list), # Offline replica broker IDs

111

('error', 'KafkaError') # Error information (None if no error)

112

])

113

114

def PartitionMetadata(topic, partition, leader, leader_epoch,

115

replicas, isr, offline_replicas=None, error=None):

116

"""

117

Create partition metadata.

118

119

Parameters:

120

- topic: str, topic name

121

- partition: int, partition number

122

- leader: int, leader broker ID (-1 if no leader)

123

- leader_epoch: int, current leader epoch

124

- replicas: List[int], all replica broker IDs

125

- isr: List[int], in-sync replica broker IDs

126

- offline_replicas: List[int], offline replica broker IDs

127

- error: KafkaError, error information if any

128

129

Returns:

130

- PartitionMetadata: complete partition information

131

"""

132

```

133

134

### Consumer Group Information

135

136

Structures for consumer group membership and coordination.

137

138

```python { .api }

139

MemberInformation = NamedTuple('MemberInformation', [

140

('member_id', str), # Member ID assigned by coordinator

141

('client_id', str), # Client ID specified by consumer

142

('client_host', str), # Client hostname

143

('member_metadata', bytes), # Member metadata (serialized)

144

('member_assignment', bytes) # Member assignment (serialized)

145

])

146

147

def MemberInformation(member_id, client_id, client_host,

148

member_metadata, member_assignment):

149

"""

150

Create consumer group member information.

151

152

Parameters:

153

- member_id: str, unique member identifier

154

- client_id: str, client identifier

155

- client_host: str, client hostname

156

- member_metadata: bytes, serialized member metadata

157

- member_assignment: bytes, serialized partition assignment

158

159

Returns:

160

- MemberInformation: consumer group member details

161

"""

162

163

GroupInformation = NamedTuple('GroupInformation', [

164

('error_code', int), # Error code (0 = success)

165

('group', str), # Group ID

166

('state', str), # Group state

167

('protocol_type', str), # Protocol type (usually 'consumer')

168

('protocol', str), # Assignment protocol name

169

('members', list), # List of MemberInformation

170

('authorized_operations', list) # List of authorized operations

171

])

172

173

def GroupInformation(error_code, group, state, protocol_type, protocol,

174

members, authorized_operations=None):

175

"""

176

Create consumer group information.

177

178

Parameters:

179

- error_code: int, operation error code

180

- group: str, consumer group ID

181

- state: str, group state ('Stable', 'Rebalancing', etc.)

182

- protocol_type: str, protocol type

183

- protocol: str, partition assignment protocol

184

- members: List[MemberInformation], group members

185

- authorized_operations: List[int], authorized operations

186

187

Returns:

188

- GroupInformation: complete group details

189

"""

190

```

191

192

### Configuration and Operational Data

193

194

Structures for configuration management and operational parameters.

195

196

```python { .api }

197

RetryOptions = NamedTuple('RetryOptions', [

198

('limit', int), # Maximum retry attempts

199

('backoff_ms', int), # Backoff interval in milliseconds

200

('retry_on_timeouts', bool) # Whether to retry on timeout errors

201

])

202

203

def RetryOptions(limit=3, backoff_ms=1000, retry_on_timeouts=True):

204

"""

205

Create retry configuration.

206

207

Parameters:

208

- limit: int, maximum number of retry attempts

209

- backoff_ms: int, backoff interval between retries

210

- retry_on_timeouts: bool, retry on timeout errors

211

212

Returns:

213

- RetryOptions: retry configuration

214

"""

215

216

Node = NamedTuple('Node', [

217

('id', int), # Node ID

218

('host', str), # Hostname

219

('port', int), # Port number

220

('rack', str) # Rack (may be None)

221

])

222

223

def Node(id, host, port, rack=None):

224

"""

225

Create node information.

226

227

Parameters:

228

- id: int, node identifier

229

- host: str, hostname

230

- port: int, port number

231

- rack: str, optional rack identifier

232

233

Returns:

234

- Node: node connection information

235

"""

236

```

237

238

### Record and Message Data

239

240

Structures representing individual messages and record batches.

241

242

```python { .api }

243

class ConsumerRecord:

244

"""

245

Individual record consumed from Kafka.

246

247

Attributes:

248

- topic: str, topic name

249

- partition: int, partition number

250

- offset: int, message offset

251

- timestamp: int, message timestamp (milliseconds since epoch)

252

- timestamp_type: int, timestamp type (0=CreateTime, 1=LogAppendTime)

253

- key: bytes, message key (raw bytes, may be None)

254

- value: bytes, message value (raw bytes, may be None)

255

- headers: List[Tuple[str, bytes]], message headers

256

- checksum: int, message checksum

257

- serialized_key_size: int, size of serialized key

258

- serialized_value_size: int, size of serialized value

259

- leader_epoch: int, leader epoch when message was written

260

"""

261

262

def __init__(self, topic, partition, offset, timestamp, timestamp_type,

263

key, value, headers=None, checksum=None,

264

serialized_key_size=None, serialized_value_size=None,

265

leader_epoch=None):

266

self.topic = topic

267

self.partition = partition

268

self.offset = offset

269

self.timestamp = timestamp

270

self.timestamp_type = timestamp_type

271

self.key = key

272

self.value = value

273

self.headers = headers or []

274

self.checksum = checksum

275

self.serialized_key_size = serialized_key_size

276

self.serialized_value_size = serialized_value_size

277

self.leader_epoch = leader_epoch

278

279

class RecordMetadata:

280

"""

281

Metadata returned after successful record production.

282

283

Attributes:

284

- topic: str, topic name

285

- partition: int, partition number

286

- offset: int, assigned offset

287

- timestamp: int, record timestamp

288

- checksum: int, record checksum

289

- serialized_key_size: int, size of serialized key

290

- serialized_value_size: int, size of serialized value

291

- leader_epoch: int, leader epoch

292

"""

293

294

def __init__(self, topic, partition, offset, timestamp=None,

295

checksum=None, serialized_key_size=None,

296

serialized_value_size=None, leader_epoch=None):

297

self.topic = topic

298

self.partition = partition

299

self.offset = offset

300

self.timestamp = timestamp

301

self.checksum = checksum

302

self.serialized_key_size = serialized_key_size

303

self.serialized_value_size = serialized_value_size

304

self.leader_epoch = leader_epoch

305

306

class ConsumerRecords:

307

"""

308

Collection of records returned by consumer poll operation.

309

310

Provides various ways to access records: by partition, by topic,

311

or iterate over all records.

312

"""

313

314

def __init__(self, record_map):

315

"""

316

Initialize with mapping of TopicPartition to List[ConsumerRecord].

317

318

Parameters:

319

- record_map: Dict[TopicPartition, List[ConsumerRecord]]

320

"""

321

self._record_map = record_map

322

323

def __iter__(self):

324

"""Iterate over all records across all partitions."""

325

for records in self._record_map.values():

326

for record in records:

327

yield record

328

329

def __len__(self):

330

"""Total number of records across all partitions."""

331

return sum(len(records) for records in self._record_map.values())

332

333

def __bool__(self):

334

"""True if contains any records."""

335

return len(self) > 0

336

337

def records(self, partition):

338

"""

339

Get records for specific partition.

340

341

Parameters:

342

- partition: TopicPartition

343

344

Returns:

345

- List[ConsumerRecord]: records for partition

346

"""

347

return self._record_map.get(partition, [])

348

349

def by_topic(self):

350

"""

351

Group records by topic.

352

353

Returns:

354

- Dict[str, List[ConsumerRecord]]: records grouped by topic

355

"""

356

topic_records = {}

357

for tp, records in self._record_map.items():

358

if tp.topic not in topic_records:

359

topic_records[tp.topic] = []

360

topic_records[tp.topic].extend(records)

361

return topic_records

362

363

def partitions(self):

364

"""

365

Get all partitions with records.

366

367

Returns:

368

- Set[TopicPartition]: partitions with records

369

"""

370

return set(self._record_map.keys())

371

```

372

373

## Usage Examples

374

375

### Working with Topic Partitions

376

377

```python

378

from kafka import TopicPartition, KafkaConsumer, KafkaProducer

379

380

# Create topic partition identifiers

381

tp1 = TopicPartition('events', 0)

382

tp2 = TopicPartition('events', 1)

383

tp3 = TopicPartition('logs', 0)

384

385

# Use with consumer assignment

386

consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])

387

consumer.assign([tp1, tp2, tp3])

388

389

# Use with producer partition selection

390

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

391

392

# Send to specific partition

393

future = producer.send(tp1.topic, value=b'message', partition=tp1.partition)

394

metadata = future.get()

395

396

print(f"Sent to {metadata.topic} partition {metadata.partition} at offset {metadata.offset}")

397

398

# Check available partitions

399

available_partitions = producer.partitions_for(tp1.topic)

400

print(f"Available partitions for {tp1.topic}: {available_partitions}")

401

402

consumer.close()

403

producer.close()

404

```

405

406

### Offset Management

407

408

```python

409

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

410

411

consumer = KafkaConsumer(

412

bootstrap_servers=['localhost:9092'],

413

group_id='offset-demo',

414

enable_auto_commit=False # Manual offset management

415

)

416

417

# Subscribe to topic

418

consumer.subscribe(['events'])

419

420

# Poll for messages

421

records = consumer.poll(timeout_ms=5000)

422

423

# Process records and commit offsets manually

424

for partition, messages in records.items():

425

for message in messages:

426

# Process message

427

print(f"Processing message: {message.value}")

428

429

# Create offset metadata with custom information

430

offset_metadata = OffsetAndMetadata(

431

offset=message.offset + 1, # Next offset to read

432

metadata=f"processed_at_{int(time.time())}"

433

)

434

435

# Commit offset for this partition

436

consumer.commit({partition: offset_metadata})

437

438

# Check committed offsets

439

for partition in consumer.assignment():

440

committed = consumer.committed(partition)

441

if committed:

442

print(f"Partition {partition}: committed offset {committed.offset}, "

443

f"metadata: {committed.metadata}")

444

445

consumer.close()

446

```

447

448

### Cluster Metadata Inspection

449

450

```python

451

from kafka.client_async import KafkaClient

452

from kafka import TopicPartition

453

454

client = KafkaClient(bootstrap_servers=['localhost:9092'])

455

456

try:

457

# Wait for metadata to load

458

client.poll(timeout_ms=5000)

459

460

cluster = client.cluster

461

462

# Examine broker information

463

print("Cluster Brokers:")

464

for broker in cluster.brokers():

465

print(f" Broker {broker.nodeId}: {broker.host}:{broker.port}")

466

if broker.rack:

467

print(f" Rack: {broker.rack}")

468

469

# Examine topic metadata

470

print(f"\nTopics ({len(cluster.topics())}):")

471

for topic in sorted(cluster.topics()):

472

partitions = cluster.partitions_for_topic(topic)

473

print(f" {topic}: {len(partitions)} partitions")

474

475

# Get detailed partition information

476

for partition_id in sorted(partitions):

477

tp = TopicPartition(topic, partition_id)

478

leader = cluster.leader_for_partition(tp)

479

replicas = cluster.replicas_for_partition(tp)

480

isr = cluster.in_sync_replicas_for_partition(tp)

481

482

print(f" Partition {partition_id}:")

483

print(f" Leader: {leader}")

484

print(f" Replicas: {replicas}")

485

print(f" ISR: {isr}")

486

487

finally:

488

client.close()

489

```

490

491

### Consumer Group Analysis

492

493

```python

494

from kafka import KafkaAdminClient

495

496

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

497

498

try:

499

# List all consumer groups

500

groups = admin.list_consumer_groups()

501

print(f"Found {len(groups)} consumer groups:")

502

503

group_ids = []

504

for group in groups:

505

print(f" {group.group}: {group.state}")

506

group_ids.append(group.group)

507

508

# Get detailed information for each group

509

if group_ids:

510

descriptions = admin.describe_consumer_groups(group_ids)

511

512

for group_id, description in descriptions.items():

513

print(f"\nGroup: {group_id}")

514

print(f" State: {description.state}")

515

print(f" Protocol: {description.partition_assignor}")

516

print(f" Members: {len(description.members)}")

517

518

for i, member in enumerate(description.members):

519

print(f" Member {i+1}:")

520

print(f" ID: {member.member_id}")

521

print(f" Client: {member.client_id}")

522

print(f" Host: {member.host}")

523

524

# Parse assignment if available

525

if hasattr(member, 'assignment') and member.assignment:

526

partitions = member.assignment.topic_partitions

527

print(f" Assigned partitions: {len(partitions)}")

528

for tp in sorted(partitions):

529

print(f" {tp.topic}[{tp.partition}]")

530

531

finally:

532

admin.close()

533

```

534

535

### Record Processing Patterns

536

537

```python

538

from kafka import KafkaConsumer

539

import json

540

from collections import defaultdict

541

542

consumer = KafkaConsumer(

543

'events',

544

bootstrap_servers=['localhost:9092'],

545

group_id='processor',

546

value_deserializer=lambda m: json.loads(m.decode('utf-8')),

547

max_poll_records=100

548

)

549

550

# Statistics tracking

551

partition_stats = defaultdict(lambda: {'count': 0, 'latest_offset': -1})

552

message_types = defaultdict(int)

553

554

try:

555

while True:

556

# Poll for batch of records

557

record_batch = consumer.poll(timeout_ms=5000)

558

559

if not record_batch:

560

print("No new messages")

561

continue

562

563

print(f"Received {len(record_batch)} records")

564

565

# Process records by partition to maintain ordering

566

for partition, records in record_batch.items():

567

print(f"\nProcessing {len(records)} records from {partition}")

568

569

for record in records:

570

# Update statistics

571

partition_stats[partition]['count'] += 1

572

partition_stats[partition]['latest_offset'] = record.offset

573

574

# Analyze message content

575

if isinstance(record.value, dict) and 'type' in record.value:

576

message_types[record.value['type']] += 1

577

578

# Process message headers

579

if record.headers:

580

print(f" Headers: {dict(record.headers)}")

581

582

# Process message based on timestamp

583

age_seconds = (time.time() * 1000 - record.timestamp) / 1000

584

if age_seconds > 300: # 5 minutes

585

print(f" Warning: Old message ({age_seconds:.1f}s old)")

586

587

# Print periodic statistics

588

print(f"\nPartition Statistics:")

589

for partition, stats in partition_stats.items():

590

print(f" {partition}: {stats['count']} messages, "

591

f"latest offset {stats['latest_offset']}")

592

593

print(f"Message Types: {dict(message_types)}")

594

595

except KeyboardInterrupt:

596

print("Shutting down...")

597

598

finally:

599

consumer.close()

600

```

601

602

### Custom Data Structures

603

604

```python

605

from collections import namedtuple

606

from kafka import TopicPartition

607

608

# Custom application-specific structures

609

MessageEnvelope = namedtuple('MessageEnvelope', [

610

'correlation_id', 'timestamp', 'source', 'data'

611

])

612

613

ProcessingResult = namedtuple('ProcessingResult', [

614

'success', 'partition', 'offset', 'processing_time', 'error'

615

])

616

617

PartitionLag = namedtuple('PartitionLag', [

618

'partition', 'current_offset', 'high_water_mark', 'lag'

619

])

620

621

def calculate_consumer_lag(consumer, partitions):

622

"""Calculate lag for consumer partitions."""

623

624

# Get current positions

625

current_offsets = {}

626

for partition in partitions:

627

try:

628

current_offsets[partition] = consumer.position(partition)

629

except Exception:

630

current_offsets[partition] = -1

631

632

# Get high water marks

633

end_offsets = consumer.end_offsets(partitions)

634

635

# Calculate lag

636

lag_info = []

637

for partition in partitions:

638

current = current_offsets.get(partition, -1)

639

high_water = end_offsets.get(partition, -1)

640

641

if current >= 0 and high_water >= 0:

642

lag = high_water - current

643

else:

644

lag = -1 # Unknown

645

646

lag_info.append(PartitionLag(

647

partition=partition,

648

current_offset=current,

649

high_water_mark=high_water,

650

lag=lag

651

))

652

653

return lag_info

654

655

# Usage

656

consumer = KafkaConsumer('events', bootstrap_servers=['localhost:9092'])

657

consumer.poll(0) # Initialize assignments

658

659

partitions = list(consumer.assignment())

660

lag_info = calculate_consumer_lag(consumer, partitions)

661

662

for lag in lag_info:

663

print(f"Partition {lag.partition}: "

664

f"current={lag.current_offset}, "

665

f"high_water={lag.high_water_mark}, "

666

f"lag={lag.lag}")

667

668

consumer.close()

669

```