0
# Simple Interface
1
2
Queue-like API for simple use cases that provides an easy-to-use interface similar to Python's queue module. The simple interface abstracts away the complexity of AMQP entities and provides a straightforward way to send and receive messages.
3
4
## Capabilities
5
6
### SimpleQueue
7
8
Simple API for persistent queues that provides a high-level, queue-like interface for message passing.
9
10
```python { .api }
11
class SimpleQueue:
12
def __init__(self, channel, name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, serializer=None, compression=None, accept=None, **kwargs):
13
"""
14
Create simple persistent queue.
15
16
Parameters:
17
- channel: AMQP channel to use
18
- name (str): Queue name
19
- no_ack (bool): Disable acknowledgments (None=default, False for persistence)
20
- queue_opts (dict): Additional queue options (durable, exclusive, etc.)
21
- queue_args (dict): Queue declaration arguments
22
- exchange_opts (dict): Additional exchange options
23
- serializer (str): Default serialization method
24
- compression (str): Default compression method
25
- accept (list): Accepted content types
26
- **kwargs: Additional options
27
"""
28
29
def get(self, block=True, timeout=None):
30
"""
31
Get message from queue.
32
33
Parameters:
34
- block (bool): Block if queue is empty (default True)
35
- timeout (float): Timeout in seconds for blocking get
36
37
Returns:
38
Decoded message body
39
40
Raises:
41
Empty: If queue is empty and block=False or timeout exceeded
42
"""
43
44
def get_nowait(self):
45
"""
46
Get message without blocking.
47
48
Returns:
49
Decoded message body
50
51
Raises:
52
Empty: If queue is empty
53
"""
54
55
def put(self, message, serializer=None, headers=None, compression=None, routing_key=None, **kwargs):
56
"""
57
Put message into queue.
58
59
Parameters:
60
- message: Message body to send
61
- serializer (str): Serialization method override
62
- headers (dict): Message headers
63
- compression (str): Compression method override
64
- routing_key (str): Routing key override
65
- **kwargs: Additional publish parameters
66
"""
67
68
def clear(self):
69
"""
70
Clear all messages from queue.
71
72
Returns:
73
int: Number of messages cleared
74
"""
75
76
def qsize(self):
77
"""
78
Get approximate queue size.
79
80
Returns:
81
int: Number of messages in queue (approximate)
82
83
Note:
84
Not all transports support this operation
85
"""
86
87
def close(self):
88
"""Close queue and cleanup resources."""
89
90
# Properties
91
@property
92
def Empty(self):
93
"""Exception class raised when queue is empty"""
94
95
@property
96
def no_ack(self):
97
"""bool: Auto-acknowledgment flag"""
98
```
99
100
### SimpleBuffer
101
102
Simple API for ephemeral queues that provides a high-level interface for temporary message passing with automatic cleanup.
103
104
```python { .api }
105
class SimpleBuffer:
106
def __init__(self, channel, name, no_ack=True, queue_opts=None, queue_args=None, exchange_opts=None, serializer=None, compression=None, accept=None, **kwargs):
107
"""
108
Create simple ephemeral queue.
109
110
Parameters:
111
- channel: AMQP channel to use
112
- name (str): Queue name
113
- no_ack (bool): Disable acknowledgments (default True for performance)
114
- queue_opts (dict): Additional queue options (auto_delete=True, durable=False by default)
115
- queue_args (dict): Queue declaration arguments
116
- exchange_opts (dict): Additional exchange options
117
- serializer (str): Default serialization method
118
- compression (str): Default compression method
119
- accept (list): Accepted content types
120
- **kwargs: Additional options
121
"""
122
123
# Inherits all methods from SimpleQueue
124
def get(self, block=True, timeout=None):
125
"""Get message from buffer (same as SimpleQueue.get)"""
126
127
def get_nowait(self):
128
"""Get message without blocking (same as SimpleQueue.get_nowait)"""
129
130
def put(self, message, serializer=None, headers=None, compression=None, routing_key=None, **kwargs):
131
"""Put message into buffer (same as SimpleQueue.put)"""
132
133
def clear(self):
134
"""Clear all messages from buffer (same as SimpleQueue.clear)"""
135
136
def qsize(self):
137
"""Get approximate buffer size (same as SimpleQueue.qsize)"""
138
139
def close(self):
140
"""Close buffer and cleanup resources (same as SimpleQueue.close)"""
141
142
# Properties
143
@property
144
def Empty(self):
145
"""Exception class raised when buffer is empty"""
146
147
@property
148
def no_ack(self):
149
"""bool: Auto-acknowledgment flag (True by default)"""
150
```
151
152
## Usage Examples
153
154
### Basic SimpleQueue Usage
155
156
```python
157
from kombu import Connection
158
159
# Connect and create simple queue
160
with Connection('redis://localhost:6379/0') as conn:
161
# Create persistent queue
162
queue = conn.SimpleQueue('task_queue')
163
164
# Send messages
165
queue.put({'task': 'process_data', 'id': 1})
166
queue.put({'task': 'send_email', 'id': 2})
167
queue.put({'task': 'generate_report', 'id': 3})
168
169
# Receive messages
170
while True:
171
try:
172
message = queue.get(timeout=5.0)
173
print(f"Processing: {message}")
174
175
# Simulate work
176
if message['task'] == 'process_data':
177
print(f"Processing data for task {message['id']}")
178
elif message['task'] == 'send_email':
179
print(f"Sending email for task {message['id']}")
180
elif message['task'] == 'generate_report':
181
print(f"Generating report for task {message['id']}")
182
183
except queue.Empty:
184
print("No more messages")
185
break
186
187
queue.close()
188
```
189
190
### Non-blocking Queue Operations
191
192
```python
193
from kombu import Connection
194
195
with Connection('redis://localhost:6379/0') as conn:
196
queue = conn.SimpleQueue('work_queue')
197
198
# Send some messages
199
for i in range(5):
200
queue.put(f'Message {i}')
201
202
# Process messages without blocking
203
processed = 0
204
while True:
205
try:
206
message = queue.get_nowait()
207
print(f"Got: {message}")
208
processed += 1
209
except queue.Empty:
210
print(f"Queue empty, processed {processed} messages")
211
break
212
213
queue.close()
214
```
215
216
### SimpleBuffer for Temporary Communication
217
218
```python
219
from kombu import Connection
220
import threading
221
import time
222
223
def producer(conn, buffer_name):
224
"""Producer function"""
225
buffer = conn.SimpleBuffer(buffer_name)
226
227
for i in range(10):
228
message = f'Temp message {i}'
229
buffer.put(message)
230
print(f"Sent: {message}")
231
time.sleep(0.1)
232
233
buffer.close()
234
235
def consumer(conn, buffer_name):
236
"""Consumer function"""
237
buffer = conn.SimpleBuffer(buffer_name)
238
239
while True:
240
try:
241
message = buffer.get(timeout=2.0)
242
print(f"Received: {message}")
243
except buffer.Empty:
244
print("Buffer empty, stopping consumer")
245
break
246
247
buffer.close()
248
249
# Use SimpleBuffer for temporary communication
250
with Connection('redis://localhost:6379/0') as conn:
251
buffer_name = 'temp_communication'
252
253
# Start producer and consumer in separate threads
254
producer_thread = threading.Thread(target=producer, args=(conn, buffer_name))
255
consumer_thread = threading.Thread(target=consumer, args=(conn, buffer_name))
256
257
producer_thread.start()
258
consumer_thread.start()
259
260
producer_thread.join()
261
consumer_thread.join()
262
```
263
264
### Queue Management Operations
265
266
```python
267
from kombu import Connection
268
269
with Connection('amqp://localhost') as conn:
270
queue = conn.SimpleQueue('management_queue')
271
272
# Add several messages
273
for i in range(100):
274
queue.put(f'Message {i}')
275
276
# Check queue size (if supported by transport)
277
try:
278
size = queue.qsize()
279
print(f"Queue has approximately {size} messages")
280
except NotImplementedError:
281
print("Queue size checking not supported by this transport")
282
283
# Process first 10 messages
284
for i in range(10):
285
try:
286
message = queue.get_nowait()
287
print(f"Processed: {message}")
288
except queue.Empty:
289
break
290
291
# Clear remaining messages
292
cleared = queue.clear()
293
print(f"Cleared {cleared} remaining messages")
294
295
queue.close()
296
```
297
298
### Serialization and Compression
299
300
```python
301
from kombu import Connection
302
import json
303
import pickle
304
305
with Connection('redis://localhost:6379/0') as conn:
306
# Queue with JSON serialization
307
json_queue = conn.SimpleQueue('json_queue', serializer='json')
308
309
# Send complex data structure
310
data = {
311
'user_id': 12345,
312
'action': 'purchase',
313
'items': [
314
{'id': 1, 'name': 'Widget', 'price': 9.99},
315
{'id': 2, 'name': 'Gadget', 'price': 19.99}
316
],
317
'total': 29.98
318
}
319
320
json_queue.put(data)
321
received = json_queue.get()
322
print(f"JSON data: {received}")
323
324
# Queue with pickle serialization and compression
325
binary_queue = conn.SimpleQueue(
326
'binary_queue',
327
serializer='pickle',
328
compression='gzip'
329
)
330
331
# Send binary data
332
binary_data = {
333
'large_list': list(range(1000)),
334
'nested_dict': {'level1': {'level2': {'level3': 'deep_value'}}}
335
}
336
337
binary_queue.put(binary_data)
338
received_binary = binary_queue.get()
339
print(f"Binary data received: {len(received_binary['large_list'])} items")
340
341
json_queue.close()
342
binary_queue.close()
343
```
344
345
### Message Headers and Properties
346
347
```python
348
from kombu import Connection
349
import time
350
351
with Connection('redis://localhost:6379/0') as conn:
352
queue = conn.SimpleQueue('header_queue')
353
354
# Send message with custom headers
355
queue.put(
356
{'task': 'important_work'},
357
headers={
358
'priority': 'high',
359
'created_by': 'worker_service',
360
'timestamp': time.time(),
361
'retry_count': 0
362
}
363
)
364
365
# The headers are automatically included with the message
366
# but access depends on the underlying implementation
367
message = queue.get()
368
print(f"Received: {message}")
369
370
queue.close()
371
```
372
373
### Error Handling with Simple Interface
374
375
```python
376
from kombu import Connection
377
import socket
378
379
def robust_queue_processing(queue_name, conn_url):
380
"""Robust queue processing with error handling"""
381
382
try:
383
with Connection(conn_url) as conn:
384
queue = conn.SimpleQueue(queue_name)
385
386
while True:
387
try:
388
# Try to get message with timeout
389
message = queue.get(timeout=30.0)
390
391
# Process message
392
print(f"Processing: {message}")
393
394
# Simulate processing that might fail
395
if message.get('should_fail'):
396
raise ValueError("Simulated processing error")
397
398
print("Processing completed successfully")
399
400
except queue.Empty:
401
print("No messages received in 30 seconds, continuing...")
402
continue
403
404
except ValueError as e:
405
print(f"Processing error: {e}")
406
# With SimpleQueue, failed messages are lost unless
407
# you implement your own retry mechanism
408
continue
409
410
except KeyboardInterrupt:
411
print("Shutting down gracefully...")
412
break
413
414
queue.close()
415
416
except socket.error as e:
417
print(f"Connection error: {e}")
418
except Exception as e:
419
print(f"Unexpected error: {e}")
420
421
# Usage
422
robust_queue_processing('work_queue', 'redis://localhost:6379/0')
423
```
424
425
### Producer-Consumer Pattern
426
427
```python
428
from kombu import Connection
429
import threading
430
import time
431
import random
432
433
class TaskProducer:
434
def __init__(self, conn, queue_name):
435
self.queue = conn.SimpleQueue(queue_name)
436
self.running = True
437
438
def produce_tasks(self):
439
"""Produce tasks continuously"""
440
task_id = 0
441
while self.running:
442
task = {
443
'id': task_id,
444
'type': random.choice(['email', 'report', 'cleanup']),
445
'created_at': time.time()
446
}
447
448
self.queue.put(task)
449
print(f"Produced task {task_id}: {task['type']}")
450
451
task_id += 1
452
time.sleep(random.uniform(0.5, 2.0))
453
454
def stop(self):
455
self.running = False
456
self.queue.close()
457
458
class TaskConsumer:
459
def __init__(self, conn, queue_name, consumer_id):
460
self.queue = conn.SimpleQueue(queue_name)
461
self.consumer_id = consumer_id
462
self.running = True
463
464
def consume_tasks(self):
465
"""Consume tasks continuously"""
466
while self.running:
467
try:
468
task = self.queue.get(timeout=1.0)
469
470
# Simulate processing time
471
processing_time = random.uniform(0.1, 1.0)
472
print(f"Consumer {self.consumer_id} processing task {task['id']}")
473
time.sleep(processing_time)
474
475
print(f"Consumer {self.consumer_id} completed task {task['id']}")
476
477
except self.queue.Empty:
478
continue
479
except KeyboardInterrupt:
480
break
481
482
def stop(self):
483
self.running = False
484
self.queue.close()
485
486
# Run producer-consumer system
487
with Connection('redis://localhost:6379/0') as conn:
488
queue_name = 'task_processing'
489
490
# Create producer and consumers
491
producer = TaskProducer(conn, queue_name)
492
consumers = [
493
TaskConsumer(conn, queue_name, i)
494
for i in range(3) # 3 consumer workers
495
]
496
497
# Start threads
498
producer_thread = threading.Thread(target=producer.produce_tasks)
499
consumer_threads = [
500
threading.Thread(target=consumer.consume_tasks)
501
for consumer in consumers
502
]
503
504
producer_thread.start()
505
for thread in consumer_threads:
506
thread.start()
507
508
try:
509
# Run for 30 seconds
510
time.sleep(30)
511
except KeyboardInterrupt:
512
pass
513
514
# Graceful shutdown
515
producer.stop()
516
for consumer in consumers:
517
consumer.stop()
518
519
producer_thread.join()
520
for thread in consumer_threads:
521
thread.join()
522
523
print("All threads stopped")
524
```