0
# Channel Operations
1
2
Channel-based message operations including publishing, consuming, queue and exchange management, and transaction support with comprehensive callback handling for AMQP messaging.
3
4
## Capabilities
5
6
### Channel Management
7
8
Basic channel lifecycle and flow control operations.
9
10
```python { .api }
11
class BlockingChannel:
12
"""Synchronous channel for message operations."""
13
14
def close(self, reply_code=200, reply_text='Normal shutdown'):
15
"""
16
Close the channel.
17
18
Parameters:
19
- reply_code (int): AMQP reply code (default: 200)
20
- reply_text (str): Human-readable close reason
21
"""
22
23
def flow(self, active):
24
"""
25
Enable or disable message flow.
26
27
Parameters:
28
- active (bool): True to enable flow, False to disable
29
30
Returns:
31
- bool: Current flow state
32
"""
33
34
def add_on_cancel_callback(self, callback):
35
"""
36
Add callback for consumer cancellation.
37
38
Parameters:
39
- callback (callable): Function called when consumer is cancelled
40
"""
41
42
def add_on_return_callback(self, callback):
43
"""
44
Add callback for returned messages.
45
46
Parameters:
47
- callback (callable): Function called with (channel, method, properties, body)
48
"""
49
50
# Channel properties
51
@property
52
def channel_number(self) -> int:
53
"""Channel number."""
54
55
@property
56
def connection(self):
57
"""Parent connection instance."""
58
59
@property
60
def is_closed(self) -> bool:
61
"""True if channel is closed."""
62
63
@property
64
def is_open(self) -> bool:
65
"""True if channel is open."""
66
67
@property
68
def consumer_tags(self) -> set:
69
"""Set of active consumer tags."""
70
```
71
72
### Message Publishing
73
74
Publish messages to exchanges with routing keys and properties.
75
76
```python { .api }
77
def basic_publish(self, exchange, routing_key, body, properties=None, mandatory=False):
78
"""
79
Publish a message.
80
81
Parameters:
82
- exchange (str): Exchange name (empty string for default exchange)
83
- routing_key (str): Routing key for message routing
84
- body (bytes or str): Message body
85
- properties (BasicProperties): Message properties
86
- mandatory (bool): If True, message must be routable or returned
87
88
Returns:
89
- bool: True if message was published (or raises exception)
90
"""
91
```
92
93
### Message Consuming
94
95
Consume messages from queues with callback-based processing.
96
97
```python { .api }
98
def basic_consume(self, queue, on_message_callback, auto_ack=False, exclusive=False,
99
consumer_tag=None, arguments=None):
100
"""
101
Start consuming messages from queue.
102
103
Parameters:
104
- queue (str): Queue name to consume from
105
- on_message_callback (callable): Function called for each message (ch, method, properties, body)
106
- auto_ack (bool): If True, automatically acknowledge messages
107
- exclusive (bool): If True, only this consumer can access the queue
108
- consumer_tag (str): Consumer identifier (auto-generated if None)
109
- arguments (dict): Additional arguments for consume
110
111
Returns:
112
- str: Consumer tag
113
"""
114
115
def basic_cancel(self, consumer_tag):
116
"""
117
Cancel a message consumer.
118
119
Parameters:
120
- consumer_tag (str): Consumer tag to cancel
121
122
Returns:
123
- str: Cancelled consumer tag
124
"""
125
126
def start_consuming(self):
127
"""Start consuming messages (blocking loop)."""
128
129
def stop_consuming(self, consumer_tag=None):
130
"""
131
Stop consuming messages.
132
133
Parameters:
134
- consumer_tag (str, optional): Specific consumer to stop (all if None)
135
"""
136
137
def consume(self, queue, no_ack=False, exclusive=False, arguments=None):
138
"""
139
Generator-based message consumption.
140
141
Parameters:
142
- queue (str): Queue name to consume from
143
- no_ack (bool): If True, don't require acknowledgments
144
- exclusive (bool): If True, exclusive access to queue
145
- arguments (dict): Additional consume arguments
146
147
Yields:
148
- tuple: (method, properties, body) for each message
149
"""
150
```
151
152
### Message Acknowledgment
153
154
Acknowledge, reject, or recover messages for reliable delivery.
155
156
```python { .api }
157
def basic_ack(self, delivery_tag, multiple=False):
158
"""
159
Acknowledge message delivery.
160
161
Parameters:
162
- delivery_tag (int): Delivery tag of message to acknowledge
163
- multiple (bool): If True, acknowledge all messages up to delivery_tag
164
"""
165
166
def basic_nack(self, delivery_tag, multiple=False, requeue=True):
167
"""
168
Negative acknowledgment of message delivery.
169
170
Parameters:
171
- delivery_tag (int): Delivery tag of message to nack
172
- multiple (bool): If True, nack all messages up to delivery_tag
173
- requeue (bool): If True, requeue the message(s)
174
"""
175
176
def basic_reject(self, delivery_tag, requeue=True):
177
"""
178
Reject a single message.
179
180
Parameters:
181
- delivery_tag (int): Delivery tag of message to reject
182
- requeue (bool): If True, requeue the message
183
"""
184
185
def basic_recover(self, requeue=True):
186
"""
187
Recover unacknowledged messages.
188
189
Parameters:
190
- requeue (bool): If True, requeue unacknowledged messages
191
"""
192
```
193
194
### Single Message Retrieval
195
196
Get individual messages from queues without setting up consumers.
197
198
```python { .api }
199
def basic_get(self, queue, auto_ack=False):
200
"""
201
Get a single message from queue.
202
203
Parameters:
204
- queue (str): Queue name to get message from
205
- auto_ack (bool): If True, automatically acknowledge message
206
207
Returns:
208
- tuple or None: (method, properties, body) if message available, None otherwise
209
"""
210
```
211
212
### Quality of Service
213
214
Control message delivery rate and prefetch behavior.
215
216
```python { .api }
217
def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):
218
"""
219
Set quality of service parameters.
220
221
Parameters:
222
- prefetch_size (int): Prefetch window size in bytes (0 = no limit)
223
- prefetch_count (int): Number of messages to prefetch (0 = no limit)
224
- global_qos (bool): If True, apply QoS globally on connection
225
"""
226
```
227
228
### Queue Operations
229
230
Declare, delete, purge, and bind queues.
231
232
```python { .api }
233
def queue_declare(self, queue='', passive=False, durable=False, exclusive=False,
234
auto_delete=False, arguments=None):
235
"""
236
Declare a queue.
237
238
Parameters:
239
- queue (str): Queue name (empty string for server-generated name)
240
- passive (bool): If True, only check if queue exists
241
- durable (bool): If True, queue survives broker restart
242
- exclusive (bool): If True, queue is exclusive to this connection
243
- auto_delete (bool): If True, queue deletes when last consumer disconnects
244
- arguments (dict): Additional queue arguments
245
246
Returns:
247
- QueueDeclareOk: Result with queue name, message count, consumer count
248
"""
249
250
def queue_delete(self, queue, if_unused=False, if_empty=False):
251
"""
252
Delete a queue.
253
254
Parameters:
255
- queue (str): Queue name to delete
256
- if_unused (bool): If True, only delete if no consumers
257
- if_empty (bool): If True, only delete if no messages
258
259
Returns:
260
- QueueDeleteOk: Result with message count
261
"""
262
263
def queue_purge(self, queue):
264
"""
265
Purge messages from queue.
266
267
Parameters:
268
- queue (str): Queue name to purge
269
270
Returns:
271
- QueuePurgeOk: Result with purged message count
272
"""
273
274
def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
275
"""
276
Bind queue to exchange.
277
278
Parameters:
279
- queue (str): Queue name to bind
280
- exchange (str): Exchange name to bind to
281
- routing_key (str): Routing key for binding
282
- arguments (dict): Additional binding arguments
283
"""
284
285
def queue_unbind(self, queue, exchange, routing_key=None, arguments=None):
286
"""
287
Unbind queue from exchange.
288
289
Parameters:
290
- queue (str): Queue name to unbind
291
- exchange (str): Exchange name to unbind from
292
- routing_key (str): Routing key for binding
293
- arguments (dict): Additional binding arguments
294
"""
295
```
296
297
### Exchange Operations
298
299
Declare, delete, and bind exchanges.
300
301
```python { .api }
302
def exchange_declare(self, exchange, exchange_type='direct', passive=False,
303
durable=False, auto_delete=False, internal=False, arguments=None):
304
"""
305
Declare an exchange.
306
307
Parameters:
308
- exchange (str): Exchange name
309
- exchange_type (str): Exchange type ('direct', 'fanout', 'topic', 'headers')
310
- passive (bool): If True, only check if exchange exists
311
- durable (bool): If True, exchange survives broker restart
312
- auto_delete (bool): If True, exchange deletes when last queue unbinds
313
- internal (bool): If True, exchange is internal (cannot be published to directly)
314
- arguments (dict): Additional exchange arguments
315
"""
316
317
def exchange_delete(self, exchange, if_unused=False):
318
"""
319
Delete an exchange.
320
321
Parameters:
322
- exchange (str): Exchange name to delete
323
- if_unused (bool): If True, only delete if no queue bindings
324
"""
325
326
def exchange_bind(self, destination, source, routing_key='', arguments=None):
327
"""
328
Bind exchange to another exchange.
329
330
Parameters:
331
- destination (str): Destination exchange name
332
- source (str): Source exchange name
333
- routing_key (str): Routing key for binding
334
- arguments (dict): Additional binding arguments
335
"""
336
337
def exchange_unbind(self, destination, source, routing_key='', arguments=None):
338
"""
339
Unbind exchange from another exchange.
340
341
Parameters:
342
- destination (str): Destination exchange name
343
- source (str): Source exchange name
344
- routing_key (str): Routing key for binding
345
- arguments (dict): Additional binding arguments
346
"""
347
```
348
349
### Transaction Support
350
351
AMQP transaction support for atomic message operations.
352
353
```python { .api }
354
def tx_select(self):
355
"""Start a transaction."""
356
357
def tx_commit(self):
358
"""Commit the current transaction."""
359
360
def tx_rollback(self):
361
"""Rollback the current transaction."""
362
```
363
364
### Publisher Confirms
365
366
Enable publisher confirmations for reliable message delivery.
367
368
```python { .api }
369
def confirm_delivery(self):
370
"""
371
Enable publisher confirmations.
372
373
Returns:
374
- bool: True if confirmations enabled
375
"""
376
377
@property
378
def publisher_confirms(self) -> bool:
379
"""True if publisher confirmations are enabled."""
380
```
381
382
## Usage Examples
383
384
### Basic Publishing
385
386
```python
387
import pika
388
389
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
390
channel = connection.channel()
391
392
# Declare a queue
393
channel.queue_declare(queue='task_queue', durable=True)
394
395
# Publish a message
396
message = "Hello World!"
397
channel.basic_publish(
398
exchange='',
399
routing_key='task_queue',
400
body=message,
401
properties=pika.BasicProperties(delivery_mode=2) # Make message persistent
402
)
403
404
connection.close()
405
```
406
407
### Basic Consuming
408
409
```python
410
import pika
411
412
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
413
channel = connection.channel()
414
415
channel.queue_declare(queue='task_queue', durable=True)
416
417
def callback(ch, method, properties, body):
418
print(f"Received {body.decode()}")
419
# Simulate work
420
import time
421
time.sleep(1)
422
423
# Acknowledge the message
424
ch.basic_ack(delivery_tag=method.delivery_tag)
425
426
# Set up consumer
427
channel.basic_qos(prefetch_count=1) # Fair dispatch
428
channel.basic_consume(queue='task_queue', on_message_callback=callback)
429
430
print('Waiting for messages. Press CTRL+C to exit')
431
channel.start_consuming()
432
```
433
434
### Publisher Confirms
435
436
```python
437
import pika
438
439
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
440
channel = connection.channel()
441
442
# Enable publisher confirms
443
channel.confirm_delivery()
444
445
try:
446
channel.basic_publish(
447
exchange='',
448
routing_key='test_queue',
449
body='Hello World!',
450
mandatory=True
451
)
452
print("Message published successfully")
453
except pika.exceptions.UnroutableError:
454
print("Message was returned as unroutable")
455
except pika.exceptions.NackError:
456
print("Message was nacked by broker")
457
458
connection.close()
459
```
460
461
### Generator-Based Consuming
462
463
```python
464
import pika
465
466
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
467
channel = connection.channel()
468
469
channel.queue_declare(queue='test_queue')
470
471
# Consume messages using generator
472
for method, properties, body in channel.consume('test_queue', auto_ack=True):
473
print(f"Received: {body.decode()}")
474
475
# Process 10 messages then stop
476
if method.delivery_tag == 10:
477
channel.cancel()
478
break
479
480
connection.close()
481
```
482
483
### Exchange and Queue Setup
484
485
```python
486
import pika
487
488
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
489
channel = connection.channel()
490
491
# Declare topic exchange
492
channel.exchange_declare(exchange='logs', exchange_type='topic', durable=True)
493
494
# Declare queue with TTL
495
queue_args = {'x-message-ttl': 60000} # 60 seconds
496
result = channel.queue_declare(queue='', exclusive=True, arguments=queue_args)
497
queue_name = result.method.queue
498
499
# Bind queue to exchange with routing key pattern
500
channel.queue_bind(exchange='logs', queue=queue_name, routing_key='app.*.error')
501
502
connection.close()
503
```
504
505
### Transaction Usage
506
507
```python
508
import pika
509
510
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
511
channel = connection.channel()
512
513
# Start transaction
514
channel.tx_select()
515
516
try:
517
# Publish multiple messages in transaction
518
for i in range(5):
519
channel.basic_publish(
520
exchange='',
521
routing_key='transactional_queue',
522
body=f'Message {i}'
523
)
524
525
# Commit transaction
526
channel.tx_commit()
527
print("All messages published successfully")
528
529
except Exception as e:
530
# Rollback on error
531
channel.tx_rollback()
532
print(f"Transaction rolled back: {e}")
533
534
connection.close()
535
```