or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

administrative-operations.mdclient-management.mdconstants-enums.mdexception-handling.mdindex.mdmessage-operations.mdmessage-types.mdsession-management.md

message-types.mddocs/

0

# Message Classes and Types

1

2

Message data structures, properties, and utility classes for creating, configuring, and processing Service Bus messages with full metadata support.

3

4

## Capabilities

5

6

### ServiceBusMessage

7

8

Message class for creating messages to send to Service Bus entities.

9

10

```python { .api }

11

class ServiceBusMessage:

12

def __init__(

13

self,

14

body: Optional[Union[str, bytes]],

15

*,

16

application_properties: Optional[Dict[Union[str, bytes], PrimitiveTypes]] = None,

17

session_id: Optional[str] = None,

18

message_id: Optional[str] = None,

19

scheduled_enqueue_time_utc: Optional[datetime] = None,

20

time_to_live: Optional[timedelta] = None,

21

content_type: Optional[str] = None,

22

correlation_id: Optional[str] = None,

23

subject: Optional[str] = None,

24

partition_key: Optional[str] = None,

25

to: Optional[str] = None,

26

reply_to: Optional[str] = None,

27

reply_to_session_id: Optional[str] = None,

28

**kwargs

29

):

30

"""

31

Create a Service Bus message.

32

33

Parameters:

34

- body: Message content (string or bytes)

35

- application_properties: Custom application properties dictionary

36

- session_id: Session identifier for session-enabled entities

37

- message_id: Unique message identifier

38

- scheduled_enqueue_time_utc: UTC time to schedule message delivery

39

- time_to_live: Message expiration duration

40

- content_type: MIME content type

41

- correlation_id: Correlation identifier for message correlation

42

- subject: Message subject or label

43

- partition_key: Partitioning key for partitioned entities

44

- to: Destination address

45

- reply_to: Reply-to address for response messages

46

- reply_to_session_id: Session ID for reply messages

47

"""

48

```

49

50

### ServiceBusMessage Properties

51

52

Access and modify message properties and metadata.

53

54

```python { .api }

55

@property

56

def body(self) -> Any:

57

"""

58

The message body content.

59

60

Returns:

61

Message body as provided during creation

62

"""

63

64

@property

65

def body_type(self) -> AmqpMessageBodyType:

66

"""

67

The AMQP message body type.

68

69

Returns:

70

AmqpMessageBodyType enum value

71

"""

72

73

@property

74

def application_properties(self) -> Optional[Dict[Union[str, bytes], PrimitiveTypes]]:

75

"""

76

Custom application-specific properties.

77

78

Returns:

79

Dictionary of application properties or None

80

"""

81

82

@application_properties.setter

83

def application_properties(self, value: Optional[Dict[Union[str, bytes], PrimitiveTypes]]) -> None: ...

84

85

@property

86

def session_id(self) -> Optional[str]:

87

"""

88

Session identifier for session-enabled entities.

89

90

Returns:

91

Session ID string or None

92

"""

93

94

@session_id.setter

95

def session_id(self, value: Optional[str]) -> None: ...

96

97

@property

98

def message_id(self) -> Optional[str]:

99

"""

100

Unique message identifier.

101

102

Returns:

103

Message ID string or None

104

"""

105

106

@message_id.setter

107

def message_id(self, value: Optional[str]) -> None: ...

108

109

@property

110

def scheduled_enqueue_time_utc(self) -> Optional[datetime]:

111

"""

112

UTC time when message should be enqueued.

113

114

Returns:

115

Scheduled enqueue time or None for immediate delivery

116

"""

117

118

@scheduled_enqueue_time_utc.setter

119

def scheduled_enqueue_time_utc(self, value: Optional[datetime]) -> None: ...

120

121

@property

122

def time_to_live(self) -> Optional[timedelta]:

123

"""

124

Message time-to-live duration.

125

126

Returns:

127

Time-to-live duration or None for no expiration

128

"""

129

130

@time_to_live.setter

131

def time_to_live(self, value: Optional[timedelta]) -> None: ...

132

133

@property

134

def content_type(self) -> Optional[str]:

135

"""

136

MIME content type of the message body.

137

138

Returns:

139

Content type string or None

140

"""

141

142

@content_type.setter

143

def content_type(self, value: Optional[str]) -> None: ...

144

145

@property

146

def correlation_id(self) -> Optional[str]:

147

"""

148

Correlation identifier for message correlation patterns.

149

150

Returns:

151

Correlation ID string or None

152

"""

153

154

@correlation_id.setter

155

def correlation_id(self, value: Optional[str]) -> None: ...

156

157

@property

158

def subject(self) -> Optional[str]:

159

"""

160

Message subject or label.

161

162

Returns:

163

Subject string or None

164

"""

165

166

@subject.setter

167

def subject(self, value: Optional[str]) -> None: ...

168

169

@property

170

def partition_key(self) -> Optional[str]:

171

"""

172

Partitioning key for partitioned entities.

173

174

Returns:

175

Partition key string or None

176

"""

177

178

@partition_key.setter

179

def partition_key(self, value: Optional[str]) -> None: ...

180

181

@property

182

def to(self) -> Optional[str]:

183

"""

184

Destination address.

185

186

Returns:

187

Destination address string or None

188

"""

189

190

@to.setter

191

def to(self, value: Optional[str]) -> None: ...

192

193

@property

194

def reply_to(self) -> Optional[str]:

195

"""

196

Reply-to address for response messages.

197

198

Returns:

199

Reply-to address string or None

200

"""

201

202

@reply_to.setter

203

def reply_to(self, value: Optional[str]) -> None: ...

204

205

@property

206

def reply_to_session_id(self) -> Optional[str]:

207

"""

208

Session ID for reply messages.

209

210

Returns:

211

Reply-to session ID string or None

212

"""

213

214

@reply_to_session_id.setter

215

def reply_to_session_id(self, value: Optional[str]) -> None: ...

216

217

@property

218

def raw_amqp_message(self) -> AmqpAnnotatedMessage:

219

"""

220

Underlying AMQP message for advanced scenarios.

221

222

Returns:

223

AmqpAnnotatedMessage object

224

"""

225

```

226

227

#### Usage Example

228

229

```python

230

from azure.servicebus import ServiceBusMessage

231

from datetime import datetime, timedelta

232

233

# Create a basic message

234

message = ServiceBusMessage("Hello, Service Bus!")

235

236

# Create a message with properties

237

message = ServiceBusMessage(

238

body="Order processing request",

239

application_properties={

240

"order_id": "12345",

241

"priority": "high",

242

"customer_id": "customer_abc"

243

},

244

session_id="order-session-12345",

245

message_id="msg-001",

246

content_type="application/json",

247

correlation_id="corr-123",

248

subject="OrderProcessing",

249

time_to_live=timedelta(hours=24)

250

)

251

252

# Schedule a message for future delivery

253

future_message = ServiceBusMessage(

254

"This will be delivered in 1 hour",

255

scheduled_enqueue_time_utc=datetime.utcnow() + timedelta(hours=1)

256

)

257

258

# Modify properties after creation

259

message.application_properties["processed_at"] = datetime.utcnow().isoformat()

260

message.subject = "UpdatedSubject"

261

```

262

263

### ServiceBusReceivedMessage

264

265

Message class for messages received from Service Bus entities (extends ServiceBusMessage).

266

267

```python { .api }

268

class ServiceBusReceivedMessage(ServiceBusMessage):

269

# Inherits all properties from ServiceBusMessage

270

271

@property

272

def dead_letter_error_description(self) -> Optional[str]:

273

"""

274

Error description if message was dead lettered.

275

276

Returns:

277

Error description string or None

278

"""

279

280

@property

281

def dead_letter_reason(self) -> Optional[str]:

282

"""

283

Reason if message was dead lettered.

284

285

Returns:

286

Dead letter reason string or None

287

"""

288

289

@property

290

def dead_letter_source(self) -> Optional[str]:

291

"""

292

Source entity name if message was dead lettered.

293

294

Returns:

295

Source entity name string or None

296

"""

297

298

@property

299

def state(self) -> ServiceBusMessageState:

300

"""

301

Current state of the message.

302

303

Returns:

304

ServiceBusMessageState enum value (ACTIVE, DEFERRED, SCHEDULED)

305

"""

306

307

@property

308

def delivery_count(self) -> Optional[int]:

309

"""

310

Number of times this message has been delivered.

311

312

Returns:

313

Delivery count or None

314

"""

315

316

@property

317

def enqueued_sequence_number(self) -> Optional[int]:

318

"""

319

Sequence number assigned when message was enqueued.

320

321

Returns:

322

Enqueued sequence number or None

323

"""

324

325

@property

326

def enqueued_time_utc(self) -> Optional[datetime]:

327

"""

328

UTC time when message was enqueued.

329

330

Returns:

331

Enqueue time or None

332

"""

333

334

@property

335

def expires_at_utc(self) -> Optional[datetime]:

336

"""

337

UTC time when message expires.

338

339

Returns:

340

Expiration time or None

341

"""

342

343

@property

344

def sequence_number(self) -> Optional[int]:

345

"""

346

Unique sequence number assigned by Service Bus.

347

348

Returns:

349

Sequence number or None

350

"""

351

352

@property

353

def lock_token(self) -> Optional[Union[uuid.UUID, str]]:

354

"""

355

Lock token for PEEK_LOCK receive mode.

356

357

Returns:

358

Lock token UUID/string or None

359

"""

360

361

@property

362

def locked_until_utc(self) -> Optional[datetime]:

363

"""

364

UTC time when message lock expires.

365

366

Returns:

367

Lock expiration time or None

368

"""

369

```

370

371

#### Usage Example

372

373

```python

374

from azure.servicebus import ServiceBusClient, ServiceBusMessageState

375

376

client = ServiceBusClient.from_connection_string("your_connection_string")

377

378

with client.get_queue_receiver("my-queue") as receiver:

379

messages = receiver.receive_messages(max_message_count=5)

380

381

for message in messages:

382

print(f"Message body: {message.body}")

383

print(f"Sequence number: {message.sequence_number}")

384

print(f"Delivery count: {message.delivery_count}")

385

print(f"Enqueued at: {message.enqueued_time_utc}")

386

print(f"Expires at: {message.expires_at_utc}")

387

print(f"Lock expires at: {message.locked_until_utc}")

388

print(f"State: {message.state}")

389

390

# Check application properties

391

if message.application_properties:

392

for key, value in message.application_properties.items():

393

print(f" {key}: {value}")

394

395

# Check if message has been redelivered

396

if message.delivery_count and message.delivery_count > 1:

397

print(f"Message has been redelivered {message.delivery_count} times")

398

399

# Check message state

400

if message.state == ServiceBusMessageState.DEFERRED:

401

print("Message is deferred")

402

elif message.state == ServiceBusMessageState.SCHEDULED:

403

print("Message is scheduled")

404

405

receiver.complete_message(message)

406

```

407

408

### ServiceBusMessageBatch

409

410

Batch container for efficient sending of multiple messages.

411

412

```python { .api }

413

class ServiceBusMessageBatch:

414

@property

415

def max_size_in_bytes(self) -> int:

416

"""

417

Maximum size of the batch in bytes.

418

419

Returns:

420

Maximum batch size

421

"""

422

423

@property

424

def size_in_bytes(self) -> int:

425

"""

426

Current size of the batch in bytes.

427

428

Returns:

429

Current batch size

430

"""

431

432

def add_message(self, message: Union[ServiceBusMessage, AmqpAnnotatedMessage]) -> None:

433

"""

434

Add a message to the batch.

435

436

Parameters:

437

- message: ServiceBusMessage or AmqpAnnotatedMessage to add

438

439

Raises:

440

- ValueError: If message would exceed batch size limit

441

"""

442

```

443

444

#### Usage Example

445

446

```python

447

from azure.servicebus import ServiceBusClient, ServiceBusMessage

448

449

client = ServiceBusClient.from_connection_string("your_connection_string")

450

451

with client.get_queue_sender("my-queue") as sender:

452

# Create a message batch

453

batch = sender.create_message_batch()

454

455

# Add messages to the batch

456

for i in range(100):

457

message = ServiceBusMessage(f"Batch message {i}")

458

try:

459

batch.add_message(message)

460

except ValueError:

461

# Batch is full, send it and create a new one

462

print(f"Sending batch with {batch.size_in_bytes} bytes")

463

sender.send_messages(batch)

464

465

# Create new batch and add current message

466

batch = sender.create_message_batch()

467

batch.add_message(message)

468

469

# Send remaining messages in the batch

470

if batch.size_in_bytes > 0:

471

print(f"Sending final batch with {batch.size_in_bytes} bytes")

472

sender.send_messages(batch)

473

```

474

475

### AMQP Message Types

476

477

Low-level AMQP message types for advanced scenarios.

478

479

```python { .api }

480

class AmqpAnnotatedMessage:

481

"""

482

Low-level AMQP message representation.

483

484

Provides direct access to AMQP message structure for advanced scenarios

485

where fine-grained control over message properties is needed.

486

"""

487

@property

488

def body(self) -> Any: ...

489

@property

490

def body_type(self) -> AmqpMessageBodyType: ...

491

@property

492

def properties(self) -> Optional[AmqpMessageProperties]: ...

493

@property

494

def application_properties(self) -> Optional[Dict]: ...

495

@property

496

def annotations(self) -> Optional[Dict]: ...

497

@property

498

def delivery_annotations(self) -> Optional[Dict]: ...

499

@property

500

def header(self) -> Optional[AmqpMessageHeader]: ...

501

@property

502

def footer(self) -> Optional[Dict]: ...

503

504

class AmqpMessageBodyType(Enum):

505

"""AMQP message body types."""

506

DATA = "data"

507

SEQUENCE = "sequence"

508

VALUE = "value"

509

510

class AmqpMessageProperties:

511

"""AMQP message properties section."""

512

@property

513

def message_id(self) -> Optional[Union[str, bytes]]: ...

514

@property

515

def user_id(self) -> Optional[bytes]: ...

516

@property

517

def to(self) -> Optional[Union[str, bytes]]: ...

518

@property

519

def subject(self) -> Optional[str]: ...

520

@property

521

def reply_to(self) -> Optional[Union[str, bytes]]: ...

522

@property

523

def correlation_id(self) -> Optional[Union[str, bytes]]: ...

524

@property

525

def content_type(self) -> Optional[str]: ...

526

@property

527

def content_encoding(self) -> Optional[str]: ...

528

@property

529

def absolute_expiry_time(self) -> Optional[int]: ...

530

@property

531

def creation_time(self) -> Optional[int]: ...

532

@property

533

def group_id(self) -> Optional[str]: ...

534

@property

535

def group_sequence(self) -> Optional[int]: ...

536

@property

537

def reply_to_group_id(self) -> Optional[str]: ...

538

539

class AmqpMessageHeader:

540

"""AMQP message header section."""

541

@property

542

def durable(self) -> Optional[bool]: ...

543

@property

544

def priority(self) -> Optional[int]: ...

545

@property

546

def time_to_live(self) -> Optional[int]: ...

547

@property

548

def first_acquirer(self) -> Optional[bool]: ...

549

@property

550

def delivery_count(self) -> Optional[int]: ...

551

```

552

553

### Message State and Type Enums

554

555

Enumerations for message states and types.

556

557

```python { .api }

558

class ServiceBusMessageState(int, Enum):

559

"""

560

Message states in Service Bus.

561

"""

562

ACTIVE = 0 # Message is active in queue/subscription

563

DEFERRED = 1 # Message has been deferred for later processing

564

SCHEDULED = 2 # Message is scheduled for future delivery

565

```

566

567

### Auto Lock Renewal

568

569

Automatic lock renewal for messages during long processing.

570

571

```python { .api }

572

class AutoLockRenewer:

573

def register(

574

self,

575

renewable: Union[ServiceBusSession, ServiceBusReceivedMessage],

576

timeout: float = 300

577

) -> None:

578

"""

579

Register a message for automatic lock renewal.

580

581

Parameters:

582

- renewable: ServiceBusReceivedMessage to auto-renew

583

- timeout: Maximum renewal duration in seconds

584

585

Raises:

586

- ValueError: If message is already registered or invalid

587

"""

588

```

589

590

#### Usage Example

591

592

```python

593

from azure.servicebus import ServiceBusClient, AutoLockRenewer

594

import time

595

596

def process_long_running_message(message):

597

# Simulate long processing time

598

time.sleep(60) # 1 minute processing

599

return f"Processed: {message.body}"

600

601

client = ServiceBusClient.from_connection_string("your_connection_string")

602

auto_renewer = AutoLockRenewer(max_lock_renewal_duration=300) # 5 minutes

603

604

try:

605

with client.get_queue_receiver("my-queue") as receiver:

606

messages = receiver.receive_messages(max_message_count=5)

607

608

for message in messages:

609

# Register message for auto-renewal

610

auto_renewer.register(message, timeout=300)

611

612

try:

613

result = process_long_running_message(message)

614

print(result)

615

receiver.complete_message(message)

616

except Exception as e:

617

print(f"Error processing message: {e}")

618

receiver.abandon_message(message)

619

620

finally:

621

auto_renewer.close()

622

```

623

624

## Type Definitions

625

626

Common type aliases used throughout the SDK.

627

628

```python { .api }

629

# Union types for message parameters

630

MessageTypes = Union[

631

ServiceBusMessage,

632

AmqpAnnotatedMessage,

633

List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]

634

]

635

636

# Primitive types for application properties

637

PrimitiveTypes = Union[int, float, str, bool, bytes, None]

638

639

# Session filter type

640

NextAvailableSessionType = ServiceBusSessionFilter

641

642

# Lock renewal callback type

643

LockRenewFailureCallback = Callable[

644

[Union[ServiceBusSession, ServiceBusReceivedMessage], Optional[Exception]],

645

None

646

]

647

648

# Azure credential types (from azure-core)

649

TokenCredential = Any # azure.core.credentials.TokenCredential

650

AzureSasCredential = Any # azure.core.credentials.AzureSasCredential

651

AzureNamedKeyCredential = Any # azure.core.credentials.AzureNamedKeyCredential

652

```