or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-client.mdcore-producer-consumer.mderror-handling.mdindex.mdschema-registry.mdserialization.md

core-producer-consumer.mddocs/

0

# Core Producer and Consumer

1

2

The core producer and consumer classes provide fundamental Kafka functionality with high performance through the underlying librdkafka C library. These classes support all Kafka features including transactions, exactly-once semantics, and custom partitioning.

3

4

## Capabilities

5

6

### Producer

7

8

High-performance Kafka producer with support for asynchronous message delivery, custom partitioning, transactions, and delivery guarantees.

9

10

```python { .api }

11

class Producer:

12

def __init__(self, conf):

13

"""

14

Create a new Producer instance.

15

16

Args:

17

conf (dict): Configuration properties for the producer

18

"""

19

20

def produce(self, topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None):

21

"""

22

Produce message to topic.

23

24

Args:

25

topic (str): Topic to produce to

26

value (bytes, str, optional): Message value

27

key (bytes, str, optional): Message key for partitioning

28

partition (int, optional): Specific partition (-1 for automatic)

29

on_delivery (callable, optional): Delivery report callback

30

timestamp (int, optional): Message timestamp (0 for current time)

31

headers (dict, optional): Message headers

32

33

Raises:

34

BufferError: If local producer queue is full

35

KafkaException: For other produce errors

36

"""

37

38

def poll(self, timeout=-1):

39

"""

40

Poll for events and call registered callbacks.

41

42

Args:

43

timeout (float): Maximum time to wait in seconds (-1 for infinite)

44

45

Returns:

46

int: Number of events processed

47

"""

48

49

def flush(self, timeout=-1):

50

"""

51

Wait for all messages to be delivered.

52

53

Args:

54

timeout (float): Maximum time to wait in seconds (-1 for infinite)

55

56

Returns:

57

int: Number of messages still in queue (0 on success)

58

"""

59

60

def purge(self, in_queue=True, in_flight=True, blocking=True):

61

"""

62

Purge messages from internal queues.

63

64

Args:

65

in_queue (bool): Purge messages in local queue

66

in_flight (bool): Purge messages in flight to broker

67

blocking (bool): Block until purge is complete

68

69

Returns:

70

int: Number of messages purged

71

"""

72

73

def abort_transaction(self, timeout=-1):

74

"""

75

Abort ongoing transaction.

76

77

Args:

78

timeout (float): Maximum time to wait in seconds

79

"""

80

81

def begin_transaction(self):

82

"""

83

Begin a new transaction.

84

"""

85

86

def commit_transaction(self, timeout=-1):

87

"""

88

Commit current transaction.

89

90

Args:

91

timeout (float): Maximum time to wait in seconds

92

"""

93

94

def init_transactions(self, timeout=-1):

95

"""

96

Initialize transactions for this producer.

97

98

Args:

99

timeout (float): Maximum time to wait in seconds

100

"""

101

102

def send_offsets_to_transaction(self, positions, group_metadata, timeout=-1):

103

"""

104

Send consumer offsets to transaction.

105

106

Args:

107

positions (list): List of TopicPartition objects with offsets

108

group_metadata (ConsumerGroupMetadata): Consumer group metadata

109

timeout (float): Maximum time to wait in seconds

110

"""

111

112

def list_topics(self, topic=None, timeout=-1):

113

"""

114

Get metadata for topics.

115

116

Args:

117

topic (str, optional): Specific topic name to query

118

timeout (float): Maximum time to wait in seconds

119

120

Returns:

121

ClusterMetadata: Cluster and topic metadata

122

"""

123

```

124

125

### Consumer

126

127

High-performance Kafka consumer with support for consumer groups, manual/automatic offset management, and rebalancing.

128

129

```python { .api }

130

class Consumer:

131

def __init__(self, conf):

132

"""

133

Create a new Consumer instance.

134

135

Args:

136

conf (dict): Configuration properties for the consumer

137

"""

138

139

def subscribe(self, topics, listener=None):

140

"""

141

Subscribe to list of topics for automatic partition assignment.

142

143

Args:

144

topics (list): List of topic names to subscribe to

145

listener (RebalanceCallback, optional): Rebalance callback

146

"""

147

148

def unsubscribe(self):

149

"""

150

Unsubscribe from current topic subscription.

151

"""

152

153

def assign(self, partitions):

154

"""

155

Manually assign partitions to consume from.

156

157

Args:

158

partitions (list): List of TopicPartition objects

159

"""

160

161

def assignment(self):

162

"""

163

Get current partition assignment.

164

165

Returns:

166

list: List of assigned TopicPartition objects

167

"""

168

169

def unassign(self):

170

"""

171

Remove current partition assignment.

172

"""

173

174

def poll(self, timeout=-1):

175

"""

176

Poll for messages.

177

178

Args:

179

timeout (float): Maximum time to wait in seconds (-1 for infinite)

180

181

Returns:

182

Message: Message object or None if timeout

183

"""

184

185

def consume(self, num_messages=1, timeout=-1):

186

"""

187

Consume multiple messages.

188

189

Args:

190

num_messages (int): Maximum number of messages to return

191

timeout (float): Maximum time to wait in seconds

192

193

Returns:

194

list: List of Message objects

195

"""

196

197

def commit(self, message=None, offsets=None, asynchronous=True):

198

"""

199

Commit message offset or specified offsets.

200

201

Args:

202

message (Message, optional): Commit offset for this message

203

offsets (list, optional): List of TopicPartition objects with offsets

204

asynchronous (bool): Commit asynchronously if True

205

206

Returns:

207

list: Committed offsets if synchronous, None if asynchronous

208

"""

209

210

def committed(self, partitions, timeout=-1):

211

"""

212

Get committed offsets for partitions.

213

214

Args:

215

partitions (list): List of TopicPartition objects

216

timeout (float): Maximum time to wait in seconds

217

218

Returns:

219

list: List of TopicPartition objects with committed offsets

220

"""

221

222

def position(self, partitions):

223

"""

224

Get current position (next fetch offset) for partitions.

225

226

Args:

227

partitions (list): List of TopicPartition objects

228

229

Returns:

230

list: List of TopicPartition objects with current positions

231

"""

232

233

def seek(self, partition):

234

"""

235

Seek to offset for partition.

236

237

Args:

238

partition (TopicPartition): Partition with offset to seek to

239

"""

240

241

def pause(self, partitions):

242

"""

243

Pause consumption for partitions.

244

245

Args:

246

partitions (list): List of TopicPartition objects to pause

247

"""

248

249

def resume(self, partitions):

250

"""

251

Resume consumption for partitions.

252

253

Args:

254

partitions (list): List of TopicPartition objects to resume

255

"""

256

257

def get_watermark_offsets(self, partition, timeout=-1, cached=False):

258

"""

259

Get low and high watermark offsets for partition.

260

261

Args:

262

partition (TopicPartition): Partition to query

263

timeout (float): Maximum time to wait in seconds

264

cached (bool): Use cached values if available

265

266

Returns:

267

tuple: (low_offset, high_offset)

268

"""

269

270

def offsets_for_times(self, partitions, timeout=-1):

271

"""

272

Get offsets for timestamps.

273

274

Args:

275

partitions (list): List of TopicPartition objects with timestamps

276

timeout (float): Maximum time to wait in seconds

277

278

Returns:

279

list: List of TopicPartition objects with offsets for timestamps

280

"""

281

282

def close(self):

283

"""

284

Close the consumer and leave consumer group.

285

"""

286

287

def store_offsets(self, message=None, offsets=None):

288

"""

289

Store offset for message or specified offsets.

290

291

Args:

292

message (Message, optional): Store offset for this message

293

offsets (list, optional): List of TopicPartition objects with offsets

294

"""

295

296

def incremental_assign(self, partitions):

297

"""

298

Incrementally add partitions to assignment.

299

300

Args:

301

partitions (list): List of TopicPartition objects to add

302

"""

303

304

def incremental_unassign(self, partitions):

305

"""

306

Incrementally remove partitions from assignment.

307

308

Args:

309

partitions (list): List of TopicPartition objects to remove

310

"""

311

312

def list_topics(self, topic=None, timeout=-1):

313

"""

314

Get metadata for topics.

315

316

Args:

317

topic (str, optional): Specific topic name to query

318

timeout (float): Maximum time to wait in seconds

319

320

Returns:

321

ClusterMetadata: Cluster and topic metadata

322

"""

323

324

def consumer_group_metadata(self):

325

"""

326

Get consumer group metadata for transactional operations.

327

328

Returns:

329

ConsumerGroupMetadata: Consumer group metadata object

330

"""

331

```

332

333

### Message

334

335

Container for Kafka message data and metadata.

336

337

```python { .api }

338

class Message:

339

def error(self):

340

"""

341

Get message error.

342

343

Returns:

344

KafkaError: Error object or None if no error

345

"""

346

347

def key(self):

348

"""

349

Get message key.

350

351

Returns:

352

bytes: Message key or None

353

"""

354

355

def value(self):

356

"""

357

Get message value.

358

359

Returns:

360

bytes: Message value or None

361

"""

362

363

def topic(self):

364

"""

365

Get message topic.

366

367

Returns:

368

str: Topic name

369

"""

370

371

def partition(self):

372

"""

373

Get message partition.

374

375

Returns:

376

int: Partition number

377

"""

378

379

def offset(self):

380

"""

381

Get message offset.

382

383

Returns:

384

int: Message offset

385

"""

386

387

def timestamp(self):

388

"""

389

Get message timestamp.

390

391

Returns:

392

tuple: (timestamp_type, timestamp) where timestamp_type is one of:

393

TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME

394

"""

395

396

def headers(self):

397

"""

398

Get message headers.

399

400

Returns:

401

dict: Dictionary of header key-value pairs or None

402

"""

403

404

def latency(self):

405

"""

406

Get message latency (produce time to broker acknowledgement).

407

408

Returns:

409

float: Latency in seconds or None

410

"""

411

412

def leader_epoch(self):

413

"""

414

Get leader epoch for the message.

415

416

Returns:

417

int: Leader epoch or None

418

"""

419

420

def set_key(self, key):

421

"""

422

Set message key.

423

424

Args:

425

key (bytes, str): New message key

426

"""

427

428

def set_value(self, value):

429

"""

430

Set message value.

431

432

Args:

433

value (bytes, str): New message value

434

"""

435

436

def set_headers(self, headers):

437

"""

438

Set message headers.

439

440

Args:

441

headers (dict): Dictionary of header key-value pairs

442

"""

443

```

444

445

### TopicPartition

446

447

Represents a Kafka topic partition with optional offset.

448

449

```python { .api }

450

class TopicPartition:

451

def __init__(self, topic, partition=None, offset=None):

452

"""

453

Create TopicPartition object.

454

455

Args:

456

topic (str): Topic name

457

partition (int, optional): Partition number

458

offset (int, optional): Offset value

459

"""

460

461

@property

462

def topic(self):

463

"""

464

Topic name.

465

466

Returns:

467

str: Topic name

468

"""

469

470

@property

471

def partition(self):

472

"""

473

Partition number.

474

475

Returns:

476

int: Partition number

477

"""

478

479

@property

480

def offset(self):

481

"""

482

Offset value.

483

484

Returns:

485

int: Offset value

486

"""

487

488

@offset.setter

489

def offset(self, value):

490

"""

491

Set offset value.

492

493

Args:

494

value (int): New offset value

495

"""

496

497

@property

498

def metadata(self):

499

"""

500

Partition metadata.

501

502

Returns:

503

str: Metadata string

504

"""

505

506

@property

507

def leader_epoch(self):

508

"""

509

Leader epoch.

510

511

Returns:

512

int: Leader epoch or None

513

"""

514

515

def __hash__(self):

516

"""Hash function for use in sets and dicts."""

517

518

def __eq__(self, other):

519

"""Equality comparison."""

520

521

def __lt__(self, other):

522

"""Less than comparison for sorting."""

523

524

def __str__(self):

525

"""String representation."""

526

```

527

528

### Uuid

529

530

Represents a UUID (Universally Unique Identifier).

531

532

```python { .api }

533

class Uuid:

534

def __init__(self, uuid_str=None):

535

"""

536

Create Uuid object.

537

538

Args:

539

uuid_str (str, optional): UUID string representation

540

"""

541

542

def __str__(self):

543

"""

544

Get string representation of UUID.

545

546

Returns:

547

str: UUID string

548

"""

549

550

def __eq__(self, other):

551

"""Equality comparison."""

552

553

def __hash__(self):

554

"""Hash function for use in sets and dicts."""

555

```

556

557

### Usage Examples

558

559

#### Basic Producer Usage

560

561

```python

562

from confluent_kafka import Producer

563

564

conf = {

565

'bootstrap.servers': 'localhost:9092',

566

'client.id': 'my-producer'

567

}

568

569

producer = Producer(conf)

570

571

def delivery_report(err, msg):

572

"""Called once for each message produced."""

573

if err is not None:

574

print(f'Message delivery failed: {err}')

575

else:

576

print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')

577

578

# Produce messages

579

for i in range(10):

580

producer.produce('my-topic',

581

key=f'key-{i}',

582

value=f'value-{i}',

583

callback=delivery_report)

584

585

# Wait for all messages to be delivered

586

producer.flush()

587

```

588

589

#### Basic Consumer Usage

590

591

```python

592

from confluent_kafka import Consumer, KafkaError

593

594

conf = {

595

'bootstrap.servers': 'localhost:9092',

596

'group.id': 'my-group',

597

'auto.offset.reset': 'earliest',

598

'enable.auto.commit': True

599

}

600

601

consumer = Consumer(conf)

602

consumer.subscribe(['my-topic'])

603

604

try:

605

while True:

606

msg = consumer.poll(timeout=1.0)

607

if msg is None:

608

continue

609

610

if msg.error():

611

if msg.error().code() == KafkaError._PARTITION_EOF:

612

print(f'End of partition {msg.topic()} [{msg.partition()}]')

613

else:

614

print(f'Error: {msg.error()}')

615

else:

616

print(f'Received: key={msg.key()}, value={msg.value()}, '

617

f'partition={msg.partition()}, offset={msg.offset()}')

618

619

finally:

620

consumer.close()

621

```

622

623

#### Transaction Usage

624

625

```python

626

from confluent_kafka import Producer

627

628

conf = {

629

'bootstrap.servers': 'localhost:9092',

630

'transactional.id': 'my-transactional-id',

631

'enable.idempotence': True

632

}

633

634

producer = Producer(conf)

635

636

# Initialize transactions

637

producer.init_transactions()

638

639

try:

640

# Begin transaction

641

producer.begin_transaction()

642

643

# Produce messages within transaction

644

for i in range(5):

645

producer.produce('my-topic', f'transactional-message-{i}')

646

647

# Commit transaction

648

producer.commit_transaction()

649

print('Transaction committed successfully')

650

651

except Exception as e:

652

print(f'Transaction failed: {e}')

653

producer.abort_transaction()

654

```

655

656

#### Manual Partition Assignment

657

658

```python

659

from confluent_kafka import Consumer, TopicPartition

660

661

conf = {

662

'bootstrap.servers': 'localhost:9092',

663

'group.id': 'my-group',

664

'enable.auto.commit': False

665

}

666

667

consumer = Consumer(conf)

668

669

# Manually assign specific partitions

670

partitions = [

671

TopicPartition('my-topic', 0, offset=100),

672

TopicPartition('my-topic', 1, offset=200)

673

]

674

consumer.assign(partitions)

675

676

try:

677

while True:

678

msg = consumer.poll(1.0)

679

if msg is None:

680

continue

681

682

if not msg.error():

683

print(f'Message: {msg.value()}')

684

# Manual offset commit

685

consumer.commit(message=msg)

686

687

finally:

688

consumer.close()

689

```