or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-operations.mdasync-operations.mdconnection-management.mdindex.mdmessage-consumption.mdmessage-production.md

message-production.mddocs/

0

# Message Production

1

2

Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.

3

4

## Capabilities

5

6

### Produce to Topic Operator

7

8

Airflow operator for producing messages to Kafka topics with flexible producer function support and delivery confirmation options.

9

10

```python { .api }

11

class ProduceToTopicOperator(BaseOperator):

12

"""

13

Operator for producing messages to a Kafka topic.

14

15

Attributes:

16

template_fields: tuple = ("topic", "producer_function_args", "producer_function_kwargs", "kafka_config_id")

17

"""

18

19

def __init__(

20

self,

21

topic: str,

22

producer_function: str | Callable[..., Any],

23

kafka_config_id: str = "kafka_default",

24

producer_function_args: Sequence[Any] | None = None,

25

producer_function_kwargs: dict[Any, Any] | None = None,

26

delivery_callback: str | None = None,

27

synchronous: bool = True,

28

poll_timeout: float = 0,

29

**kwargs: Any

30

) -> None:

31

"""

32

Initialize the producer operator.

33

34

Args:

35

topic: Kafka topic name to produce to

36

producer_function: Function that takes a producer and produces messages

37

kafka_config_id: Airflow connection ID for Kafka configuration

38

producer_function_args: Arguments to pass to producer function

39

producer_function_kwargs: Keyword arguments to pass to producer function

40

delivery_callback: Callback function name for delivery confirmations

41

synchronous: Whether to wait for delivery confirmation

42

poll_timeout: Timeout for polling delivery confirmations

43

**kwargs: Additional operator arguments

44

"""

45

46

def execute(self, context) -> None:

47

"""

48

Execute the producer operation.

49

50

Args:

51

context: Airflow task context

52

"""

53

```

54

55

### Kafka Producer Hook

56

57

Lower-level hook providing direct access to Kafka Producer client for advanced use cases.

58

59

```python { .api }

60

class KafkaProducerHook(KafkaBaseHook):

61

"""

62

A hook for producing messages to Kafka topics.

63

64

Inherits from KafkaBaseHook and provides producer client access.

65

"""

66

67

def __init__(self, kafka_config_id: str = "kafka_default") -> None:

68

"""

69

Initialize the Kafka producer hook.

70

71

Args:

72

kafka_config_id: The connection object to use

73

"""

74

75

def get_producer(self) -> Producer:

76

"""

77

Get Kafka Producer client.

78

79

Returns:

80

Producer: Configured confluent-kafka Producer instance

81

"""

82

83

def _get_client(self, config) -> Producer:

84

"""

85

Get a Kafka Producer client with the given configuration.

86

87

Args:

88

config: Kafka client configuration dictionary

89

90

Returns:

91

Producer: Configured confluent-kafka Producer instance

92

"""

93

```

94

95

### Delivery Callback Functions

96

97

Default and custom callback functions for handling message delivery confirmations.

98

99

```python { .api }

100

def acked(err, msg):

101

"""

102

Default delivery callback for message acknowledgments.

103

104

This callback is automatically called by the producer when a message

105

delivery attempt completes (either successfully or with failure).

106

107

Args:

108

err: Error object if delivery failed, None if successful

109

msg: Message object with delivery details including topic, partition, offset

110

"""

111

```

112

113

### Usage Examples

114

115

#### Basic Message Production

116

117

```python

118

from airflow import DAG

119

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

120

from datetime import datetime

121

122

def simple_producer(producer):

123

"""Produce simple text messages."""

124

messages = ["Hello", "World", "from", "Airflow"]

125

126

for message in messages:

127

producer.produce("my-topic", value=message)

128

129

# Flush to ensure delivery

130

producer.flush()

131

132

dag = DAG(

133

"kafka_producer_example",

134

start_date=datetime(2023, 1, 1),

135

schedule_interval=None,

136

catchup=False

137

)

138

139

produce_task = ProduceToTopicOperator(

140

task_id="produce_messages",

141

topic="my-topic",

142

producer_function=simple_producer,

143

kafka_config_id="kafka_default",

144

dag=dag

145

)

146

```

147

148

#### JSON Message Production

149

150

```python

151

import json

152

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

153

154

def json_producer(producer):

155

"""Produce JSON messages with keys."""

156

data_records = [

157

{"user_id": 1, "action": "login", "timestamp": "2023-01-01T10:00:00Z"},

158

{"user_id": 2, "action": "purchase", "timestamp": "2023-01-01T10:01:00Z"},

159

{"user_id": 1, "action": "logout", "timestamp": "2023-01-01T10:02:00Z"}

160

]

161

162

for record in data_records:

163

key = str(record["user_id"])

164

value = json.dumps(record)

165

166

producer.produce(

167

"user-events",

168

key=key,

169

value=value,

170

headers={"content-type": "application/json"}

171

)

172

173

producer.flush()

174

175

produce_json_task = ProduceToTopicOperator(

176

task_id="produce_json_data",

177

topic="user-events",

178

producer_function=json_producer,

179

kafka_config_id="kafka_default"

180

)

181

```

182

183

#### Parameterized Production

184

185

```python

186

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

187

188

def parameterized_producer(producer, batch_size, message_prefix):

189

"""Produce messages with parameters."""

190

for i in range(batch_size):

191

message = f"{message_prefix}-{i:04d}"

192

producer.produce("batch-topic", value=message)

193

194

producer.flush()

195

print(f"Produced {batch_size} messages with prefix '{message_prefix}'")

196

197

produce_batch_task = ProduceToTopicOperator(

198

task_id="produce_batch",

199

topic="batch-topic",

200

producer_function=parameterized_producer,

201

producer_function_args=[100, "BATCH"], # batch_size=100, prefix="BATCH"

202

kafka_config_id="kafka_default"

203

)

204

```

205

206

#### Custom Delivery Callback

207

208

```python

209

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

210

211

def custom_delivery_callback(err, msg):

212

"""Custom callback for delivery confirmation."""

213

if err is not None:

214

print(f"Message delivery failed: {err}")

215

else:

216

print(f"Message delivered to {msg.topic()}[{msg.partition()}] at offset {msg.offset()}")

217

218

def producer_with_callback(producer):

219

"""Producer using custom delivery callback."""

220

for i in range(5):

221

producer.produce(

222

"callback-topic",

223

value=f"Message {i}",

224

callback=custom_delivery_callback

225

)

226

227

# Poll for delivery reports

228

producer.poll(1.0)

229

producer.flush()

230

231

produce_with_callback = ProduceToTopicOperator(

232

task_id="produce_with_callback",

233

topic="callback-topic",

234

producer_function=producer_with_callback,

235

delivery_callback="custom_delivery_callback",

236

kafka_config_id="kafka_default"

237

)

238

```

239

240

#### Asynchronous Production

241

242

```python

243

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

244

245

def async_producer(producer):

246

"""Asynchronous producer without waiting for confirmations."""

247

for i in range(1000):

248

producer.produce(

249

"high-volume-topic",

250

value=f"Message {i}",

251

key=str(i % 10) # Distribute across partitions

252

)

253

254

# Occasional polling to clear delivery reports

255

if i % 100 == 0:

256

producer.poll(0)

257

258

producer.flush()

259

260

async_produce_task = ProduceToTopicOperator(

261

task_id="async_produce",

262

topic="high-volume-topic",

263

producer_function=async_producer,

264

synchronous=False, # Don't wait for individual confirmations

265

kafka_config_id="kafka_default"

266

)

267

```

268

269

### Using Producer Hook Directly

270

271

```python

272

from airflow.operators.python import PythonOperator

273

from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook

274

275

def custom_producer_logic():

276

"""Use producer hook for advanced scenarios."""

277

hook = KafkaProducerHook(kafka_config_id="kafka_default")

278

producer = hook.get_producer()

279

280

try:

281

# Custom production logic

282

for i in range(10):

283

future = producer.produce(

284

"custom-topic",

285

value=f"Custom message {i}",

286

partition=i % 3 # Explicit partition assignment

287

)

288

289

# Wait for all messages

290

producer.flush(timeout=10)

291

292

except Exception as e:

293

print(f"Production failed: {e}")

294

raise

295

finally:

296

producer.close()

297

298

hook_task = PythonOperator(

299

task_id="custom_producer",

300

python_callable=custom_producer_logic

301

)

302

```

303

304

### Advanced Configuration

305

306

#### Producer Configuration via Connection

307

308

```python

309

# Connection extra configuration for optimized production

310

{

311

"bootstrap.servers": "kafka:9092",

312

"acks": "all", # Wait for all replicas

313

"retries": "2147483647", # Retry indefinitely

314

"batch.size": "16384", # Batch size in bytes

315

"linger.ms": "5", # Wait up to 5ms for batching

316

"compression.type": "snappy", # Enable compression

317

"max.in.flight.requests.per.connection": "5",

318

"enable.idempotence": "true" # Exactly-once semantics

319

}

320

```

321

322

#### Error Handling in Producer Functions

323

324

```python

325

def robust_producer(producer):

326

"""Producer with comprehensive error handling."""

327

messages_sent = 0

328

failed_messages = []

329

330

def delivery_report(err, msg):

331

nonlocal messages_sent, failed_messages

332

if err is not None:

333

failed_messages.append((msg.key(), msg.value(), str(err)))

334

else:

335

messages_sent += 1

336

337

try:

338

for i in range(100):

339

producer.produce(

340

"robust-topic",

341

key=str(i),

342

value=f"Message {i}",

343

callback=delivery_report

344

)

345

346

# Poll periodically

347

if i % 10 == 0:

348

producer.poll(0.1)

349

350

# Final flush with timeout

351

remaining = producer.flush(timeout=30)

352

if remaining > 0:

353

raise Exception(f"{remaining} messages failed to deliver")

354

355

print(f"Successfully sent {messages_sent} messages")

356

if failed_messages:

357

print(f"Failed messages: {failed_messages}")

358

359

except Exception as e:

360

print(f"Producer error: {e}")

361

raise

362

363

robust_task = ProduceToTopicOperator(

364

task_id="robust_producer",

365

topic="robust-topic",

366

producer_function=robust_producer,

367

kafka_config_id="kafka_default"

368

)

369

```

370

371

### Best Practices

372

373

1. **Batching**: Use `linger.ms` and `batch.size` for optimal throughput

374

2. **Error Handling**: Always implement delivery callbacks for critical data

375

3. **Flushing**: Call `producer.flush()` to ensure message delivery

376

4. **Partitioning**: Use message keys for consistent partition assignment

377

5. **Compression**: Enable compression for better network utilization

378

6. **Idempotence**: Enable idempotence for exactly-once delivery semantics