or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconsumer.mderrors.mdindex.mdproducer.mdstructures.md

consumer.mddocs/

0

# Consumer API

1

2

High-level consumer for consuming records from Kafka topics with comprehensive support for consumer groups, automatic partition assignment, offset management, and rebalancing coordination.

3

4

## Capabilities

5

6

### KafkaConsumer

7

8

Main consumer class providing high-level interface for consuming records from Kafka topics. Supports both subscribe (automatic partition assignment) and assign (manual partition assignment) modes.

9

10

```python { .api }

11

class KafkaConsumer:

12

def __init__(self, *topics, **configs):

13

"""

14

Initialize Kafka consumer.

15

16

Parameters:

17

- *topics: str, optional topic names to subscribe to

18

- **configs: consumer configuration options

19

- bootstrap_servers: List[str], broker addresses

20

- group_id: str, consumer group identifier

21

- key_deserializer: Callable, key deserialization function

22

- value_deserializer: Callable, value deserialization function

23

- auto_offset_reset: str, 'earliest' or 'latest' for new groups

24

- enable_auto_commit: bool, automatic offset commits (default: True)

25

- auto_commit_interval_ms: int, auto-commit interval (default: 5000)

26

- session_timeout_ms: int, session timeout (default: 10000)

27

- heartbeat_interval_ms: int, heartbeat interval (default: 3000)

28

- max_poll_records: int, max records per poll (default: 500)

29

- fetch_min_bytes: int, minimum fetch size (default: 1)

30

- fetch_max_wait_ms: int, max fetch wait time (default: 500)

31

- max_partition_fetch_bytes: int, max bytes per partition (default: 1MB)

32

- security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'

33

- api_version: tuple, broker API version or 'auto'

34

"""

35

36

def subscribe(self, topics=(), pattern=None, listener=None):

37

"""

38

Subscribe to topics with automatic partition assignment.

39

40

Parameters:

41

- topics: List[str], topic names to subscribe to

42

- pattern: str, regex pattern for topic matching

43

- listener: ConsumerRebalanceListener, rebalance callback

44

"""

45

46

def assign(self, partitions):

47

"""

48

Manually assign specific partitions to consumer.

49

50

Parameters:

51

- partitions: List[TopicPartition], partitions to assign

52

"""

53

54

def poll(self, timeout_ms=0, max_records=None, update_offsets=True):

55

"""

56

Fetch records from assigned partitions.

57

58

Parameters:

59

- timeout_ms: int, polling timeout in milliseconds

60

- max_records: int, maximum records to return

61

- update_offsets: bool, whether to update high-water mark

62

63

Returns:

64

- ConsumerRecords: mapping of TopicPartition to list of ConsumerRecord

65

"""

66

67

def commit(self, offsets=None):

68

"""

69

Commit offsets to Kafka.

70

71

Parameters:

72

- offsets: Dict[TopicPartition, OffsetAndMetadata], specific offsets to commit

73

If None, commits current position for all assigned partitions

74

"""

75

76

def commit_async(self, offsets=None, callback=None):

77

"""

78

Asynchronous offset commit.

79

80

Parameters:

81

- offsets: Dict[TopicPartition, OffsetAndMetadata], offsets to commit

82

- callback: Callable, completion callback function

83

"""

84

85

def seek(self, partition, offset):

86

"""

87

Seek to specific offset in partition.

88

89

Parameters:

90

- partition: TopicPartition, target partition

91

- offset: int, target offset

92

"""

93

94

def seek_to_beginning(self, *partitions):

95

"""

96

Seek to beginning of partitions.

97

98

Parameters:

99

- *partitions: TopicPartition, partitions to seek (all assigned if none)

100

"""

101

102

def seek_to_end(self, *partitions):

103

"""

104

Seek to end of partitions.

105

106

Parameters:

107

- *partitions: TopicPartition, partitions to seek (all assigned if none)

108

"""

109

110

def position(self, partition):

111

"""

112

Get current position (next fetch offset) for partition.

113

114

Parameters:

115

- partition: TopicPartition, target partition

116

117

Returns:

118

- int: current position offset

119

"""

120

121

def committed(self, partition):

122

"""

123

Get last committed offset for partition.

124

125

Parameters:

126

- partition: TopicPartition, target partition

127

128

Returns:

129

- OffsetAndMetadata: last committed offset with metadata

130

"""

131

132

def pause(self, *partitions):

133

"""

134

Suspend fetching from partitions.

135

136

Parameters:

137

- *partitions: TopicPartition, partitions to pause

138

"""

139

140

def resume(self, *partitions):

141

"""

142

Resume fetching from partitions.

143

144

Parameters:

145

- *partitions: TopicPartition, partitions to resume

146

"""

147

148

def paused(self):

149

"""

150

Get currently paused partitions.

151

152

Returns:

153

- Set[TopicPartition]: paused partitions

154

"""

155

156

def close(self, autocommit=True):

157

"""

158

Close consumer and clean up resources.

159

160

Parameters:

161

- autocommit: bool, commit offsets before closing

162

"""

163

164

def subscription(self):

165

"""

166

Get current topic subscription.

167

168

Returns:

169

- Set[str]: subscribed topic names

170

"""

171

172

def assignment(self):

173

"""

174

Get current partition assignment.

175

176

Returns:

177

- Set[TopicPartition]: assigned partitions

178

"""

179

180

def beginning_offsets(self, partitions):

181

"""

182

Get earliest available offsets for partitions.

183

184

Parameters:

185

- partitions: List[TopicPartition], target partitions

186

187

Returns:

188

- Dict[TopicPartition, int]: earliest offsets

189

"""

190

191

def end_offsets(self, partitions):

192

"""

193

Get latest offsets for partitions.

194

195

Parameters:

196

- partitions: List[TopicPartition], target partitions

197

198

Returns:

199

- Dict[TopicPartition, int]: latest offsets

200

"""

201

202

def offsets_for_times(self, timestamps):

203

"""

204

Get offsets for specific timestamps.

205

206

Parameters:

207

- timestamps: Dict[TopicPartition, int], timestamp mapping

208

209

Returns:

210

- Dict[TopicPartition, OffsetAndTimestamp]: offset and timestamp info

211

"""

212

213

def metrics(self):

214

"""

215

Get consumer metrics.

216

217

Returns:

218

- Dict[str, float]: current metric values

219

"""

220

```

221

222

### Consumer Rebalance Listener

223

224

Abstract base class for handling partition rebalancing events in consumer groups. Implement this interface to perform cleanup or initialization when partitions are assigned or revoked.

225

226

```python { .api }

227

class ConsumerRebalanceListener:

228

def on_partitions_revoked(self, revoked):

229

"""

230

Called before partitions are reassigned.

231

232

Use this callback to commit offsets and clean up state

233

for partitions that are being revoked.

234

235

Parameters:

236

- revoked: List[TopicPartition], partitions being revoked

237

"""

238

239

def on_partitions_assigned(self, assigned):

240

"""

241

Called after partitions are assigned.

242

243

Use this callback to set up state or seek to specific

244

offsets for newly assigned partitions.

245

246

Parameters:

247

- assigned: List[TopicPartition], partitions being assigned

248

"""

249

```

250

251

### Consumer Records

252

253

Result of consumer poll() operation containing records organized by partition.

254

255

```python { .api }

256

class ConsumerRecords:

257

def __init__(self, record_map):

258

"""

259

Container for poll() results.

260

261

Parameters:

262

- record_map: Dict[TopicPartition, List[ConsumerRecord]]

263

"""

264

265

def __iter__(self):

266

"""Iterate over all records across all partitions."""

267

268

def __len__(self):

269

"""Total number of records across all partitions."""

270

271

def __bool__(self):

272

"""True if contains any records."""

273

274

def records(self, partition):

275

"""

276

Get records for specific partition.

277

278

Parameters:

279

- partition: TopicPartition, target partition

280

281

Returns:

282

- List[ConsumerRecord]: records for partition

283

"""

284

285

def by_topic(self):

286

"""

287

Group records by topic.

288

289

Returns:

290

- Dict[str, List[ConsumerRecord]]: records grouped by topic

291

"""

292

```

293

294

### Consumer Record

295

296

Individual record consumed from Kafka containing message data and metadata.

297

298

```python { .api }

299

class ConsumerRecord:

300

topic: str # Topic name

301

partition: int # Partition number

302

offset: int # Message offset

303

timestamp: int # Message timestamp (milliseconds)

304

timestamp_type: int # Timestamp type (0=CreateTime, 1=LogAppendTime)

305

key: bytes # Message key (raw bytes)

306

value: bytes # Message value (raw bytes)

307

headers: List[Tuple[str, bytes]] # Message headers

308

checksum: int # Message checksum

309

serialized_key_size: int # Serialized key size

310

serialized_value_size: int # Serialized value size

311

leader_epoch: int # Leader epoch

312

```

313

314

## Usage Examples

315

316

### Basic Consumer Group

317

318

```python

319

from kafka import KafkaConsumer

320

import json

321

322

# Create consumer with automatic offset management

323

consumer = KafkaConsumer(

324

'my-topic',

325

bootstrap_servers=['localhost:9092'],

326

group_id='my-group',

327

value_deserializer=lambda m: json.loads(m.decode('utf-8')),

328

auto_offset_reset='earliest',

329

enable_auto_commit=True,

330

auto_commit_interval_ms=1000

331

)

332

333

# Process messages

334

for message in consumer:

335

print(f"Topic: {message.topic}")

336

print(f"Partition: {message.partition}")

337

print(f"Offset: {message.offset}")

338

print(f"Key: {message.key}")

339

print(f"Value: {message.value}")

340

print(f"Timestamp: {message.timestamp}")

341

342

consumer.close()

343

```

344

345

### Manual Partition Assignment

346

347

```python

348

from kafka import KafkaConsumer, TopicPartition

349

350

consumer = KafkaConsumer(

351

bootstrap_servers=['localhost:9092'],

352

group_id=None, # No consumer group

353

value_deserializer=lambda m: m.decode('utf-8')

354

)

355

356

# Manually assign specific partitions

357

partitions = [

358

TopicPartition('topic1', 0),

359

TopicPartition('topic1', 1),

360

TopicPartition('topic2', 0)

361

]

362

consumer.assign(partitions)

363

364

# Seek to specific positions

365

consumer.seek(TopicPartition('topic1', 0), 100)

366

consumer.seek_to_end(TopicPartition('topic1', 1))

367

368

# Poll for messages

369

while True:

370

records = consumer.poll(timeout_ms=1000)

371

for partition, messages in records.items():

372

for message in messages:

373

print(f"Partition {partition}: {message.value}")

374

```

375

376

### Manual Offset Management

377

378

```python

379

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

380

381

consumer = KafkaConsumer(

382

'my-topic',

383

bootstrap_servers=['localhost:9092'],

384

group_id='manual-commit-group',

385

enable_auto_commit=False, # Disable auto-commit

386

auto_offset_reset='earliest'

387

)

388

389

try:

390

while True:

391

records = consumer.poll(timeout_ms=1000)

392

393

for partition, messages in records.items():

394

for message in messages:

395

# Process message

396

print(f"Processing: {message.value}")

397

398

# Manually commit after processing

399

consumer.commit({

400

partition: OffsetAndMetadata(message.offset + 1, None)

401

})

402

403

except KeyboardInterrupt:

404

pass

405

finally:

406

consumer.close()

407

```

408

409

### Rebalance Listener

410

411

```python

412

from kafka import KafkaConsumer

413

from kafka.consumer import ConsumerRebalanceListener

414

import logging

415

416

class MyRebalanceListener(ConsumerRebalanceListener):

417

def __init__(self, consumer):

418

self.consumer = consumer

419

420

def on_partitions_revoked(self, revoked):

421

logging.info(f"Partitions revoked: {revoked}")

422

# Commit current offsets before partitions are reassigned

423

self.consumer.commit()

424

425

def on_partitions_assigned(self, assigned):

426

logging.info(f"Partitions assigned: {assigned}")

427

# Could seek to specific offsets or perform other setup

428

429

consumer = KafkaConsumer(

430

bootstrap_servers=['localhost:9092'],

431

group_id='rebalance-group'

432

)

433

434

listener = MyRebalanceListener(consumer)

435

consumer.subscribe(['my-topic'], listener=listener)

436

437

for message in consumer:

438

print(f"Received: {message.value}")

439

```

440

441

### Batch Processing

442

443

```python

444

from kafka import KafkaConsumer

445

446

consumer = KafkaConsumer(

447

'batch-topic',

448

bootstrap_servers=['localhost:9092'],

449

group_id='batch-group',

450

max_poll_records=100, # Process up to 100 records per poll

451

enable_auto_commit=False

452

)

453

454

def process_batch(records):

455

"""Process a batch of records together."""

456

batch_data = []

457

for record in records:

458

batch_data.append(record.value)

459

460

# Simulate batch processing

461

print(f"Processing batch of {len(batch_data)} records")

462

# ... perform batch operation ...

463

464

return True

465

466

try:

467

while True:

468

# Poll for a batch of records

469

record_batch = consumer.poll(timeout_ms=5000, max_records=50)

470

471

if record_batch:

472

# Flatten records from all partitions

473

all_records = []

474

last_offsets = {}

475

476

for partition, records in record_batch.items():

477

all_records.extend(records)

478

if records:

479

last_offsets[partition] = records[-1].offset + 1

480

481

# Process the batch

482

if process_batch(all_records):

483

# Commit offsets for successfully processed batch

484

offset_data = {

485

partition: OffsetAndMetadata(offset, None)

486

for partition, offset in last_offsets.items()

487

}

488

consumer.commit(offset_data)

489

490

except KeyboardInterrupt:

491

pass

492

finally:

493

consumer.close()

494

```