or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

address-endpoints.mdasync-operations.mdauthentication.mdclient-apis.mdconnection-session.mderror-handling.mdhigh-level-messaging.mdindex.mdlow-level-protocol.mdmessage-management.mdtypes-constants.md

client-apis.mddocs/

0

# Client APIs

1

2

High-level client classes for sending and receiving messages with built-in connection management, error handling, and batching capabilities that provide the easiest way to build AMQP messaging applications.

3

4

## Capabilities

5

6

### Base AMQP Client

7

8

Base client class that provides common functionality for all AMQP client operations.

9

10

```python { .api }

11

class AMQPClient:

12

def __init__(self, remote_address, auth=None, client_name=None, debug=False,

13

error_policy=None, keep_alive_interval=None, max_frame_size=None,

14

channel_max=None, idle_timeout=None, properties=None,

15

remote_idle_timeout_empty_frame_send_ratio=None, incoming_window=None,

16

outgoing_window=None, handle_max=None, on_attach=None,

17

auto_complete=True, encoding='UTF-8', desired_capabilities=None,

18

max_message_size=None, link_properties=None, timeout=0, **kwargs):

19

"""

20

Base AMQP client for connection and session management.

21

22

Parameters:

23

- remote_address (str): AMQP broker address

24

- auth (AMQPAuth): Authentication credentials

25

- client_name (str): Client identifier

26

- debug (bool): Enable debug logging

27

- error_policy (ErrorPolicy): Error handling policy

28

- keep_alive_interval (int): Keep-alive interval in seconds

29

- max_frame_size (int): Maximum frame size in bytes

30

- channel_max (int): Maximum number of channels

31

- idle_timeout (int): Connection idle timeout in seconds

32

- properties (dict): Connection properties

33

- incoming_window (int): Session incoming window size

34

- outgoing_window (int): Session outgoing window size

35

- handle_max (int): Maximum link handles

36

- on_attach (callable): Link attach callback

37

- auto_complete (bool): Auto-complete messages

38

- encoding (str): Character encoding

39

- desired_capabilities (list): Desired connection capabilities

40

- max_message_size (int): Maximum message size

41

- link_properties (dict): Link properties

42

- timeout (int): Operation timeout in seconds

43

"""

44

```

45

46

**Key Methods:**

47

48

```python { .api }

49

def open(self):

50

"""Open the client connection and session."""

51

52

def close(self):

53

"""Close the client connection."""

54

55

def __enter__(self):

56

"""Context manager entry."""

57

58

def __exit__(self, exc_type, exc_val, exc_tb):

59

"""Context manager exit."""

60

61

def mgmt_request(self, message, operation, op_type=None, node=None, callback=None, **kwargs):

62

"""

63

Send a management request.

64

65

Parameters:

66

- message (Message): Request message

67

- operation (str): Management operation name

68

- op_type (str): Operation type

69

- node (str): Target node name

70

- callback (callable): Response callback function

71

72

Returns:

73

Message: Response message

74

"""

75

76

def auth_complete(self):

77

"""

78

Check if authentication is complete.

79

80

Returns:

81

bool: True if authentication is complete

82

"""

83

84

def client_ready(self):

85

"""

86

Check if client is ready for operations.

87

88

Returns:

89

bool: True if client is ready

90

"""

91

92

def do_work(self, **kwargs):

93

"""Perform client work iteration."""

94

```

95

96

**Usage Example:**

97

98

```python

99

from uamqp.client import AMQPClient

100

from uamqp.authentication import SASLPlain

101

102

auth = SASLPlain("amqp.example.com", "user", "password")

103

104

# Using context manager (recommended)

105

with AMQPClient("amqps://amqp.example.com", auth=auth) as client:

106

# Client operations here

107

pass

108

109

# Manual open/close

110

client = AMQPClient("amqps://amqp.example.com", auth=auth)

111

client.open()

112

try:

113

# Client operations here

114

pass

115

finally:

116

client.close()

117

```

118

119

### Send Client

120

121

High-level client for sending messages with automatic connection management and batch sending capabilities.

122

123

```python { .api }

124

class SendClient(AMQPClient):

125

def __init__(self, target, auth=None, client_name=None, debug=False,

126

msg_timeout=0, error_policy=None, keep_alive_interval=None,

127

send_settle_mode=None, auto_complete=True, encoding='UTF-8',

128

**kwargs):

129

"""

130

High-level client for sending AMQP messages.

131

132

Parameters:

133

- target (str or Target): Target endpoint for messages

134

- auth (AMQPAuth): Authentication credentials

135

- client_name (str): Client identifier

136

- debug (bool): Enable debug logging

137

- msg_timeout (int): Message send timeout in seconds

138

- error_policy (ErrorPolicy): Error handling policy

139

- keep_alive_interval (int): Keep-alive interval

140

- send_settle_mode (SenderSettleMode): Message settlement mode

141

- auto_complete (bool): Auto-complete sent messages

142

- encoding (str): Character encoding

143

"""

144

```

145

146

**Key Methods:**

147

148

```python { .api }

149

def queue_message(self, message):

150

"""

151

Queue a message for sending.

152

153

Parameters:

154

- message (Message): Message to queue for sending

155

"""

156

157

def send_all_messages(self, close_on_done=True):

158

"""

159

Send all queued messages.

160

161

Parameters:

162

- close_on_done (bool): Whether to close connection after sending

163

164

Returns:

165

list[MessageState]: List of send results for each message

166

"""

167

168

def send_message_batch(self, messages, close_on_done=True):

169

"""

170

Send a batch of messages.

171

172

Parameters:

173

- messages (list[Message]): Messages to send

174

- close_on_done (bool): Whether to close connection after sending

175

176

Returns:

177

list[MessageState]: List of send results for each message

178

"""

179

```

180

181

**Usage Examples:**

182

183

```python

184

from uamqp import SendClient, Message

185

from uamqp.authentication import SASTokenAuth

186

187

# Single message sending

188

target = "amqps://mynamespace.servicebus.windows.net/myqueue"

189

auth = SASTokenAuth("mynamespace.servicebus.windows.net", token=sas_token)

190

191

with SendClient(target, auth=auth) as client:

192

message = Message("Hello World")

193

client.queue_message(message)

194

results = client.send_all_messages()

195

print(f"Send results: {results}")

196

197

# Batch message sending

198

messages = [

199

Message("Message 1"),

200

Message("Message 2"),

201

Message("Message 3")

202

]

203

204

with SendClient(target, auth=auth) as client:

205

results = client.send_message_batch(messages)

206

print(f"Sent {len(results)} messages")

207

208

# Queue multiple messages individually

209

with SendClient(target, auth=auth) as client:

210

for i in range(10):

211

client.queue_message(Message(f"Message {i}"))

212

results = client.send_all_messages()

213

```

214

215

### Receive Client

216

217

High-level client for receiving messages with automatic connection management and batch receiving capabilities.

218

219

```python { .api }

220

class ReceiveClient(AMQPClient):

221

def __init__(self, source, auth=None, client_name=None, debug=False,

222

prefetch=300, auto_complete=True, error_policy=None,

223

keep_alive_interval=None, receive_settle_mode=None,

224

encoding='UTF-8', **kwargs):

225

"""

226

High-level client for receiving AMQP messages.

227

228

Parameters:

229

- source (str or Source): Source endpoint for messages

230

- auth (AMQPAuth): Authentication credentials

231

- client_name (str): Client identifier

232

- debug (bool): Enable debug logging

233

- prefetch (int): Number of messages to prefetch

234

- auto_complete (bool): Auto-complete received messages

235

- error_policy (ErrorPolicy): Error handling policy

236

- keep_alive_interval (int): Keep-alive interval

237

- receive_settle_mode (ReceiverSettleMode): Message settlement mode

238

- encoding (str): Character encoding

239

"""

240

```

241

242

**Key Methods:**

243

244

```python { .api }

245

def receive_message_batch(self, max_batch_size=None, timeout=0):

246

"""

247

Receive a batch of messages.

248

249

Parameters:

250

- max_batch_size (int): Maximum messages to receive (default: prefetch size)

251

- timeout (int): Timeout in milliseconds

252

253

Returns:

254

list[Message]: Received messages

255

"""

256

257

def receive_messages(self, timeout=0):

258

"""

259

Receive messages with iterator-like behavior.

260

261

Parameters:

262

- timeout (int): Timeout in milliseconds

263

264

Returns:

265

generator: Message iterator

266

"""

267

```

268

269

**Usage Examples:**

270

271

```python

272

from uamqp import ReceiveClient

273

from uamqp.authentication import SASLPlain

274

275

source = "amqps://amqp.example.com/myqueue"

276

auth = SASLPlain("amqp.example.com", "user", "password")

277

278

# Batch message receiving

279

with ReceiveClient(source, auth=auth, prefetch=50) as client:

280

messages = client.receive_message_batch(max_batch_size=10, timeout=30000)

281

print(f"Received {len(messages)} messages")

282

283

for message in messages:

284

print(f"Message: {message.get_data()}")

285

message.accept() # Acknowledge message

286

287

# Continuous message receiving

288

with ReceiveClient(source, auth=auth) as client:

289

for message in client.receive_messages():

290

try:

291

data = message.get_data()

292

print(f"Processing: {data}")

293

# Process message here

294

message.accept()

295

except Exception as e:

296

print(f"Error processing message: {e}")

297

message.reject()

298

299

# Break after processing 100 messages

300

if processed_count >= 100:

301

break

302

303

# Receive with custom settlement mode

304

from uamqp.constants import ReceiverSettleMode

305

306

with ReceiveClient(source, auth=auth,

307

receive_settle_mode=ReceiverSettleMode.PeekLock) as client:

308

messages = client.receive_message_batch(timeout=10000)

309

for message in messages:

310

# Message won't be removed from queue until explicitly settled

311

if process_message(message.get_data()):

312

message.accept() # Remove from queue

313

else:

314

message.release() # Return to queue for retry

315

```

316

317

## Configuration Options

318

319

### Error Policy

320

321

Configure how clients handle errors and retries:

322

323

```python

324

from uamqp.errors import ErrorPolicy, ErrorAction

325

326

# Custom error policy with retries

327

error_policy = ErrorPolicy(

328

max_retries=3,

329

on_error=ErrorAction(retry=True, backoff=2.0)

330

)

331

332

client = SendClient(target, auth=auth, error_policy=error_policy)

333

```

334

335

### Settlement Modes

336

337

Control message acknowledgment behavior:

338

339

```python

340

from uamqp.constants import SenderSettleMode, ReceiverSettleMode

341

342

# Send client settlement

343

send_client = SendClient(

344

target,

345

auth=auth,

346

send_settle_mode=SenderSettleMode.Settled # Fire-and-forget

347

)

348

349

# Receive client settlement

350

receive_client = ReceiveClient(

351

source,

352

auth=auth,

353

receive_settle_mode=ReceiverSettleMode.PeekLock # Manual acknowledgment

354

)

355

```

356

357

### Connection Properties

358

359

Set custom connection properties:

360

361

```python

362

properties = {

363

'connection-name': 'MyApp-v1.0',

364

'product': 'MyApplication',

365

'platform': 'Python'

366

}

367

368

client = SendClient(target, auth=auth, properties=properties)

369

```

370

371

## Performance Tuning

372

373

### Prefetch Settings

374

375

Optimize message throughput:

376

377

```python

378

# High throughput receiving

379

client = ReceiveClient(

380

source,

381

auth=auth,

382

prefetch=1000, # Prefetch more messages

383

max_frame_size=65536 # Larger frames

384

)

385

386

# Low latency receiving

387

client = ReceiveClient(

388

source,

389

auth=auth,

390

prefetch=1, # Minimal prefetch

391

auto_complete=False # Manual acknowledgment

392

)

393

```

394

395

### Batch Operations

396

397

Use batch operations for better performance:

398

399

```python

400

# Batch sending (more efficient than individual sends)

401

messages = [Message(f"Batch message {i}") for i in range(100)]

402

with SendClient(target, auth=auth) as client:

403

results = client.send_message_batch(messages)

404

```

405

406

### Management Operations

407

408

AMQP management request/response operations for interacting with broker management interfaces.

409

410

```python { .api }

411

class MgmtOperation:

412

def __init__(self, session, target=None, debug=False,

413

status_code_field=b'statusCode',

414

description_fields=b'statusDescription',

415

encoding='UTF-8'):

416

"""

417

AMQP management operation client.

418

419

Parameters:

420

- session (Session): AMQP session for the operation

421

- target (bytes or str): Target node name (default: b"$management")

422

- debug (bool): Enable debug logging

423

- status_code_field (bytes): Response status code field name

424

- description_fields (bytes): Response description field name

425

- encoding (str): Character encoding

426

"""

427

428

def execute(self, operation, op_type, message, timeout=0):

429

"""

430

Execute a management operation.

431

432

Parameters:

433

- operation (str): Operation name

434

- op_type (str): Operation type

435

- message (Message): Request message

436

- timeout (int): Operation timeout in milliseconds

437

438

Returns:

439

Message: Response message

440

"""

441

```

442

443

**Usage Example:**

444

445

```python

446

from uamqp import Session, MgmtOperation, Message

447

448

# Create management operation

449

with Session(connection) as session:

450

mgmt_op = MgmtOperation(session, target=b"$management")

451

452

# Execute management request

453

request = Message({"operation": "read-queue-info"})

454

response = mgmt_op.execute("read", "queue", request, timeout=30000)

455

456

print(f"Response: {response.get_data()}")

457

```

458

459

## Common Client Errors

460

461

Client operations may raise these exceptions:

462

463

- **AMQPConnectionError**: Connection to broker failed

464

- **MessageSendFailed**: Message sending failed

465

- **ClientTimeout**: Operation timed out

466

- **AMQPClientShutdown**: Client was shut down

467

- **LinkDetach**: Link was detached by broker