0
# Client APIs
1
2
High-level client classes for sending and receiving messages with built-in connection management, error handling, and batching capabilities that provide the easiest way to build AMQP messaging applications.
3
4
## Capabilities
5
6
### Base AMQP Client
7
8
Base client class that provides common functionality for all AMQP client operations.
9
10
```python { .api }
11
class AMQPClient:
12
def __init__(self, remote_address, auth=None, client_name=None, debug=False,
13
error_policy=None, keep_alive_interval=None, max_frame_size=None,
14
channel_max=None, idle_timeout=None, properties=None,
15
remote_idle_timeout_empty_frame_send_ratio=None, incoming_window=None,
16
outgoing_window=None, handle_max=None, on_attach=None,
17
auto_complete=True, encoding='UTF-8', desired_capabilities=None,
18
max_message_size=None, link_properties=None, timeout=0, **kwargs):
19
"""
20
Base AMQP client for connection and session management.
21
22
Parameters:
23
- remote_address (str): AMQP broker address
24
- auth (AMQPAuth): Authentication credentials
25
- client_name (str): Client identifier
26
- debug (bool): Enable debug logging
27
- error_policy (ErrorPolicy): Error handling policy
28
- keep_alive_interval (int): Keep-alive interval in seconds
29
- max_frame_size (int): Maximum frame size in bytes
30
- channel_max (int): Maximum number of channels
31
- idle_timeout (int): Connection idle timeout in seconds
32
- properties (dict): Connection properties
33
- incoming_window (int): Session incoming window size
34
- outgoing_window (int): Session outgoing window size
35
- handle_max (int): Maximum link handles
36
- on_attach (callable): Link attach callback
37
- auto_complete (bool): Auto-complete messages
38
- encoding (str): Character encoding
39
- desired_capabilities (list): Desired connection capabilities
40
- max_message_size (int): Maximum message size
41
- link_properties (dict): Link properties
42
- timeout (int): Operation timeout in seconds
43
"""
44
```
45
46
**Key Methods:**
47
48
```python { .api }
49
def open(self):
50
"""Open the client connection and session."""
51
52
def close(self):
53
"""Close the client connection."""
54
55
def __enter__(self):
56
"""Context manager entry."""
57
58
def __exit__(self, exc_type, exc_val, exc_tb):
59
"""Context manager exit."""
60
61
def mgmt_request(self, message, operation, op_type=None, node=None, callback=None, **kwargs):
62
"""
63
Send a management request.
64
65
Parameters:
66
- message (Message): Request message
67
- operation (str): Management operation name
68
- op_type (str): Operation type
69
- node (str): Target node name
70
- callback (callable): Response callback function
71
72
Returns:
73
Message: Response message
74
"""
75
76
def auth_complete(self):
77
"""
78
Check if authentication is complete.
79
80
Returns:
81
bool: True if authentication is complete
82
"""
83
84
def client_ready(self):
85
"""
86
Check if client is ready for operations.
87
88
Returns:
89
bool: True if client is ready
90
"""
91
92
def do_work(self, **kwargs):
93
"""Perform client work iteration."""
94
```
95
96
**Usage Example:**
97
98
```python
99
from uamqp.client import AMQPClient
100
from uamqp.authentication import SASLPlain
101
102
auth = SASLPlain("amqp.example.com", "user", "password")
103
104
# Using context manager (recommended)
105
with AMQPClient("amqps://amqp.example.com", auth=auth) as client:
106
# Client operations here
107
pass
108
109
# Manual open/close
110
client = AMQPClient("amqps://amqp.example.com", auth=auth)
111
client.open()
112
try:
113
# Client operations here
114
pass
115
finally:
116
client.close()
117
```
118
119
### Send Client
120
121
High-level client for sending messages with automatic connection management and batch sending capabilities.
122
123
```python { .api }
124
class SendClient(AMQPClient):
125
def __init__(self, target, auth=None, client_name=None, debug=False,
126
msg_timeout=0, error_policy=None, keep_alive_interval=None,
127
send_settle_mode=None, auto_complete=True, encoding='UTF-8',
128
**kwargs):
129
"""
130
High-level client for sending AMQP messages.
131
132
Parameters:
133
- target (str or Target): Target endpoint for messages
134
- auth (AMQPAuth): Authentication credentials
135
- client_name (str): Client identifier
136
- debug (bool): Enable debug logging
137
- msg_timeout (int): Message send timeout in seconds
138
- error_policy (ErrorPolicy): Error handling policy
139
- keep_alive_interval (int): Keep-alive interval
140
- send_settle_mode (SenderSettleMode): Message settlement mode
141
- auto_complete (bool): Auto-complete sent messages
142
- encoding (str): Character encoding
143
"""
144
```
145
146
**Key Methods:**
147
148
```python { .api }
149
def queue_message(self, message):
150
"""
151
Queue a message for sending.
152
153
Parameters:
154
- message (Message): Message to queue for sending
155
"""
156
157
def send_all_messages(self, close_on_done=True):
158
"""
159
Send all queued messages.
160
161
Parameters:
162
- close_on_done (bool): Whether to close connection after sending
163
164
Returns:
165
list[MessageState]: List of send results for each message
166
"""
167
168
def send_message_batch(self, messages, close_on_done=True):
169
"""
170
Send a batch of messages.
171
172
Parameters:
173
- messages (list[Message]): Messages to send
174
- close_on_done (bool): Whether to close connection after sending
175
176
Returns:
177
list[MessageState]: List of send results for each message
178
"""
179
```
180
181
**Usage Examples:**
182
183
```python
184
from uamqp import SendClient, Message
185
from uamqp.authentication import SASTokenAuth
186
187
# Single message sending
188
target = "amqps://mynamespace.servicebus.windows.net/myqueue"
189
auth = SASTokenAuth("mynamespace.servicebus.windows.net", token=sas_token)
190
191
with SendClient(target, auth=auth) as client:
192
message = Message("Hello World")
193
client.queue_message(message)
194
results = client.send_all_messages()
195
print(f"Send results: {results}")
196
197
# Batch message sending
198
messages = [
199
Message("Message 1"),
200
Message("Message 2"),
201
Message("Message 3")
202
]
203
204
with SendClient(target, auth=auth) as client:
205
results = client.send_message_batch(messages)
206
print(f"Sent {len(results)} messages")
207
208
# Queue multiple messages individually
209
with SendClient(target, auth=auth) as client:
210
for i in range(10):
211
client.queue_message(Message(f"Message {i}"))
212
results = client.send_all_messages()
213
```
214
215
### Receive Client
216
217
High-level client for receiving messages with automatic connection management and batch receiving capabilities.
218
219
```python { .api }
220
class ReceiveClient(AMQPClient):
221
def __init__(self, source, auth=None, client_name=None, debug=False,
222
prefetch=300, auto_complete=True, error_policy=None,
223
keep_alive_interval=None, receive_settle_mode=None,
224
encoding='UTF-8', **kwargs):
225
"""
226
High-level client for receiving AMQP messages.
227
228
Parameters:
229
- source (str or Source): Source endpoint for messages
230
- auth (AMQPAuth): Authentication credentials
231
- client_name (str): Client identifier
232
- debug (bool): Enable debug logging
233
- prefetch (int): Number of messages to prefetch
234
- auto_complete (bool): Auto-complete received messages
235
- error_policy (ErrorPolicy): Error handling policy
236
- keep_alive_interval (int): Keep-alive interval
237
- receive_settle_mode (ReceiverSettleMode): Message settlement mode
238
- encoding (str): Character encoding
239
"""
240
```
241
242
**Key Methods:**
243
244
```python { .api }
245
def receive_message_batch(self, max_batch_size=None, timeout=0):
246
"""
247
Receive a batch of messages.
248
249
Parameters:
250
- max_batch_size (int): Maximum messages to receive (default: prefetch size)
251
- timeout (int): Timeout in milliseconds
252
253
Returns:
254
list[Message]: Received messages
255
"""
256
257
def receive_messages(self, timeout=0):
258
"""
259
Receive messages with iterator-like behavior.
260
261
Parameters:
262
- timeout (int): Timeout in milliseconds
263
264
Returns:
265
generator: Message iterator
266
"""
267
```
268
269
**Usage Examples:**
270
271
```python
272
from uamqp import ReceiveClient
273
from uamqp.authentication import SASLPlain
274
275
source = "amqps://amqp.example.com/myqueue"
276
auth = SASLPlain("amqp.example.com", "user", "password")
277
278
# Batch message receiving
279
with ReceiveClient(source, auth=auth, prefetch=50) as client:
280
messages = client.receive_message_batch(max_batch_size=10, timeout=30000)
281
print(f"Received {len(messages)} messages")
282
283
for message in messages:
284
print(f"Message: {message.get_data()}")
285
message.accept() # Acknowledge message
286
287
# Continuous message receiving
288
with ReceiveClient(source, auth=auth) as client:
289
for message in client.receive_messages():
290
try:
291
data = message.get_data()
292
print(f"Processing: {data}")
293
# Process message here
294
message.accept()
295
except Exception as e:
296
print(f"Error processing message: {e}")
297
message.reject()
298
299
# Break after processing 100 messages
300
if processed_count >= 100:
301
break
302
303
# Receive with custom settlement mode
304
from uamqp.constants import ReceiverSettleMode
305
306
with ReceiveClient(source, auth=auth,
307
receive_settle_mode=ReceiverSettleMode.PeekLock) as client:
308
messages = client.receive_message_batch(timeout=10000)
309
for message in messages:
310
# Message won't be removed from queue until explicitly settled
311
if process_message(message.get_data()):
312
message.accept() # Remove from queue
313
else:
314
message.release() # Return to queue for retry
315
```
316
317
## Configuration Options
318
319
### Error Policy
320
321
Configure how clients handle errors and retries:
322
323
```python
324
from uamqp.errors import ErrorPolicy, ErrorAction
325
326
# Custom error policy with retries
327
error_policy = ErrorPolicy(
328
max_retries=3,
329
on_error=ErrorAction(retry=True, backoff=2.0)
330
)
331
332
client = SendClient(target, auth=auth, error_policy=error_policy)
333
```
334
335
### Settlement Modes
336
337
Control message acknowledgment behavior:
338
339
```python
340
from uamqp.constants import SenderSettleMode, ReceiverSettleMode
341
342
# Send client settlement
343
send_client = SendClient(
344
target,
345
auth=auth,
346
send_settle_mode=SenderSettleMode.Settled # Fire-and-forget
347
)
348
349
# Receive client settlement
350
receive_client = ReceiveClient(
351
source,
352
auth=auth,
353
receive_settle_mode=ReceiverSettleMode.PeekLock # Manual acknowledgment
354
)
355
```
356
357
### Connection Properties
358
359
Set custom connection properties:
360
361
```python
362
properties = {
363
'connection-name': 'MyApp-v1.0',
364
'product': 'MyApplication',
365
'platform': 'Python'
366
}
367
368
client = SendClient(target, auth=auth, properties=properties)
369
```
370
371
## Performance Tuning
372
373
### Prefetch Settings
374
375
Optimize message throughput:
376
377
```python
378
# High throughput receiving
379
client = ReceiveClient(
380
source,
381
auth=auth,
382
prefetch=1000, # Prefetch more messages
383
max_frame_size=65536 # Larger frames
384
)
385
386
# Low latency receiving
387
client = ReceiveClient(
388
source,
389
auth=auth,
390
prefetch=1, # Minimal prefetch
391
auto_complete=False # Manual acknowledgment
392
)
393
```
394
395
### Batch Operations
396
397
Use batch operations for better performance:
398
399
```python
400
# Batch sending (more efficient than individual sends)
401
messages = [Message(f"Batch message {i}") for i in range(100)]
402
with SendClient(target, auth=auth) as client:
403
results = client.send_message_batch(messages)
404
```
405
406
### Management Operations
407
408
AMQP management request/response operations for interacting with broker management interfaces.
409
410
```python { .api }
411
class MgmtOperation:
412
def __init__(self, session, target=None, debug=False,
413
status_code_field=b'statusCode',
414
description_fields=b'statusDescription',
415
encoding='UTF-8'):
416
"""
417
AMQP management operation client.
418
419
Parameters:
420
- session (Session): AMQP session for the operation
421
- target (bytes or str): Target node name (default: b"$management")
422
- debug (bool): Enable debug logging
423
- status_code_field (bytes): Response status code field name
424
- description_fields (bytes): Response description field name
425
- encoding (str): Character encoding
426
"""
427
428
def execute(self, operation, op_type, message, timeout=0):
429
"""
430
Execute a management operation.
431
432
Parameters:
433
- operation (str): Operation name
434
- op_type (str): Operation type
435
- message (Message): Request message
436
- timeout (int): Operation timeout in milliseconds
437
438
Returns:
439
Message: Response message
440
"""
441
```
442
443
**Usage Example:**
444
445
```python
446
from uamqp import Session, MgmtOperation, Message
447
448
# Create management operation
449
with Session(connection) as session:
450
mgmt_op = MgmtOperation(session, target=b"$management")
451
452
# Execute management request
453
request = Message({"operation": "read-queue-info"})
454
response = mgmt_op.execute("read", "queue", request, timeout=30000)
455
456
print(f"Response: {response.get_data()}")
457
```
458
459
## Common Client Errors
460
461
Client operations may raise these exceptions:
462
463
- **AMQPConnectionError**: Connection to broker failed
464
- **MessageSendFailed**: Message sending failed
465
- **ClientTimeout**: Operation timed out
466
- **AMQPClientShutdown**: Client was shut down
467
- **LinkDetach**: Link was detached by broker