0
# Connection Adapters
1
2
Framework-specific connection adapters for asyncio, Tornado, Twisted, and Gevent integration enabling pika usage in different Python async frameworks and event loops.
3
4
## Capabilities
5
6
### Base Connection
7
8
Abstract base class for all connection adapters.
9
10
```python { .api }
11
class BaseConnection:
12
"""Abstract base class for all connection adapters."""
13
14
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
15
on_close_callback=None):
16
"""
17
Base connection initialization.
18
19
Parameters:
20
- parameters (ConnectionParameters): Connection configuration
21
- on_open_callback (callable): Called when connection opens
22
- on_open_error_callback (callable): Called on connection error
23
- on_close_callback (callable): Called when connection closes
24
"""
25
26
def channel(self, on_open_callback, channel_number=None):
27
"""
28
Create a new channel asynchronously.
29
30
Parameters:
31
- on_open_callback (callable): Called when channel opens
32
- channel_number (int, optional): Specific channel number
33
"""
34
35
def close(self, reply_code=200, reply_text='Normal shutdown'):
36
"""
37
Close connection.
38
39
Parameters:
40
- reply_code (int): AMQP reply code
41
- reply_text (str): Human-readable reason
42
"""
43
44
# Connection state properties
45
@property
46
def is_closed(self) -> bool:
47
"""True if connection is closed."""
48
49
@property
50
def is_open(self) -> bool:
51
"""True if connection is open."""
52
```
53
54
### AsyncIO Connection
55
56
Native Python 3 asyncio integration for async/await patterns.
57
58
```python { .api }
59
class AsyncioConnection(BaseConnection):
60
"""Connection adapter for Python asyncio."""
61
62
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
63
on_close_callback=None, loop=None):
64
"""
65
Create asyncio connection.
66
67
Parameters:
68
- parameters (ConnectionParameters): Connection configuration
69
- on_open_callback (callable): Called when connection opens
70
- on_open_error_callback (callable): Called on connection error
71
- on_close_callback (callable): Called when connection closes
72
- loop (asyncio.AbstractEventLoop): Event loop (uses current if None)
73
"""
74
75
def channel(self, on_open_callback, channel_number=None):
76
"""
77
Create asyncio channel.
78
79
Parameters:
80
- on_open_callback (callable): Called when channel opens
81
- channel_number (int, optional): Specific channel number
82
"""
83
```
84
85
### Tornado Connection
86
87
Integration with Tornado web framework and IOLoop.
88
89
```python { .api }
90
class TornadoConnection(BaseConnection):
91
"""Connection adapter for Tornado web framework."""
92
93
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
94
on_close_callback=None, ioloop=None):
95
"""
96
Create Tornado connection.
97
98
Parameters:
99
- parameters (ConnectionParameters): Connection configuration
100
- on_open_callback (callable): Called when connection opens
101
- on_open_error_callback (callable): Called on connection error
102
- on_close_callback (callable): Called when connection closes
103
- ioloop (tornado.ioloop.IOLoop): Tornado IOLoop instance
104
"""
105
```
106
107
### Twisted Connection
108
109
Integration with Twisted framework and reactor pattern.
110
111
```python { .api }
112
class TwistedProtocolConnection(BaseConnection):
113
"""Connection adapter for Twisted framework."""
114
115
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
116
on_close_callback=None):
117
"""
118
Create Twisted connection.
119
120
Parameters:
121
- parameters (ConnectionParameters): Connection configuration
122
- on_open_callback (callable): Called when connection opens
123
- on_open_error_callback (callable): Called on connection error
124
- on_close_callback (callable): Called when connection closes
125
"""
126
127
class TwistedConnection(TwistedProtocolConnection):
128
"""Alias for TwistedProtocolConnection."""
129
```
130
131
### Gevent Connection
132
133
Integration with Gevent cooperative multitasking library.
134
135
```python { .api }
136
class GeventConnection(BaseConnection):
137
"""Connection adapter for Gevent."""
138
139
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
140
on_close_callback=None):
141
"""
142
Create Gevent connection.
143
144
Parameters:
145
- parameters (ConnectionParameters): Connection configuration
146
- on_open_callback (callable): Called when connection opens
147
- on_open_error_callback (callable): Called on connection error
148
- on_close_callback (callable): Called when connection closes
149
"""
150
```
151
152
### Select Connection
153
154
Event-driven connection using select/poll/epoll for high-performance async operations.
155
156
```python { .api }
157
class SelectConnection(BaseConnection):
158
"""Event-driven connection using select/poll/epoll."""
159
160
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
161
on_close_callback=None, ioloop=None):
162
"""
163
Create select-based connection.
164
165
Parameters:
166
- parameters (ConnectionParameters): Connection configuration
167
- on_open_callback (callable): Called when connection opens
168
- on_open_error_callback (callable): Called on connection error
169
- on_close_callback (callable): Called when connection closes
170
- ioloop (IOLoop): Event loop instance
171
"""
172
173
@property
174
def ioloop(self):
175
"""Get the IOLoop instance."""
176
177
class IOLoop:
178
"""Event loop implementation for SelectConnection."""
179
180
def start(self):
181
"""Start the event loop."""
182
183
def stop(self):
184
"""Stop the event loop."""
185
186
def add_timeout(self, deadline, callback_method):
187
"""
188
Add a timeout callback.
189
190
Parameters:
191
- deadline (float): Time when callback should be called
192
- callback_method (callable): Callback function
193
"""
194
195
def remove_timeout(self, timeout_id):
196
"""
197
Remove a timeout callback.
198
199
Parameters:
200
- timeout_id: Timeout identifier to remove
201
"""
202
203
def call_later(self, delay, callback, *args, **kwargs):
204
"""
205
Schedule callback execution after delay.
206
207
Parameters:
208
- delay (float): Delay in seconds
209
- callback (callable): Callback function
210
- *args, **kwargs: Arguments for callback
211
"""
212
```
213
214
## Usage Examples
215
216
### AsyncIO Connection
217
218
```python
219
import asyncio
220
import pika
221
from pika.adapters.asyncio_connection import AsyncioConnection
222
223
async def main():
224
# Connection callbacks
225
def on_connection_open(connection):
226
print('Connection opened')
227
connection.channel(on_open_callback=on_channel_open)
228
229
def on_connection_open_error(connection, error):
230
print(f'Connection failed: {error}')
231
232
def on_connection_closed(connection, reason):
233
print(f'Connection closed: {reason}')
234
235
def on_channel_open(channel):
236
print('Channel opened')
237
238
# Declare queue
239
channel.queue_declare(
240
queue='asyncio_test',
241
callback=on_queue_declared
242
)
243
244
def on_queue_declared(method_frame):
245
print('Queue declared')
246
247
# Publish message
248
channel.basic_publish(
249
exchange='',
250
routing_key='asyncio_test',
251
body='Hello AsyncIO!'
252
)
253
254
# Create connection
255
parameters = pika.ConnectionParameters('localhost')
256
connection = AsyncioConnection(
257
parameters,
258
on_open_callback=on_connection_open,
259
on_open_error_callback=on_connection_open_error,
260
on_close_callback=on_connection_closed
261
)
262
263
# Keep running
264
await asyncio.sleep(2)
265
connection.close()
266
267
# Run the async function
268
asyncio.run(main())
269
```
270
271
### Tornado Connection
272
273
```python
274
import pika
275
from pika.adapters.tornado_connection import TornadoConnection
276
from tornado import ioloop
277
278
def on_connection_open(connection):
279
print('Connection opened')
280
connection.channel(on_open_callback=on_channel_open)
281
282
def on_connection_open_error(connection, error):
283
print(f'Connection failed: {error}')
284
ioloop.IOLoop.current().stop()
285
286
def on_connection_closed(connection, reason):
287
print(f'Connection closed: {reason}')
288
ioloop.IOLoop.current().stop()
289
290
def on_channel_open(channel):
291
print('Channel opened')
292
293
def on_queue_declared(method_frame):
294
# Start consuming
295
channel.basic_consume(
296
queue='tornado_test',
297
on_message_callback=on_message,
298
auto_ack=True
299
)
300
301
# Declare queue
302
channel.queue_declare(queue='tornado_test', callback=on_queue_declared)
303
304
def on_message(channel, method, properties, body):
305
print(f'Received: {body.decode()}')
306
307
# Stop after first message
308
channel.close()
309
310
# Create connection
311
parameters = pika.ConnectionParameters('localhost')
312
connection = TornadoConnection(
313
parameters,
314
on_open_callback=on_connection_open,
315
on_open_error_callback=on_connection_open_error,
316
on_close_callback=on_connection_closed
317
)
318
319
# Start Tornado IOLoop
320
ioloop.IOLoop.current().start()
321
```
322
323
### Twisted Connection
324
325
```python
326
import pika
327
from pika.adapters.twisted_connection import TwistedProtocolConnection
328
from twisted.internet import reactor, defer
329
330
class TwistedExample:
331
def __init__(self):
332
self.connection = None
333
self.channel = None
334
335
def connect(self):
336
parameters = pika.ConnectionParameters('localhost')
337
self.connection = TwistedProtocolConnection(
338
parameters,
339
on_open_callback=self.on_connection_open,
340
on_open_error_callback=self.on_connection_error,
341
on_close_callback=self.on_connection_closed
342
)
343
344
def on_connection_open(self, connection):
345
print('Connection opened')
346
connection.channel(on_open_callback=self.on_channel_open)
347
348
def on_connection_error(self, connection, error):
349
print(f'Connection failed: {error}')
350
reactor.stop()
351
352
def on_connection_closed(self, connection, reason):
353
print(f'Connection closed: {reason}')
354
reactor.stop()
355
356
def on_channel_open(self, channel):
357
print('Channel opened')
358
self.channel = channel
359
360
# Declare queue
361
channel.queue_declare(
362
queue='twisted_test',
363
callback=self.on_queue_declared
364
)
365
366
def on_queue_declared(self, method_frame):
367
# Publish message
368
self.channel.basic_publish(
369
exchange='',
370
routing_key='twisted_test',
371
body='Hello Twisted!'
372
)
373
374
# Close connection after publishing
375
reactor.callLater(1, self.connection.close)
376
377
# Usage
378
example = TwistedExample()
379
example.connect()
380
reactor.run()
381
```
382
383
### Gevent Connection
384
385
```python
386
import pika
387
from pika.adapters.gevent_connection import GeventConnection
388
import gevent
389
390
class GeventExample:
391
def __init__(self):
392
self.connection = None
393
self.channel = None
394
self.is_ready = False
395
396
def connect(self):
397
parameters = pika.ConnectionParameters('localhost')
398
self.connection = GeventConnection(
399
parameters,
400
on_open_callback=self.on_connection_open,
401
on_open_error_callback=self.on_connection_error,
402
on_close_callback=self.on_connection_closed
403
)
404
405
def on_connection_open(self, connection):
406
print('Connection opened')
407
connection.channel(on_open_callback=self.on_channel_open)
408
409
def on_connection_error(self, connection, error):
410
print(f'Connection failed: {error}')
411
412
def on_connection_closed(self, connection, reason):
413
print(f'Connection closed: {reason}')
414
415
def on_channel_open(self, channel):
416
print('Channel opened')
417
self.channel = channel
418
self.is_ready = True
419
420
def publish_message(self, message):
421
if self.is_ready:
422
self.channel.basic_publish(
423
exchange='',
424
routing_key='gevent_test',
425
body=message
426
)
427
print(f'Published: {message}')
428
429
# Usage
430
example = GeventExample()
431
example.connect()
432
433
# Wait for connection to be ready
434
while not example.is_ready:
435
gevent.sleep(0.1)
436
437
# Publish some messages
438
for i in range(5):
439
example.publish_message(f'Message {i}')
440
gevent.sleep(1)
441
442
example.connection.close()
443
```
444
445
### Select Connection with Custom IOLoop
446
447
```python
448
import pika
449
from pika.adapters.select_connection import SelectConnection, IOLoop
450
451
class SelectExample:
452
def __init__(self):
453
self.connection = None
454
self.channel = None
455
self.ioloop = None
456
457
def connect(self):
458
# Create custom IOLoop
459
self.ioloop = IOLoop()
460
461
parameters = pika.ConnectionParameters('localhost')
462
self.connection = SelectConnection(
463
parameters,
464
on_open_callback=self.on_connection_open,
465
on_open_error_callback=self.on_connection_error,
466
on_close_callback=self.on_connection_closed,
467
ioloop=self.ioloop
468
)
469
470
def on_connection_open(self, connection):
471
print('Connection opened')
472
connection.channel(on_open_callback=self.on_channel_open)
473
474
def on_connection_error(self, connection, error):
475
print(f'Connection failed: {error}')
476
self.ioloop.stop()
477
478
def on_connection_closed(self, connection, reason):
479
print(f'Connection closed: {reason}')
480
self.ioloop.stop()
481
482
def on_channel_open(self, channel):
483
print('Channel opened')
484
self.channel = channel
485
486
# Schedule periodic message publishing
487
self.schedule_publish()
488
489
def schedule_publish(self):
490
# Publish message
491
self.channel.basic_publish(
492
exchange='',
493
routing_key='select_test',
494
body='Hello Select!'
495
)
496
497
# Schedule next publish in 2 seconds
498
self.ioloop.call_later(2.0, self.schedule_publish)
499
500
def run(self):
501
# Start the IOLoop
502
self.ioloop.start()
503
504
# Usage
505
example = SelectExample()
506
example.connect()
507
508
# Stop after 10 seconds
509
example.ioloop.call_later(10.0, example.connection.close)
510
511
example.run()
512
```
513
514
### Adapter Comparison
515
516
```python
517
import pika
518
519
def create_connection(adapter_type='blocking'):
520
"""Create connection based on adapter type."""
521
parameters = pika.ConnectionParameters('localhost')
522
523
if adapter_type == 'blocking':
524
return pika.BlockingConnection(parameters)
525
526
elif adapter_type == 'select':
527
def on_open(connection):
528
print('Select connection opened')
529
530
return pika.SelectConnection(
531
parameters,
532
on_open_callback=on_open
533
)
534
535
elif adapter_type == 'asyncio':
536
from pika.adapters.asyncio_connection import AsyncioConnection
537
538
def on_open(connection):
539
print('AsyncIO connection opened')
540
541
return AsyncioConnection(
542
parameters,
543
on_open_callback=on_open
544
)
545
546
elif adapter_type == 'tornado':
547
from pika.adapters.tornado_connection import TornadoConnection
548
549
def on_open(connection):
550
print('Tornado connection opened')
551
552
return TornadoConnection(
553
parameters,
554
on_open_callback=on_open
555
)
556
557
else:
558
raise ValueError(f"Unsupported adapter type: {adapter_type}")
559
560
# Usage examples
561
print("Available adapters:")
562
print("- blocking: Synchronous operations")
563
print("- select: Event-driven with select/poll")
564
print("- asyncio: Python 3 async/await")
565
print("- tornado: Tornado web framework")
566
print("- twisted: Twisted framework")
567
print("- gevent: Gevent cooperative multitasking")
568
```