or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconnection.mdconsumer.mderrors.mdindex.mdproducer.mdserialization.mdstructs.md

producer.mddocs/

0

# Producer Operations

1

2

High-level producer for publishing records to Kafka topics with automatic batching, compression, partitioning, and retry logic. The KafkaProducer is thread-safe and designed for high-throughput scenarios.

3

4

## Capabilities

5

6

### KafkaProducer

7

8

Main producer class for sending records to Kafka topics. Provides asynchronous sending with futures, automatic batching, and configurable retry logic.

9

10

```python { .api }

11

class KafkaProducer:

12

def __init__(self, **configs):

13

"""

14

Create a KafkaProducer instance.

15

16

Args:

17

**configs: Producer configuration options including:

18

bootstrap_servers (list): List of Kafka brokers

19

key_serializer (callable): Function to serialize keys

20

value_serializer (callable): Function to serialize values

21

acks (int|str): Acknowledgment requirements (0, 1, 'all')

22

retries (int): Number of retry attempts

23

batch_size (int): Batch size in bytes

24

linger_ms (int): Time to wait for batching

25

buffer_memory (int): Total memory for buffering

26

compression_type (str): Compression algorithm ('gzip', 'snappy', 'lz4', 'zstd')

27

max_in_flight_requests_per_connection (int): Max unacknowledged requests

28

request_timeout_ms (int): Request timeout

29

retry_backoff_ms (int): Retry backoff time

30

client_id (str): Client identifier

31

security_protocol (str): Security protocol ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')

32

ssl_context: SSL context for encrypted connections

33

sasl_mechanism (str): SASL mechanism ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'GSSAPI', 'OAUTHBEARER')

34

sasl_plain_username (str): Username for PLAIN SASL

35

sasl_plain_password (str): Password for PLAIN SASL

36

"""

37

38

def send(self, topic: str, value=None, key=None, headers=None, partition=None, timestamp_ms=None):

39

"""

40

Send a record to the specified topic.

41

42

Args:

43

topic (str): The topic to send the record to

44

value: The record value (will be serialized using value_serializer)

45

key: The record key (will be serialized using key_serializer)

46

headers (list): List of (key, value) header tuples

47

partition (int): Specific partition to send to (optional)

48

timestamp_ms (int): Record timestamp in milliseconds (optional)

49

50

Returns:

51

FutureRecordMetadata: Future that resolves to RecordMetadata when send completes

52

"""

53

54

def flush(self, timeout=None):

55

"""

56

Flush all pending records, blocking until complete.

57

58

Args:

59

timeout (float): Maximum time to wait in seconds

60

"""

61

62

def close(self, timeout=None):

63

"""

64

Close the producer and release resources.

65

66

Args:

67

timeout (float): Maximum time to wait for pending sends

68

"""

69

70

def partitions_for(self, topic: str):

71

"""

72

Get available partitions for a topic.

73

74

Args:

75

topic (str): Topic name

76

77

Returns:

78

set: Set of available partition numbers

79

"""

80

81

def bootstrap_connected(self):

82

"""

83

Check if producer has established bootstrap connection.

84

85

Returns:

86

bool: True if connected to at least one bootstrap server

87

"""

88

89

def metrics(self, raw=False):

90

"""

91

Get producer performance metrics.

92

93

Args:

94

raw (bool): If True, return raw metrics dict. If False, return formatted metrics.

95

96

Returns:

97

dict: Producer performance metrics including send rates, batch sizes,

98

buffer usage, and request latencies

99

"""

100

```

101

102

### Future Objects

103

104

The send() method returns future objects that can be used to handle asynchronous results.

105

106

```python { .api }

107

class FutureRecordMetadata:

108

def get(self, timeout=None):

109

"""

110

Get the RecordMetadata result, blocking if necessary.

111

112

Args:

113

timeout (float): Maximum time to wait in seconds

114

115

Returns:

116

RecordMetadata: Metadata about the sent record

117

118

Raises:

119

KafkaError: If send failed

120

"""

121

122

def add_callback(self, callback):

123

"""

124

Add callback function to be called when send completes.

125

126

Args:

127

callback (callable): Function called with (metadata, exception)

128

"""

129

130

def add_errback(self, errback):

131

"""

132

Add error callback for send failures.

133

134

Args:

135

errback (callable): Function called with exception on failure

136

"""

137

138

def is_done(self):

139

"""

140

Check if the send operation is complete.

141

142

Returns:

143

bool: True if send is complete (success or failure)

144

"""

145

146

class RecordMetadata:

147

topic: str # Topic name

148

partition: int # Partition number

149

offset: int # Record offset in partition

150

timestamp: int # Record timestamp

151

checksum: int # Record checksum

152

serialized_key_size: int # Serialized key size in bytes

153

serialized_value_size: int # Serialized value size in bytes

154

```

155

156

## Usage Examples

157

158

### Basic Producer Usage

159

160

```python

161

from kafka import KafkaProducer

162

import json

163

164

# Create producer with JSON serialization

165

producer = KafkaProducer(

166

bootstrap_servers=['localhost:9092'],

167

value_serializer=lambda v: json.dumps(v).encode('utf-8'),

168

key_serializer=lambda k: str(k).encode('utf-8')

169

)

170

171

# Send a message

172

future = producer.send('my-topic', key='user123', value={'action': 'login'})

173

174

# Block until send completes and get metadata

175

metadata = future.get(timeout=10)

176

print(f"Sent to partition {metadata.partition} at offset {metadata.offset}")

177

178

producer.close()

179

```

180

181

### Producer with Callbacks

182

183

```python

184

from kafka import KafkaProducer

185

186

def on_success(metadata):

187

print(f"Message sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")

188

189

def on_error(exception):

190

print(f"Send failed: {exception}")

191

192

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

193

194

# Send with callbacks

195

future = producer.send('my-topic', b'message-value')

196

future.add_callback(on_success)

197

future.add_errback(on_error)

198

199

producer.flush()

200

producer.close()

201

```

202

203

### High-Throughput Configuration

204

205

```python

206

from kafka import KafkaProducer

207

208

# Configure for high throughput

209

producer = KafkaProducer(

210

bootstrap_servers=['localhost:9092'],

211

batch_size=16384, # 16KB batches

212

linger_ms=10, # Wait 10ms for batching

213

compression_type='lz4', # Compress batches

214

buffer_memory=33554432, # 32MB buffer

215

max_in_flight_requests_per_connection=5

216

)

217

218

# Send many messages quickly

219

for i in range(1000):

220

producer.send('high-volume-topic', f'message-{i}'.encode())

221

222

producer.flush()

223

producer.close()

224

```

225

226

### Secure Producer (SSL + SASL)

227

228

```python

229

from kafka import KafkaProducer

230

231

producer = KafkaProducer(

232

bootstrap_servers=['secure-broker:9093'],

233

security_protocol='SASL_SSL',

234

sasl_mechanism='SCRAM-SHA-256',

235

sasl_plain_username='myuser',

236

sasl_plain_password='mypassword',

237

ssl_check_hostname=True,

238

ssl_cafile='ca-cert.pem'

239

)

240

241

producer.send('secure-topic', b'encrypted message')

242

producer.close()

243

```

244

245

### Custom Partitioning

246

247

```python

248

from kafka import KafkaProducer

249

from kafka.partitioner import DefaultPartitioner

250

251

class CustomPartitioner:

252

def __init__(self):

253

self.default = DefaultPartitioner()

254

255

def partition(self, topic, key, all_partitions, available_partitions):

256

# Custom logic here

257

if key and key.startswith(b'priority-'):

258

return 0 # Send priority messages to partition 0

259

return self.default.partition(topic, key, all_partitions, available_partitions)

260

261

producer = KafkaProducer(

262

bootstrap_servers=['localhost:9092'],

263

partitioner=CustomPartitioner()

264

)

265

```