or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdchannel-operations.mdconnection-adapters.mdconnection-management.mdexception-handling.mdindex.mdmessage-properties-types.md

message-properties-types.mddocs/

0

# Message Properties & Types

1

2

AMQP message properties, delivery modes, exchange types, and type definitions for comprehensive message handling and routing control in RabbitMQ.

3

4

## Capabilities

5

6

### Message Properties

7

8

AMQP BasicProperties for message metadata and routing information.

9

10

```python { .api }

11

class BasicProperties:

12

"""AMQP message properties."""

13

14

def __init__(self, content_type=None, content_encoding=None, headers=None,

15

delivery_mode=None, priority=None, correlation_id=None,

16

reply_to=None, expiration=None, message_id=None,

17

timestamp=None, type=None, user_id=None, app_id=None,

18

cluster_id=None):

19

"""

20

Create message properties.

21

22

Parameters:

23

- content_type (str): MIME content type (e.g., 'application/json')

24

- content_encoding (str): Content encoding (e.g., 'utf-8')

25

- headers (dict): Application-specific headers

26

- delivery_mode (int): Delivery mode (1=transient, 2=persistent)

27

- priority (int): Message priority (0-9)

28

- correlation_id (str): Correlation identifier for RPC

29

- reply_to (str): Queue name for RPC replies

30

- expiration (str): Message expiration time in milliseconds

31

- message_id (str): Unique message identifier

32

- timestamp (int): Message timestamp (Unix time)

33

- type (str): Message type identifier

34

- user_id (str): User ID that published the message

35

- app_id (str): Application identifier

36

- cluster_id (str): Cluster identifier

37

"""

38

39

def decode(self, encoded_data):

40

"""

41

Decode properties from bytes.

42

43

Parameters:

44

- encoded_data (bytes): Encoded property data

45

"""

46

47

def encode(self):

48

"""

49

Encode properties to bytes.

50

51

Returns:

52

- bytes: Encoded property data

53

"""

54

55

@property

56

def content_type(self) -> str:

57

"""MIME content type."""

58

59

@property

60

def content_encoding(self) -> str:

61

"""Content encoding."""

62

63

@property

64

def headers(self) -> dict:

65

"""Application headers dictionary."""

66

67

@property

68

def delivery_mode(self) -> int:

69

"""Delivery mode (1=transient, 2=persistent)."""

70

71

@property

72

def priority(self) -> int:

73

"""Message priority (0-9)."""

74

75

@property

76

def correlation_id(self) -> str:

77

"""Correlation ID for request/reply patterns."""

78

79

@property

80

def reply_to(self) -> str:

81

"""Reply queue name."""

82

83

@property

84

def expiration(self) -> str:

85

"""Message expiration in milliseconds."""

86

87

@property

88

def message_id(self) -> str:

89

"""Unique message identifier."""

90

91

@property

92

def timestamp(self) -> int:

93

"""Message timestamp (Unix time)."""

94

95

@property

96

def type(self) -> str:

97

"""Message type identifier."""

98

99

@property

100

def user_id(self) -> str:

101

"""User ID of message publisher."""

102

103

@property

104

def app_id(self) -> str:

105

"""Application identifier."""

106

107

@property

108

def cluster_id(self) -> str:

109

"""Cluster identifier."""

110

```

111

112

### Delivery Mode

113

114

Message persistence modes for durability control.

115

116

```python { .api }

117

from enum import Enum

118

119

class DeliveryMode(Enum):

120

"""Message delivery mode enumeration."""

121

122

Transient = 1 # Non-persistent messages (default)

123

Persistent = 2 # Persistent messages (survive broker restart)

124

```

125

126

### Exchange Types

127

128

AMQP exchange types for message routing patterns.

129

130

```python { .api }

131

from enum import Enum

132

133

class ExchangeType(Enum):

134

"""Exchange type enumeration."""

135

136

direct = 'direct' # Direct routing by routing key

137

fanout = 'fanout' # Broadcast to all bound queues

138

headers = 'headers' # Route based on header attributes

139

topic = 'topic' # Pattern-based routing with wildcards

140

```

141

142

### Protocol Constants

143

144

AMQP protocol constants and frame definitions.

145

146

```python { .api }

147

# Protocol version

148

PROTOCOL_VERSION = (0, 9, 1)

149

PORT = 5672

150

151

# Frame types

152

FRAME_METHOD = 1

153

FRAME_HEADER = 2

154

FRAME_BODY = 3

155

FRAME_HEARTBEAT = 8

156

157

# Frame size limits

158

FRAME_MAX_SIZE = 131072

159

FRAME_MIN_SIZE = 4096

160

FRAME_HEADER_SIZE = 7

161

FRAME_END_SIZE = 1

162

FRAME_END = 206

163

164

# Delivery mode constants

165

PERSISTENT_DELIVERY_MODE = 2

166

167

# AMQP reply codes

168

REPLY_SUCCESS = 200

169

CONTENT_TOO_LARGE = 311

170

NO_ROUTE = 312

171

NO_CONSUMERS = 313

172

CONNECTION_FORCED = 320

173

INVALID_PATH = 402

174

ACCESS_REFUSED = 403

175

NOT_FOUND = 404

176

PRECONDITION_FAILED = 406

177

FRAME_ERROR = 501

178

COMMAND_INVALID = 503

179

CHANNEL_ERROR = 504

180

NOT_ALLOWED = 530

181

NOT_IMPLEMENTED = 540

182

INTERNAL_ERROR = 541

183

```

184

185

## Usage Examples

186

187

### Basic Message Properties

188

189

```python

190

import pika

191

192

properties = pika.BasicProperties(

193

content_type='application/json',

194

delivery_mode=2, # Persistent message

195

headers={'source': 'web-app', 'version': '1.0'}

196

)

197

198

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

199

channel = connection.channel()

200

201

channel.basic_publish(

202

exchange='',

203

routing_key='data_queue',

204

body='{"message": "Hello World"}',

205

properties=properties

206

)

207

208

connection.close()

209

```

210

211

### Using Delivery Mode Enum

212

213

```python

214

import pika

215

216

# Persistent message using enum

217

properties = pika.BasicProperties(

218

delivery_mode=pika.DeliveryMode.Persistent.value

219

)

220

221

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

222

channel = connection.channel()

223

224

channel.basic_publish(

225

exchange='',

226

routing_key='persistent_queue',

227

body='This message will survive broker restart',

228

properties=properties

229

)

230

231

connection.close()

232

```

233

234

### RPC Pattern with Properties

235

236

```python

237

import pika

238

import uuid

239

240

class RpcClient:

241

def __init__(self):

242

self.connection = pika.BlockingConnection(

243

pika.ConnectionParameters('localhost')

244

)

245

self.channel = self.connection.channel()

246

247

# Declare callback queue

248

result = self.channel.queue_declare(queue='', exclusive=True)

249

self.callback_queue = result.method.queue

250

251

self.channel.basic_consume(

252

queue=self.callback_queue,

253

on_message_callback=self.on_response,

254

auto_ack=True

255

)

256

257

def on_response(self, ch, method, props, body):

258

if self.corr_id == props.correlation_id:

259

self.response = body

260

261

def call(self, message):

262

self.response = None

263

self.corr_id = str(uuid.uuid4())

264

265

# Publish with reply properties

266

self.channel.basic_publish(

267

exchange='',

268

routing_key='rpc_queue',

269

properties=pika.BasicProperties(

270

reply_to=self.callback_queue,

271

correlation_id=self.corr_id,

272

content_type='text/plain'

273

),

274

body=message

275

)

276

277

# Wait for response

278

while self.response is None:

279

self.connection.process_data_events()

280

281

return self.response

282

283

# Usage

284

rpc = RpcClient()

285

response = rpc.call("Hello RPC")

286

print(f"Response: {response.decode()}")

287

```

288

289

### Message Expiration

290

291

```python

292

import pika

293

294

# Message expires after 30 seconds

295

properties = pika.BasicProperties(

296

expiration='30000', # 30 seconds in milliseconds

297

delivery_mode=2

298

)

299

300

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

301

channel = connection.channel()

302

303

channel.basic_publish(

304

exchange='',

305

routing_key='temp_queue',

306

body='This message expires in 30 seconds',

307

properties=properties

308

)

309

310

connection.close()

311

```

312

313

### Headers-Based Routing

314

315

```python

316

import pika

317

318

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

319

channel = connection.channel()

320

321

# Declare headers exchange

322

channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')

323

324

# Publish with custom headers

325

properties = pika.BasicProperties(

326

headers={

327

'format': 'json',

328

'source': 'api',

329

'priority': 'high',

330

'x-match': 'all' # Match all headers

331

}

332

)

333

334

channel.basic_publish(

335

exchange='headers_exchange',

336

routing_key='', # Ignored for headers exchange

337

body='{"data": "headers routing example"}',

338

properties=properties

339

)

340

341

connection.close()

342

```

343

344

### Topic Exchange with Properties

345

346

```python

347

import pika

348

349

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

350

channel = connection.channel()

351

352

# Declare topic exchange

353

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

354

355

# Publish messages with different routing keys and properties

356

messages = [

357

('app.error.database', 'Database connection failed', 'high'),

358

('app.warning.cache', 'Cache miss rate high', 'medium'),

359

('app.info.startup', 'Application started', 'low')

360

]

361

362

for routing_key, message, priority in messages:

363

properties = pika.BasicProperties(

364

priority={'high': 9, 'medium': 5, 'low': 1}[priority],

365

timestamp=int(time.time()),

366

type='log_message',

367

headers={'level': routing_key.split('.')[1]}

368

)

369

370

channel.basic_publish(

371

exchange='topic_logs',

372

routing_key=routing_key,

373

body=message,

374

properties=properties

375

)

376

377

connection.close()

378

```

379

380

### Message Metadata Inspection

381

382

```python

383

import pika

384

385

def callback(ch, method, properties, body):

386

print(f"Message: {body.decode()}")

387

print(f"Content Type: {properties.content_type}")

388

print(f"Delivery Mode: {properties.delivery_mode}")

389

print(f"Priority: {properties.priority}")

390

print(f"Headers: {properties.headers}")

391

print(f"Timestamp: {properties.timestamp}")

392

print(f"Message ID: {properties.message_id}")

393

394

ch.basic_ack(delivery_tag=method.delivery_tag)

395

396

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

397

channel = connection.channel()

398

399

channel.queue_declare(queue='inspect_queue')

400

channel.basic_consume(queue='inspect_queue', on_message_callback=callback)

401

402

print('Waiting for messages...')

403

channel.start_consuming()

404

```

405

406

### Custom Application Properties

407

408

```python

409

import pika

410

import json

411

import time

412

413

# Custom message with application-specific properties

414

properties = pika.BasicProperties(

415

content_type='application/json',

416

content_encoding='utf-8',

417

delivery_mode=2,

418

priority=5,

419

correlation_id='req-12345',

420

message_id=f'msg-{int(time.time())}',

421

timestamp=int(time.time()),

422

type='user_event',

423

user_id='user123',

424

app_id='web-service-v1.0',

425

headers={

426

'event_type': 'user_signup',

427

'version': '2.1',

428

'source_ip': '192.168.1.100',

429

'user_agent': 'Mozilla/5.0...'

430

}

431

)

432

433

message_data = {

434

'user_id': 'user123',

435

'event': 'signup',

436

'timestamp': time.time()

437

}

438

439

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

440

channel = connection.channel()

441

442

channel.basic_publish(

443

exchange='user_events',

444

routing_key='signup',

445

body=json.dumps(message_data),

446

properties=properties

447

)

448

449

connection.close()

450

```