or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

address-endpoints.mdasync-operations.mdauthentication.mdclient-apis.mdconnection-session.mderror-handling.mdhigh-level-messaging.mdindex.mdlow-level-protocol.mdmessage-management.mdtypes-constants.md

low-level-protocol.mddocs/

0

# Low-Level Protocol Access

1

2

Direct access to AMQP protocol elements including message senders, receivers, and protocol-level message handling for advanced use cases that require fine-grained control over AMQP 1.0 protocol behavior.

3

4

## Capabilities

5

6

### Message Sender

7

8

Low-level message sender link that provides direct control over message transmission and settlement.

9

10

```python { .api }

11

class MessageSender:

12

def __init__(self, session, source, target, name=None, send_settle_mode=None,

13

max_message_size=None, link_properties=None,

14

desired_capabilities=None):

15

"""

16

Low-level message sender link.

17

18

Parameters:

19

- session (Session): AMQP session

20

- source (Source): Link source address

21

- target (Target): Link target address

22

- name (str): Link name

23

- send_settle_mode (SenderSettleMode): Settlement mode

24

- max_message_size (int): Maximum message size

25

- link_properties (dict): Link properties

26

- desired_capabilities (list): Desired link capabilities

27

"""

28

```

29

30

**Key Methods:**

31

32

```python { .api }

33

def open(self):

34

"""Open the sender link."""

35

36

def close(self):

37

"""Close the sender link."""

38

39

def send_async(self, message, callback=None):

40

"""

41

Send a message asynchronously.

42

43

Parameters:

44

- message (Message): Message to send

45

- callback (callable): Completion callback

46

47

Returns:

48

MessageState: Send operation state

49

"""

50

51

def work(self):

52

"""Process sender work (I/O and protocol handling)."""

53

54

def destroy(self):

55

"""Destroy the sender and free resources."""

56

```

57

58

**Key Properties:**

59

60

```python { .api }

61

@property

62

def name: str

63

"""Link name."""

64

65

@property

66

def source: Source

67

"""Link source address."""

68

69

@property

70

def target: Target

71

"""Link target address."""

72

73

@property

74

def max_message_size: int

75

"""Maximum message size."""

76

77

@property

78

def send_settle_mode: int

79

"""Sender settlement mode."""

80

```

81

82

**Usage Examples:**

83

84

```python

85

from uamqp import Connection, Session, MessageSender

86

from uamqp.address import Source, Target

87

from uamqp import Message

88

from uamqp.constants import SenderSettleMode

89

90

# Create low-level sender

91

connection = Connection("amqp.example.com", sasl=auth)

92

connection.open()

93

94

session = Session(connection)

95

session.begin()

96

97

source = Source() # Null source for sender

98

target = Target("myqueue")

99

100

sender = MessageSender(

101

session=session,

102

source=source,

103

target=target,

104

name="my-sender",

105

send_settle_mode=SenderSettleMode.Mixed,

106

max_message_size=1048576 # 1MB

107

)

108

109

try:

110

sender.open()

111

112

# Send message with callback

113

def send_callback(message, result, error):

114

if error:

115

print(f"Send failed: {error}")

116

else:

117

print(f"Send completed: {result}")

118

119

message = Message("Hello World")

120

result = sender.send_async(message, callback=send_callback)

121

122

# Process until send completes

123

while result == MessageState.WaitingForSendAck:

124

sender.work()

125

connection.work()

126

127

finally:

128

sender.close()

129

session.end()

130

connection.close()

131

```

132

133

### Message Receiver

134

135

Low-level message receiver link that provides direct control over message reception and flow control.

136

137

```python { .api }

138

class MessageReceiver:

139

def __init__(self, session, source, target, name=None,

140

receive_settle_mode=None, max_message_size=None,

141

prefetch=None, link_properties=None,

142

desired_capabilities=None):

143

"""

144

Low-level message receiver link.

145

146

Parameters:

147

- session (Session): AMQP session

148

- source (Source): Link source address

149

- target (Target): Link target address

150

- name (str): Link name

151

- receive_settle_mode (ReceiverSettleMode): Settlement mode

152

- max_message_size (int): Maximum message size

153

- prefetch (int): Number of messages to prefetch

154

- link_properties (dict): Link properties

155

- desired_capabilities (list): Desired link capabilities

156

"""

157

```

158

159

**Key Methods:**

160

161

```python { .api }

162

def open(self):

163

"""Open the receiver link."""

164

165

def close(self):

166

"""Close the receiver link."""

167

168

def receive_message_batch(self, max_batch_size=None):

169

"""

170

Receive a batch of messages.

171

172

Parameters:

173

- max_batch_size (int): Maximum messages to receive

174

175

Returns:

176

list[Message]: Received messages

177

"""

178

179

def work(self):

180

"""Process receiver work (I/O and protocol handling)."""

181

182

def flow(self, link_credit):

183

"""

184

Grant link credit for message flow control.

185

186

Parameters:

187

- link_credit (int): Number of credits to grant

188

"""

189

190

def destroy(self):

191

"""Destroy the receiver and free resources."""

192

```

193

194

**Key Properties:**

195

196

```python { .api }

197

@property

198

def name: str

199

"""Link name."""

200

201

@property

202

def source: Source

203

"""Link source address."""

204

205

@property

206

def target: Target

207

"""Link target address."""

208

209

@property

210

def max_message_size: int

211

"""Maximum message size."""

212

213

@property

214

def receive_settle_mode: int

215

"""Receiver settlement mode."""

216

217

@property

218

def prefetch: int

219

"""Prefetch count."""

220

```

221

222

**Usage Examples:**

223

224

```python

225

from uamqp import MessageReceiver

226

from uamqp.constants import ReceiverSettleMode

227

228

# Create low-level receiver

229

source = Source("myqueue")

230

target = Target() # Null target for receiver

231

232

receiver = MessageReceiver(

233

session=session,

234

source=source,

235

target=target,

236

name="my-receiver",

237

receive_settle_mode=ReceiverSettleMode.PeekLock,

238

prefetch=100,

239

max_message_size=1048576

240

)

241

242

try:

243

receiver.open()

244

245

# Grant initial credits

246

receiver.flow(10)

247

248

# Receive messages in loop

249

while True:

250

messages = receiver.receive_message_batch(max_batch_size=5)

251

252

if not messages:

253

# No messages, process connection

254

receiver.work()

255

connection.work()

256

continue

257

258

print(f"Received {len(messages)} messages")

259

260

for message in messages:

261

try:

262

data = message.get_data()

263

print(f"Message: {data}")

264

265

# Process message

266

process_message(data)

267

268

# Accept message

269

message.accept()

270

271

except Exception as e:

272

print(f"Processing error: {e}")

273

message.reject(

274

condition="processing-error",

275

description=str(e)

276

)

277

278

# Grant more credits after processing

279

receiver.flow(len(messages))

280

281

finally:

282

receiver.close()

283

```

284

285

### Advanced Protocol Control

286

287

#### Manual Credit Management

288

289

```python

290

class CreditManager:

291

def __init__(self, receiver, initial_credits=10, min_credits=5):

292

self.receiver = receiver

293

self.initial_credits = initial_credits

294

self.min_credits = min_credits

295

self.granted_credits = 0

296

self.processed_messages = 0

297

298

def start(self):

299

"""Grant initial credits."""

300

self.receiver.flow(self.initial_credits)

301

self.granted_credits = self.initial_credits

302

303

def on_message_processed(self):

304

"""Call when a message is processed."""

305

self.processed_messages += 1

306

307

# Calculate remaining credits

308

remaining_credits = self.granted_credits - self.processed_messages

309

310

# Grant more credits if running low

311

if remaining_credits <= self.min_credits:

312

additional_credits = self.initial_credits - remaining_credits

313

self.receiver.flow(additional_credits)

314

self.granted_credits += additional_credits

315

print(f"Granted {additional_credits} additional credits")

316

317

# Usage

318

credit_manager = CreditManager(receiver, initial_credits=20, min_credits=5)

319

credit_manager.start()

320

321

while True:

322

messages = receiver.receive_message_batch(max_batch_size=10)

323

324

for message in messages:

325

# Process message

326

process_message(message)

327

message.accept()

328

329

# Update credit management

330

credit_manager.on_message_processed()

331

```

332

333

#### Link State Management

334

335

```python

336

from uamqp.constants import MessageSenderState, MessageReceiverState

337

338

def monitor_link_state(link, link_type="sender"):

339

"""Monitor and handle link state changes."""

340

341

if link_type == "sender":

342

states = MessageSenderState

343

else:

344

states = MessageReceiverState

345

346

current_state = link.get_state()

347

348

if current_state == states.Opening:

349

print("Link is opening...")

350

# Wait for open to complete

351

while link.get_state() == states.Opening:

352

link.work()

353

time.sleep(0.1)

354

355

elif current_state == states.Open:

356

print("Link is open and ready")

357

return True

358

359

elif current_state == states.Error:

360

print("Link is in error state")

361

error_info = link.get_error_info()

362

print(f"Error: {error_info}")

363

return False

364

365

elif current_state == states.Closing:

366

print("Link is closing...")

367

return False

368

369

return current_state == states.Open

370

371

# Usage

372

sender_ready = monitor_link_state(sender, "sender")

373

if sender_ready:

374

# Proceed with sending

375

pass

376

```

377

378

#### Custom Link Properties

379

380

```python

381

def create_sender_with_properties(session, target):

382

"""Create sender with custom link properties."""

383

384

link_properties = {

385

'x-opt-jms-dest': 1, # JMS destination type

386

'x-opt-enqueuetime': True, # Include enqueue time

387

'product': 'MyApplication', # Application identifier

388

'version': '1.0.0' # Application version

389

}

390

391

desired_capabilities = [

392

'ANONYMOUS-RELAY', # Anonymous relay capability

393

'DELAYED_DELIVERY' # Delayed delivery capability

394

]

395

396

sender = MessageSender(

397

session=session,

398

source=Source(),

399

target=target,

400

link_properties=link_properties,

401

desired_capabilities=desired_capabilities

402

)

403

404

return sender

405

406

# Check if capabilities were granted

407

def check_link_capabilities(sender):

408

"""Check which capabilities were granted by peer."""

409

410

remote_capabilities = sender.get_remote_capabilities()

411

desired_capabilities = sender.desired_capabilities

412

413

granted = []

414

denied = []

415

416

for capability in desired_capabilities:

417

if capability in remote_capabilities:

418

granted.append(capability)

419

else:

420

denied.append(capability)

421

422

print(f"Granted capabilities: {granted}")

423

print(f"Denied capabilities: {denied}")

424

425

return granted, denied

426

```

427

428

### Performance Optimization

429

430

#### Batch Processing

431

432

```python

433

class BatchProcessor:

434

def __init__(self, receiver, batch_size=100, timeout=5.0):

435

self.receiver = receiver

436

self.batch_size = batch_size

437

self.timeout = timeout

438

self.message_buffer = []

439

440

def process_batches(self):

441

"""Process messages in batches for better throughput."""

442

443

start_time = time.time()

444

445

while True:

446

messages = self.receiver.receive_message_batch(

447

max_batch_size=self.batch_size - len(self.message_buffer)

448

)

449

450

self.message_buffer.extend(messages)

451

452

# Process batch if full or timeout reached

453

if (len(self.message_buffer) >= self.batch_size or

454

time.time() - start_time > self.timeout):

455

456

if self.message_buffer:

457

self._process_batch(self.message_buffer)

458

self.message_buffer = []

459

start_time = time.time()

460

461

# Service the connection

462

self.receiver.work()

463

464

def _process_batch(self, messages):

465

"""Process a batch of messages."""

466

print(f"Processing batch of {len(messages)} messages")

467

468

for message in messages:

469

try:

470

# Process message

471

data = message.get_data()

472

process_message_fast(data)

473

message.accept()

474

475

except Exception as e:

476

print(f"Error processing message: {e}")

477

message.reject()

478

479

# Usage

480

batch_processor = BatchProcessor(receiver, batch_size=50, timeout=2.0)

481

batch_processor.process_batches()

482

```

483

484

#### Parallel Processing

485

486

```python

487

import threading

488

from queue import Queue

489

490

class ParallelProcessor:

491

def __init__(self, receiver, worker_count=4):

492

self.receiver = receiver

493

self.worker_count = worker_count

494

self.message_queue = Queue()

495

self.workers = []

496

self.running = False

497

498

def start(self):

499

"""Start parallel message processing."""

500

self.running = True

501

502

# Start worker threads

503

for i in range(self.worker_count):

504

worker = threading.Thread(target=self._worker_thread, args=(i,))

505

worker.daemon = True

506

worker.start()

507

self.workers.append(worker)

508

509

# Start receiver thread

510

receiver_thread = threading.Thread(target=self._receiver_thread)

511

receiver_thread.daemon = True

512

receiver_thread.start()

513

514

def stop(self):

515

"""Stop parallel processing."""

516

self.running = False

517

518

# Wait for workers to finish

519

for worker in self.workers:

520

worker.join(timeout=5.0)

521

522

def _receiver_thread(self):

523

"""Receive messages and queue for processing."""

524

while self.running:

525

try:

526

messages = self.receiver.receive_message_batch(max_batch_size=10)

527

528

for message in messages:

529

self.message_queue.put(message)

530

531

self.receiver.work()

532

533

except Exception as e:

534

print(f"Receiver error: {e}")

535

time.sleep(1)

536

537

def _worker_thread(self, worker_id):

538

"""Worker thread for processing messages."""

539

print(f"Worker {worker_id} started")

540

541

while self.running:

542

try:

543

# Get message from queue with timeout

544

message = self.message_queue.get(timeout=1.0)

545

546

# Process message

547

data = message.get_data()

548

result = process_message_threadsafe(data)

549

550

if result:

551

message.accept()

552

else:

553

message.reject()

554

555

self.message_queue.task_done()

556

557

except Queue.Empty:

558

continue # Timeout, check if still running

559

except Exception as e:

560

print(f"Worker {worker_id} error: {e}")

561

562

# Usage

563

processor = ParallelProcessor(receiver, worker_count=8)

564

processor.start()

565

566

# Let it run for a while

567

time.sleep(60)

568

569

processor.stop()

570

```

571

572

### Management Operations

573

574

Low-level AMQP management operations for advanced broker interaction and administrative tasks.

575

576

#### Management Operation

577

578

```python { .api }

579

class MgmtOperation:

580

def __init__(self, session, target=None, debug=False):

581

"""

582

AMQP management operation handler.

583

584

Parameters:

585

- session (Session): AMQP session

586

- target (Target): Management target endpoint

587

- debug (bool): Enable debug logging

588

"""

589

590

def open(self):

591

"""Open the management operation link."""

592

593

def close(self):

594

"""Close the management operation link."""

595

596

def execute_async(self, operation, op_type, locales=None, timeout=0):

597

"""

598

Execute a management operation asynchronously.

599

600

Parameters:

601

- operation (str): Operation name

602

- op_type (str): Operation type

603

- locales (list): Supported locales

604

- timeout (int): Operation timeout in milliseconds

605

606

Returns:

607

Management operation result

608

"""

609

```

610

611

#### Async Management Operation

612

613

```python { .api }

614

class MgmtOperationAsync:

615

def __init__(self, session, target=None, debug=False, loop=None):

616

"""

617

Async AMQP management operation handler.

618

619

Parameters:

620

- session (SessionAsync): Async AMQP session

621

- target (Target): Management target endpoint

622

- debug (bool): Enable debug logging

623

- loop: Asyncio event loop

624

"""

625

626

async def open_async(self):

627

"""Asynchronously open the management operation link."""

628

629

async def close_async(self):

630

"""Asynchronously close the management operation link."""

631

632

async def execute_async(self, operation, op_type, locales=None, timeout=0):

633

"""

634

Execute a management operation asynchronously.

635

636

Parameters:

637

- operation (str): Operation name

638

- op_type (str): Operation type

639

- locales (list): Supported locales

640

- timeout (int): Operation timeout in milliseconds

641

642

Returns:

643

Management operation result

644

"""

645

```

646

647

**Usage Examples:**

648

649

```python

650

from uamqp.mgmt_operation import MgmtOperation

651

from uamqp.address import Target

652

653

# Create management operation

654

mgmt_target = Target("$management")

655

mgmt_op = MgmtOperation(session, target=mgmt_target)

656

657

try:

658

mgmt_op.open()

659

660

# Execute management operation (e.g., get queue info)

661

result = mgmt_op.execute_async(

662

operation="READ",

663

op_type="com.microsoft:queue",

664

timeout=30000

665

)

666

667

print(f"Management result: {result}")

668

669

finally:

670

mgmt_op.close()

671

672

# Async management operations

673

from uamqp.async_ops.mgmt_operation_async import MgmtOperationAsync

674

675

async def async_management_example():

676

mgmt_op = MgmtOperationAsync(async_session, target=mgmt_target)

677

678

try:

679

await mgmt_op.open_async()

680

681

result = await mgmt_op.execute_async(

682

operation="CREATE",

683

op_type="com.microsoft:queue",

684

timeout=30000

685

)

686

687

print(f"Async management result: {result}")

688

689

finally:

690

await mgmt_op.close_async()

691

```

692

693

### Error Recovery

694

695

#### Link Recovery

696

697

```python

698

def create_resilient_sender(session, target, max_retries=3):

699

"""Create sender with automatic recovery."""

700

701

for attempt in range(max_retries):

702

try:

703

sender = MessageSender(session, Source(), target)

704

sender.open()

705

706

# Wait for link to open

707

while sender.get_state() == MessageSenderState.Opening:

708

sender.work()

709

time.sleep(0.1)

710

711

if sender.get_state() == MessageSenderState.Open:

712

print(f"Sender opened successfully on attempt {attempt + 1}")

713

return sender

714

else:

715

raise Exception(f"Sender failed to open: {sender.get_state()}")

716

717

except Exception as e:

718

print(f"Sender creation attempt {attempt + 1} failed: {e}")

719

if attempt < max_retries - 1:

720

time.sleep(2 ** attempt) # Exponential backoff

721

else:

722

raise

723

724

return None

725

726

# Usage with recovery

727

sender = create_resilient_sender(session, target)

728

if sender:

729

# Use sender...

730

pass

731

```