or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconnection.mdconsumer.mderrors.mdindex.mdproducer.mdserialization.mdstructs.md

structs.mddocs/

0

# Data Structures

1

2

Core data structures for representing Kafka concepts including topic partitions, offsets, broker metadata, and consumer group information.

3

4

## Imports

5

6

```python

7

from collections import namedtuple

8

```

9

10

## Capabilities

11

12

### Topic and Partition Identifiers

13

14

Data structures for identifying topics and partitions within a Kafka cluster.

15

16

```python { .api }

17

TopicPartition = namedtuple("TopicPartition", ["topic", "partition"])

18

"""

19

A topic and partition tuple.

20

21

Args:

22

topic (str): Topic name

23

partition (int): Partition number

24

25

Usage:

26

tp = TopicPartition('my-topic', 0)

27

print(tp.topic) # 'my-topic'

28

print(tp.partition) # 0

29

"""

30

```

31

32

### Offset and Metadata

33

34

Data structures for representing offsets with associated metadata.

35

36

```python { .api }

37

OffsetAndMetadata = namedtuple("OffsetAndMetadata", ["offset", "metadata"])

38

"""

39

Offset with commit metadata.

40

41

Args:

42

offset (int): The offset to be committed

43

metadata (str): Non-null metadata

44

45

Usage:

46

oam = OffsetAndMetadata(100, "my-metadata")

47

print(oam.offset) # 100

48

print(oam.metadata) # "my-metadata"

49

"""

50

51

OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", ["offset", "timestamp"])

52

"""

53

Offset with associated timestamp.

54

55

Args:

56

offset (int): An offset

57

timestamp (int): The timestamp associated to the offset

58

59

Usage:

60

oat = OffsetAndTimestamp(100, 1640995200000)

61

print(oat.offset) # 100

62

print(oat.timestamp) # 1640995200000

63

"""

64

```

65

66

### Broker and Cluster Metadata

67

68

Data structures for representing broker and cluster information.

69

70

```python { .api }

71

class BrokerMetadata:

72

def __init__(self, nodeId: int, host: str, port: int, rack: str = None):

73

"""

74

Kafka broker metadata.

75

76

Args:

77

nodeId (int): Broker node ID

78

host (str): Broker hostname

79

port (int): Broker port

80

rack (str): Broker rack identifier (optional)

81

"""

82

83

nodeId: int # Broker node ID

84

host: str # Broker hostname

85

port: int # Broker port

86

rack: str # Broker rack (optional)

87

88

class PartitionMetadata:

89

def __init__(self, topic: str, partition: int, leader: int, replicas: list, isr: list, error: int = None):

90

"""

91

Partition metadata from cluster.

92

93

Args:

94

topic (str): Topic name

95

partition (int): Partition number

96

leader (int): Leader broker node ID

97

replicas (list): List of replica broker node IDs

98

isr (list): List of in-sync replica broker node IDs

99

error (int): Error code (optional)

100

"""

101

102

topic: str # Topic name

103

partition: int # Partition number

104

leader: int # Leader broker node ID

105

replicas: list # Replica broker node IDs

106

isr: list # In-sync replica broker node IDs

107

error: int # Error code

108

```

109

110

### Consumer Group Information

111

112

Data structures for representing consumer group state and member information.

113

114

```python { .api }

115

class MemberInformation:

116

def __init__(self, member_id: str, client_id: str, client_host: str, member_metadata: bytes, member_assignment: bytes):

117

"""

118

Consumer group member information.

119

120

Args:

121

member_id (str): Member identifier

122

client_id (str): Client identifier

123

client_host (str): Client hostname

124

member_metadata (bytes): Member metadata

125

member_assignment (bytes): Member assignment data

126

"""

127

128

member_id: str # Member identifier

129

client_id: str # Client identifier

130

client_host: str # Client hostname

131

member_metadata: bytes # Member metadata

132

member_assignment: bytes # Member assignment

133

134

class GroupInformation:

135

def __init__(self, error_code: int, group: str, state: str, protocol_type: str, protocol: str, members: list, authorized_operations: set = None):

136

"""

137

Consumer group information.

138

139

Args:

140

error_code (int): Error code

141

group (str): Group ID

142

state (str): Group state

143

protocol_type (str): Protocol type

144

protocol (str): Protocol name

145

members (list): List of MemberInformation objects

146

authorized_operations (set): Authorized operations (optional)

147

"""

148

149

error_code: int # Error code

150

group: str # Group ID

151

state: str # Group state

152

protocol_type: str # Protocol type

153

protocol: str # Protocol name

154

members: list # List of members

155

authorized_operations: set # Authorized operations

156

```

157

158

### Producer Configuration

159

160

Data structures for producer retry and configuration options.

161

162

```python { .api }

163

class RetryOptions:

164

def __init__(self, limit: int, backoff_ms: int, retry_on_timeouts: bool = True):

165

"""

166

Retry policy configuration for async producer.

167

168

Args:

169

limit (int): Maximum retry attempts

170

backoff_ms (int): Backoff time between retries in milliseconds

171

retry_on_timeouts (bool): Whether to retry on timeout errors

172

"""

173

174

limit: int # Maximum retry attempts

175

backoff_ms: int # Backoff time in milliseconds

176

retry_on_timeouts: bool # Retry on timeouts

177

```

178

179

### Record Metadata and Timestamps

180

181

Data structures for record metadata and timing information.

182

183

```python { .api }

184

class RecordMetadata:

185

def __init__(self, topic: str, partition: int, offset: int, timestamp: int = None, checksum: int = None, serialized_key_size: int = None, serialized_value_size: int = None):

186

"""

187

Metadata for a produced record.

188

189

Args:

190

topic (str): Topic name

191

partition (int): Partition number

192

offset (int): Record offset

193

timestamp (int): Record timestamp (optional)

194

checksum (int): Record checksum (optional)

195

serialized_key_size (int): Key size in bytes (optional)

196

serialized_value_size (int): Value size in bytes (optional)

197

"""

198

199

topic: str # Topic name

200

partition: int # Partition number

201

offset: int # Record offset

202

timestamp: int # Record timestamp

203

checksum: int # Record checksum

204

serialized_key_size: int # Key size in bytes

205

serialized_value_size: int # Value size in bytes

206

207

class ConsumerRecord:

208

def __init__(self, topic: str, partition: int, offset: int, timestamp: int, timestamp_type: int, key: bytes, value: bytes, headers: list = None, checksum: int = None, serialized_key_size: int = None, serialized_value_size: int = None):

209

"""

210

Record consumed from Kafka.

211

212

Args:

213

topic (str): Topic name

214

partition (int): Partition number

215

offset (int): Record offset

216

timestamp (int): Record timestamp

217

timestamp_type (int): Timestamp type

218

key (bytes): Record key

219

value (bytes): Record value

220

headers (list): List of (key, value) header tuples (optional)

221

checksum (int): Record checksum (optional)

222

serialized_key_size (int): Key size in bytes (optional)

223

serialized_value_size (int): Value size in bytes (optional)

224

"""

225

226

topic: str # Topic name

227

partition: int # Partition number

228

offset: int # Record offset

229

timestamp: int # Record timestamp

230

timestamp_type: int # Timestamp type

231

key: bytes # Record key

232

value: bytes # Record value

233

headers: list # Header tuples

234

checksum: int # Record checksum

235

serialized_key_size: int # Key size in bytes

236

serialized_value_size: int # Value size in bytes

237

```

238

239

## Usage Examples

240

241

### Working with TopicPartition

242

243

```python

244

from kafka.structs import TopicPartition

245

from kafka import KafkaConsumer

246

247

# Create topic partition identifiers

248

partition_0 = TopicPartition('my-topic', 0)

249

partition_1 = TopicPartition('my-topic', 1)

250

251

# Use in consumer assignment

252

consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'])

253

consumer.assign([partition_0, partition_1])

254

255

# Use in offset management

256

consumer.seek(partition_0, 1000)

257

current_position = consumer.position(partition_0)

258

259

# TopicPartition objects are hashable

260

partition_set = {partition_0, partition_1}

261

partition_dict = {partition_0: 'data for partition 0'}

262

```

263

264

### Offset and Metadata Management

265

266

```python

267

from kafka.structs import TopicPartition, OffsetAndMetadata

268

from kafka import KafkaConsumer

269

270

consumer = KafkaConsumer(

271

bootstrap_servers=['localhost:9092'],

272

group_id='my-group',

273

enable_auto_commit=False

274

)

275

276

partition = TopicPartition('my-topic', 0)

277

278

# Manual offset commits with metadata

279

offset_metadata = OffsetAndMetadata(1500, 'processed batch 123')

280

consumer.commit({partition: offset_metadata})

281

282

# Check committed offset

283

committed = consumer.committed(partition, metadata=True)

284

print(f"Committed offset: {committed.offset}, metadata: {committed.metadata}")

285

286

# Get offsets with timestamps

287

from kafka.structs import OffsetAndTimestamp

288

# Note: OffsetAndTimestamp is typically returned by offset-by-timestamp queries

289

```

290

291

### Broker and Cluster Information

292

293

```python

294

from kafka import KafkaAdminClient

295

from kafka.structs import BrokerMetadata

296

297

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

298

299

# Get cluster metadata

300

metadata = admin.list_topics()

301

302

# Access broker information

303

for broker_id, broker in metadata.brokers.items():

304

print(f"Broker {broker.nodeId}: {broker.host}:{broker.port}")

305

if broker.rack:

306

print(f" Rack: {broker.rack}")

307

308

# Access partition metadata

309

for topic_name, topic_metadata in metadata.topics.items():

310

for partition in topic_metadata.partitions.values():

311

print(f"Topic {partition.topic} partition {partition.partition}:")

312

print(f" Leader: {partition.leader}")

313

print(f" Replicas: {partition.replicas}")

314

print(f" ISR: {partition.isr}")

315

```

316

317

### Consumer Group Information

318

319

```python

320

from kafka import KafkaAdminClient

321

322

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

323

324

# Get consumer group details

325

group_details = admin.describe_consumer_groups(['my-consumer-group'])

326

327

for group_id, group_info in group_details.items():

328

print(f"Group: {group_info.group}")

329

print(f"State: {group_info.state}")

330

print(f"Protocol: {group_info.protocol}")

331

332

print("Members:")

333

for member in group_info.members:

334

print(f" Member ID: {member.member_id}")

335

print(f" Client ID: {member.client_id}")

336

print(f" Host: {member.client_host}")

337

```

338

339

### Record Metadata Handling

340

341

```python

342

from kafka import KafkaProducer

343

344

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

345

346

# Send message and get metadata

347

future = producer.send('my-topic', key=b'key1', value=b'value1')

348

record_metadata = future.get(timeout=10)

349

350

print(f"Record sent to:")

351

print(f" Topic: {record_metadata.topic}")

352

print(f" Partition: {record_metadata.partition}")

353

print(f" Offset: {record_metadata.offset}")

354

print(f" Timestamp: {record_metadata.timestamp}")

355

print(f" Key size: {record_metadata.serialized_key_size} bytes")

356

print(f" Value size: {record_metadata.serialized_value_size} bytes")

357

```

358

359

### Consumer Record Processing

360

361

```python

362

from kafka import KafkaConsumer

363

364

consumer = KafkaConsumer(

365

'my-topic',

366

bootstrap_servers=['localhost:9092'],

367

group_id='my-group'

368

)

369

370

for message in consumer:

371

print(f"Consumed record:")

372

print(f" Topic: {message.topic}")

373

print(f" Partition: {message.partition}")

374

print(f" Offset: {message.offset}")

375

print(f" Timestamp: {message.timestamp}")

376

print(f" Key: {message.key}")

377

print(f" Value: {message.value}")

378

379

# Process headers if present

380

if message.headers:

381

print(" Headers:")

382

for header_key, header_value in message.headers:

383

print(f" {header_key}: {header_value}")

384

```

385

386

### Data Structure Comparisons and Collections

387

388

```python

389

from kafka.structs import TopicPartition, OffsetAndMetadata

390

391

# TopicPartition equality and hashing

392

tp1 = TopicPartition('topic-a', 0)

393

tp2 = TopicPartition('topic-a', 0)

394

tp3 = TopicPartition('topic-a', 1)

395

396

print(tp1 == tp2) # True

397

print(tp1 == tp3) # False

398

399

# Use in sets and dictionaries

400

partitions = {tp1, tp2, tp3} # Only 2 unique partitions

401

print(len(partitions)) # 2

402

403

# Offset mapping

404

offsets = {

405

tp1: OffsetAndMetadata(1000, 'first partition'),

406

tp3: OffsetAndMetadata(2000, 'second partition')

407

}

408

409

for partition, offset_data in offsets.items():

410

print(f"{partition.topic}:{partition.partition} = {offset_data.offset}")

411

```