or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-operations.mdasync-operations.mdconnection-management.mdindex.mdmessage-consumption.mdmessage-production.md

message-consumption.mddocs/

0

# Message Consumption

1

2

Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.

3

4

## Capabilities

5

6

### Consume From Topic Operator

7

8

Airflow operator for consuming messages from Kafka topics with flexible processing functions and configurable commit strategies.

9

10

```python { .api }

11

class ConsumeFromTopicOperator(BaseOperator):

12

"""

13

Operator for consuming messages from Kafka topics.

14

15

Attributes:

16

template_fields: tuple = ("topics", "apply_function_args", "apply_function_kwargs", "kafka_config_id")

17

"""

18

19

def __init__(

20

self,

21

topics: str | Sequence[str],

22

kafka_config_id: str = "kafka_default",

23

apply_function: Callable[..., Any] | str | None = None,

24

apply_function_batch: Callable[..., Any] | str | None = None,

25

apply_function_args: Sequence[Any] | None = None,

26

apply_function_kwargs: dict[Any, Any] | None = None,

27

commit_cadence: str = "end_of_operator",

28

max_messages: int | None = None,

29

max_batch_size: int = 1000,

30

poll_timeout: float = 60,

31

**kwargs: Any

32

) -> None:

33

"""

34

Initialize the consumer operator.

35

36

Args:

37

topics: Kafka topic name(s) to consume from

38

kafka_config_id: Airflow connection ID for Kafka configuration

39

apply_function: Function to apply to each message

40

apply_function_batch: Function to apply to batches of messages

41

apply_function_args: Arguments to pass to apply function

42

apply_function_kwargs: Keyword arguments to pass to apply function

43

commit_cadence: When to commit offsets ("never", "end_of_batch", "end_of_operator")

44

max_messages: Maximum number of messages to consume

45

max_batch_size: Maximum messages per batch

46

poll_timeout: Timeout for polling messages (seconds)

47

**kwargs: Additional operator arguments

48

"""

49

50

def execute(self, context) -> Any:

51

"""

52

Execute the consumer operation.

53

54

Args:

55

context: Airflow task context

56

57

Returns:

58

Any: Result of apply function(s)

59

"""

60

```

61

62

### Kafka Consumer Hook

63

64

Lower-level hook providing direct access to Kafka Consumer client for advanced use cases.

65

66

```python { .api }

67

class KafkaConsumerHook(KafkaBaseHook):

68

"""

69

A hook for consuming messages from Kafka topics.

70

71

Inherits from KafkaBaseHook and provides consumer client access.

72

"""

73

74

def __init__(self, topics: Sequence[str], kafka_config_id: str = "kafka_default") -> None:

75

"""

76

Initialize the Kafka consumer hook.

77

78

Args:

79

topics: List of topic names to subscribe to

80

kafka_config_id: The connection object to use

81

"""

82

83

def get_consumer(self) -> Consumer:

84

"""

85

Get Kafka Consumer client.

86

87

Returns:

88

Consumer: Configured confluent-kafka Consumer instance

89

"""

90

91

def _get_client(self, config) -> Consumer:

92

"""

93

Get a Kafka Consumer client with the given configuration.

94

95

Args:

96

config: Kafka client configuration dictionary

97

98

Returns:

99

Consumer: Configured confluent-kafka Consumer instance

100

"""

101

```

102

103

### Commit Cadence Options

104

105

```python { .api }

106

VALID_COMMIT_CADENCE = ["never", "end_of_batch", "end_of_operator"]

107

```

108

109

### Error Handling

110

111

```python { .api }

112

class KafkaAuthenticationError(Exception):

113

"""

114

Custom exception for Kafka authentication failures.

115

116

Raised when consumer authentication fails during connection

117

or topic subscription attempts.

118

"""

119

120

def error_callback(err):

121

"""

122

Default error handling callback for consumer operations.

123

124

Args:

125

err: Error object from confluent-kafka consumer

126

"""

127

```

128

129

### Usage Examples

130

131

#### Basic Message Consumption

132

133

```python

134

from airflow import DAG

135

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

136

from datetime import datetime

137

138

def process_message(message):

139

"""Process individual messages."""

140

key = message.key().decode('utf-8') if message.key() else None

141

value = message.value().decode('utf-8')

142

143

print(f"Processing message - Key: {key}, Value: {value}")

144

145

# Return True to indicate successful processing

146

return True

147

148

dag = DAG(

149

"kafka_consumer_example",

150

start_date=datetime(2023, 1, 1),

151

schedule_interval=None,

152

catchup=False

153

)

154

155

consume_task = ConsumeFromTopicOperator(

156

task_id="consume_messages",

157

topics=["my-topic"],

158

apply_function=process_message,

159

max_messages=100,

160

kafka_config_id="kafka_default",

161

dag=dag

162

)

163

```

164

165

#### Batch Processing

166

167

```python

168

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

169

import json

170

171

def process_batch(messages):

172

"""Process messages in batches."""

173

processed_records = []

174

175

for message in messages:

176

try:

177

# Parse JSON message

178

data = json.loads(message.value().decode('utf-8'))

179

180

# Transform data

181

processed_record = {

182

"user_id": data["user_id"],

183

"action": data["action"],

184

"processed_at": datetime.now().isoformat()

185

}

186

processed_records.append(processed_record)

187

188

except json.JSONDecodeError:

189

print(f"Failed to parse message: {message.value()}")

190

continue

191

192

# Bulk insert or process

193

print(f"Processed batch of {len(processed_records)} records")

194

return processed_records

195

196

consume_batch_task = ConsumeFromTopicOperator(

197

task_id="consume_batch",

198

topics=["user-events"],

199

apply_function_batch=process_batch,

200

max_batch_size=500,

201

commit_cadence="end_of_batch",

202

kafka_config_id="kafka_default"

203

)

204

```

205

206

#### Multiple Topics Consumption

207

208

```python

209

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

210

211

def multi_topic_processor(message):

212

"""Process messages from multiple topics."""

213

topic = message.topic()

214

value = message.value().decode('utf-8')

215

216

if topic == "orders":

217

print(f"Processing order: {value}")

218

# Handle order logic

219

elif topic == "payments":

220

print(f"Processing payment: {value}")

221

# Handle payment logic

222

elif topic == "shipments":

223

print(f"Processing shipment: {value}")

224

# Handle shipment logic

225

226

return True

227

228

multi_topic_task = ConsumeFromTopicOperator(

229

task_id="consume_multi_topics",

230

topics=["orders", "payments", "shipments"],

231

apply_function=multi_topic_processor,

232

max_messages=1000,

233

poll_timeout=30,

234

kafka_config_id="kafka_default"

235

)

236

```

237

238

#### Parameterized Processing

239

240

```python

241

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

242

243

def parameterized_processor(message, filter_user_id, output_format):

244

"""Process messages with parameters."""

245

try:

246

data = json.loads(message.value().decode('utf-8'))

247

248

# Filter by user ID if specified

249

if filter_user_id and data.get("user_id") != filter_user_id:

250

return None # Skip this message

251

252

# Format output

253

if output_format == "csv":

254

result = f"{data['user_id']},{data['action']},{data['timestamp']}"

255

else:

256

result = json.dumps(data)

257

258

print(f"Processed: {result}")

259

return result

260

261

except Exception as e:

262

print(f"Error processing message: {e}")

263

return None

264

265

filtered_consume_task = ConsumeFromTopicOperator(

266

task_id="filtered_consume",

267

topics=["user-events"],

268

apply_function=parameterized_processor,

269

apply_function_args=[123, "json"], # filter_user_id=123, output_format="json"

270

max_messages=500,

271

kafka_config_id="kafka_default"

272

)

273

```

274

275

#### Advanced Commit Strategy

276

277

```python

278

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

279

280

def critical_processor(message):

281

"""Process critical messages with manual commit control."""

282

try:

283

# Critical processing logic

284

data = json.loads(message.value().decode('utf-8'))

285

286

# Validate data

287

if not all(k in data for k in ["id", "amount", "currency"]):

288

raise ValueError("Missing required fields")

289

290

# Process financial transaction

291

process_transaction(data)

292

293

print(f"Successfully processed transaction {data['id']}")

294

return True

295

296

except Exception as e:

297

print(f"Failed to process transaction: {e}")

298

# Don't commit this message, it will be retried

299

raise

300

301

critical_consume_task = ConsumeFromTopicOperator(

302

task_id="consume_critical",

303

topics=["financial-transactions"],

304

apply_function=critical_processor,

305

commit_cadence="end_of_operator", # Only commit after all messages processed

306

max_messages=100,

307

poll_timeout=60,

308

kafka_config_id="kafka_prod"

309

)

310

```

311

312

### Using Consumer Hook Directly

313

314

```python

315

from airflow.operators.python import PythonOperator

316

from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook

317

318

def custom_consumer_logic():

319

"""Use consumer hook for advanced scenarios."""

320

hook = KafkaConsumerHook(

321

topics=["custom-topic"],

322

kafka_config_id="kafka_default"

323

)

324

325

consumer = hook.get_consumer()

326

messages_processed = 0

327

328

try:

329

while messages_processed < 100:

330

msg = consumer.poll(timeout=1.0)

331

332

if msg is None:

333

continue

334

335

if msg.error():

336

print(f"Consumer error: {msg.error()}")

337

continue

338

339

# Custom processing

340

print(f"Received: {msg.value().decode('utf-8')}")

341

messages_processed += 1

342

343

# Manual commit after processing

344

consumer.commit(msg)

345

346

except Exception as e:

347

print(f"Consumer error: {e}")

348

raise

349

finally:

350

consumer.close()

351

352

hook_consumer_task = PythonOperator(

353

task_id="custom_consumer",

354

python_callable=custom_consumer_logic

355

)

356

```

357

358

### Advanced Consumer Configuration

359

360

#### Consumer Group Configuration

361

362

```python

363

# Connection extra configuration for consumer groups

364

{

365

"bootstrap.servers": "kafka:9092",

366

"group.id": "airflow-consumer-group",

367

"auto.offset.reset": "earliest", # or "latest"

368

"enable.auto.commit": "false", # Manual commit control

369

"max.poll.records": "500", # Messages per poll

370

"session.timeout.ms": "30000", # Consumer group session timeout

371

"heartbeat.interval.ms": "3000", # Heartbeat interval

372

"fetch.min.bytes": "1024", # Minimum bytes to fetch

373

"fetch.max.wait.ms": "500" # Maximum wait time for fetch

374

}

375

```

376

377

#### Message Filtering and Transformation

378

379

```python

380

def filter_and_transform(message):

381

"""Filter and transform messages based on content."""

382

try:

383

data = json.loads(message.value().decode('utf-8'))

384

385

# Filter based on message content

386

if data.get("event_type") not in ["purchase", "login"]:

387

return None # Skip message

388

389

# Transform message

390

transformed = {

391

"event_id": data["id"],

392

"user": data["user_id"],

393

"type": data["event_type"],

394

"timestamp": data["timestamp"],

395

"metadata": {

396

"topic": message.topic(),

397

"partition": message.partition(),

398

"offset": message.offset()

399

}

400

}

401

402

return transformed

403

404

except Exception as e:

405

print(f"Error filtering message: {e}")

406

return None

407

408

filter_task = ConsumeFromTopicOperator(

409

task_id="filter_messages",

410

topics=["raw-events"],

411

apply_function=filter_and_transform,

412

commit_cadence="end_of_batch",

413

max_batch_size=100,

414

kafka_config_id="kafka_default"

415

)

416

```

417

418

### Error Handling and Retry Logic

419

420

```python

421

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

422

423

def resilient_processor(message):

424

"""Process messages with retry logic."""

425

max_retries = 3

426

retry_count = 0

427

428

while retry_count < max_retries:

429

try:

430

# Processing logic that might fail

431

data = json.loads(message.value().decode('utf-8'))

432

433

# Simulate external API call

434

result = call_external_api(data)

435

436

print(f"Successfully processed message: {result}")

437

return result

438

439

except Exception as e:

440

retry_count += 1

441

print(f"Attempt {retry_count} failed: {e}")

442

443

if retry_count >= max_retries:

444

# Log to dead letter topic or error handling

445

print(f"Message failed after {max_retries} attempts")

446

return None

447

448

# Wait before retry

449

time.sleep(2 ** retry_count) # Exponential backoff

450

451

resilient_task = ConsumeFromTopicOperator(

452

task_id="resilient_consume",

453

topics=["api-requests"],

454

apply_function=resilient_processor,

455

commit_cadence="end_of_operator",

456

kafka_config_id="kafka_default"

457

)

458

```

459

460

### Performance Optimization

461

462

#### High-Throughput Consumption

463

464

```python

465

def high_throughput_batch_processor(messages):

466

"""Optimized batch processing for high throughput."""

467

# Process in chunks for memory efficiency

468

chunk_size = 100

469

results = []

470

471

for i in range(0, len(messages), chunk_size):

472

chunk = messages[i:i + chunk_size]

473

474

# Parallel processing of chunk

475

chunk_results = []

476

for message in chunk:

477

try:

478

data = json.loads(message.value().decode('utf-8'))

479

# Fast processing logic

480

chunk_results.append(transform_data(data))

481

except Exception as e:

482

print(f"Error in chunk processing: {e}")

483

continue

484

485

results.extend(chunk_results)

486

487

# Optional: Intermediate processing/storage

488

if len(results) >= 1000:

489

bulk_store(results)

490

results = []

491

492

# Final storage

493

if results:

494

bulk_store(results)

495

496

return len(messages)

497

498

high_throughput_task = ConsumeFromTopicOperator(

499

task_id="high_throughput_consume",

500

topics=["high-volume-topic"],

501

apply_function_batch=high_throughput_batch_processor,

502

max_batch_size=2000,

503

poll_timeout=10,

504

commit_cadence="end_of_batch",

505

kafka_config_id="kafka_default"

506

)

507

```

508

509

### Best Practices

510

511

1. **Consumer Groups**: Use meaningful group IDs for consumer coordination

512

2. **Offset Management**: Choose appropriate `auto.offset.reset` and commit strategies

513

3. **Error Handling**: Implement robust error handling with retry logic

514

4. **Memory Management**: Process messages in batches for better memory utilization

515

5. **Monitoring**: Log processing metrics and errors for operational visibility

516

6. **Backpressure**: Use `max_messages` and `poll_timeout` to control consumption rate