or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconsumer.mderrors.mdindex.mdproducer.mdstructures.md

producer.mddocs/

0

# Producer API

1

2

High-level producer for publishing records to Kafka topics with comprehensive support for batching, compression, partitioning strategies, idempotent production, and transactional semantics.

3

4

## Capabilities

5

6

### KafkaProducer

7

8

Main producer class providing high-level interface for sending records to Kafka topics. Supports synchronous and asynchronous sending with configurable batching, compression, and delivery guarantees.

9

10

```python { .api }

11

class KafkaProducer:

12

def __init__(self, **configs):

13

"""

14

Initialize Kafka producer.

15

16

Configuration Parameters:

17

- bootstrap_servers: List[str], broker addresses

18

- key_serializer: Callable, key serialization function

19

- value_serializer: Callable, value serialization function

20

- acks: str|int, acknowledgment policy ('0', '1', 'all'/-1)

21

- retries: int, number of retries for failed sends (default: 2147483647)

22

- batch_size: int, batch size in bytes (default: 16384)

23

- linger_ms: int, time to wait for batching (default: 0)

24

- buffer_memory: int, total memory for buffering (default: 33554432)

25

- compression_type: str, 'none', 'gzip', 'snappy', 'lz4', 'zstd'

26

- max_in_flight_requests_per_connection: int, max unacknowledged requests (default: 5)

27

- request_timeout_ms: int, request timeout (default: 30000)

28

- delivery_timeout_ms: int, delivery timeout (default: 120000)

29

- max_request_size: int, max request size (default: 1048576)

30

- send_buffer_bytes: int, TCP send buffer size (default: 131072)

31

- receive_buffer_bytes: int, TCP receive buffer size (default: 32768)

32

- security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'

33

- api_version: tuple, broker API version or 'auto'

34

- enable_idempotence: bool, enable idempotent producer (default: False)

35

- transactional_id: str, transactional producer ID

36

- partitioner: Callable, custom partitioner function

37

- max_block_ms: int, max time to block on send (default: 60000)

38

"""

39

40

def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None, headers=None):

41

"""

42

Asynchronously send record to topic.

43

44

Parameters:

45

- topic: str, target topic name

46

- value: any, message value (will be serialized)

47

- key: any, message key (will be serialized)

48

- partition: int, specific partition (optional)

49

- timestamp_ms: int, message timestamp in milliseconds

50

- headers: List[Tuple[str, bytes]], message headers

51

52

Returns:

53

- FutureRecordMetadata: future representing the send result

54

"""

55

56

def flush(self, timeout=None):

57

"""

58

Force all buffered records to be sent immediately.

59

60

Parameters:

61

- timeout: float, max time to wait for completion (seconds)

62

63

Raises:

64

- KafkaTimeoutError: if timeout exceeded

65

"""

66

67

def close(self, timeout=None):

68

"""

69

Close producer and clean up resources.

70

71

Parameters:

72

- timeout: float, max time to wait for completion (seconds)

73

"""

74

75

def partitions_for(self, topic):

76

"""

77

Get available partitions for topic.

78

79

Parameters:

80

- topic: str, topic name

81

82

Returns:

83

- Set[int]: available partition numbers

84

"""

85

86

def metrics(self):

87

"""

88

Get producer metrics.

89

90

Returns:

91

- Dict[str, float]: current metric values

92

"""

93

94

def bootstrap_connected(self):

95

"""

96

Check if connected to at least one bootstrap server.

97

98

Returns:

99

- bool: True if connected

100

"""

101

102

# Transactional methods (requires Kafka 0.11+)

103

def init_transactions(self):

104

"""

105

Initialize transactions for transactional producer.

106

Must be called before any other transaction methods.

107

108

Raises:

109

- IllegalStateError: if transactional_id not configured

110

- ProducerFencedError: if another producer with same ID is active

111

"""

112

113

def begin_transaction(self):

114

"""

115

Begin a new transaction.

116

Must call init_transactions() first.

117

118

Raises:

119

- IllegalStateError: if no transaction initialized

120

- ProducerFencedError: if producer is fenced

121

"""

122

123

def send_offsets_to_transaction(self, offsets, consumer_group_id):

124

"""

125

Add consumer offsets to current transaction.

126

127

Parameters:

128

- offsets: Dict[TopicPartition, OffsetAndMetadata], offsets to include

129

- consumer_group_id: str, consumer group ID

130

131

Raises:

132

- IllegalStateError: if no active transaction

133

- ProducerFencedError: if producer is fenced

134

"""

135

136

def commit_transaction(self):

137

"""

138

Commit the current transaction.

139

140

Raises:

141

- IllegalStateError: if no active transaction

142

- ProducerFencedError: if producer is fenced

143

"""

144

145

def abort_transaction(self):

146

"""

147

Abort the current transaction.

148

149

Raises:

150

- IllegalStateError: if no active transaction

151

- ProducerFencedError: if producer is fenced

152

"""

153

```

154

155

### Future Record Metadata

156

157

Future object returned by producer.send() representing an asynchronous send operation.

158

159

```python { .api }

160

class FutureRecordMetadata:

161

def get(self, timeout=None):

162

"""

163

Block until send completes and return metadata.

164

165

Parameters:

166

- timeout: float, max time to wait (seconds)

167

168

Returns:

169

- RecordMetadata: send result metadata

170

171

Raises:

172

- KafkaError: if send failed

173

- KafkaTimeoutError: if timeout exceeded

174

"""

175

176

def add_callback(self, callback):

177

"""

178

Add success callback.

179

180

Parameters:

181

- callback: Callable[[RecordMetadata], None], success callback

182

"""

183

184

def add_errback(self, errback):

185

"""

186

Add error callback.

187

188

Parameters:

189

- errback: Callable[[Exception], None], error callback

190

"""

191

192

def is_done(self):

193

"""

194

Check if send operation completed.

195

196

Returns:

197

- bool: True if completed (success or failure)

198

"""

199

200

def succeeded(self):

201

"""

202

Check if send succeeded.

203

204

Returns:

205

- bool: True if succeeded

206

"""

207

208

def failed(self):

209

"""

210

Check if send failed.

211

212

Returns:

213

- bool: True if failed

214

"""

215

```

216

217

### Record Metadata

218

219

Metadata returned after successful record send containing partition and offset information.

220

221

```python { .api }

222

class RecordMetadata:

223

topic: str # Topic name

224

partition: int # Partition number

225

offset: int # Record offset

226

timestamp: int # Record timestamp

227

checksum: int # Record checksum

228

serialized_key_size: int # Serialized key size

229

serialized_value_size: int # Serialized value size

230

leader_epoch: int # Leader epoch

231

```

232

233

### Partitioner Interface

234

235

Interface for custom partitioning strategies. The default partitioner uses murmur2 hash for keyed messages and round-robin for keyless messages.

236

237

```python { .api }

238

class DefaultPartitioner:

239

def __call__(self, key, all_partitions, available_partitions):

240

"""

241

Select partition for message.

242

243

Parameters:

244

- key: bytes, message key (may be None)

245

- all_partitions: List[int], all partition numbers

246

- available_partitions: List[int], available partition numbers

247

248

Returns:

249

- int: selected partition number

250

"""

251

252

def murmur2(data):

253

"""

254

Pure Python murmur2 hash implementation.

255

256

Parameters:

257

- data: bytes, data to hash

258

259

Returns:

260

- int: hash value

261

"""

262

```

263

264

### Serializer Interface

265

266

Abstract base classes for custom key and value serialization.

267

268

```python { .api }

269

class Serializer:

270

def serialize(self, topic, value):

271

"""

272

Serialize value to bytes.

273

274

Parameters:

275

- topic: str, topic name

276

- value: any, value to serialize

277

278

Returns:

279

- bytes: serialized value

280

"""

281

282

def close(self):

283

"""Clean up resources."""

284

```

285

286

## Usage Examples

287

288

### Basic Producer

289

290

```python

291

from kafka import KafkaProducer

292

import json

293

294

# Create producer with JSON serialization

295

producer = KafkaProducer(

296

bootstrap_servers=['localhost:9092'],

297

value_serializer=lambda v: json.dumps(v).encode('utf-8'),

298

key_serializer=lambda k: k.encode('utf-8') if k else None

299

)

300

301

# Send message asynchronously

302

future = producer.send('my-topic', value={'message': 'Hello World'}, key='key1')

303

304

# Block for acknowledgment

305

try:

306

record_metadata = future.get(timeout=10)

307

print(f"Message sent to topic {record_metadata.topic} "

308

f"partition {record_metadata.partition} "

309

f"offset {record_metadata.offset}")

310

except Exception as e:

311

print(f"Send failed: {e}")

312

313

producer.close()

314

```

315

316

### Fire and Forget

317

318

```python

319

from kafka import KafkaProducer

320

321

producer = KafkaProducer(

322

bootstrap_servers=['localhost:9092'],

323

value_serializer=str.encode,

324

acks=0 # Fire and forget

325

)

326

327

# Send many messages without waiting for acknowledgment

328

for i in range(1000):

329

producer.send('events', value=f'Event {i}')

330

331

# Force send all buffered messages

332

producer.flush()

333

producer.close()

334

```

335

336

### Synchronous Sending

337

338

```python

339

from kafka import KafkaProducer

340

from kafka.errors import KafkaError

341

342

producer = KafkaProducer(

343

bootstrap_servers=['localhost:9092'],

344

value_serializer=str.encode,

345

acks='all', # Wait for all replicas

346

retries=3

347

)

348

349

def send_sync(topic, value, key=None):

350

"""Send message synchronously with error handling."""

351

try:

352

future = producer.send(topic, value=value, key=key)

353

record_metadata = future.get(timeout=30)

354

return record_metadata

355

except KafkaError as e:

356

print(f"Failed to send message: {e}")

357

return None

358

359

# Send messages synchronously

360

metadata = send_sync('orders', 'Order #12345', key='customer-123')

361

if metadata:

362

print(f"Order sent successfully to offset {metadata.offset}")

363

364

producer.close()

365

```

366

367

### Asynchronous with Callbacks

368

369

```python

370

from kafka import KafkaProducer

371

import time

372

373

producer = KafkaProducer(

374

bootstrap_servers=['localhost:9092'],

375

value_serializer=str.encode

376

)

377

378

def on_success(record_metadata):

379

print(f"Message sent successfully: topic={record_metadata.topic} "

380

f"partition={record_metadata.partition} offset={record_metadata.offset}")

381

382

def on_error(exception):

383

print(f"Message send failed: {exception}")

384

385

# Send with callbacks

386

for i in range(10):

387

future = producer.send('async-topic', value=f'Message {i}')

388

future.add_callback(on_success)

389

future.add_errback(on_error)

390

391

# Give time for callbacks to complete

392

time.sleep(2)

393

producer.close()

394

```

395

396

### Batching and Compression

397

398

```python

399

from kafka import KafkaProducer

400

401

# Configure for high throughput with batching and compression

402

producer = KafkaProducer(

403

bootstrap_servers=['localhost:9092'],

404

value_serializer=str.encode,

405

406

# Batching configuration

407

batch_size=32768, # 32KB batches

408

linger_ms=100, # Wait up to 100ms to fill batch

409

410

# Compression

411

compression_type='lz4',

412

413

# Memory and throughput

414

buffer_memory=67108864, # 64MB buffer

415

max_in_flight_requests_per_connection=10

416

)

417

418

# Send many messages - will be batched and compressed

419

for i in range(10000):

420

producer.send('high-throughput', value=f'Batch message {i}')

421

422

producer.flush()

423

producer.close()

424

```

425

426

### Custom Partitioner

427

428

```python

429

from kafka import KafkaProducer

430

import hashlib

431

432

def custom_partitioner(key, all_partitions, available_partitions):

433

"""Custom partitioner using SHA-256 hash."""

434

if key is None:

435

# Round-robin for messages without keys

436

partition = hash(time.time()) % len(available_partitions)

437

return available_partitions[partition]

438

else:

439

# Hash-based partitioning for keyed messages

440

hash_value = int(hashlib.sha256(key).hexdigest(), 16)

441

return all_partitions[hash_value % len(all_partitions)]

442

443

producer = KafkaProducer(

444

bootstrap_servers=['localhost:9092'],

445

value_serializer=str.encode,

446

key_serializer=str.encode,

447

partitioner=custom_partitioner

448

)

449

450

# Messages with same key will go to same partition

451

producer.send('partitioned-topic', key='user-123', value='Event A')

452

producer.send('partitioned-topic', key='user-123', value='Event B')

453

producer.send('partitioned-topic', key='user-456', value='Event C')

454

455

producer.close()

456

```

457

458

### Idempotent Producer

459

460

```python

461

from kafka import KafkaProducer

462

463

# Enable idempotence for exactly-once semantics

464

producer = KafkaProducer(

465

bootstrap_servers=['localhost:9092'],

466

value_serializer=str.encode,

467

enable_idempotence=True,

468

acks='all',

469

retries=10,

470

max_in_flight_requests_per_connection=1 # Required for idempotence

471

)

472

473

# Producer will automatically retry and deduplicate

474

for i in range(100):

475

producer.send('exactly-once-topic', value=f'Idempotent message {i}')

476

477

producer.close()

478

```

479

480

### Transactional Producer

481

482

```python

483

from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata

484

485

# Configure transactional producer

486

producer = KafkaProducer(

487

bootstrap_servers=['localhost:9092'],

488

value_serializer=str.encode,

489

transactional_id='my-transactional-id', # Required for transactions

490

enable_idempotence=True, # Required for transactions

491

acks='all' # Required for transactions

492

)

493

494

# Initialize transactions (must be called once)

495

producer.init_transactions()

496

497

try:

498

# Begin transaction

499

producer.begin_transaction()

500

501

# Send messages as part of transaction

502

producer.send('orders', value='Order #1001')

503

producer.send('inventory', value='Item #5001 reserved')

504

505

# Include consumer offsets in transaction (consume-transform-produce pattern)

506

consumer_offsets = {

507

TopicPartition('input-topic', 0): OffsetAndMetadata(100, None)

508

}

509

producer.send_offsets_to_transaction(consumer_offsets, 'my-consumer-group')

510

511

# Commit transaction - all messages become visible atomically

512

producer.commit_transaction()

513

print("Transaction committed successfully")

514

515

except Exception as e:

516

print(f"Transaction failed: {e}")

517

# Abort transaction - no messages become visible

518

producer.abort_transaction()

519

520

finally:

521

producer.close()

522

```

523

524

### Custom Serializer

525

526

```python

527

from kafka import KafkaProducer

528

from kafka.serializer import Serializer

529

import pickle

530

import json

531

532

class PickleSerializer(Serializer):

533

"""Custom serializer using pickle."""

534

535

def serialize(self, topic, value):

536

if value is None:

537

return None

538

return pickle.dumps(value)

539

540

class JSONSerializer(Serializer):

541

"""Custom JSON serializer with error handling."""

542

543

def serialize(self, topic, value):

544

if value is None:

545

return None

546

try:

547

return json.dumps(value).encode('utf-8')

548

except (TypeError, ValueError) as e:

549

raise ValueError(f"Cannot serialize {value} to JSON: {e}")

550

551

producer = KafkaProducer(

552

bootstrap_servers=['localhost:9092'],

553

key_serializer=str.encode,

554

value_serializer=JSONSerializer()

555

)

556

557

# Send complex objects

558

producer.send('json-topic',

559

key='order-123',

560

value={'order_id': 123, 'items': ['item1', 'item2'], 'total': 99.99})

561

562

producer.close()

563

```

564

565

### Error Handling and Retries

566

567

```python

568

from kafka import KafkaProducer

569

from kafka.errors import KafkaError, KafkaTimeoutError, MessageSizeTooLargeError

570

571

producer = KafkaProducer(

572

bootstrap_servers=['localhost:9092'],

573

value_serializer=str.encode,

574

retries=5,

575

retry_backoff_ms=1000,

576

request_timeout_ms=30000

577

)

578

579

def robust_send(topic, value, key=None, max_retries=3):

580

"""Send with custom retry logic and error handling."""

581

582

for attempt in range(max_retries + 1):

583

try:

584

future = producer.send(topic, value=value, key=key)

585

record_metadata = future.get(timeout=30)

586

return record_metadata

587

588

except MessageSizeTooLargeError:

589

print(f"Message too large for topic {topic}")

590

return None

591

592

except KafkaTimeoutError:

593

print(f"Timeout on attempt {attempt + 1}")

594

if attempt == max_retries:

595

print("Max retries reached, giving up")

596

return None

597

time.sleep(2 ** attempt) # Exponential backoff

598

599

except KafkaError as e:

600

print(f"Kafka error on attempt {attempt + 1}: {e}")

601

if not e.retriable or attempt == max_retries:

602

return None

603

time.sleep(1)

604

605

return None

606

607

# Use robust sending

608

result = robust_send('critical-topic', 'Important message')

609

if result:

610

print(f"Message sent successfully: offset {result.offset}")

611

else:

612

print("Failed to send message after all retries")

613

614

producer.close()

615

```

616

617

## Performance Considerations

618

619

### Throughput Optimization

620

621

```python

622

# High-throughput producer configuration

623

producer = KafkaProducer(

624

bootstrap_servers=['localhost:9092'],

625

value_serializer=str.encode,

626

627

# Increase batch size for better throughput

628

batch_size=65536, # 64KB batches

629

linger_ms=50, # Small delay to fill batches

630

631

# Use fast compression

632

compression_type='lz4', # Fast compression

633

634

# Increase memory and concurrent requests

635

buffer_memory=134217728, # 128MB buffer

636

max_in_flight_requests_per_connection=10,

637

638

# Reduce acknowledgment requirements

639

acks=1, # Only wait for leader

640

641

# Increase network buffers

642

send_buffer_bytes=262144, # 256KB send buffer

643

receive_buffer_bytes=65536 # 64KB receive buffer

644

)

645

```

646

647

### Latency Optimization

648

649

```python

650

# Low-latency producer configuration

651

producer = KafkaProducer(

652

bootstrap_servers=['localhost:9092'],

653

value_serializer=str.encode,

654

655

# Minimize batching delays

656

batch_size=1, # Send immediately

657

linger_ms=0, # No delay

658

659

# No compression for speed

660

compression_type='none',

661

662

# Faster acknowledgments

663

acks=1,

664

665

# Reduce timeout values

666

request_timeout_ms=10000, # 10 second timeout

667

delivery_timeout_ms=30000 # 30 second delivery timeout

668

)

669

```