0
# Consumer Mixins
1
2
Ready-to-use consumer frameworks that provide structured approaches for building consumer applications with connection management and error handling. The mixin classes offer a robust foundation for building long-running consumer services.
3
4
## Capabilities
5
6
### ConsumerMixin
7
8
Convenience mixin for implementing consumer programs with automatic connection management, error handling, and graceful shutdown support.
9
10
```python { .api }
11
class ConsumerMixin:
12
def get_consumers(self, Consumer, channel):
13
"""
14
Abstract method that must be implemented by subclasses.
15
16
Should return a list of Consumer instances.
17
18
Parameters:
19
- Consumer (class): Consumer class to instantiate
20
- channel: AMQP channel to use
21
22
Returns:
23
list: List of Consumer instances
24
25
Must be implemented by subclasses.
26
"""
27
28
def run(self, _tokens=1, **kwargs):
29
"""
30
Main run loop that handles connections and consumers.
31
32
Parameters:
33
- _tokens (int): Number of times to restart on connection failure
34
- **kwargs: Additional arguments passed to consume()
35
36
Returns:
37
None
38
"""
39
40
def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):
41
"""
42
Consume messages from queues.
43
44
Parameters:
45
- limit (int): Maximum number of messages to process
46
- timeout (float): Timeout for each drain_events call
47
- safety_interval (float): Sleep interval between iterations
48
- **kwargs: Additional consume arguments
49
50
Returns:
51
None
52
"""
53
54
def on_connection_error(self, exc, interval):
55
"""
56
Called when connection error occurs.
57
58
Parameters:
59
- exc (Exception): Connection exception
60
- interval (float): Sleep interval before retry
61
62
Override to customize error handling.
63
"""
64
65
def on_connection_revived(self):
66
"""
67
Called when connection is re-established after failure.
68
69
Override to perform cleanup or reinitialization.
70
"""
71
72
def on_consume_ready(self, connection, channel, consumers, **kwargs):
73
"""
74
Called when consumers are ready to process messages.
75
76
Parameters:
77
- connection: Connection instance
78
- channel: AMQP channel
79
- consumers (list): List of Consumer instances
80
- **kwargs: Additional context
81
82
Override to perform setup before message processing.
83
"""
84
85
def on_consume_end(self, connection, channel):
86
"""
87
Called when consume loop ends.
88
89
Parameters:
90
- connection: Connection instance
91
- channel: AMQP channel
92
93
Override to perform cleanup after message processing.
94
"""
95
96
def on_iteration(self):
97
"""
98
Called on each iteration of the consume loop.
99
100
Override to perform periodic tasks.
101
"""
102
103
def on_decode_error(self, message, exc):
104
"""
105
Called when message decode error occurs.
106
107
Parameters:
108
- message (Message): Message that failed to decode
109
- exc (Exception): Decode exception
110
111
Override to handle decode errors. Default rejects message.
112
"""
113
114
def extra_context(self, connection, channel):
115
"""
116
Extra context manager for consume loop.
117
118
Parameters:
119
- connection: Connection instance
120
- channel: AMQP channel
121
122
Returns:
123
Context manager or None
124
125
Override to provide additional context management.
126
"""
127
128
# Properties and attributes
129
@property
130
def connect_max_retries(self):
131
"""int: Maximum connection retry attempts (default: 5)"""
132
133
@property
134
def should_stop(self):
135
"""bool: Flag to stop the consumer (set to True to stop)"""
136
```
137
138
### ConsumerProducerMixin
139
140
Consumer and Producer mixin that provides separate producer connection for publishing messages while consuming, preventing deadlocks and improving performance.
141
142
```python { .api }
143
class ConsumerProducerMixin(ConsumerMixin):
144
def get_consumers(self, Consumer, channel):
145
"""
146
Abstract method inherited from ConsumerMixin.
147
148
Must be implemented by subclasses.
149
"""
150
151
@property
152
def producer(self):
153
"""
154
Producer instance with separate connection.
155
156
Returns:
157
Producer: Producer for publishing messages
158
159
Automatically creates and manages separate connection.
160
"""
161
162
@property
163
def producer_connection(self):
164
"""
165
Separate connection for producer.
166
167
Returns:
168
Connection: Producer connection instance
169
170
Automatically created and managed.
171
"""
172
173
# Inherits all other methods from ConsumerMixin
174
```
175
176
## Usage Examples
177
178
### Basic ConsumerMixin Implementation
179
180
```python
181
from kombu import Connection, Queue, Exchange
182
from kombu.mixins import ConsumerMixin
183
184
class TaskWorker(ConsumerMixin):
185
def __init__(self, connection, queues):
186
self.connection = connection
187
self.queues = queues
188
189
def get_consumers(self, Consumer, channel):
190
return [
191
Consumer(
192
queues=self.queues,
193
callbacks=[self.process_message],
194
accept=['json', 'pickle']
195
)
196
]
197
198
def process_message(self, body, message):
199
print(f"Processing task: {body}")
200
201
try:
202
# Simulate work
203
task_type = body.get('type')
204
if task_type == 'email':
205
self.send_email(body)
206
elif task_type == 'report':
207
self.generate_report(body)
208
else:
209
print(f"Unknown task type: {task_type}")
210
211
message.ack()
212
print(f"Task completed: {body.get('id')}")
213
214
except Exception as exc:
215
print(f"Task failed: {exc}")
216
message.reject(requeue=True)
217
218
def send_email(self, task):
219
# Email sending logic
220
print(f"Sending email: {task}")
221
222
def generate_report(self, task):
223
# Report generation logic
224
print(f"Generating report: {task}")
225
226
# Usage
227
if __name__ == '__main__':
228
# Define message routing
229
task_exchange = Exchange('tasks', type='direct', durable=True)
230
task_queue = Queue(
231
'task_queue',
232
exchange=task_exchange,
233
routing_key='task',
234
durable=True
235
)
236
237
# Create and run worker
238
with Connection('redis://localhost:6379/0') as conn:
239
worker = TaskWorker(conn, [task_queue])
240
try:
241
worker.run()
242
except KeyboardInterrupt:
243
print('Stopping worker...')
244
```
245
246
### Advanced ConsumerMixin with Error Handling
247
248
```python
249
from kombu import Connection, Queue, Exchange
250
from kombu.mixins import ConsumerMixin
251
import logging
252
import time
253
254
logging.basicConfig(level=logging.INFO)
255
logger = logging.getLogger(__name__)
256
257
class RobustWorker(ConsumerMixin):
258
def __init__(self, connection, queues):
259
self.connection = connection
260
self.queues = queues
261
self.processed_count = 0
262
self.error_count = 0
263
264
def get_consumers(self, Consumer, channel):
265
return [
266
Consumer(
267
queues=self.queues,
268
callbacks=[self.process_message],
269
prefetch_count=10,
270
accept=['json']
271
)
272
]
273
274
def process_message(self, body, message):
275
start_time = time.time()
276
277
try:
278
logger.info(f"Processing message: {body.get('id')}")
279
280
# Simulate variable processing time
281
processing_time = body.get('processing_time', 1.0)
282
time.sleep(processing_time)
283
284
# Simulate occasional failures
285
if body.get('should_fail', False):
286
raise ValueError("Simulated processing failure")
287
288
message.ack()
289
self.processed_count += 1
290
291
duration = time.time() - start_time
292
logger.info(f"Message processed in {duration:.2f}s")
293
294
except Exception as exc:
295
self.error_count += 1
296
logger.error(f"Processing failed: {exc}")
297
298
# Check retry count
299
retry_count = message.headers.get('x-retry-count', 0) if message.headers else 0
300
if retry_count < 3:
301
# Increment retry count and requeue
302
logger.info(f"Requeuing message (retry {retry_count + 1}/3)")
303
# Note: Headers manipulation depends on transport support
304
message.reject(requeue=True)
305
else:
306
# Max retries exceeded, reject permanently
307
logger.error(f"Max retries exceeded, rejecting message")
308
message.reject(requeue=False)
309
310
def on_connection_error(self, exc, interval):
311
logger.error(f"Connection error: {exc}, retrying in {interval}s")
312
313
def on_connection_revived(self):
314
logger.info("Connection re-established")
315
316
def on_consume_ready(self, connection, channel, consumers, **kwargs):
317
logger.info(f"Ready to consume from {len(self.queues)} queues")
318
319
def on_consume_end(self, connection, channel):
320
logger.info(f"Consumer stopped. Processed: {self.processed_count}, Errors: {self.error_count}")
321
322
def on_iteration(self):
323
# Log stats every 100 messages
324
if self.processed_count > 0 and self.processed_count % 100 == 0:
325
logger.info(f"Stats - Processed: {self.processed_count}, Errors: {self.error_count}")
326
327
def on_decode_error(self, message, exc):
328
logger.error(f"Message decode error: {exc}")
329
logger.error(f"Raw message: {message.body}")
330
message.reject(requeue=False)
331
332
# Usage with graceful shutdown
333
if __name__ == '__main__':
334
queue = Queue('robust_queue', durable=True)
335
336
with Connection('redis://localhost:6379/0') as conn:
337
worker = RobustWorker(conn, [queue])
338
339
try:
340
worker.run()
341
except KeyboardInterrupt:
342
logger.info('Received interrupt, stopping...')
343
worker.should_stop = True
344
```
345
346
### ConsumerProducerMixin Implementation
347
348
```python
349
from kombu import Connection, Queue, Exchange
350
from kombu.mixins import ConsumerProducerMixin
351
import json
352
import time
353
354
class RequestProcessor(ConsumerProducerMixin):
355
def __init__(self, connection, request_queue, response_exchange):
356
self.connection = connection
357
self.request_queue = request_queue
358
self.response_exchange = response_exchange
359
360
def get_consumers(self, Consumer, channel):
361
return [
362
Consumer(
363
queues=[self.request_queue],
364
callbacks=[self.process_request],
365
prefetch_count=5
366
)
367
]
368
369
def process_request(self, body, message):
370
request_id = body.get('id')
371
print(f"Processing request {request_id}")
372
373
try:
374
# Process the request
375
result = self.handle_request(body)
376
377
# Send response using separate producer connection
378
response = {
379
'request_id': request_id,
380
'status': 'success',
381
'result': result,
382
'processed_at': time.time()
383
}
384
385
# Publish response
386
self.producer.publish(
387
response,
388
exchange=self.response_exchange,
389
routing_key=body.get('reply_to', 'default'),
390
serializer='json'
391
)
392
393
message.ack()
394
print(f"Request {request_id} completed successfully")
395
396
except Exception as exc:
397
print(f"Request {request_id} failed: {exc}")
398
399
# Send error response
400
error_response = {
401
'request_id': request_id,
402
'status': 'error',
403
'error': str(exc),
404
'processed_at': time.time()
405
}
406
407
self.producer.publish(
408
error_response,
409
exchange=self.response_exchange,
410
routing_key=body.get('reply_to', 'errors'),
411
serializer='json'
412
)
413
414
message.ack() # Acknowledge even failed messages
415
416
def handle_request(self, request):
417
# Simulate request processing
418
request_type = request.get('type')
419
420
if request_type == 'calculation':
421
return {'result': request['a'] + request['b']}
422
elif request_type == 'lookup':
423
return {'data': f"Data for {request['key']}"}
424
else:
425
raise ValueError(f"Unknown request type: {request_type}")
426
427
# Usage
428
if __name__ == '__main__':
429
# Define routing
430
request_queue = Queue('requests', durable=True)
431
response_exchange = Exchange('responses', type='direct', durable=True)
432
433
with Connection('redis://localhost:6379/0') as conn:
434
processor = RequestProcessor(conn, request_queue, response_exchange)
435
436
try:
437
processor.run()
438
except KeyboardInterrupt:
439
print('Stopping processor...')
440
```
441
442
### Multi-Queue Consumer
443
444
```python
445
from kombu import Connection, Queue, Exchange
446
from kombu.mixins import ConsumerMixin
447
448
class MultiQueueWorker(ConsumerMixin):
449
def __init__(self, connection):
450
self.connection = connection
451
self.stats = {
452
'high_priority': 0,
453
'normal_priority': 0,
454
'low_priority': 0
455
}
456
457
def get_consumers(self, Consumer, channel):
458
# Define different priority queues
459
high_priority_queue = Queue('high_priority', durable=True)
460
normal_priority_queue = Queue('normal_priority', durable=True)
461
low_priority_queue = Queue('low_priority', durable=True)
462
463
return [
464
# High priority consumer with higher prefetch
465
Consumer(
466
queues=[high_priority_queue],
467
callbacks=[self.process_high_priority],
468
prefetch_count=20
469
),
470
# Normal priority consumer
471
Consumer(
472
queues=[normal_priority_queue],
473
callbacks=[self.process_normal_priority],
474
prefetch_count=10
475
),
476
# Low priority consumer with lower prefetch
477
Consumer(
478
queues=[low_priority_queue],
479
callbacks=[self.process_low_priority],
480
prefetch_count=5
481
)
482
]
483
484
def process_high_priority(self, body, message):
485
print(f"HIGH PRIORITY: {body}")
486
# Fast processing for high priority
487
self.stats['high_priority'] += 1
488
message.ack()
489
490
def process_normal_priority(self, body, message):
491
print(f"Normal priority: {body}")
492
# Standard processing
493
time.sleep(0.1) # Simulate work
494
self.stats['normal_priority'] += 1
495
message.ack()
496
497
def process_low_priority(self, body, message):
498
print(f"low priority: {body}")
499
# Slower processing for low priority
500
time.sleep(0.5) # Simulate slower work
501
self.stats['low_priority'] += 1
502
message.ack()
503
504
def on_iteration(self):
505
# Print stats periodically
506
total = sum(self.stats.values())
507
if total > 0 and total % 50 == 0:
508
print(f"Stats: {self.stats}")
509
510
# Usage
511
if __name__ == '__main__':
512
with Connection('redis://localhost:6379/0') as conn:
513
worker = MultiQueueWorker(conn)
514
515
try:
516
worker.run()
517
except KeyboardInterrupt:
518
print(f'Final stats: {worker.stats}')
519
```
520
521
### Mixin with Custom Context Management
522
523
```python
524
from kombu import Connection, Queue
525
from kombu.mixins import ConsumerMixin
526
from contextlib import contextmanager
527
import redis
528
import logging
529
530
class CacheIntegratedWorker(ConsumerMixin):
531
def __init__(self, connection, queues, redis_url):
532
self.connection = connection
533
self.queues = queues
534
self.redis_url = redis_url
535
self.redis_client = None
536
self.logger = logging.getLogger(__name__)
537
538
def get_consumers(self, Consumer, channel):
539
return [
540
Consumer(
541
queues=self.queues,
542
callbacks=[self.process_with_cache]
543
)
544
]
545
546
def process_with_cache(self, body, message):
547
# Use Redis cache in message processing
548
cache_key = f"task:{body.get('id')}"
549
550
# Check if already processed
551
if self.redis_client.get(cache_key):
552
self.logger.info(f"Task {body['id']} already processed, skipping")
553
message.ack()
554
return
555
556
try:
557
# Process the task
558
result = self.process_task(body)
559
560
# Cache the result
561
self.redis_client.setex(
562
cache_key,
563
3600, # 1 hour TTL
564
json.dumps(result)
565
)
566
567
message.ack()
568
self.logger.info(f"Task {body['id']} processed and cached")
569
570
except Exception as exc:
571
self.logger.error(f"Task processing failed: {exc}")
572
message.reject(requeue=True)
573
574
def process_task(self, task):
575
# Actual task processing logic
576
return {'processed': True, 'data': task}
577
578
@contextmanager
579
def redis_connection(self):
580
"""Context manager for Redis connection"""
581
client = redis.from_url(self.redis_url)
582
try:
583
yield client
584
finally:
585
client.close()
586
587
def extra_context(self, connection, channel):
588
"""Provide Redis connection as extra context"""
589
return self.redis_connection()
590
591
def on_consume_ready(self, connection, channel, consumers, **kwargs):
592
# Get Redis client from context
593
self.redis_client = kwargs.get('redis_client')
594
self.logger.info("Consumer ready with Redis integration")
595
596
# Usage would be:
597
# worker = CacheIntegratedWorker(conn, [queue], 'redis://localhost:6379/1')
598
```