0
# Async Operations
1
2
Asynchronous AMQP operations for Python 3.4+ with async/await support for high-performance messaging applications that need non-blocking I/O operations.
3
4
## Capabilities
5
6
### Async Send Client
7
8
Asynchronous high-level client for sending messages without blocking the event loop.
9
10
```python { .api }
11
class SendClientAsync:
12
def __init__(self, target, auth=None, client_name=None, debug=False,
13
msg_timeout=0, error_policy=None, keep_alive_interval=None,
14
send_settle_mode=None, auto_complete=True, encoding='UTF-8',
15
loop=None, **kwargs):
16
"""
17
Async high-level client for sending AMQP messages.
18
19
Parameters:
20
- target (str or Target): Target endpoint for messages
21
- auth (AMQPAuth): Authentication credentials
22
- client_name (str): Client identifier
23
- debug (bool): Enable debug logging
24
- msg_timeout (int): Message send timeout in seconds
25
- error_policy (ErrorPolicy): Error handling policy
26
- keep_alive_interval (int): Keep-alive interval
27
- send_settle_mode (SenderSettleMode): Message settlement mode
28
- auto_complete (bool): Auto-complete sent messages
29
- encoding (str): Character encoding
30
- loop: Asyncio event loop
31
"""
32
```
33
34
**Key Methods:**
35
36
```python { .api }
37
async def open_async(self):
38
"""Asynchronously open the client connection."""
39
40
async def close_async(self):
41
"""Asynchronously close the client connection."""
42
43
def queue_message(self, message):
44
"""Queue a message for sending (synchronous)."""
45
46
async def send_all_messages_async(self, close_on_done=True):
47
"""
48
Asynchronously send all queued messages.
49
50
Parameters:
51
- close_on_done (bool): Whether to close connection after sending
52
53
Returns:
54
list[MessageState]: List of send results for each message
55
"""
56
57
async def send_message_batch_async(self, messages, close_on_done=True):
58
"""
59
Asynchronously send a batch of messages.
60
61
Parameters:
62
- messages (list[Message]): Messages to send
63
- close_on_done (bool): Whether to close connection after sending
64
65
Returns:
66
list[MessageState]: List of send results for each message
67
"""
68
```
69
70
**Usage Examples:**
71
72
```python
73
import asyncio
74
from uamqp.async_ops import SendClientAsync
75
from uamqp import Message
76
from uamqp.authentication import SASTokenAsync
77
78
async def send_messages_async():
79
target = "amqps://mynamespace.servicebus.windows.net/myqueue"
80
auth = SASTokenAsync("mynamespace.servicebus.windows.net", token=sas_token)
81
82
# Using async context manager
83
async with SendClientAsync(target, auth=auth) as client:
84
message = Message("Hello Async World")
85
client.queue_message(message)
86
results = await client.send_all_messages_async()
87
print(f"Send results: {results}")
88
89
# Batch async sending
90
async def send_batch_async():
91
messages = [Message(f"Async message {i}") for i in range(10)]
92
93
async with SendClientAsync(target, auth=auth) as client:
94
results = await client.send_message_batch_async(messages)
95
print(f"Sent {len(results)} messages asynchronously")
96
97
# Run async functions
98
asyncio.run(send_messages_async())
99
```
100
101
### Async Receive Client
102
103
Asynchronous high-level client for receiving messages with non-blocking message processing.
104
105
```python { .api }
106
class ReceiveClientAsync:
107
def __init__(self, source, auth=None, client_name=None, debug=False,
108
prefetch=300, auto_complete=True, error_policy=None,
109
keep_alive_interval=None, receive_settle_mode=None,
110
encoding='UTF-8', loop=None, **kwargs):
111
"""
112
Async high-level client for receiving AMQP messages.
113
114
Parameters:
115
- source (str or Source): Source endpoint for messages
116
- auth (AMQPAuth): Authentication credentials
117
- client_name (str): Client identifier
118
- debug (bool): Enable debug logging
119
- prefetch (int): Number of messages to prefetch
120
- auto_complete (bool): Auto-complete received messages
121
- error_policy (ErrorPolicy): Error handling policy
122
- keep_alive_interval (int): Keep-alive interval
123
- receive_settle_mode (ReceiverSettleMode): Message settlement mode
124
- encoding (str): Character encoding
125
- loop: Asyncio event loop
126
"""
127
```
128
129
**Key Methods:**
130
131
```python { .api }
132
async def open_async(self):
133
"""Asynchronously open the client connection."""
134
135
async def close_async(self):
136
"""Asynchronously close the client connection."""
137
138
async def receive_message_batch_async(self, max_batch_size=None, timeout=0):
139
"""
140
Asynchronously receive a batch of messages.
141
142
Parameters:
143
- max_batch_size (int): Maximum messages to receive
144
- timeout (int): Timeout in milliseconds
145
146
Returns:
147
list[Message]: Received messages
148
"""
149
150
def receive_messages_iter_async(self):
151
"""
152
Create async iterator for receiving messages.
153
154
Returns:
155
AsyncMessageIter: Async message iterator
156
"""
157
```
158
159
**Usage Examples:**
160
161
```python
162
import asyncio
163
from uamqp.async_ops import ReceiveClientAsync
164
from uamqp.authentication import SASLPlain
165
166
async def receive_messages_async():
167
source = "amqps://amqp.example.com/myqueue"
168
auth = SASLPlain("amqp.example.com", "user", "password")
169
170
# Batch async receiving
171
async with ReceiveClientAsync(source, auth=auth) as client:
172
messages = await client.receive_message_batch_async(
173
max_batch_size=10,
174
timeout=30000
175
)
176
177
print(f"Received {len(messages)} messages")
178
for message in messages:
179
print(f"Message: {message.get_data()}")
180
message.accept()
181
182
# Async message iterator
183
async def process_messages_continuously():
184
async with ReceiveClientAsync(source, auth=auth) as client:
185
message_iter = client.receive_messages_iter_async()
186
187
async for message in message_iter:
188
try:
189
data = message.get_data()
190
print(f"Processing: {data}")
191
192
# Simulate async processing
193
await asyncio.sleep(0.1)
194
195
message.accept()
196
except Exception as e:
197
print(f"Error: {e}")
198
message.reject()
199
200
asyncio.run(receive_messages_async())
201
```
202
203
### Async Message Iterator
204
205
Async iterator for processing messages as they arrive without blocking.
206
207
```python { .api }
208
class AsyncMessageIter:
209
def __init__(self, client):
210
"""
211
Async iterator for AMQP messages.
212
213
Parameters:
214
- client: ReceiveClientAsync instance
215
"""
216
217
def __aiter__(self):
218
"""Return async iterator."""
219
220
async def __anext__(self):
221
"""Get next message asynchronously."""
222
```
223
224
**Usage Example:**
225
226
```python
227
async def process_messages_with_iterator():
228
async with ReceiveClientAsync(source, auth=auth) as client:
229
message_iter = client.receive_messages_iter_async()
230
231
message_count = 0
232
async for message in message_iter:
233
await process_message_async(message)
234
message.accept()
235
236
message_count += 1
237
if message_count >= 100: # Process only 100 messages
238
break
239
240
async def process_message_async(message):
241
"""Custom async message processing."""
242
data = message.get_data()
243
244
# Simulate async work (database query, API call, etc.)
245
await asyncio.sleep(0.1)
246
247
print(f"Processed: {data}")
248
```
249
250
### Async Low-Level Protocol
251
252
Asynchronous versions of low-level protocol classes for advanced scenarios.
253
254
#### Async Connection
255
256
```python { .api }
257
class ConnectionAsync:
258
def __init__(self, hostname, sasl=None, container_id=None,
259
max_frame_size=None, channel_max=None, idle_timeout=None,
260
properties=None, remote_idle_timeout_empty_frame_send_ratio=None,
261
debug=False, encoding='UTF-8', loop=None):
262
"""Async AMQP connection management."""
263
264
async def open_async(self):
265
"""Asynchronously open connection."""
266
267
async def close_async(self):
268
"""Asynchronously close connection."""
269
```
270
271
#### Async Session
272
273
```python { .api }
274
class SessionAsync:
275
def __init__(self, connection, incoming_window=None, outgoing_window=None,
276
handle_max=None, loop=None):
277
"""Async AMQP session management."""
278
279
async def begin_async(self):
280
"""Asynchronously begin session."""
281
282
async def end_async(self):
283
"""Asynchronously end session."""
284
```
285
286
#### Async Message Sender
287
288
```python { .api }
289
class MessageSenderAsync:
290
def __init__(self, session, source, target, name=None,
291
send_settle_mode=None, max_message_size=None,
292
link_properties=None, desired_capabilities=None,
293
loop=None):
294
"""Async low-level message sender."""
295
296
async def open_async(self):
297
"""Asynchronously open sender link."""
298
299
async def send_async(self, message, callback=None):
300
"""
301
Asynchronously send a message.
302
303
Parameters:
304
- message (Message): Message to send
305
- callback (callable): Completion callback
306
307
Returns:
308
MessageState: Send operation state
309
"""
310
```
311
312
#### Async Message Receiver
313
314
```python { .api }
315
class MessageReceiverAsync:
316
def __init__(self, session, source, target, name=None,
317
receive_settle_mode=None, max_message_size=None,
318
prefetch=None, link_properties=None,
319
desired_capabilities=None, loop=None):
320
"""Async low-level message receiver."""
321
322
async def open_async(self):
323
"""Asynchronously open receiver link."""
324
325
async def receive_message_batch_async(self, max_batch_size=None):
326
"""
327
Asynchronously receive message batch.
328
329
Parameters:
330
- max_batch_size (int): Maximum messages to receive
331
332
Returns:
333
list[Message]: Received messages
334
"""
335
```
336
337
**Usage Example:**
338
339
```python
340
async def low_level_async_example():
341
# Create async connection
342
connection = ConnectionAsync("amqp.example.com", sasl=auth_sasl)
343
await connection.open_async()
344
345
try:
346
# Create async session
347
session = SessionAsync(connection)
348
await session.begin_async()
349
350
# Create async sender
351
sender = MessageSenderAsync(session, source="source", target="target")
352
await sender.open_async()
353
354
# Send message asynchronously
355
message = Message("Low-level async message")
356
result = await sender.send_async(message)
357
print(f"Send result: {result}")
358
359
finally:
360
await connection.close_async()
361
```
362
363
## Async Authentication
364
365
Use async authentication classes with async operations:
366
367
```python
368
from uamqp.authentication import SASTokenAsync, JWTTokenAsync
369
370
# Async SAS token auth
371
auth = SASTokenAsync(
372
hostname="mynamespace.servicebus.windows.net",
373
token=sas_token
374
)
375
376
# Async JWT token auth
377
auth = JWTTokenAsync(
378
hostname="service.example.com",
379
token=jwt_token,
380
audience="https://service.example.com"
381
)
382
```
383
384
## Async Error Handling
385
386
Handle errors in async operations:
387
388
```python
389
from uamqp.errors import AMQPConnectionError, MessageSendFailed
390
391
async def robust_async_sending():
392
try:
393
async with SendClientAsync(target, auth=auth) as client:
394
client.queue_message(Message("Test message"))
395
results = await client.send_all_messages_async()
396
397
except AMQPConnectionError as e:
398
print(f"Connection failed: {e}")
399
# Implement retry logic
400
401
except MessageSendFailed as e:
402
print(f"Send failed: {e}")
403
# Handle send failure
404
405
except Exception as e:
406
print(f"Unexpected error: {e}")
407
```
408
409
## Async Performance Considerations
410
411
### Event Loop Integration
412
413
```python
414
# Use existing event loop
415
import asyncio
416
417
async def main():
418
loop = asyncio.get_running_loop()
419
420
async with SendClientAsync(target, auth=auth, loop=loop) as client:
421
# Operations use the specified loop
422
pass
423
424
asyncio.run(main())
425
```
426
427
### Concurrent Operations
428
429
```python
430
async def concurrent_operations():
431
# Send and receive concurrently
432
send_task = asyncio.create_task(send_messages_async())
433
receive_task = asyncio.create_task(receive_messages_async())
434
435
# Wait for both to complete
436
await asyncio.gather(send_task, receive_task)
437
```
438
439
### Async Context Managers
440
441
Always use async context managers for proper resource cleanup:
442
443
```python
444
# Correct async usage
445
async with SendClientAsync(target, auth=auth) as client:
446
await client.send_all_messages_async()
447
448
# Manual async lifecycle management
449
client = SendClientAsync(target, auth=auth)
450
await client.open_async()
451
try:
452
await client.send_all_messages_async()
453
finally:
454
await client.close_async()
455
```