or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconnection.mdconsumer.mderrors.mdindex.mdproducer.mdserialization.mdstructs.md

consumer.mddocs/

0

# Consumer Operations

1

2

High-level consumer for consuming records from Kafka topics with automatic group coordination, partition assignment, and offset management. Supports both subscription-based consumption and manual partition assignment.

3

4

## Capabilities

5

6

### KafkaConsumer

7

8

Main consumer class for consuming records from Kafka topics. Provides automatic group coordination, offset management, and flexible consumption patterns.

9

10

```python { .api }

11

class KafkaConsumer:

12

def __init__(self, *topics, **configs):

13

"""

14

Create a KafkaConsumer instance.

15

16

Args:

17

*topics: Topics to subscribe to initially

18

**configs: Consumer configuration options including:

19

bootstrap_servers (list): List of Kafka brokers

20

group_id (str): Consumer group identifier

21

key_deserializer (callable): Function to deserialize keys

22

value_deserializer (callable): Function to deserialize values

23

auto_offset_reset (str): What to do when no offset ('earliest', 'latest', 'none')

24

enable_auto_commit (bool): Whether to auto-commit offsets

25

auto_commit_interval_ms (int): Auto-commit interval

26

session_timeout_ms (int): Group session timeout

27

heartbeat_interval_ms (int): Heartbeat interval

28

max_poll_records (int): Maximum records per poll

29

max_poll_interval_ms (int): Maximum time between polls

30

client_id (str): Client identifier

31

security_protocol (str): Security protocol

32

ssl_context: SSL context

33

sasl_mechanism (str): SASL mechanism

34

consumer_timeout_ms (int): Consumer timeout

35

"""

36

37

def subscribe(self, topics=None, pattern=None, listener=None):

38

"""

39

Subscribe to topics or topic pattern.

40

41

Args:

42

topics (list): List of topic names to subscribe to

43

pattern (str): Regex pattern for topic matching

44

listener (ConsumerRebalanceListener): Rebalance event listener

45

"""

46

47

def unsubscribe(self):

48

"""Unsubscribe from all topics."""

49

50

def assign(self, partitions):

51

"""

52

Manually assign partitions to consumer.

53

54

Args:

55

partitions (list): List of TopicPartition objects

56

"""

57

58

def assignment(self):

59

"""

60

Get current partition assignment.

61

62

Returns:

63

set: Set of TopicPartition objects currently assigned

64

"""

65

66

def subscription(self):

67

"""

68

Get current topic subscription.

69

70

Returns:

71

set: Set of subscribed topic names

72

"""

73

74

def poll(self, timeout_ms=0, max_records=None, update_offsets=True):

75

"""

76

Fetch records from Kafka.

77

78

Args:

79

timeout_ms (int): Maximum time to wait for records

80

max_records (int): Maximum number of records to return

81

update_offsets (bool): Whether to update fetch positions

82

83

Returns:

84

dict: Dictionary mapping TopicPartition to list of ConsumerRecord

85

"""

86

87

def commit(self, offsets=None):

88

"""

89

Commit offsets synchronously.

90

91

Args:

92

offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata

93

"""

94

95

def commit_async(self, offsets=None, callback=None):

96

"""

97

Commit offsets asynchronously.

98

99

Args:

100

offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata

101

callback (callable): Callback function for commit result

102

"""

103

104

def committed(self, partition, metadata=False):

105

"""

106

Get committed offset for partition.

107

108

Args:

109

partition (TopicPartition): Partition to check

110

metadata (bool): Whether to return metadata with offset

111

112

Returns:

113

int or OffsetAndMetadata: Committed offset

114

"""

115

116

def position(self, partition):

117

"""

118

Get current position (next fetch offset) for partition.

119

120

Args:

121

partition (TopicPartition): Partition to check

122

123

Returns:

124

int: Current fetch position

125

"""

126

127

def seek(self, partition, offset):

128

"""

129

Seek to specific offset for partition.

130

131

Args:

132

partition (TopicPartition): Partition to seek

133

offset (int): Offset to seek to

134

"""

135

136

def seek_to_beginning(self, *partitions):

137

"""

138

Seek to beginning of partitions.

139

140

Args:

141

*partitions: TopicPartition objects (empty = all assigned)

142

"""

143

144

def seek_to_end(self, *partitions):

145

"""

146

Seek to end of partitions.

147

148

Args:

149

*partitions: TopicPartition objects (empty = all assigned)

150

"""

151

152

def pause(self, *partitions):

153

"""

154

Pause consumption from partitions.

155

156

Args:

157

*partitions: TopicPartition objects to pause

158

"""

159

160

def resume(self, *partitions):

161

"""

162

Resume consumption from previously paused partitions.

163

164

Args:

165

*partitions: TopicPartition objects to resume

166

"""

167

168

def paused(self):

169

"""

170

Get currently paused partitions.

171

172

Returns:

173

set: Set of paused TopicPartition objects

174

"""

175

176

def topics(self):

177

"""

178

Get available topics from cluster.

179

180

Returns:

181

set: Set of available topic names

182

"""

183

184

def partitions_for_topic(self, topic):

185

"""

186

Get available partitions for topic.

187

188

Args:

189

topic (str): Topic name

190

191

Returns:

192

set: Set of partition numbers for topic

193

"""

194

195

def beginning_offsets(self, partitions):

196

"""

197

Get earliest available offsets for partitions.

198

199

Args:

200

partitions (list): List of TopicPartition objects

201

202

Returns:

203

dict: Dictionary mapping TopicPartition to offset

204

"""

205

206

def end_offsets(self, partitions):

207

"""

208

Get latest available offsets for partitions.

209

210

Args:

211

partitions (list): List of TopicPartition objects

212

213

Returns:

214

dict: Dictionary mapping TopicPartition to offset

215

"""

216

217

def close(self):

218

"""Close the consumer and release resources."""

219

220

def highwater(self, partition):

221

"""

222

Get high watermark offset for partition.

223

224

Args:

225

partition (TopicPartition): Partition to check

226

227

Returns:

228

int: High watermark offset

229

"""

230

231

def bootstrap_connected(self):

232

"""

233

Check if consumer has established bootstrap connection.

234

235

Returns:

236

bool: True if connected to at least one bootstrap server

237

"""

238

239

def offsets_for_times(self, timestamps):

240

"""

241

Get offsets for given timestamps.

242

243

Args:

244

timestamps (dict): Dictionary mapping TopicPartition to timestamp

245

246

Returns:

247

dict: Dictionary mapping TopicPartition to OffsetAndTimestamp

248

"""

249

250

def metrics(self, raw=False):

251

"""

252

Get consumer performance metrics.

253

254

Args:

255

raw (bool): If True, return raw metrics dict

256

257

Returns:

258

dict: Consumer performance metrics including fetch rates, lag, and timing

259

"""

260

```

261

262

### Consumer Records and Metadata

263

264

Data structures returned by the consumer for representing consumed records.

265

266

```python { .api }

267

class ConsumerRecord:

268

topic: str # Topic name

269

partition: int # Partition number

270

offset: int # Record offset

271

timestamp: int # Record timestamp

272

timestamp_type: int # Timestamp type

273

key: bytes # Record key (deserialized if deserializer provided)

274

value: bytes # Record value (deserialized if deserializer provided)

275

headers: list # List of (key, value) header tuples

276

checksum: int # Record checksum

277

serialized_key_size: int # Key size in bytes

278

serialized_value_size: int # Value size in bytes

279

280

class ConsumerRebalanceListener:

281

def on_partitions_revoked(self, revoked):

282

"""

283

Called when partitions are revoked from consumer.

284

285

Args:

286

revoked (list): List of TopicPartition objects being revoked

287

"""

288

289

def on_partitions_assigned(self, assigned):

290

"""

291

Called when new partitions are assigned to consumer.

292

293

Args:

294

assigned (list): List of TopicPartition objects being assigned

295

"""

296

```

297

298

## Usage Examples

299

300

### Basic Consumer Usage

301

302

```python

303

from kafka import KafkaConsumer

304

import json

305

306

# Create consumer with JSON deserialization

307

consumer = KafkaConsumer(

308

'my-topic',

309

bootstrap_servers=['localhost:9092'],

310

group_id='my-consumer-group',

311

auto_offset_reset='earliest',

312

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

313

)

314

315

# Consume messages

316

for message in consumer:

317

print(f"Topic: {message.topic}")

318

print(f"Partition: {message.partition}")

319

print(f"Offset: {message.offset}")

320

print(f"Value: {message.value}")

321

```

322

323

### Manual Offset Management

324

325

```python

326

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

327

328

consumer = KafkaConsumer(

329

bootstrap_servers=['localhost:9092'],

330

group_id='manual-commit-group',

331

enable_auto_commit=False # Disable auto-commit

332

)

333

334

consumer.subscribe(['my-topic'])

335

336

try:

337

while True:

338

# Poll for messages

339

message_batch = consumer.poll(timeout_ms=1000)

340

341

for topic_partition, messages in message_batch.items():

342

for message in messages:

343

# Process message

344

print(f"Processing: {message.value}")

345

346

# Manually commit after processing

347

consumer.commit({

348

topic_partition: OffsetAndMetadata(message.offset + 1, "processed")

349

})

350

351

except KeyboardInterrupt:

352

consumer.close()

353

```

354

355

### Manual Partition Assignment

356

357

```python

358

from kafka import KafkaConsumer, TopicPartition

359

360

consumer = KafkaConsumer(

361

bootstrap_servers=['localhost:9092'],

362

group_id=None # No group coordination

363

)

364

365

# Manually assign specific partitions

366

partitions = [

367

TopicPartition('topic-1', 0),

368

TopicPartition('topic-1', 1),

369

TopicPartition('topic-2', 0)

370

]

371

consumer.assign(partitions)

372

373

# Seek to specific offsets

374

consumer.seek(TopicPartition('topic-1', 0), 100)

375

consumer.seek_to_beginning(TopicPartition('topic-1', 1))

376

377

for message in consumer:

378

print(f"Manual assignment: {message.topic}:{message.partition}:{message.offset}")

379

```

380

381

### Consumer with Rebalance Listener

382

383

```python

384

from kafka import KafkaConsumer

385

from kafka.consumer.subscription_state import ConsumerRebalanceListener

386

387

class RebalanceListener(ConsumerRebalanceListener):

388

def __init__(self, consumer):

389

self.consumer = consumer

390

391

def on_partitions_revoked(self, revoked):

392

print(f"Partitions revoked: {revoked}")

393

# Commit current offsets before rebalance

394

self.consumer.commit()

395

396

def on_partitions_assigned(self, assigned):

397

print(f"Partitions assigned: {assigned}")

398

# Reset to beginning for new partitions

399

for partition in assigned:

400

self.consumer.seek_to_beginning(partition)

401

402

consumer = KafkaConsumer(

403

bootstrap_servers=['localhost:9092'],

404

group_id='rebalance-group'

405

)

406

407

listener = RebalanceListener(consumer)

408

consumer.subscribe(['my-topic'], listener=listener)

409

410

for message in consumer:

411

print(f"Consumed: {message.value}")

412

```

413

414

### Batch Processing

415

416

```python

417

from kafka import KafkaConsumer

418

419

consumer = KafkaConsumer(

420

'batch-topic',

421

bootstrap_servers=['localhost:9092'],

422

group_id='batch-processor',

423

max_poll_records=100, # Get up to 100 records per poll

424

enable_auto_commit=False

425

)

426

427

while True:

428

# Get batch of messages

429

message_batch = consumer.poll(timeout_ms=5000, max_records=100)

430

431

if not message_batch:

432

continue

433

434

# Process batch

435

batch_count = 0

436

for topic_partition, messages in message_batch.items():

437

for message in messages:

438

# Process message

439

batch_count += 1

440

441

print(f"Processed batch of {batch_count} messages")

442

443

# Commit batch

444

consumer.commit()

445

```

446

447

### Secure Consumer (SSL + SASL)

448

449

```python

450

from kafka import KafkaConsumer

451

452

consumer = KafkaConsumer(

453

'secure-topic',

454

bootstrap_servers=['secure-broker:9093'],

455

group_id='secure-group',

456

security_protocol='SASL_SSL',

457

sasl_mechanism='SCRAM-SHA-256',

458

sasl_plain_username='myuser',

459

sasl_plain_password='mypassword',

460

ssl_check_hostname=True,

461

ssl_cafile='ca-cert.pem'

462

)

463

464

for message in consumer:

465

print(f"Secure message: {message.value}")

466

```