or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

address-endpoints.mdasync-operations.mdauthentication.mdclient-apis.mdconnection-session.mderror-handling.mdhigh-level-messaging.mdindex.mdlow-level-protocol.mdmessage-management.mdtypes-constants.md

async-operations.mddocs/

0

# Async Operations

1

2

Asynchronous AMQP operations for Python 3.4+ with async/await support for high-performance messaging applications that need non-blocking I/O operations.

3

4

## Capabilities

5

6

### Async Send Client

7

8

Asynchronous high-level client for sending messages without blocking the event loop.

9

10

```python { .api }

11

class SendClientAsync:

12

def __init__(self, target, auth=None, client_name=None, debug=False,

13

msg_timeout=0, error_policy=None, keep_alive_interval=None,

14

send_settle_mode=None, auto_complete=True, encoding='UTF-8',

15

loop=None, **kwargs):

16

"""

17

Async high-level client for sending AMQP messages.

18

19

Parameters:

20

- target (str or Target): Target endpoint for messages

21

- auth (AMQPAuth): Authentication credentials

22

- client_name (str): Client identifier

23

- debug (bool): Enable debug logging

24

- msg_timeout (int): Message send timeout in seconds

25

- error_policy (ErrorPolicy): Error handling policy

26

- keep_alive_interval (int): Keep-alive interval

27

- send_settle_mode (SenderSettleMode): Message settlement mode

28

- auto_complete (bool): Auto-complete sent messages

29

- encoding (str): Character encoding

30

- loop: Asyncio event loop

31

"""

32

```

33

34

**Key Methods:**

35

36

```python { .api }

37

async def open_async(self):

38

"""Asynchronously open the client connection."""

39

40

async def close_async(self):

41

"""Asynchronously close the client connection."""

42

43

def queue_message(self, message):

44

"""Queue a message for sending (synchronous)."""

45

46

async def send_all_messages_async(self, close_on_done=True):

47

"""

48

Asynchronously send all queued messages.

49

50

Parameters:

51

- close_on_done (bool): Whether to close connection after sending

52

53

Returns:

54

list[MessageState]: List of send results for each message

55

"""

56

57

async def send_message_batch_async(self, messages, close_on_done=True):

58

"""

59

Asynchronously send a batch of messages.

60

61

Parameters:

62

- messages (list[Message]): Messages to send

63

- close_on_done (bool): Whether to close connection after sending

64

65

Returns:

66

list[MessageState]: List of send results for each message

67

"""

68

```

69

70

**Usage Examples:**

71

72

```python

73

import asyncio

74

from uamqp.async_ops import SendClientAsync

75

from uamqp import Message

76

from uamqp.authentication import SASTokenAsync

77

78

async def send_messages_async():

79

target = "amqps://mynamespace.servicebus.windows.net/myqueue"

80

auth = SASTokenAsync("mynamespace.servicebus.windows.net", token=sas_token)

81

82

# Using async context manager

83

async with SendClientAsync(target, auth=auth) as client:

84

message = Message("Hello Async World")

85

client.queue_message(message)

86

results = await client.send_all_messages_async()

87

print(f"Send results: {results}")

88

89

# Batch async sending

90

async def send_batch_async():

91

messages = [Message(f"Async message {i}") for i in range(10)]

92

93

async with SendClientAsync(target, auth=auth) as client:

94

results = await client.send_message_batch_async(messages)

95

print(f"Sent {len(results)} messages asynchronously")

96

97

# Run async functions

98

asyncio.run(send_messages_async())

99

```

100

101

### Async Receive Client

102

103

Asynchronous high-level client for receiving messages with non-blocking message processing.

104

105

```python { .api }

106

class ReceiveClientAsync:

107

def __init__(self, source, auth=None, client_name=None, debug=False,

108

prefetch=300, auto_complete=True, error_policy=None,

109

keep_alive_interval=None, receive_settle_mode=None,

110

encoding='UTF-8', loop=None, **kwargs):

111

"""

112

Async high-level client for receiving AMQP messages.

113

114

Parameters:

115

- source (str or Source): Source endpoint for messages

116

- auth (AMQPAuth): Authentication credentials

117

- client_name (str): Client identifier

118

- debug (bool): Enable debug logging

119

- prefetch (int): Number of messages to prefetch

120

- auto_complete (bool): Auto-complete received messages

121

- error_policy (ErrorPolicy): Error handling policy

122

- keep_alive_interval (int): Keep-alive interval

123

- receive_settle_mode (ReceiverSettleMode): Message settlement mode

124

- encoding (str): Character encoding

125

- loop: Asyncio event loop

126

"""

127

```

128

129

**Key Methods:**

130

131

```python { .api }

132

async def open_async(self):

133

"""Asynchronously open the client connection."""

134

135

async def close_async(self):

136

"""Asynchronously close the client connection."""

137

138

async def receive_message_batch_async(self, max_batch_size=None, timeout=0):

139

"""

140

Asynchronously receive a batch of messages.

141

142

Parameters:

143

- max_batch_size (int): Maximum messages to receive

144

- timeout (int): Timeout in milliseconds

145

146

Returns:

147

list[Message]: Received messages

148

"""

149

150

def receive_messages_iter_async(self):

151

"""

152

Create async iterator for receiving messages.

153

154

Returns:

155

AsyncMessageIter: Async message iterator

156

"""

157

```

158

159

**Usage Examples:**

160

161

```python

162

import asyncio

163

from uamqp.async_ops import ReceiveClientAsync

164

from uamqp.authentication import SASLPlain

165

166

async def receive_messages_async():

167

source = "amqps://amqp.example.com/myqueue"

168

auth = SASLPlain("amqp.example.com", "user", "password")

169

170

# Batch async receiving

171

async with ReceiveClientAsync(source, auth=auth) as client:

172

messages = await client.receive_message_batch_async(

173

max_batch_size=10,

174

timeout=30000

175

)

176

177

print(f"Received {len(messages)} messages")

178

for message in messages:

179

print(f"Message: {message.get_data()}")

180

message.accept()

181

182

# Async message iterator

183

async def process_messages_continuously():

184

async with ReceiveClientAsync(source, auth=auth) as client:

185

message_iter = client.receive_messages_iter_async()

186

187

async for message in message_iter:

188

try:

189

data = message.get_data()

190

print(f"Processing: {data}")

191

192

# Simulate async processing

193

await asyncio.sleep(0.1)

194

195

message.accept()

196

except Exception as e:

197

print(f"Error: {e}")

198

message.reject()

199

200

asyncio.run(receive_messages_async())

201

```

202

203

### Async Message Iterator

204

205

Async iterator for processing messages as they arrive without blocking.

206

207

```python { .api }

208

class AsyncMessageIter:

209

def __init__(self, client):

210

"""

211

Async iterator for AMQP messages.

212

213

Parameters:

214

- client: ReceiveClientAsync instance

215

"""

216

217

def __aiter__(self):

218

"""Return async iterator."""

219

220

async def __anext__(self):

221

"""Get next message asynchronously."""

222

```

223

224

**Usage Example:**

225

226

```python

227

async def process_messages_with_iterator():

228

async with ReceiveClientAsync(source, auth=auth) as client:

229

message_iter = client.receive_messages_iter_async()

230

231

message_count = 0

232

async for message in message_iter:

233

await process_message_async(message)

234

message.accept()

235

236

message_count += 1

237

if message_count >= 100: # Process only 100 messages

238

break

239

240

async def process_message_async(message):

241

"""Custom async message processing."""

242

data = message.get_data()

243

244

# Simulate async work (database query, API call, etc.)

245

await asyncio.sleep(0.1)

246

247

print(f"Processed: {data}")

248

```

249

250

### Async Low-Level Protocol

251

252

Asynchronous versions of low-level protocol classes for advanced scenarios.

253

254

#### Async Connection

255

256

```python { .api }

257

class ConnectionAsync:

258

def __init__(self, hostname, sasl=None, container_id=None,

259

max_frame_size=None, channel_max=None, idle_timeout=None,

260

properties=None, remote_idle_timeout_empty_frame_send_ratio=None,

261

debug=False, encoding='UTF-8', loop=None):

262

"""Async AMQP connection management."""

263

264

async def open_async(self):

265

"""Asynchronously open connection."""

266

267

async def close_async(self):

268

"""Asynchronously close connection."""

269

```

270

271

#### Async Session

272

273

```python { .api }

274

class SessionAsync:

275

def __init__(self, connection, incoming_window=None, outgoing_window=None,

276

handle_max=None, loop=None):

277

"""Async AMQP session management."""

278

279

async def begin_async(self):

280

"""Asynchronously begin session."""

281

282

async def end_async(self):

283

"""Asynchronously end session."""

284

```

285

286

#### Async Message Sender

287

288

```python { .api }

289

class MessageSenderAsync:

290

def __init__(self, session, source, target, name=None,

291

send_settle_mode=None, max_message_size=None,

292

link_properties=None, desired_capabilities=None,

293

loop=None):

294

"""Async low-level message sender."""

295

296

async def open_async(self):

297

"""Asynchronously open sender link."""

298

299

async def send_async(self, message, callback=None):

300

"""

301

Asynchronously send a message.

302

303

Parameters:

304

- message (Message): Message to send

305

- callback (callable): Completion callback

306

307

Returns:

308

MessageState: Send operation state

309

"""

310

```

311

312

#### Async Message Receiver

313

314

```python { .api }

315

class MessageReceiverAsync:

316

def __init__(self, session, source, target, name=None,

317

receive_settle_mode=None, max_message_size=None,

318

prefetch=None, link_properties=None,

319

desired_capabilities=None, loop=None):

320

"""Async low-level message receiver."""

321

322

async def open_async(self):

323

"""Asynchronously open receiver link."""

324

325

async def receive_message_batch_async(self, max_batch_size=None):

326

"""

327

Asynchronously receive message batch.

328

329

Parameters:

330

- max_batch_size (int): Maximum messages to receive

331

332

Returns:

333

list[Message]: Received messages

334

"""

335

```

336

337

**Usage Example:**

338

339

```python

340

async def low_level_async_example():

341

# Create async connection

342

connection = ConnectionAsync("amqp.example.com", sasl=auth_sasl)

343

await connection.open_async()

344

345

try:

346

# Create async session

347

session = SessionAsync(connection)

348

await session.begin_async()

349

350

# Create async sender

351

sender = MessageSenderAsync(session, source="source", target="target")

352

await sender.open_async()

353

354

# Send message asynchronously

355

message = Message("Low-level async message")

356

result = await sender.send_async(message)

357

print(f"Send result: {result}")

358

359

finally:

360

await connection.close_async()

361

```

362

363

## Async Authentication

364

365

Use async authentication classes with async operations:

366

367

```python

368

from uamqp.authentication import SASTokenAsync, JWTTokenAsync

369

370

# Async SAS token auth

371

auth = SASTokenAsync(

372

hostname="mynamespace.servicebus.windows.net",

373

token=sas_token

374

)

375

376

# Async JWT token auth

377

auth = JWTTokenAsync(

378

hostname="service.example.com",

379

token=jwt_token,

380

audience="https://service.example.com"

381

)

382

```

383

384

## Async Error Handling

385

386

Handle errors in async operations:

387

388

```python

389

from uamqp.errors import AMQPConnectionError, MessageSendFailed

390

391

async def robust_async_sending():

392

try:

393

async with SendClientAsync(target, auth=auth) as client:

394

client.queue_message(Message("Test message"))

395

results = await client.send_all_messages_async()

396

397

except AMQPConnectionError as e:

398

print(f"Connection failed: {e}")

399

# Implement retry logic

400

401

except MessageSendFailed as e:

402

print(f"Send failed: {e}")

403

# Handle send failure

404

405

except Exception as e:

406

print(f"Unexpected error: {e}")

407

```

408

409

## Async Performance Considerations

410

411

### Event Loop Integration

412

413

```python

414

# Use existing event loop

415

import asyncio

416

417

async def main():

418

loop = asyncio.get_running_loop()

419

420

async with SendClientAsync(target, auth=auth, loop=loop) as client:

421

# Operations use the specified loop

422

pass

423

424

asyncio.run(main())

425

```

426

427

### Concurrent Operations

428

429

```python

430

async def concurrent_operations():

431

# Send and receive concurrently

432

send_task = asyncio.create_task(send_messages_async())

433

receive_task = asyncio.create_task(receive_messages_async())

434

435

# Wait for both to complete

436

await asyncio.gather(send_task, receive_task)

437

```

438

439

### Async Context Managers

440

441

Always use async context managers for proper resource cleanup:

442

443

```python

444

# Correct async usage

445

async with SendClientAsync(target, auth=auth) as client:

446

await client.send_all_messages_async()

447

448

# Manual async lifecycle management

449

client = SendClientAsync(target, auth=auth)

450

await client.open_async()

451

try:

452

await client.send_all_messages_async()

453

finally:

454

await client.close_async()

455

```