or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

address-endpoints.mdasync-operations.mdauthentication.mdclient-apis.mdconnection-session.mderror-handling.mdhigh-level-messaging.mdindex.mdlow-level-protocol.mdmessage-management.mdtypes-constants.md

address-endpoints.mddocs/

0

# Address and Endpoints

1

2

AMQP endpoint addressing including source and target configuration with support for filters, dynamic addresses, and link properties for flexible message routing and delivery.

3

4

## Capabilities

5

6

### Base Address Class

7

8

Base class for AMQP endpoint representation that handles address parsing and configuration.

9

10

```python { .api }

11

class Address:

12

def __init__(self, address=None, encoding='UTF-8'):

13

"""

14

Base AMQP endpoint representation.

15

16

Parameters:

17

- address (str): AMQP address string

18

- encoding (str): Character encoding

19

"""

20

21

@property

22

def hostname: str

23

"""Hostname from address."""

24

25

@property

26

def scheme: str

27

"""URI scheme (amqp, amqps)."""

28

29

@property

30

def username: str

31

"""Username from address."""

32

33

@property

34

def password: str

35

"""Password from address."""

36

37

@property

38

def address: str

39

"""The address string."""

40

41

@property

42

def durable: bool

43

"""Whether the address is durable."""

44

45

@property

46

def expiry_policy: str

47

"""Address expiry policy."""

48

49

@property

50

def timeout: int

51

"""Address timeout."""

52

53

@property

54

def dynamic: bool

55

"""Whether the address is dynamic."""

56

57

@property

58

def distribution_mode: str

59

"""Message distribution mode."""

60

61

@classmethod

62

def from_c_obj(cls, c_value, encoding='UTF-8'):

63

"""Create Address from C object."""

64

```

65

66

**Usage Examples:**

67

68

```python

69

from uamqp.address import Address

70

71

# Simple address

72

addr = Address("myqueue")

73

print(f"Address: {addr.address}")

74

75

# Full URI address

76

addr = Address("amqps://user:pass@broker.com:5671/queue?timeout=30")

77

print(f"Hostname: {addr.hostname}")

78

print(f"Scheme: {addr.scheme}")

79

print(f"Username: {addr.username}")

80

```

81

82

### Source Address

83

84

AMQP source endpoint for receiving messages with filtering capabilities and advanced configuration options.

85

86

```python { .api }

87

class Source(Address):

88

def __init__(self, address=None, **kwargs):

89

"""

90

AMQP source endpoint for receiving messages.

91

92

Parameters:

93

- address (str): Source address string

94

- **kwargs: Additional source configuration options

95

"""

96

97

@property

98

def filter_key: str

99

"""Filter key for message selection."""

100

101

def get_filter(self, name):

102

"""

103

Get a filter by name.

104

105

Parameters:

106

- name (str): Filter name

107

108

Returns:

109

Filter value or None

110

"""

111

112

def set_filter(self, value, name=None, descriptor=None):

113

"""

114

Set a message filter.

115

116

Parameters:

117

- value: Filter value

118

- name (str): Filter name

119

- descriptor: Filter descriptor

120

"""

121

```

122

123

**Usage Examples:**

124

125

```python

126

from uamqp.address import Source

127

128

# Basic source

129

source = Source("amqps://servicebus.windows.net/myqueue")

130

131

# Source with filter

132

source = Source("amqps://servicebus.windows.net/mytopic/Subscriptions/mysub")

133

134

# Set SQL filter for message selection

135

source.set_filter(

136

"color = 'red' AND price > 100",

137

name="sql-filter",

138

descriptor="apache.org:selector-filter:string"

139

)

140

141

# Set correlation ID filter

142

source.set_filter(

143

"correlation-123",

144

name="correlation-filter"

145

)

146

147

# Get filter value

148

current_filter = source.get_filter("sql-filter")

149

print(f"Current filter: {current_filter}")

150

```

151

152

### Target Address

153

154

AMQP target endpoint for sending messages with routing and delivery configuration.

155

156

```python { .api }

157

class Target(Address):

158

def __init__(self, address=None, **kwargs):

159

"""

160

AMQP target endpoint for sending messages.

161

162

Parameters:

163

- address (str): Target address string

164

- **kwargs: Additional target configuration options

165

"""

166

```

167

168

**Usage Examples:**

169

170

```python

171

from uamqp.address import Target

172

173

# Basic target

174

target = Target("amqps://servicebus.windows.net/myqueue")

175

176

# Target with specific properties

177

target = Target(

178

address="amqps://servicebus.windows.net/mytopic",

179

durable=True,

180

timeout=30000

181

)

182

183

# Dynamic target (broker assigns address)

184

target = Target(dynamic=True)

185

```

186

187

## Address Configuration

188

189

### Address Properties

190

191

Configure various address properties for different messaging patterns.

192

193

```python

194

# Durable address (survives broker restart)

195

source = Source("persistent-queue", durable=True)

196

197

# Temporary address (deleted when unused)

198

source = Source("temp-queue", durable=False, expiry_policy="session-end")

199

200

# Dynamic address (broker generates name)

201

source = Source(dynamic=True)

202

203

# Address with timeout

204

target = Target("slow-queue", timeout=60000) # 60 seconds

205

```

206

207

### Distribution Modes

208

209

Configure how messages are distributed to multiple consumers.

210

211

```python

212

# Copy distribution (each consumer gets a copy)

213

source = Source("broadcast-topic", distribution_mode="copy")

214

215

# Move distribution (only one consumer gets each message)

216

source = Source("work-queue", distribution_mode="move")

217

```

218

219

## Message Filtering

220

221

### SQL Filters

222

223

Use SQL-like expressions to filter messages based on properties.

224

225

```python

226

from uamqp.address import Source

227

228

# Create source with SQL filter

229

source = Source("amqps://servicebus.windows.net/mytopic/Subscriptions/mysub")

230

231

# Filter by message properties

232

sql_filter = """

233

(priority = 'high' OR priority = 'critical')

234

AND region = 'us-west'

235

AND timestamp > '2023-01-01'

236

"""

237

238

source.set_filter(

239

sql_filter,

240

name="priority-region-filter",

241

descriptor="apache.org:selector-filter:string"

242

)

243

244

# Use with receive client

245

from uamqp import ReceiveClient

246

with ReceiveClient(source, auth=auth) as client:

247

messages = client.receive_message_batch(timeout=30000)

248

# Only messages matching filter will be received

249

```

250

251

### Correlation Filters

252

253

Filter messages by correlation ID for request-response patterns.

254

255

```python

256

# Set correlation filter for specific conversation

257

correlation_id = "conversation-12345"

258

source.set_filter(

259

correlation_id,

260

name="correlation-filter",

261

descriptor="apache.org:correlation-filter:string"

262

)

263

264

# Multiple correlation filters

265

correlation_ids = ["conv-1", "conv-2", "conv-3"]

266

for i, cid in enumerate(correlation_ids):

267

source.set_filter(

268

cid,

269

name=f"correlation-filter-{i}",

270

descriptor="apache.org:correlation-filter:string"

271

)

272

```

273

274

### Custom Filters

275

276

Create custom filters for advanced message selection.

277

278

```python

279

# XPath filter for XML message content

280

xpath_filter = "//order[@status='pending' and @total>1000]"

281

source.set_filter(

282

xpath_filter,

283

name="xpath-filter",

284

descriptor="apache.org:xpath-filter:string"

285

)

286

287

# Binary filter for exact byte matching

288

binary_filter = b'\x01\x02\x03'

289

source.set_filter(

290

binary_filter,

291

name="binary-filter",

292

descriptor="apache.org:binary-filter:binary"

293

)

294

```

295

296

## Advanced Address Patterns

297

298

### Request-Response Pattern

299

300

Configure addresses for request-response messaging.

301

302

```python

303

from uamqp.address import Source, Target

304

from uamqp import Message, SendClient, ReceiveClient

305

import uuid

306

307

def setup_request_response():

308

# Request target

309

request_target = Target("amqps://broker.com/requests")

310

311

# Dynamic response source (broker creates unique address)

312

response_source = Source(dynamic=True)

313

314

return request_target, response_source

315

316

def send_request_with_response(request_data, auth):

317

request_target, response_source = setup_request_response()

318

319

# Create response receiver first

320

with ReceiveClient(response_source, auth=auth) as response_client:

321

# Get the actual response address assigned by broker

322

response_address = response_client.source_address

323

324

# Create request message with reply-to

325

request_id = str(uuid.uuid4())

326

request = Message(

327

request_data,

328

properties={

329

'reply_to': response_address,

330

'correlation_id': request_id

331

}

332

)

333

334

# Send request

335

with SendClient(request_target, auth=auth) as request_client:

336

request_client.queue_message(request)

337

request_client.send_all_messages()

338

339

# Wait for response

340

response_source.set_filter(

341

request_id,

342

name="correlation-filter"

343

)

344

345

responses = response_client.receive_message_batch(timeout=30000)

346

if responses:

347

return responses[0].get_data()

348

else:

349

raise TimeoutError("No response received")

350

```

351

352

### Publish-Subscribe Pattern

353

354

Configure addresses for publish-subscribe messaging.

355

356

```python

357

def setup_pub_sub(topic_name, subscription_names):

358

"""Setup publish-subscribe addresses."""

359

360

# Publisher target

361

publisher_target = Target(f"amqps://broker.com/{topic_name}")

362

363

# Subscriber sources

364

subscriber_sources = []

365

for sub_name in subscription_names:

366

source = Source(

367

f"amqps://broker.com/{topic_name}/Subscriptions/{sub_name}",

368

distribution_mode="copy" # Each subscriber gets a copy

369

)

370

subscriber_sources.append(source)

371

372

return publisher_target, subscriber_sources

373

374

# Usage

375

target, sources = setup_pub_sub("events", ["sub1", "sub2", "sub3"])

376

377

# Publish message

378

with SendClient(target, auth=auth) as publisher:

379

event = Message({"event": "user_login", "user_id": 12345})

380

publisher.queue_message(event)

381

publisher.send_all_messages()

382

383

# Subscribe to messages

384

for i, source in enumerate(sources):

385

with ReceiveClient(source, auth=auth) as subscriber:

386

messages = subscriber.receive_message_batch(timeout=10000)

387

print(f"Subscriber {i+1} received {len(messages)} messages")

388

```

389

390

### Work Queue Pattern

391

392

Configure addresses for work distribution patterns.

393

394

```python

395

def setup_work_queue(queue_name, worker_count):

396

"""Setup work queue with multiple workers."""

397

398

# Work queue source with move distribution

399

work_source = Source(

400

f"amqps://broker.com/{queue_name}",

401

distribution_mode="move" # Only one worker gets each message

402

)

403

404

# Dead letter queue for failed messages

405

dlq_target = Target(f"amqps://broker.com/{queue_name}-dlq")

406

407

return work_source, dlq_target

408

409

def process_work_items(work_source, dlq_target, auth):

410

"""Process work items with error handling."""

411

412

with ReceiveClient(work_source, auth=auth) as worker, \

413

SendClient(dlq_target, auth=auth) as dlq_sender:

414

415

while True:

416

messages = worker.receive_message_batch(max_batch_size=10, timeout=30000)

417

418

if not messages:

419

break # No more work

420

421

for message in messages:

422

try:

423

# Process work item

424

work_data = message.get_data()

425

result = process_work_item(work_data)

426

427

# Success - accept message

428

message.accept()

429

430

except Exception as e:

431

# Failed - send to dead letter queue

432

error_msg = Message({

433

"original_data": message.get_data(),

434

"error": str(e),

435

"timestamp": time.time()

436

})

437

438

dlq_sender.queue_message(error_msg)

439

dlq_sender.send_all_messages()

440

441

# Reject original message

442

message.reject(

443

condition="processing-error",

444

description=str(e)

445

)

446

447

def process_work_item(data):

448

"""Process individual work item."""

449

# Simulate work processing

450

import time

451

time.sleep(0.1)

452

return f"Processed: {data}"

453

```

454

455

## Address Validation and Parsing

456

457

### Address Validation

458

459

```python

460

def validate_address(address_str):

461

"""Validate AMQP address format."""

462

try:

463

addr = Address(address_str)

464

465

# Check required components

466

if not addr.hostname:

467

raise ValueError("Hostname required")

468

469

if addr.scheme not in ['amqp', 'amqps']:

470

raise ValueError("Scheme must be amqp or amqps")

471

472

if addr.scheme == 'amqps' and not addr.hostname:

473

raise ValueError("TLS requires valid hostname")

474

475

return True

476

477

except Exception as e:

478

print(f"Invalid address '{address_str}': {e}")

479

return False

480

481

# Usage

482

addresses = [

483

"amqps://broker.com/queue", # Valid

484

"amqp://broker.com:5672/topic", # Valid

485

"invalid://broker.com/queue", # Invalid scheme

486

"amqps:///queue" # Invalid - missing hostname

487

]

488

489

for addr in addresses:

490

is_valid = validate_address(addr)

491

print(f"{addr}: {'Valid' if is_valid else 'Invalid'}")

492

```

493

494

### Address Parsing

495

496

```python

497

def parse_address_components(address_str):

498

"""Parse address into components."""

499

addr = Address(address_str)

500

501

return {

502

'full_address': addr.address,

503

'scheme': addr.scheme,

504

'hostname': addr.hostname,

505

'username': addr.username,

506

'password': '***' if addr.password else None, # Don't log passwords

507

'port': getattr(addr, 'port', None),

508

'path': getattr(addr, 'path', None),

509

'is_secure': addr.scheme == 'amqps',

510

'is_dynamic': addr.dynamic,

511

'is_durable': addr.durable

512

}

513

514

# Usage

515

components = parse_address_components("amqps://user@broker.com:5671/myqueue?durable=true")

516

for key, value in components.items():

517

print(f"{key}: {value}")

518

```