or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

administrative-operations.mdclient-management.mdconstants-enums.mdexception-handling.mdindex.mdmessage-operations.mdmessage-types.mdsession-management.md

message-operations.mddocs/

0

# Message Operations

1

2

Comprehensive message sending and receiving operations including batch processing, message scheduling, and various settlement patterns for reliable message processing.

3

4

## Capabilities

5

6

### Message Sending

7

8

Send messages to queues and topics with support for single messages, message lists, and batches.

9

10

```python { .api }

11

class ServiceBusSender:

12

@property

13

def fully_qualified_namespace(self) -> str:

14

"""The fully qualified namespace URL of the Service Bus."""

15

16

@property

17

def entity_name(self) -> str:

18

"""The name of the entity (queue or topic) that the sender sends to."""

19

20

def send_messages(

21

self,

22

message: Union[ServiceBusMessage, ServiceBusMessageBatch, List[ServiceBusMessage]],

23

*,

24

timeout: Optional[float] = None,

25

**kwargs

26

) -> None:

27

"""

28

Send messages to the Service Bus entity.

29

30

Parameters:

31

- message: Single message, list of messages, or message batch to send

32

- timeout: Operation timeout in seconds

33

34

Raises:

35

- MessageSizeExceededError: If message exceeds size limits

36

- ServiceBusError: For other Service Bus related errors

37

"""

38

```

39

40

#### Usage Example

41

42

```python

43

from azure.servicebus import ServiceBusClient, ServiceBusMessage

44

45

client = ServiceBusClient.from_connection_string("your_connection_string")

46

47

with client.get_queue_sender("my-queue") as sender:

48

# Send a single message

49

message = ServiceBusMessage("Hello World")

50

sender.send_messages(message)

51

52

# Send multiple messages

53

messages = [

54

ServiceBusMessage("Message 1"),

55

ServiceBusMessage("Message 2"),

56

ServiceBusMessage("Message 3")

57

]

58

sender.send_messages(messages)

59

60

# Send a message batch (more efficient for large volumes)

61

batch = sender.create_message_batch()

62

for i in range(10):

63

batch.add_message(ServiceBusMessage(f"Batch message {i}"))

64

sender.send_messages(batch)

65

```

66

67

### Message Scheduling

68

69

Schedule messages for future delivery with precise timing control.

70

71

```python { .api }

72

def schedule_messages(

73

self,

74

messages: Union[ServiceBusMessage, List[ServiceBusMessage]],

75

schedule_time_utc: datetime,

76

*,

77

timeout: Optional[float] = None,

78

**kwargs

79

) -> List[int]:

80

"""

81

Schedule messages for future delivery.

82

83

Parameters:

84

- messages: Single message or list of messages to schedule

85

- schedule_time_utc: UTC datetime when messages should be delivered

86

- timeout: Operation timeout in seconds

87

88

Returns:

89

List of sequence numbers for the scheduled messages

90

91

Raises:

92

- ServiceBusError: For Service Bus related errors

93

"""

94

95

def cancel_scheduled_messages(

96

self,

97

sequence_numbers: Union[int, List[int]],

98

*,

99

timeout: Optional[float] = None,

100

**kwargs

101

) -> None:

102

"""

103

Cancel previously scheduled messages.

104

105

Parameters:

106

- sequence_numbers: Single sequence number or list of sequence numbers to cancel

107

- timeout: Operation timeout in seconds

108

109

Raises:

110

- MessageNotFoundError: If scheduled message is not found

111

- ServiceBusError: For other Service Bus related errors

112

"""

113

```

114

115

#### Usage Example

116

117

```python

118

from datetime import datetime, timedelta

119

from azure.servicebus import ServiceBusClient, ServiceBusMessage

120

121

client = ServiceBusClient.from_connection_string("your_connection_string")

122

123

with client.get_queue_sender("my-queue") as sender:

124

# Schedule a message for 1 hour from now

125

future_time = datetime.utcnow() + timedelta(hours=1)

126

message = ServiceBusMessage("This message will be delivered in 1 hour")

127

128

sequence_numbers = sender.schedule_messages(message, future_time)

129

print(f"Scheduled message with sequence number: {sequence_numbers[0]}")

130

131

# Cancel the scheduled message if needed

132

sender.cancel_scheduled_messages(sequence_numbers[0])

133

```

134

135

### Message Batching

136

137

Create and manage message batches for efficient bulk sending.

138

139

```python { .api }

140

def create_message_batch(

141

self,

142

max_size_in_bytes: Optional[int] = None

143

) -> ServiceBusMessageBatch:

144

"""

145

Create an empty message batch.

146

147

Parameters:

148

- max_size_in_bytes: Maximum size of the batch in bytes (uses service limit if None)

149

150

Returns:

151

Empty ServiceBusMessageBatch instance

152

153

Raises:

154

- ValueError: If max_size_in_bytes is invalid

155

"""

156

```

157

158

### Message Receiving

159

160

Receive and process messages from queues and subscriptions with various modes and options.

161

162

```python { .api }

163

class ServiceBusReceiver:

164

@property

165

def fully_qualified_namespace(self) -> str:

166

"""The fully qualified namespace URL of the Service Bus."""

167

168

@property

169

def entity_path(self) -> str:

170

"""The full path of the entity (queue or subscription) that the receiver receives from."""

171

172

@property

173

def session(self) -> Optional[ServiceBusSession]:

174

"""The session object if this is a session-enabled receiver, None otherwise."""

175

176

def receive_messages(

177

self,

178

max_message_count: int = 1,

179

max_wait_time: Optional[float] = None

180

) -> List[ServiceBusReceivedMessage]:

181

"""

182

Receive messages from the Service Bus entity.

183

184

Parameters:

185

- max_message_count: Maximum number of messages to receive (1-256)

186

- max_wait_time: Maximum time to wait for messages in seconds

187

188

Returns:

189

List of ServiceBusReceivedMessage objects

190

191

Raises:

192

- ServiceBusError: For Service Bus related errors

193

"""

194

```

195

196

#### Usage Example

197

198

```python

199

from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode

200

201

client = ServiceBusClient.from_connection_string("your_connection_string")

202

203

with client.get_queue_receiver("my-queue") as receiver:

204

# Receive up to 10 messages, wait up to 5 seconds

205

messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)

206

207

for message in messages:

208

print(f"Received: {message.body}")

209

# Process the message

210

try:

211

# Your message processing logic here

212

process_message(message)

213

214

# Complete the message to remove it from the queue

215

receiver.complete_message(message)

216

except Exception as e:

217

print(f"Error processing message: {e}")

218

# Abandon the message to make it available for redelivery

219

receiver.abandon_message(message)

220

```

221

222

### Message Peeking

223

224

Peek at messages without receiving them (messages remain in the queue/subscription).

225

226

```python { .api }

227

def peek_messages(

228

self,

229

max_message_count: int = 1,

230

*,

231

sequence_number: int = 0,

232

timeout: Optional[float] = None,

233

**kwargs

234

) -> List[ServiceBusReceivedMessage]:

235

"""

236

Peek at messages without receiving them.

237

238

Parameters:

239

- max_message_count: Maximum number of messages to peek (1-32)

240

- sequence_number: Starting sequence number for peeking

241

- timeout: Operation timeout in seconds

242

243

Returns:

244

List of ServiceBusReceivedMessage objects (without locks)

245

246

Raises:

247

- ServiceBusError: For Service Bus related errors

248

"""

249

```

250

251

### Message Settlement

252

253

Various ways to settle (complete processing of) received messages.

254

255

```python { .api }

256

def complete_message(self, message: ServiceBusReceivedMessage) -> None:

257

"""

258

Complete a message, removing it from the queue/subscription.

259

260

Parameters:

261

- message: The message to complete

262

263

Raises:

264

- MessageAlreadySettled: If message was already settled

265

- MessageLockLostError: If message lock has expired

266

- ServiceBusError: For other Service Bus related errors

267

"""

268

269

def abandon_message(self, message: ServiceBusReceivedMessage) -> None:

270

"""

271

Abandon a message, making it available for redelivery.

272

273

Parameters:

274

- message: The message to abandon

275

276

Raises:

277

- MessageAlreadySettled: If message was already settled

278

- MessageLockLostError: If message lock has expired

279

- ServiceBusError: For other Service Bus related errors

280

"""

281

282

def defer_message(self, message: ServiceBusReceivedMessage) -> None:

283

"""

284

Defer a message for later processing.

285

286

Parameters:

287

- message: The message to defer

288

289

Raises:

290

- MessageAlreadySettled: If message was already settled

291

- MessageLockLostError: If message lock has expired

292

- ServiceBusError: For other Service Bus related errors

293

"""

294

295

def dead_letter_message(

296

self,

297

message: ServiceBusReceivedMessage,

298

reason: Optional[str] = None,

299

error_description: Optional[str] = None

300

) -> None:

301

"""

302

Move a message to the dead letter sub-queue.

303

304

Parameters:

305

- message: The message to dead letter

306

- reason: Reason for dead lettering

307

- error_description: Description of the error

308

309

Raises:

310

- MessageAlreadySettled: If message was already settled

311

- MessageLockLostError: If message lock has expired

312

- ServiceBusError: For other Service Bus related errors

313

"""

314

```

315

316

### Message Lock Management

317

318

Manage message locks to prevent timeout during processing.

319

320

```python { .api }

321

def renew_message_lock(

322

self,

323

message: ServiceBusReceivedMessage,

324

*,

325

timeout: Optional[float] = None,

326

**kwargs

327

) -> datetime:

328

"""

329

Renew the lock on a message.

330

331

Parameters:

332

- message: The message whose lock should be renewed

333

- timeout: Operation timeout in seconds

334

335

Returns:

336

New lock expiration time

337

338

Raises:

339

- MessageAlreadySettled: If message was already settled

340

- MessageLockLostError: If message lock has expired

341

- ServiceBusError: For other Service Bus related errors

342

"""

343

```

344

345

### Iterator Support

346

347

ServiceBusReceiver supports iteration for continuous message processing.

348

349

```python { .api }

350

def __iter__(self) -> Iterator[ServiceBusReceivedMessage]:

351

"""

352

Iterate over messages from the receiver.

353

354

Returns:

355

Iterator that yields ServiceBusReceivedMessage objects

356

"""

357

358

def __next__(self) -> ServiceBusReceivedMessage:

359

"""

360

Get the next message from the receiver.

361

362

Returns:

363

Next ServiceBusReceivedMessage

364

365

Raises:

366

StopIteration: When no more messages are available

367

"""

368

```

369

370

#### Usage Example

371

372

```python

373

with client.get_queue_receiver("my-queue") as receiver:

374

# Iterate over messages as they arrive

375

for message in receiver:

376

print(f"Received: {message.body}")

377

try:

378

process_message(message)

379

receiver.complete_message(message)

380

except Exception as e:

381

print(f"Error: {e}")

382

receiver.abandon_message(message)

383

384

# Break after processing 10 messages

385

if message.delivery_count and message.delivery_count >= 10:

386

break

387

```

388

389

### Context Manager Support

390

391

All senders and receivers support context manager protocol for automatic resource cleanup.

392

393

```python { .api }

394

# ServiceBusSender

395

def __enter__(self) -> ServiceBusSender: ...

396

def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

397

398

# ServiceBusReceiver

399

def __enter__(self) -> ServiceBusReceiver: ...

400

def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

401

402

def close(self) -> None:

403

"""Close the sender/receiver and release resources."""

404

```

405

406

## Asynchronous Message Operations

407

408

For asynchronous operations, use the async versions of ServiceBusSender and ServiceBusReceiver from the `azure.servicebus.aio` module.

409

410

```python { .api }

411

from azure.servicebus.aio import ServiceBusSender, ServiceBusReceiver

412

413

# All methods have async equivalents

414

class ServiceBusSender:

415

async def send_messages(self, message, **kwargs) -> None: ...

416

async def schedule_messages(self, messages, schedule_time_utc, **kwargs) -> List[int]: ...

417

async def cancel_scheduled_messages(self, sequence_numbers, **kwargs) -> None: ...

418

async def close(self) -> None: ...

419

async def __aenter__(self) -> ServiceBusSender: ...

420

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...

421

422

class ServiceBusReceiver:

423

async def receive_messages(self, max_message_count=1, max_wait_time=None) -> List[ServiceBusReceivedMessage]: ...

424

async def peek_messages(self, max_message_count=1, **kwargs) -> List[ServiceBusReceivedMessage]: ...

425

async def complete_message(self, message: ServiceBusReceivedMessage) -> None: ...

426

async def abandon_message(self, message: ServiceBusReceivedMessage) -> None: ...

427

async def defer_message(self, message: ServiceBusReceivedMessage) -> None: ...

428

async def dead_letter_message(self, message: ServiceBusReceivedMessage, reason=None, error_description=None) -> None: ...

429

async def renew_message_lock(self, message: ServiceBusReceivedMessage, **kwargs) -> datetime: ...

430

async def close(self) -> None: ...

431

async def __aenter__(self) -> ServiceBusReceiver: ...

432

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...

433

def __aiter__(self) -> AsyncIterator[ServiceBusReceivedMessage]: ...

434

async def __anext__(self) -> ServiceBusReceivedMessage: ...

435

```

436

437

#### Usage Example

438

439

```python

440

import asyncio

441

from azure.servicebus.aio import ServiceBusClient

442

from azure.servicebus import ServiceBusMessage

443

444

async def send_and_receive():

445

async with ServiceBusClient.from_connection_string("your_connection_string") as client:

446

# Send messages asynchronously

447

async with client.get_queue_sender("my-queue") as sender:

448

messages = [ServiceBusMessage(f"Message {i}") for i in range(5)]

449

await sender.send_messages(messages)

450

451

# Receive messages asynchronously

452

async with client.get_queue_receiver("my-queue") as receiver:

453

async for message in receiver:

454

print(f"Received: {message.body}")

455

await receiver.complete_message(message)

456

457

asyncio.run(send_and_receive())

458

```