or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

address-endpoints.mdasync-operations.mdauthentication.mdclient-apis.mdconnection-session.mderror-handling.mdhigh-level-messaging.mdindex.mdlow-level-protocol.mdmessage-management.mdtypes-constants.md

message-management.mddocs/

0

# Message Management

1

2

Core message classes for creating, manipulating, and processing AMQP messages including support for different body types, properties, headers, annotations, and batch operations.

3

4

## Capabilities

5

6

### Message Class

7

8

The core AMQP message representation that encapsulates all message data including body, properties, headers, and annotations.

9

10

```python { .api }

11

class Message:

12

def __init__(self, body=None, properties=None, application_properties=None,

13

annotations=None, header=None, msg_format=None, encoding='UTF-8',

14

body_type=None, footer=None, delivery_annotations=None):

15

"""

16

Create an AMQP message.

17

18

Parameters:

19

- body: Message body (str, bytes, list, or dict)

20

- properties (MessageProperties): Standard AMQP message properties

21

- application_properties (dict): Custom application-specific properties

22

- annotations (dict): Message annotations (broker-specific metadata)

23

- header (MessageHeader): Message header with delivery information

24

- msg_format (int): Message format identifier

25

- encoding (str): Text encoding for string data

26

- body_type (MessageBodyType): Type of message body (Data, Value, Sequence)

27

- footer (dict): Message footer annotations

28

- delivery_annotations (dict): Delivery-specific annotations

29

"""

30

```

31

32

**Key Properties:**

33

34

```python { .api }

35

# Message content access

36

@property

37

def data: bytes # Binary message data

38

@property

39

def value: any # Decoded message value

40

@property

41

def sequence: list # Sequence message data

42

43

# Message metadata

44

@property

45

def properties: MessageProperties # Standard message properties

46

@property

47

def application_properties: dict # Custom properties

48

@property

49

def header: MessageHeader # Message header

50

@property

51

def annotations: dict # Message annotations

52

@property

53

def delivery_annotations: dict # Delivery annotations

54

@property

55

def footer: dict # Message footer

56

57

# Message state

58

@property

59

def settled: bool # Whether message is settled

60

@property

61

def state: MessageState # Current message state (enum)

62

@property

63

def idle_time: int # Time message has been idle

64

@property

65

def retries: int # Number of retry attempts

66

@property

67

def delivery_no: int # Delivery sequence number

68

@property

69

def delivery_tag: bytes # Delivery tag identifier

70

@property

71

def message_annotations: dict # Alias for annotations property

72

```

73

74

**Key Methods:**

75

76

```python { .api }

77

def get_data(self):

78

"""Get the message body as bytes."""

79

80

def get_message(self):

81

"""Get the underlying C message object."""

82

83

def accept(self):

84

"""

85

Accept the message (acknowledge successful processing).

86

87

Returns:

88

bool: True if settlement succeeded

89

"""

90

91

def reject(self, condition=None, description=None, info=None):

92

"""

93

Reject the message (indicate processing failure).

94

95

Parameters:

96

- condition (str): Error condition code

97

- description (str): Error description

98

- info (dict): Additional error information

99

100

Returns:

101

bool: True if settlement succeeded

102

"""

103

104

def release(self):

105

"""

106

Release the message (return to queue for redelivery).

107

108

Returns:

109

bool: True if settlement succeeded

110

"""

111

112

def modify(self, failed, deliverable, annotations=None):

113

"""

114

Modify message state and annotations.

115

116

Parameters:

117

- failed (bool): Whether message processing failed

118

- deliverable (bool): Whether message is deliverable

119

- annotations (dict): Additional annotations to add

120

"""

121

122

def gather(self):

123

"""Gather multi-part message data."""

124

125

def get_message_encoded_size(self):

126

"""

127

Get the encoded size of the message.

128

129

Returns:

130

int: Message size in bytes

131

"""

132

133

def encode_message(self):

134

"""

135

Encode message to AMQP wire format.

136

137

Returns:

138

bytearray: Encoded message data

139

"""

140

141

@classmethod

142

def decode_from_bytes(cls, data, encoding='UTF-8'):

143

"""

144

Decode message from raw AMQP bytes.

145

146

Parameters:

147

- data (bytes): Raw AMQP message data

148

- encoding (str): Text encoding for string data

149

150

Returns:

151

Message: Decoded message instance

152

"""

153

```

154

155

**Usage Examples:**

156

157

```python

158

from uamqp import Message

159

from uamqp.message import MessageProperties, MessageHeader

160

161

# Simple text message

162

message = Message("Hello World")

163

164

# Message with properties

165

properties = MessageProperties(

166

message_id="msg-123",

167

content_type="text/plain",

168

reply_to="response-queue"

169

)

170

message = Message("Hello World", properties=properties)

171

172

# Message with custom application properties

173

app_props = {"priority": "high", "source": "sensor-1"}

174

message = Message("sensor data", application_properties=app_props)

175

176

# Binary message

177

binary_data = b'\x00\x01\x02\x03'

178

message = Message(binary_data)

179

180

# JSON message (will be serialized)

181

json_data = {"temperature": 23.5, "humidity": 65}

182

message = Message(json_data)

183

```

184

185

### BatchMessage Class

186

187

Optimized message class for high-throughput scenarios that can contain multiple individual messages in a single batch.

188

189

```python { .api }

190

class BatchMessage:

191

def __init__(self, data=None, properties=None, application_properties=None,

192

annotations=None, header=None, multi_messages=False, encoding='UTF-8'):

193

"""

194

Create a batch message for high-throughput scenarios.

195

196

Parameters:

197

- data: Batch data (list of messages or encoded batch data)

198

- properties (MessageProperties): Batch-level message properties

199

- application_properties (dict): Custom batch properties

200

- annotations (dict): Batch annotations

201

- header (MessageHeader): Batch header

202

- multi_messages (bool): Whether data contains multiple distinct messages

203

- encoding (str): Text encoding

204

"""

205

```

206

207

**Key Properties:**

208

209

```python { .api }

210

@property

211

def batch_format: int # Batch message format identifier

212

@property

213

def max_message_length: int # Maximum individual message length

214

@property

215

def data: bytes # Batch data

216

```

217

218

**Key Methods:**

219

220

```python { .api }

221

def gather(self):

222

"""Gather and prepare batch data for transmission."""

223

```

224

225

**Usage Example:**

226

227

```python

228

from uamqp import BatchMessage, Message

229

230

# Create individual messages

231

messages = [

232

Message("Message 1"),

233

Message("Message 2"),

234

Message("Message 3")

235

]

236

237

# Create batch message

238

batch = BatchMessage(messages, multi_messages=True)

239

240

# Send batch message

241

from uamqp import SendClient

242

with SendClient(target) as client:

243

client.queue_message(batch)

244

client.send_all_messages()

245

```

246

247

### MessageProperties Class

248

249

Container for standard AMQP message properties following the AMQP 1.0 specification.

250

251

```python { .api }

252

class MessageProperties:

253

def __init__(self, message_id=None, user_id=None, to=None, subject=None,

254

reply_to=None, correlation_id=None, content_type=None,

255

content_encoding=None, absolute_expiry_time=None,

256

creation_time=None, group_id=None, group_sequence=None,

257

reply_to_group_id=None):

258

"""

259

Standard AMQP message properties.

260

261

Parameters:

262

- message_id: Unique message identifier

263

- user_id: Identity of user responsible for producing message

264

- to: Address of node the message is being sent to

265

- subject: Application-specific subject

266

- reply_to: Address of node to send replies to

267

- correlation_id: Application correlation identifier

268

- content_type: MIME content type

269

- content_encoding: MIME content encoding

270

- absolute_expiry_time: Absolute expiry time

271

- creation_time: Message creation timestamp

272

- group_id: Group identifier

273

- group_sequence: Position within group

274

- reply_to_group_id: Group identifier for replies

275

"""

276

277

def get_properties_obj(self):

278

"""

279

Get underlying C properties reference.

280

281

Returns:

282

uamqp.c_uamqp.cProperties: C properties object

283

"""

284

```

285

286

**Usage Example:**

287

288

```python

289

from uamqp.message import MessageProperties

290

import datetime

291

292

properties = MessageProperties(

293

message_id="order-12345",

294

content_type="application/json",

295

reply_to="order-responses",

296

correlation_id="req-98765",

297

creation_time=datetime.datetime.utcnow(),

298

subject="order-created"

299

)

300

301

message = Message(order_data, properties=properties)

302

```

303

304

### MessageHeader Class

305

306

Container for AMQP message header information including delivery details and quality of service settings.

307

308

```python { .api }

309

class MessageHeader:

310

def __init__(self, durable=None, priority=None, time_to_live=None,

311

first_acquirer=None, delivery_count=None):

312

"""

313

AMQP message header information.

314

315

Parameters:

316

- durable (bool): Whether message should survive broker restart

317

- priority (int): Message priority (0-255)

318

- time_to_live (int): Message TTL in milliseconds

319

- first_acquirer (bool): Whether this is first attempt to acquire

320

- delivery_count (int): Number of delivery attempts

321

"""

322

```

323

324

**Key Properties:**

325

326

```python { .api }

327

@property

328

def delivery_count: int # Number of delivery attempts

329

@property

330

def time_to_live: int # TTL in milliseconds

331

@property

332

def ttl: int # Alias for time_to_live

333

@property

334

def durable: bool # Message durability

335

@property

336

def first_acquirer: bool # First acquisition flag

337

@property

338

def priority: int # Message priority (0-255)

339

340

def get_header_obj(self):

341

"""

342

Get underlying C header reference.

343

344

Returns:

345

uamqp.c_uamqp.cHeader: C header object

346

"""

347

```

348

349

**Usage Example:**

350

351

```python

352

from uamqp.message import MessageHeader

353

354

# High priority, durable message with 1 hour TTL

355

header = MessageHeader(

356

durable=True,

357

priority=255,

358

time_to_live=3600000 # 1 hour in milliseconds

359

)

360

361

message = Message("Important data", header=header)

362

```

363

364

### Message Body Types

365

366

Different message body formats supported by AMQP 1.0.

367

368

```python { .api }

369

class MessageBody:

370

"""Base class for message bodies."""

371

@property

372

def type: int # Body type identifier

373

@property

374

def data: any # Body data

375

376

class DataBody(MessageBody):

377

"""Binary data message body."""

378

def append(self, data):

379

"""Append binary data to body."""

380

381

def __len__(self):

382

"""Get number of data segments."""

383

384

def __getitem__(self, index):

385

"""Get data segment by index."""

386

387

def __str__(self):

388

"""String representation of data body."""

389

390

def __bytes__(self):

391

"""Bytes representation of data body."""

392

393

class ValueBody(MessageBody):

394

"""Single encoded value message body."""

395

def set(self, value):

396

"""Set the body value."""

397

398

def __str__(self):

399

"""String representation of value body."""

400

401

def __bytes__(self):

402

"""Bytes representation of value body."""

403

404

class SequenceBody(MessageBody):

405

"""Sequence of structured data message body."""

406

def append(self, data):

407

"""Append data to sequence."""

408

409

def __len__(self):

410

"""Get number of sequence items."""

411

412

def __getitem__(self, index):

413

"""Get sequence item by index."""

414

415

def __str__(self):

416

"""String representation of sequence body."""

417

418

def __bytes__(self):

419

"""Bytes representation of sequence body."""

420

```

421

422

**Usage Example:**

423

424

```python

425

from uamqp.message import DataBody, ValueBody, SequenceBody

426

427

# Binary data body

428

data_body = DataBody()

429

data_body.append(b"binary data chunk 1")

430

data_body.append(b"binary data chunk 2")

431

432

# Single value body

433

value_body = ValueBody()

434

value_body.set({"key": "value", "number": 42})

435

436

# Sequence body

437

sequence_body = SequenceBody()

438

sequence_body.append("item 1")

439

sequence_body.append("item 2")

440

sequence_body.append({"structured": "data"})

441

```

442

443

### MessageBodyType Enum

444

445

Message body type constants that define how message data is structured:

446

447

```python { .api }

448

class MessageBodyType:

449

"""Message body type identifiers."""

450

Data = 0x75 # Binary data body

451

Value = 0x77 # Single encoded value body

452

Sequence = 0x76 # Sequence of structured data body

453

```

454

455

## Message Settlement

456

457

AMQP message settlement patterns for reliable message processing:

458

459

```python

460

# Accept message (successful processing)

461

message.accept()

462

463

# Reject message (processing failed, don't retry)

464

message.reject(

465

condition="processing-error",

466

description="Invalid data format"

467

)

468

469

# Release message (temporary failure, allow retry)

470

message.release()

471

472

# Modify message (change delivery state/annotations)

473

message.modify(

474

failed=True,

475

deliverable=False,

476

annotations={"retry_count": 3}

477

)

478

```