or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

mixins.mddocs/

0

# Consumer Mixins

1

2

Ready-to-use consumer frameworks that provide structured approaches for building consumer applications with connection management and error handling. The mixin classes offer a robust foundation for building long-running consumer services.

3

4

## Capabilities

5

6

### ConsumerMixin

7

8

Convenience mixin for implementing consumer programs with automatic connection management, error handling, and graceful shutdown support.

9

10

```python { .api }

11

class ConsumerMixin:

12

def get_consumers(self, Consumer, channel):

13

"""

14

Abstract method that must be implemented by subclasses.

15

16

Should return a list of Consumer instances.

17

18

Parameters:

19

- Consumer (class): Consumer class to instantiate

20

- channel: AMQP channel to use

21

22

Returns:

23

list: List of Consumer instances

24

25

Must be implemented by subclasses.

26

"""

27

28

def run(self, _tokens=1, **kwargs):

29

"""

30

Main run loop that handles connections and consumers.

31

32

Parameters:

33

- _tokens (int): Number of times to restart on connection failure

34

- **kwargs: Additional arguments passed to consume()

35

36

Returns:

37

None

38

"""

39

40

def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):

41

"""

42

Consume messages from queues.

43

44

Parameters:

45

- limit (int): Maximum number of messages to process

46

- timeout (float): Timeout for each drain_events call

47

- safety_interval (float): Sleep interval between iterations

48

- **kwargs: Additional consume arguments

49

50

Returns:

51

None

52

"""

53

54

def on_connection_error(self, exc, interval):

55

"""

56

Called when connection error occurs.

57

58

Parameters:

59

- exc (Exception): Connection exception

60

- interval (float): Sleep interval before retry

61

62

Override to customize error handling.

63

"""

64

65

def on_connection_revived(self):

66

"""

67

Called when connection is re-established after failure.

68

69

Override to perform cleanup or reinitialization.

70

"""

71

72

def on_consume_ready(self, connection, channel, consumers, **kwargs):

73

"""

74

Called when consumers are ready to process messages.

75

76

Parameters:

77

- connection: Connection instance

78

- channel: AMQP channel

79

- consumers (list): List of Consumer instances

80

- **kwargs: Additional context

81

82

Override to perform setup before message processing.

83

"""

84

85

def on_consume_end(self, connection, channel):

86

"""

87

Called when consume loop ends.

88

89

Parameters:

90

- connection: Connection instance

91

- channel: AMQP channel

92

93

Override to perform cleanup after message processing.

94

"""

95

96

def on_iteration(self):

97

"""

98

Called on each iteration of the consume loop.

99

100

Override to perform periodic tasks.

101

"""

102

103

def on_decode_error(self, message, exc):

104

"""

105

Called when message decode error occurs.

106

107

Parameters:

108

- message (Message): Message that failed to decode

109

- exc (Exception): Decode exception

110

111

Override to handle decode errors. Default rejects message.

112

"""

113

114

def extra_context(self, connection, channel):

115

"""

116

Extra context manager for consume loop.

117

118

Parameters:

119

- connection: Connection instance

120

- channel: AMQP channel

121

122

Returns:

123

Context manager or None

124

125

Override to provide additional context management.

126

"""

127

128

# Properties and attributes

129

@property

130

def connect_max_retries(self):

131

"""int: Maximum connection retry attempts (default: 5)"""

132

133

@property

134

def should_stop(self):

135

"""bool: Flag to stop the consumer (set to True to stop)"""

136

```

137

138

### ConsumerProducerMixin

139

140

Consumer and Producer mixin that provides separate producer connection for publishing messages while consuming, preventing deadlocks and improving performance.

141

142

```python { .api }

143

class ConsumerProducerMixin(ConsumerMixin):

144

def get_consumers(self, Consumer, channel):

145

"""

146

Abstract method inherited from ConsumerMixin.

147

148

Must be implemented by subclasses.

149

"""

150

151

@property

152

def producer(self):

153

"""

154

Producer instance with separate connection.

155

156

Returns:

157

Producer: Producer for publishing messages

158

159

Automatically creates and manages separate connection.

160

"""

161

162

@property

163

def producer_connection(self):

164

"""

165

Separate connection for producer.

166

167

Returns:

168

Connection: Producer connection instance

169

170

Automatically created and managed.

171

"""

172

173

# Inherits all other methods from ConsumerMixin

174

```

175

176

## Usage Examples

177

178

### Basic ConsumerMixin Implementation

179

180

```python

181

from kombu import Connection, Queue, Exchange

182

from kombu.mixins import ConsumerMixin

183

184

class TaskWorker(ConsumerMixin):

185

def __init__(self, connection, queues):

186

self.connection = connection

187

self.queues = queues

188

189

def get_consumers(self, Consumer, channel):

190

return [

191

Consumer(

192

queues=self.queues,

193

callbacks=[self.process_message],

194

accept=['json', 'pickle']

195

)

196

]

197

198

def process_message(self, body, message):

199

print(f"Processing task: {body}")

200

201

try:

202

# Simulate work

203

task_type = body.get('type')

204

if task_type == 'email':

205

self.send_email(body)

206

elif task_type == 'report':

207

self.generate_report(body)

208

else:

209

print(f"Unknown task type: {task_type}")

210

211

message.ack()

212

print(f"Task completed: {body.get('id')}")

213

214

except Exception as exc:

215

print(f"Task failed: {exc}")

216

message.reject(requeue=True)

217

218

def send_email(self, task):

219

# Email sending logic

220

print(f"Sending email: {task}")

221

222

def generate_report(self, task):

223

# Report generation logic

224

print(f"Generating report: {task}")

225

226

# Usage

227

if __name__ == '__main__':

228

# Define message routing

229

task_exchange = Exchange('tasks', type='direct', durable=True)

230

task_queue = Queue(

231

'task_queue',

232

exchange=task_exchange,

233

routing_key='task',

234

durable=True

235

)

236

237

# Create and run worker

238

with Connection('redis://localhost:6379/0') as conn:

239

worker = TaskWorker(conn, [task_queue])

240

try:

241

worker.run()

242

except KeyboardInterrupt:

243

print('Stopping worker...')

244

```

245

246

### Advanced ConsumerMixin with Error Handling

247

248

```python

249

from kombu import Connection, Queue, Exchange

250

from kombu.mixins import ConsumerMixin

251

import logging

252

import time

253

254

logging.basicConfig(level=logging.INFO)

255

logger = logging.getLogger(__name__)

256

257

class RobustWorker(ConsumerMixin):

258

def __init__(self, connection, queues):

259

self.connection = connection

260

self.queues = queues

261

self.processed_count = 0

262

self.error_count = 0

263

264

def get_consumers(self, Consumer, channel):

265

return [

266

Consumer(

267

queues=self.queues,

268

callbacks=[self.process_message],

269

prefetch_count=10,

270

accept=['json']

271

)

272

]

273

274

def process_message(self, body, message):

275

start_time = time.time()

276

277

try:

278

logger.info(f"Processing message: {body.get('id')}")

279

280

# Simulate variable processing time

281

processing_time = body.get('processing_time', 1.0)

282

time.sleep(processing_time)

283

284

# Simulate occasional failures

285

if body.get('should_fail', False):

286

raise ValueError("Simulated processing failure")

287

288

message.ack()

289

self.processed_count += 1

290

291

duration = time.time() - start_time

292

logger.info(f"Message processed in {duration:.2f}s")

293

294

except Exception as exc:

295

self.error_count += 1

296

logger.error(f"Processing failed: {exc}")

297

298

# Check retry count

299

retry_count = message.headers.get('x-retry-count', 0) if message.headers else 0

300

if retry_count < 3:

301

# Increment retry count and requeue

302

logger.info(f"Requeuing message (retry {retry_count + 1}/3)")

303

# Note: Headers manipulation depends on transport support

304

message.reject(requeue=True)

305

else:

306

# Max retries exceeded, reject permanently

307

logger.error(f"Max retries exceeded, rejecting message")

308

message.reject(requeue=False)

309

310

def on_connection_error(self, exc, interval):

311

logger.error(f"Connection error: {exc}, retrying in {interval}s")

312

313

def on_connection_revived(self):

314

logger.info("Connection re-established")

315

316

def on_consume_ready(self, connection, channel, consumers, **kwargs):

317

logger.info(f"Ready to consume from {len(self.queues)} queues")

318

319

def on_consume_end(self, connection, channel):

320

logger.info(f"Consumer stopped. Processed: {self.processed_count}, Errors: {self.error_count}")

321

322

def on_iteration(self):

323

# Log stats every 100 messages

324

if self.processed_count > 0 and self.processed_count % 100 == 0:

325

logger.info(f"Stats - Processed: {self.processed_count}, Errors: {self.error_count}")

326

327

def on_decode_error(self, message, exc):

328

logger.error(f"Message decode error: {exc}")

329

logger.error(f"Raw message: {message.body}")

330

message.reject(requeue=False)

331

332

# Usage with graceful shutdown

333

if __name__ == '__main__':

334

queue = Queue('robust_queue', durable=True)

335

336

with Connection('redis://localhost:6379/0') as conn:

337

worker = RobustWorker(conn, [queue])

338

339

try:

340

worker.run()

341

except KeyboardInterrupt:

342

logger.info('Received interrupt, stopping...')

343

worker.should_stop = True

344

```

345

346

### ConsumerProducerMixin Implementation

347

348

```python

349

from kombu import Connection, Queue, Exchange

350

from kombu.mixins import ConsumerProducerMixin

351

import json

352

import time

353

354

class RequestProcessor(ConsumerProducerMixin):

355

def __init__(self, connection, request_queue, response_exchange):

356

self.connection = connection

357

self.request_queue = request_queue

358

self.response_exchange = response_exchange

359

360

def get_consumers(self, Consumer, channel):

361

return [

362

Consumer(

363

queues=[self.request_queue],

364

callbacks=[self.process_request],

365

prefetch_count=5

366

)

367

]

368

369

def process_request(self, body, message):

370

request_id = body.get('id')

371

print(f"Processing request {request_id}")

372

373

try:

374

# Process the request

375

result = self.handle_request(body)

376

377

# Send response using separate producer connection

378

response = {

379

'request_id': request_id,

380

'status': 'success',

381

'result': result,

382

'processed_at': time.time()

383

}

384

385

# Publish response

386

self.producer.publish(

387

response,

388

exchange=self.response_exchange,

389

routing_key=body.get('reply_to', 'default'),

390

serializer='json'

391

)

392

393

message.ack()

394

print(f"Request {request_id} completed successfully")

395

396

except Exception as exc:

397

print(f"Request {request_id} failed: {exc}")

398

399

# Send error response

400

error_response = {

401

'request_id': request_id,

402

'status': 'error',

403

'error': str(exc),

404

'processed_at': time.time()

405

}

406

407

self.producer.publish(

408

error_response,

409

exchange=self.response_exchange,

410

routing_key=body.get('reply_to', 'errors'),

411

serializer='json'

412

)

413

414

message.ack() # Acknowledge even failed messages

415

416

def handle_request(self, request):

417

# Simulate request processing

418

request_type = request.get('type')

419

420

if request_type == 'calculation':

421

return {'result': request['a'] + request['b']}

422

elif request_type == 'lookup':

423

return {'data': f"Data for {request['key']}"}

424

else:

425

raise ValueError(f"Unknown request type: {request_type}")

426

427

# Usage

428

if __name__ == '__main__':

429

# Define routing

430

request_queue = Queue('requests', durable=True)

431

response_exchange = Exchange('responses', type='direct', durable=True)

432

433

with Connection('redis://localhost:6379/0') as conn:

434

processor = RequestProcessor(conn, request_queue, response_exchange)

435

436

try:

437

processor.run()

438

except KeyboardInterrupt:

439

print('Stopping processor...')

440

```

441

442

### Multi-Queue Consumer

443

444

```python

445

from kombu import Connection, Queue, Exchange

446

from kombu.mixins import ConsumerMixin

447

448

class MultiQueueWorker(ConsumerMixin):

449

def __init__(self, connection):

450

self.connection = connection

451

self.stats = {

452

'high_priority': 0,

453

'normal_priority': 0,

454

'low_priority': 0

455

}

456

457

def get_consumers(self, Consumer, channel):

458

# Define different priority queues

459

high_priority_queue = Queue('high_priority', durable=True)

460

normal_priority_queue = Queue('normal_priority', durable=True)

461

low_priority_queue = Queue('low_priority', durable=True)

462

463

return [

464

# High priority consumer with higher prefetch

465

Consumer(

466

queues=[high_priority_queue],

467

callbacks=[self.process_high_priority],

468

prefetch_count=20

469

),

470

# Normal priority consumer

471

Consumer(

472

queues=[normal_priority_queue],

473

callbacks=[self.process_normal_priority],

474

prefetch_count=10

475

),

476

# Low priority consumer with lower prefetch

477

Consumer(

478

queues=[low_priority_queue],

479

callbacks=[self.process_low_priority],

480

prefetch_count=5

481

)

482

]

483

484

def process_high_priority(self, body, message):

485

print(f"HIGH PRIORITY: {body}")

486

# Fast processing for high priority

487

self.stats['high_priority'] += 1

488

message.ack()

489

490

def process_normal_priority(self, body, message):

491

print(f"Normal priority: {body}")

492

# Standard processing

493

time.sleep(0.1) # Simulate work

494

self.stats['normal_priority'] += 1

495

message.ack()

496

497

def process_low_priority(self, body, message):

498

print(f"low priority: {body}")

499

# Slower processing for low priority

500

time.sleep(0.5) # Simulate slower work

501

self.stats['low_priority'] += 1

502

message.ack()

503

504

def on_iteration(self):

505

# Print stats periodically

506

total = sum(self.stats.values())

507

if total > 0 and total % 50 == 0:

508

print(f"Stats: {self.stats}")

509

510

# Usage

511

if __name__ == '__main__':

512

with Connection('redis://localhost:6379/0') as conn:

513

worker = MultiQueueWorker(conn)

514

515

try:

516

worker.run()

517

except KeyboardInterrupt:

518

print(f'Final stats: {worker.stats}')

519

```

520

521

### Mixin with Custom Context Management

522

523

```python

524

from kombu import Connection, Queue

525

from kombu.mixins import ConsumerMixin

526

from contextlib import contextmanager

527

import redis

528

import logging

529

530

class CacheIntegratedWorker(ConsumerMixin):

531

def __init__(self, connection, queues, redis_url):

532

self.connection = connection

533

self.queues = queues

534

self.redis_url = redis_url

535

self.redis_client = None

536

self.logger = logging.getLogger(__name__)

537

538

def get_consumers(self, Consumer, channel):

539

return [

540

Consumer(

541

queues=self.queues,

542

callbacks=[self.process_with_cache]

543

)

544

]

545

546

def process_with_cache(self, body, message):

547

# Use Redis cache in message processing

548

cache_key = f"task:{body.get('id')}"

549

550

# Check if already processed

551

if self.redis_client.get(cache_key):

552

self.logger.info(f"Task {body['id']} already processed, skipping")

553

message.ack()

554

return

555

556

try:

557

# Process the task

558

result = self.process_task(body)

559

560

# Cache the result

561

self.redis_client.setex(

562

cache_key,

563

3600, # 1 hour TTL

564

json.dumps(result)

565

)

566

567

message.ack()

568

self.logger.info(f"Task {body['id']} processed and cached")

569

570

except Exception as exc:

571

self.logger.error(f"Task processing failed: {exc}")

572

message.reject(requeue=True)

573

574

def process_task(self, task):

575

# Actual task processing logic

576

return {'processed': True, 'data': task}

577

578

@contextmanager

579

def redis_connection(self):

580

"""Context manager for Redis connection"""

581

client = redis.from_url(self.redis_url)

582

try:

583

yield client

584

finally:

585

client.close()

586

587

def extra_context(self, connection, channel):

588

"""Provide Redis connection as extra context"""

589

return self.redis_connection()

590

591

def on_consume_ready(self, connection, channel, consumers, **kwargs):

592

# Get Redis client from context

593

self.redis_client = kwargs.get('redis_client')

594

self.logger.info("Consumer ready with Redis integration")

595

596

# Usage would be:

597

# worker = CacheIntegratedWorker(conn, [queue], 'redis://localhost:6379/1')

598

```