0
# Address and Endpoints
1
2
AMQP endpoint addressing including source and target configuration with support for filters, dynamic addresses, and link properties for flexible message routing and delivery.
3
4
## Capabilities
5
6
### Base Address Class
7
8
Base class for AMQP endpoint representation that handles address parsing and configuration.
9
10
```python { .api }
11
class Address:
12
def __init__(self, address=None, encoding='UTF-8'):
13
"""
14
Base AMQP endpoint representation.
15
16
Parameters:
17
- address (str): AMQP address string
18
- encoding (str): Character encoding
19
"""
20
21
@property
22
def hostname: str
23
"""Hostname from address."""
24
25
@property
26
def scheme: str
27
"""URI scheme (amqp, amqps)."""
28
29
@property
30
def username: str
31
"""Username from address."""
32
33
@property
34
def password: str
35
"""Password from address."""
36
37
@property
38
def address: str
39
"""The address string."""
40
41
@property
42
def durable: bool
43
"""Whether the address is durable."""
44
45
@property
46
def expiry_policy: str
47
"""Address expiry policy."""
48
49
@property
50
def timeout: int
51
"""Address timeout."""
52
53
@property
54
def dynamic: bool
55
"""Whether the address is dynamic."""
56
57
@property
58
def distribution_mode: str
59
"""Message distribution mode."""
60
61
@classmethod
62
def from_c_obj(cls, c_value, encoding='UTF-8'):
63
"""Create Address from C object."""
64
```
65
66
**Usage Examples:**
67
68
```python
69
from uamqp.address import Address
70
71
# Simple address
72
addr = Address("myqueue")
73
print(f"Address: {addr.address}")
74
75
# Full URI address
76
addr = Address("amqps://user:pass@broker.com:5671/queue?timeout=30")
77
print(f"Hostname: {addr.hostname}")
78
print(f"Scheme: {addr.scheme}")
79
print(f"Username: {addr.username}")
80
```
81
82
### Source Address
83
84
AMQP source endpoint for receiving messages with filtering capabilities and advanced configuration options.
85
86
```python { .api }
87
class Source(Address):
88
def __init__(self, address=None, **kwargs):
89
"""
90
AMQP source endpoint for receiving messages.
91
92
Parameters:
93
- address (str): Source address string
94
- **kwargs: Additional source configuration options
95
"""
96
97
@property
98
def filter_key: str
99
"""Filter key for message selection."""
100
101
def get_filter(self, name):
102
"""
103
Get a filter by name.
104
105
Parameters:
106
- name (str): Filter name
107
108
Returns:
109
Filter value or None
110
"""
111
112
def set_filter(self, value, name=None, descriptor=None):
113
"""
114
Set a message filter.
115
116
Parameters:
117
- value: Filter value
118
- name (str): Filter name
119
- descriptor: Filter descriptor
120
"""
121
```
122
123
**Usage Examples:**
124
125
```python
126
from uamqp.address import Source
127
128
# Basic source
129
source = Source("amqps://servicebus.windows.net/myqueue")
130
131
# Source with filter
132
source = Source("amqps://servicebus.windows.net/mytopic/Subscriptions/mysub")
133
134
# Set SQL filter for message selection
135
source.set_filter(
136
"color = 'red' AND price > 100",
137
name="sql-filter",
138
descriptor="apache.org:selector-filter:string"
139
)
140
141
# Set correlation ID filter
142
source.set_filter(
143
"correlation-123",
144
name="correlation-filter"
145
)
146
147
# Get filter value
148
current_filter = source.get_filter("sql-filter")
149
print(f"Current filter: {current_filter}")
150
```
151
152
### Target Address
153
154
AMQP target endpoint for sending messages with routing and delivery configuration.
155
156
```python { .api }
157
class Target(Address):
158
def __init__(self, address=None, **kwargs):
159
"""
160
AMQP target endpoint for sending messages.
161
162
Parameters:
163
- address (str): Target address string
164
- **kwargs: Additional target configuration options
165
"""
166
```
167
168
**Usage Examples:**
169
170
```python
171
from uamqp.address import Target
172
173
# Basic target
174
target = Target("amqps://servicebus.windows.net/myqueue")
175
176
# Target with specific properties
177
target = Target(
178
address="amqps://servicebus.windows.net/mytopic",
179
durable=True,
180
timeout=30000
181
)
182
183
# Dynamic target (broker assigns address)
184
target = Target(dynamic=True)
185
```
186
187
## Address Configuration
188
189
### Address Properties
190
191
Configure various address properties for different messaging patterns.
192
193
```python
194
# Durable address (survives broker restart)
195
source = Source("persistent-queue", durable=True)
196
197
# Temporary address (deleted when unused)
198
source = Source("temp-queue", durable=False, expiry_policy="session-end")
199
200
# Dynamic address (broker generates name)
201
source = Source(dynamic=True)
202
203
# Address with timeout
204
target = Target("slow-queue", timeout=60000) # 60 seconds
205
```
206
207
### Distribution Modes
208
209
Configure how messages are distributed to multiple consumers.
210
211
```python
212
# Copy distribution (each consumer gets a copy)
213
source = Source("broadcast-topic", distribution_mode="copy")
214
215
# Move distribution (only one consumer gets each message)
216
source = Source("work-queue", distribution_mode="move")
217
```
218
219
## Message Filtering
220
221
### SQL Filters
222
223
Use SQL-like expressions to filter messages based on properties.
224
225
```python
226
from uamqp.address import Source
227
228
# Create source with SQL filter
229
source = Source("amqps://servicebus.windows.net/mytopic/Subscriptions/mysub")
230
231
# Filter by message properties
232
sql_filter = """
233
(priority = 'high' OR priority = 'critical')
234
AND region = 'us-west'
235
AND timestamp > '2023-01-01'
236
"""
237
238
source.set_filter(
239
sql_filter,
240
name="priority-region-filter",
241
descriptor="apache.org:selector-filter:string"
242
)
243
244
# Use with receive client
245
from uamqp import ReceiveClient
246
with ReceiveClient(source, auth=auth) as client:
247
messages = client.receive_message_batch(timeout=30000)
248
# Only messages matching filter will be received
249
```
250
251
### Correlation Filters
252
253
Filter messages by correlation ID for request-response patterns.
254
255
```python
256
# Set correlation filter for specific conversation
257
correlation_id = "conversation-12345"
258
source.set_filter(
259
correlation_id,
260
name="correlation-filter",
261
descriptor="apache.org:correlation-filter:string"
262
)
263
264
# Multiple correlation filters
265
correlation_ids = ["conv-1", "conv-2", "conv-3"]
266
for i, cid in enumerate(correlation_ids):
267
source.set_filter(
268
cid,
269
name=f"correlation-filter-{i}",
270
descriptor="apache.org:correlation-filter:string"
271
)
272
```
273
274
### Custom Filters
275
276
Create custom filters for advanced message selection.
277
278
```python
279
# XPath filter for XML message content
280
xpath_filter = "//order[@status='pending' and @total>1000]"
281
source.set_filter(
282
xpath_filter,
283
name="xpath-filter",
284
descriptor="apache.org:xpath-filter:string"
285
)
286
287
# Binary filter for exact byte matching
288
binary_filter = b'\x01\x02\x03'
289
source.set_filter(
290
binary_filter,
291
name="binary-filter",
292
descriptor="apache.org:binary-filter:binary"
293
)
294
```
295
296
## Advanced Address Patterns
297
298
### Request-Response Pattern
299
300
Configure addresses for request-response messaging.
301
302
```python
303
from uamqp.address import Source, Target
304
from uamqp import Message, SendClient, ReceiveClient
305
import uuid
306
307
def setup_request_response():
308
# Request target
309
request_target = Target("amqps://broker.com/requests")
310
311
# Dynamic response source (broker creates unique address)
312
response_source = Source(dynamic=True)
313
314
return request_target, response_source
315
316
def send_request_with_response(request_data, auth):
317
request_target, response_source = setup_request_response()
318
319
# Create response receiver first
320
with ReceiveClient(response_source, auth=auth) as response_client:
321
# Get the actual response address assigned by broker
322
response_address = response_client.source_address
323
324
# Create request message with reply-to
325
request_id = str(uuid.uuid4())
326
request = Message(
327
request_data,
328
properties={
329
'reply_to': response_address,
330
'correlation_id': request_id
331
}
332
)
333
334
# Send request
335
with SendClient(request_target, auth=auth) as request_client:
336
request_client.queue_message(request)
337
request_client.send_all_messages()
338
339
# Wait for response
340
response_source.set_filter(
341
request_id,
342
name="correlation-filter"
343
)
344
345
responses = response_client.receive_message_batch(timeout=30000)
346
if responses:
347
return responses[0].get_data()
348
else:
349
raise TimeoutError("No response received")
350
```
351
352
### Publish-Subscribe Pattern
353
354
Configure addresses for publish-subscribe messaging.
355
356
```python
357
def setup_pub_sub(topic_name, subscription_names):
358
"""Setup publish-subscribe addresses."""
359
360
# Publisher target
361
publisher_target = Target(f"amqps://broker.com/{topic_name}")
362
363
# Subscriber sources
364
subscriber_sources = []
365
for sub_name in subscription_names:
366
source = Source(
367
f"amqps://broker.com/{topic_name}/Subscriptions/{sub_name}",
368
distribution_mode="copy" # Each subscriber gets a copy
369
)
370
subscriber_sources.append(source)
371
372
return publisher_target, subscriber_sources
373
374
# Usage
375
target, sources = setup_pub_sub("events", ["sub1", "sub2", "sub3"])
376
377
# Publish message
378
with SendClient(target, auth=auth) as publisher:
379
event = Message({"event": "user_login", "user_id": 12345})
380
publisher.queue_message(event)
381
publisher.send_all_messages()
382
383
# Subscribe to messages
384
for i, source in enumerate(sources):
385
with ReceiveClient(source, auth=auth) as subscriber:
386
messages = subscriber.receive_message_batch(timeout=10000)
387
print(f"Subscriber {i+1} received {len(messages)} messages")
388
```
389
390
### Work Queue Pattern
391
392
Configure addresses for work distribution patterns.
393
394
```python
395
def setup_work_queue(queue_name, worker_count):
396
"""Setup work queue with multiple workers."""
397
398
# Work queue source with move distribution
399
work_source = Source(
400
f"amqps://broker.com/{queue_name}",
401
distribution_mode="move" # Only one worker gets each message
402
)
403
404
# Dead letter queue for failed messages
405
dlq_target = Target(f"amqps://broker.com/{queue_name}-dlq")
406
407
return work_source, dlq_target
408
409
def process_work_items(work_source, dlq_target, auth):
410
"""Process work items with error handling."""
411
412
with ReceiveClient(work_source, auth=auth) as worker, \
413
SendClient(dlq_target, auth=auth) as dlq_sender:
414
415
while True:
416
messages = worker.receive_message_batch(max_batch_size=10, timeout=30000)
417
418
if not messages:
419
break # No more work
420
421
for message in messages:
422
try:
423
# Process work item
424
work_data = message.get_data()
425
result = process_work_item(work_data)
426
427
# Success - accept message
428
message.accept()
429
430
except Exception as e:
431
# Failed - send to dead letter queue
432
error_msg = Message({
433
"original_data": message.get_data(),
434
"error": str(e),
435
"timestamp": time.time()
436
})
437
438
dlq_sender.queue_message(error_msg)
439
dlq_sender.send_all_messages()
440
441
# Reject original message
442
message.reject(
443
condition="processing-error",
444
description=str(e)
445
)
446
447
def process_work_item(data):
448
"""Process individual work item."""
449
# Simulate work processing
450
import time
451
time.sleep(0.1)
452
return f"Processed: {data}"
453
```
454
455
## Address Validation and Parsing
456
457
### Address Validation
458
459
```python
460
def validate_address(address_str):
461
"""Validate AMQP address format."""
462
try:
463
addr = Address(address_str)
464
465
# Check required components
466
if not addr.hostname:
467
raise ValueError("Hostname required")
468
469
if addr.scheme not in ['amqp', 'amqps']:
470
raise ValueError("Scheme must be amqp or amqps")
471
472
if addr.scheme == 'amqps' and not addr.hostname:
473
raise ValueError("TLS requires valid hostname")
474
475
return True
476
477
except Exception as e:
478
print(f"Invalid address '{address_str}': {e}")
479
return False
480
481
# Usage
482
addresses = [
483
"amqps://broker.com/queue", # Valid
484
"amqp://broker.com:5672/topic", # Valid
485
"invalid://broker.com/queue", # Invalid scheme
486
"amqps:///queue" # Invalid - missing hostname
487
]
488
489
for addr in addresses:
490
is_valid = validate_address(addr)
491
print(f"{addr}: {'Valid' if is_valid else 'Invalid'}")
492
```
493
494
### Address Parsing
495
496
```python
497
def parse_address_components(address_str):
498
"""Parse address into components."""
499
addr = Address(address_str)
500
501
return {
502
'full_address': addr.address,
503
'scheme': addr.scheme,
504
'hostname': addr.hostname,
505
'username': addr.username,
506
'password': '***' if addr.password else None, # Don't log passwords
507
'port': getattr(addr, 'port', None),
508
'path': getattr(addr, 'path', None),
509
'is_secure': addr.scheme == 'amqps',
510
'is_dynamic': addr.dynamic,
511
'is_durable': addr.durable
512
}
513
514
# Usage
515
components = parse_address_components("amqps://user@broker.com:5671/myqueue?durable=true")
516
for key, value in components.items():
517
print(f"{key}: {value}")
518
```