0
# Low-Level Protocol Access
1
2
Direct access to AMQP protocol elements including message senders, receivers, and protocol-level message handling for advanced use cases that require fine-grained control over AMQP 1.0 protocol behavior.
3
4
## Capabilities
5
6
### Message Sender
7
8
Low-level message sender link that provides direct control over message transmission and settlement.
9
10
```python { .api }
11
class MessageSender:
12
def __init__(self, session, source, target, name=None, send_settle_mode=None,
13
max_message_size=None, link_properties=None,
14
desired_capabilities=None):
15
"""
16
Low-level message sender link.
17
18
Parameters:
19
- session (Session): AMQP session
20
- source (Source): Link source address
21
- target (Target): Link target address
22
- name (str): Link name
23
- send_settle_mode (SenderSettleMode): Settlement mode
24
- max_message_size (int): Maximum message size
25
- link_properties (dict): Link properties
26
- desired_capabilities (list): Desired link capabilities
27
"""
28
```
29
30
**Key Methods:**
31
32
```python { .api }
33
def open(self):
34
"""Open the sender link."""
35
36
def close(self):
37
"""Close the sender link."""
38
39
def send_async(self, message, callback=None):
40
"""
41
Send a message asynchronously.
42
43
Parameters:
44
- message (Message): Message to send
45
- callback (callable): Completion callback
46
47
Returns:
48
MessageState: Send operation state
49
"""
50
51
def work(self):
52
"""Process sender work (I/O and protocol handling)."""
53
54
def destroy(self):
55
"""Destroy the sender and free resources."""
56
```
57
58
**Key Properties:**
59
60
```python { .api }
61
@property
62
def name: str
63
"""Link name."""
64
65
@property
66
def source: Source
67
"""Link source address."""
68
69
@property
70
def target: Target
71
"""Link target address."""
72
73
@property
74
def max_message_size: int
75
"""Maximum message size."""
76
77
@property
78
def send_settle_mode: int
79
"""Sender settlement mode."""
80
```
81
82
**Usage Examples:**
83
84
```python
85
from uamqp import Connection, Session, MessageSender
86
from uamqp.address import Source, Target
87
from uamqp import Message
88
from uamqp.constants import SenderSettleMode
89
90
# Create low-level sender
91
connection = Connection("amqp.example.com", sasl=auth)
92
connection.open()
93
94
session = Session(connection)
95
session.begin()
96
97
source = Source() # Null source for sender
98
target = Target("myqueue")
99
100
sender = MessageSender(
101
session=session,
102
source=source,
103
target=target,
104
name="my-sender",
105
send_settle_mode=SenderSettleMode.Mixed,
106
max_message_size=1048576 # 1MB
107
)
108
109
try:
110
sender.open()
111
112
# Send message with callback
113
def send_callback(message, result, error):
114
if error:
115
print(f"Send failed: {error}")
116
else:
117
print(f"Send completed: {result}")
118
119
message = Message("Hello World")
120
result = sender.send_async(message, callback=send_callback)
121
122
# Process until send completes
123
while result == MessageState.WaitingForSendAck:
124
sender.work()
125
connection.work()
126
127
finally:
128
sender.close()
129
session.end()
130
connection.close()
131
```
132
133
### Message Receiver
134
135
Low-level message receiver link that provides direct control over message reception and flow control.
136
137
```python { .api }
138
class MessageReceiver:
139
def __init__(self, session, source, target, name=None,
140
receive_settle_mode=None, max_message_size=None,
141
prefetch=None, link_properties=None,
142
desired_capabilities=None):
143
"""
144
Low-level message receiver link.
145
146
Parameters:
147
- session (Session): AMQP session
148
- source (Source): Link source address
149
- target (Target): Link target address
150
- name (str): Link name
151
- receive_settle_mode (ReceiverSettleMode): Settlement mode
152
- max_message_size (int): Maximum message size
153
- prefetch (int): Number of messages to prefetch
154
- link_properties (dict): Link properties
155
- desired_capabilities (list): Desired link capabilities
156
"""
157
```
158
159
**Key Methods:**
160
161
```python { .api }
162
def open(self):
163
"""Open the receiver link."""
164
165
def close(self):
166
"""Close the receiver link."""
167
168
def receive_message_batch(self, max_batch_size=None):
169
"""
170
Receive a batch of messages.
171
172
Parameters:
173
- max_batch_size (int): Maximum messages to receive
174
175
Returns:
176
list[Message]: Received messages
177
"""
178
179
def work(self):
180
"""Process receiver work (I/O and protocol handling)."""
181
182
def flow(self, link_credit):
183
"""
184
Grant link credit for message flow control.
185
186
Parameters:
187
- link_credit (int): Number of credits to grant
188
"""
189
190
def destroy(self):
191
"""Destroy the receiver and free resources."""
192
```
193
194
**Key Properties:**
195
196
```python { .api }
197
@property
198
def name: str
199
"""Link name."""
200
201
@property
202
def source: Source
203
"""Link source address."""
204
205
@property
206
def target: Target
207
"""Link target address."""
208
209
@property
210
def max_message_size: int
211
"""Maximum message size."""
212
213
@property
214
def receive_settle_mode: int
215
"""Receiver settlement mode."""
216
217
@property
218
def prefetch: int
219
"""Prefetch count."""
220
```
221
222
**Usage Examples:**
223
224
```python
225
from uamqp import MessageReceiver
226
from uamqp.constants import ReceiverSettleMode
227
228
# Create low-level receiver
229
source = Source("myqueue")
230
target = Target() # Null target for receiver
231
232
receiver = MessageReceiver(
233
session=session,
234
source=source,
235
target=target,
236
name="my-receiver",
237
receive_settle_mode=ReceiverSettleMode.PeekLock,
238
prefetch=100,
239
max_message_size=1048576
240
)
241
242
try:
243
receiver.open()
244
245
# Grant initial credits
246
receiver.flow(10)
247
248
# Receive messages in loop
249
while True:
250
messages = receiver.receive_message_batch(max_batch_size=5)
251
252
if not messages:
253
# No messages, process connection
254
receiver.work()
255
connection.work()
256
continue
257
258
print(f"Received {len(messages)} messages")
259
260
for message in messages:
261
try:
262
data = message.get_data()
263
print(f"Message: {data}")
264
265
# Process message
266
process_message(data)
267
268
# Accept message
269
message.accept()
270
271
except Exception as e:
272
print(f"Processing error: {e}")
273
message.reject(
274
condition="processing-error",
275
description=str(e)
276
)
277
278
# Grant more credits after processing
279
receiver.flow(len(messages))
280
281
finally:
282
receiver.close()
283
```
284
285
### Advanced Protocol Control
286
287
#### Manual Credit Management
288
289
```python
290
class CreditManager:
291
def __init__(self, receiver, initial_credits=10, min_credits=5):
292
self.receiver = receiver
293
self.initial_credits = initial_credits
294
self.min_credits = min_credits
295
self.granted_credits = 0
296
self.processed_messages = 0
297
298
def start(self):
299
"""Grant initial credits."""
300
self.receiver.flow(self.initial_credits)
301
self.granted_credits = self.initial_credits
302
303
def on_message_processed(self):
304
"""Call when a message is processed."""
305
self.processed_messages += 1
306
307
# Calculate remaining credits
308
remaining_credits = self.granted_credits - self.processed_messages
309
310
# Grant more credits if running low
311
if remaining_credits <= self.min_credits:
312
additional_credits = self.initial_credits - remaining_credits
313
self.receiver.flow(additional_credits)
314
self.granted_credits += additional_credits
315
print(f"Granted {additional_credits} additional credits")
316
317
# Usage
318
credit_manager = CreditManager(receiver, initial_credits=20, min_credits=5)
319
credit_manager.start()
320
321
while True:
322
messages = receiver.receive_message_batch(max_batch_size=10)
323
324
for message in messages:
325
# Process message
326
process_message(message)
327
message.accept()
328
329
# Update credit management
330
credit_manager.on_message_processed()
331
```
332
333
#### Link State Management
334
335
```python
336
from uamqp.constants import MessageSenderState, MessageReceiverState
337
338
def monitor_link_state(link, link_type="sender"):
339
"""Monitor and handle link state changes."""
340
341
if link_type == "sender":
342
states = MessageSenderState
343
else:
344
states = MessageReceiverState
345
346
current_state = link.get_state()
347
348
if current_state == states.Opening:
349
print("Link is opening...")
350
# Wait for open to complete
351
while link.get_state() == states.Opening:
352
link.work()
353
time.sleep(0.1)
354
355
elif current_state == states.Open:
356
print("Link is open and ready")
357
return True
358
359
elif current_state == states.Error:
360
print("Link is in error state")
361
error_info = link.get_error_info()
362
print(f"Error: {error_info}")
363
return False
364
365
elif current_state == states.Closing:
366
print("Link is closing...")
367
return False
368
369
return current_state == states.Open
370
371
# Usage
372
sender_ready = monitor_link_state(sender, "sender")
373
if sender_ready:
374
# Proceed with sending
375
pass
376
```
377
378
#### Custom Link Properties
379
380
```python
381
def create_sender_with_properties(session, target):
382
"""Create sender with custom link properties."""
383
384
link_properties = {
385
'x-opt-jms-dest': 1, # JMS destination type
386
'x-opt-enqueuetime': True, # Include enqueue time
387
'product': 'MyApplication', # Application identifier
388
'version': '1.0.0' # Application version
389
}
390
391
desired_capabilities = [
392
'ANONYMOUS-RELAY', # Anonymous relay capability
393
'DELAYED_DELIVERY' # Delayed delivery capability
394
]
395
396
sender = MessageSender(
397
session=session,
398
source=Source(),
399
target=target,
400
link_properties=link_properties,
401
desired_capabilities=desired_capabilities
402
)
403
404
return sender
405
406
# Check if capabilities were granted
407
def check_link_capabilities(sender):
408
"""Check which capabilities were granted by peer."""
409
410
remote_capabilities = sender.get_remote_capabilities()
411
desired_capabilities = sender.desired_capabilities
412
413
granted = []
414
denied = []
415
416
for capability in desired_capabilities:
417
if capability in remote_capabilities:
418
granted.append(capability)
419
else:
420
denied.append(capability)
421
422
print(f"Granted capabilities: {granted}")
423
print(f"Denied capabilities: {denied}")
424
425
return granted, denied
426
```
427
428
### Performance Optimization
429
430
#### Batch Processing
431
432
```python
433
class BatchProcessor:
434
def __init__(self, receiver, batch_size=100, timeout=5.0):
435
self.receiver = receiver
436
self.batch_size = batch_size
437
self.timeout = timeout
438
self.message_buffer = []
439
440
def process_batches(self):
441
"""Process messages in batches for better throughput."""
442
443
start_time = time.time()
444
445
while True:
446
messages = self.receiver.receive_message_batch(
447
max_batch_size=self.batch_size - len(self.message_buffer)
448
)
449
450
self.message_buffer.extend(messages)
451
452
# Process batch if full or timeout reached
453
if (len(self.message_buffer) >= self.batch_size or
454
time.time() - start_time > self.timeout):
455
456
if self.message_buffer:
457
self._process_batch(self.message_buffer)
458
self.message_buffer = []
459
start_time = time.time()
460
461
# Service the connection
462
self.receiver.work()
463
464
def _process_batch(self, messages):
465
"""Process a batch of messages."""
466
print(f"Processing batch of {len(messages)} messages")
467
468
for message in messages:
469
try:
470
# Process message
471
data = message.get_data()
472
process_message_fast(data)
473
message.accept()
474
475
except Exception as e:
476
print(f"Error processing message: {e}")
477
message.reject()
478
479
# Usage
480
batch_processor = BatchProcessor(receiver, batch_size=50, timeout=2.0)
481
batch_processor.process_batches()
482
```
483
484
#### Parallel Processing
485
486
```python
487
import threading
488
from queue import Queue
489
490
class ParallelProcessor:
491
def __init__(self, receiver, worker_count=4):
492
self.receiver = receiver
493
self.worker_count = worker_count
494
self.message_queue = Queue()
495
self.workers = []
496
self.running = False
497
498
def start(self):
499
"""Start parallel message processing."""
500
self.running = True
501
502
# Start worker threads
503
for i in range(self.worker_count):
504
worker = threading.Thread(target=self._worker_thread, args=(i,))
505
worker.daemon = True
506
worker.start()
507
self.workers.append(worker)
508
509
# Start receiver thread
510
receiver_thread = threading.Thread(target=self._receiver_thread)
511
receiver_thread.daemon = True
512
receiver_thread.start()
513
514
def stop(self):
515
"""Stop parallel processing."""
516
self.running = False
517
518
# Wait for workers to finish
519
for worker in self.workers:
520
worker.join(timeout=5.0)
521
522
def _receiver_thread(self):
523
"""Receive messages and queue for processing."""
524
while self.running:
525
try:
526
messages = self.receiver.receive_message_batch(max_batch_size=10)
527
528
for message in messages:
529
self.message_queue.put(message)
530
531
self.receiver.work()
532
533
except Exception as e:
534
print(f"Receiver error: {e}")
535
time.sleep(1)
536
537
def _worker_thread(self, worker_id):
538
"""Worker thread for processing messages."""
539
print(f"Worker {worker_id} started")
540
541
while self.running:
542
try:
543
# Get message from queue with timeout
544
message = self.message_queue.get(timeout=1.0)
545
546
# Process message
547
data = message.get_data()
548
result = process_message_threadsafe(data)
549
550
if result:
551
message.accept()
552
else:
553
message.reject()
554
555
self.message_queue.task_done()
556
557
except Queue.Empty:
558
continue # Timeout, check if still running
559
except Exception as e:
560
print(f"Worker {worker_id} error: {e}")
561
562
# Usage
563
processor = ParallelProcessor(receiver, worker_count=8)
564
processor.start()
565
566
# Let it run for a while
567
time.sleep(60)
568
569
processor.stop()
570
```
571
572
### Management Operations
573
574
Low-level AMQP management operations for advanced broker interaction and administrative tasks.
575
576
#### Management Operation
577
578
```python { .api }
579
class MgmtOperation:
580
def __init__(self, session, target=None, debug=False):
581
"""
582
AMQP management operation handler.
583
584
Parameters:
585
- session (Session): AMQP session
586
- target (Target): Management target endpoint
587
- debug (bool): Enable debug logging
588
"""
589
590
def open(self):
591
"""Open the management operation link."""
592
593
def close(self):
594
"""Close the management operation link."""
595
596
def execute_async(self, operation, op_type, locales=None, timeout=0):
597
"""
598
Execute a management operation asynchronously.
599
600
Parameters:
601
- operation (str): Operation name
602
- op_type (str): Operation type
603
- locales (list): Supported locales
604
- timeout (int): Operation timeout in milliseconds
605
606
Returns:
607
Management operation result
608
"""
609
```
610
611
#### Async Management Operation
612
613
```python { .api }
614
class MgmtOperationAsync:
615
def __init__(self, session, target=None, debug=False, loop=None):
616
"""
617
Async AMQP management operation handler.
618
619
Parameters:
620
- session (SessionAsync): Async AMQP session
621
- target (Target): Management target endpoint
622
- debug (bool): Enable debug logging
623
- loop: Asyncio event loop
624
"""
625
626
async def open_async(self):
627
"""Asynchronously open the management operation link."""
628
629
async def close_async(self):
630
"""Asynchronously close the management operation link."""
631
632
async def execute_async(self, operation, op_type, locales=None, timeout=0):
633
"""
634
Execute a management operation asynchronously.
635
636
Parameters:
637
- operation (str): Operation name
638
- op_type (str): Operation type
639
- locales (list): Supported locales
640
- timeout (int): Operation timeout in milliseconds
641
642
Returns:
643
Management operation result
644
"""
645
```
646
647
**Usage Examples:**
648
649
```python
650
from uamqp.mgmt_operation import MgmtOperation
651
from uamqp.address import Target
652
653
# Create management operation
654
mgmt_target = Target("$management")
655
mgmt_op = MgmtOperation(session, target=mgmt_target)
656
657
try:
658
mgmt_op.open()
659
660
# Execute management operation (e.g., get queue info)
661
result = mgmt_op.execute_async(
662
operation="READ",
663
op_type="com.microsoft:queue",
664
timeout=30000
665
)
666
667
print(f"Management result: {result}")
668
669
finally:
670
mgmt_op.close()
671
672
# Async management operations
673
from uamqp.async_ops.mgmt_operation_async import MgmtOperationAsync
674
675
async def async_management_example():
676
mgmt_op = MgmtOperationAsync(async_session, target=mgmt_target)
677
678
try:
679
await mgmt_op.open_async()
680
681
result = await mgmt_op.execute_async(
682
operation="CREATE",
683
op_type="com.microsoft:queue",
684
timeout=30000
685
)
686
687
print(f"Async management result: {result}")
688
689
finally:
690
await mgmt_op.close_async()
691
```
692
693
### Error Recovery
694
695
#### Link Recovery
696
697
```python
698
def create_resilient_sender(session, target, max_retries=3):
699
"""Create sender with automatic recovery."""
700
701
for attempt in range(max_retries):
702
try:
703
sender = MessageSender(session, Source(), target)
704
sender.open()
705
706
# Wait for link to open
707
while sender.get_state() == MessageSenderState.Opening:
708
sender.work()
709
time.sleep(0.1)
710
711
if sender.get_state() == MessageSenderState.Open:
712
print(f"Sender opened successfully on attempt {attempt + 1}")
713
return sender
714
else:
715
raise Exception(f"Sender failed to open: {sender.get_state()}")
716
717
except Exception as e:
718
print(f"Sender creation attempt {attempt + 1} failed: {e}")
719
if attempt < max_retries - 1:
720
time.sleep(2 ** attempt) # Exponential backoff
721
else:
722
raise
723
724
return None
725
726
# Usage with recovery
727
sender = create_resilient_sender(session, target)
728
if sender:
729
# Use sender...
730
pass
731
```