or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

entities.mddocs/

0

# Message Entities

1

2

AMQP entity declarations for exchanges, queues, and bindings that define message routing topology and behavior. These entities provide the foundation for message routing, persistence, and delivery guarantees in messaging systems.

3

4

## Capabilities

5

6

### Exchange

7

8

Represents AMQP exchange declarations that route messages to queues based on routing rules and exchange types.

9

10

```python { .api }

11

class Exchange:

12

def __init__(self, name='', type='direct', channel=None, durable=True, auto_delete=False, delivery_mode=None, arguments=None, **kwargs):

13

"""

14

Create exchange declaration.

15

16

Parameters:

17

- name (str): Exchange name

18

- type (str): Exchange type ('direct', 'topic', 'fanout', 'headers')

19

- channel: AMQP channel to bind to

20

- durable (bool): Survive broker restart

21

- auto_delete (bool): Delete when no queues bound

22

- delivery_mode (int): Default delivery mode (1=transient, 2=persistent)

23

- arguments (dict): Additional exchange arguments

24

"""

25

26

def declare(self, nowait=False, passive=None, channel=None):

27

"""

28

Declare exchange on the broker.

29

30

Parameters:

31

- nowait (bool): Don't wait for confirmation

32

- passive (bool): Only check if exchange exists

33

- channel: Channel to use (uses bound channel if None)

34

35

Returns:

36

Exchange instance for chaining

37

"""

38

39

def bind_to(self, exchange, routing_key='', arguments=None, nowait=False, channel=None):

40

"""

41

Bind this exchange to another exchange.

42

43

Parameters:

44

- exchange (Exchange|str): Source exchange

45

- routing_key (str): Binding routing key

46

- arguments (dict): Binding arguments

47

- nowait (bool): Don't wait for confirmation

48

- channel: Channel to use

49

50

Returns:

51

Exchange instance for chaining

52

"""

53

54

def unbind_from(self, source, routing_key='', arguments=None, nowait=False, channel=None):

55

"""

56

Unbind exchange from source exchange.

57

58

Parameters:

59

- source (Exchange|str): Source exchange to unbind from

60

- routing_key (str): Binding routing key that was used

61

- arguments (dict): Binding arguments that were used

62

- nowait (bool): Don't wait for confirmation

63

- channel: Channel to use

64

65

Returns:

66

Exchange instance for chaining

67

"""

68

69

def publish(self, message, routing_key=None, mandatory=False, immediate=False, **kwargs):

70

"""

71

Publish message to this exchange.

72

73

Parameters:

74

- message: Message body (will be serialized)

75

- routing_key (str): Message routing key

76

- mandatory (bool): Return message if no route found

77

- immediate (bool): Return message if no consumer ready

78

- **kwargs: Additional publish arguments

79

80

Returns:

81

Message instance

82

"""

83

84

def Message(self, body, delivery_mode=None, properties=None, **kwargs):

85

"""

86

Create message bound to this exchange.

87

88

Parameters:

89

- body: Message body

90

- delivery_mode (int): Delivery mode (1=transient, 2=persistent)

91

- properties (dict): Message properties

92

- **kwargs: Additional message arguments

93

94

Returns:

95

Message instance

96

"""

97

98

def delete(self, if_unused=False, nowait=False, channel=None):

99

"""

100

Delete exchange from broker.

101

102

Parameters:

103

- if_unused (bool): Only delete if no queues bound

104

- nowait (bool): Don't wait for confirmation

105

- channel: Channel to use

106

"""

107

108

def binding(self, routing_key='', arguments=None, unbind_arguments=None):

109

"""

110

Create binding object for this exchange.

111

112

Parameters:

113

- routing_key (str): Binding routing key

114

- arguments (dict): Binding arguments

115

- unbind_arguments (dict): Arguments for unbinding

116

117

Returns:

118

binding instance

119

"""

120

121

# Properties

122

@property

123

def name(self):

124

"""str: Exchange name"""

125

126

@property

127

def type(self):

128

"""str: Exchange type"""

129

130

@property

131

def durable(self):

132

"""bool: Durability flag"""

133

134

@property

135

def auto_delete(self):

136

"""bool: Auto-delete flag"""

137

138

@property

139

def delivery_mode(self):

140

"""int: Default delivery mode"""

141

142

@property

143

def arguments(self):

144

"""dict: Additional arguments"""

145

```

146

147

### Queue

148

149

Represents AMQP queue declarations that store messages and define consumption parameters.

150

151

```python { .api }

152

class Queue:

153

def __init__(self, name='', exchange=None, routing_key='', channel=None, durable=True, exclusive=False, auto_delete=False, no_ack=None, alias=None, bindings=None, on_declared=None, expires=None, message_ttl=None, max_length=None, max_length_bytes=None, max_priority=None, queue_arguments=None, binding_arguments=None, consumer_arguments=None, **kwargs):

154

"""

155

Create queue declaration.

156

157

Parameters:

158

- name (str): Queue name

159

- exchange (Exchange): Exchange to bind to

160

- routing_key (str): Routing key for binding

161

- channel: AMQP channel to bind to

162

- durable (bool): Survive broker restart

163

- exclusive (bool): Only allow one connection

164

- auto_delete (bool): Delete when no consumers

165

- no_ack (bool): Disable acknowledgments

166

- alias (str): Alias name for queue

167

- bindings (list): Additional bindings

168

- on_declared (callable): Callback when declared

169

- expires (int): Queue expiry time in ms (RabbitMQ)

170

- message_ttl (int): Message TTL in ms (RabbitMQ)

171

- max_length (int): Max queue length (RabbitMQ)

172

- max_length_bytes (int): Max queue size in bytes (RabbitMQ)

173

- max_priority (int): Max message priority (RabbitMQ)

174

- queue_arguments (dict): Queue-specific arguments

175

- binding_arguments (dict): Binding-specific arguments

176

- consumer_arguments (dict): Consumer-specific arguments

177

"""

178

179

def declare(self, nowait=False, channel=None):

180

"""

181

Declare queue and bindings on broker.

182

183

Parameters:

184

- nowait (bool): Don't wait for confirmation

185

- channel: Channel to use

186

187

Returns:

188

Queue instance for chaining

189

"""

190

191

def bind_to(self, exchange=None, routing_key=None, arguments=None, nowait=False, channel=None):

192

"""

193

Bind queue to exchange.

194

195

Parameters:

196

- exchange (Exchange|str): Exchange to bind to

197

- routing_key (str): Routing key for binding

198

- arguments (dict): Binding arguments

199

- nowait (bool): Don't wait for confirmation

200

- channel: Channel to use

201

202

Returns:

203

Queue instance for chaining

204

"""

205

206

def unbind_from(self, exchange, routing_key=None, arguments=None, nowait=False, channel=None):

207

"""

208

Unbind queue from exchange.

209

210

Parameters:

211

- exchange (Exchange|str): Exchange to unbind from

212

- routing_key (str): Routing key that was used

213

- arguments (dict): Binding arguments that were used

214

- nowait (bool): Don't wait for confirmation

215

- channel: Channel to use

216

217

Returns:

218

Queue instance for chaining

219

"""

220

221

def get(self, no_ack=None, accept=None):

222

"""

223

Poll for single message from queue.

224

225

Parameters:

226

- no_ack (bool): Disable acknowledgment

227

- accept (list): Accepted content types

228

229

Returns:

230

Message instance or None if queue empty

231

"""

232

233

def purge(self, nowait=False):

234

"""

235

Remove all ready messages from queue.

236

237

Parameters:

238

- nowait (bool): Don't wait for confirmation

239

240

Returns:

241

Number of messages purged

242

"""

243

244

def consume(self, consumer_tag='', callback=None, no_ack=None, nowait=False):

245

"""

246

Start consuming messages from queue.

247

248

Parameters:

249

- consumer_tag (str): Consumer identifier

250

- callback (callable): Message callback function

251

- no_ack (bool): Disable acknowledgments

252

- nowait (bool): Don't wait for confirmation

253

254

Returns:

255

Consumer tag

256

"""

257

258

def cancel(self, consumer_tag, nowait=False):

259

"""

260

Cancel queue consumer.

261

262

Parameters:

263

- consumer_tag (str): Consumer to cancel

264

- nowait (bool): Don't wait for confirmation

265

"""

266

267

def delete(self, if_unused=False, if_empty=False, nowait=False):

268

"""

269

Delete queue from broker.

270

271

Parameters:

272

- if_unused (bool): Only delete if no consumers

273

- if_empty (bool): Only delete if no messages

274

- nowait (bool): Don't wait for confirmation

275

276

Returns:

277

Number of messages deleted

278

"""

279

280

# Properties

281

@property

282

def name(self):

283

"""str: Queue name"""

284

285

@property

286

def exchange(self):

287

"""Exchange: Associated exchange"""

288

289

@property

290

def routing_key(self):

291

"""str: Routing/binding key"""

292

293

@property

294

def durable(self):

295

"""bool: Durability flag"""

296

297

@property

298

def exclusive(self):

299

"""bool: Exclusivity flag"""

300

301

@property

302

def auto_delete(self):

303

"""bool: Auto-delete flag"""

304

305

@property

306

def expires(self):

307

"""int: Queue expiry time"""

308

309

@property

310

def message_ttl(self):

311

"""int: Message TTL"""

312

313

@property

314

def max_length(self):

315

"""int: Maximum queue length"""

316

317

@property

318

def max_priority(self):

319

"""int: Maximum message priority"""

320

```

321

322

### Binding

323

324

Represents queue or exchange binding declarations that define routing relationships.

325

326

```python { .api }

327

class binding:

328

def __init__(self, exchange=None, routing_key='', arguments=None, unbind_arguments=None):

329

"""

330

Create binding declaration.

331

332

Parameters:

333

- exchange (Exchange): Exchange to bind to

334

- routing_key (str): Routing key for binding

335

- arguments (dict): Binding arguments

336

- unbind_arguments (dict): Arguments for unbinding

337

"""

338

339

def declare(self, channel, nowait=False):

340

"""

341

Declare the destination exchange.

342

343

Parameters:

344

- channel: AMQP channel to use

345

- nowait (bool): Don't wait for confirmation

346

347

Returns:

348

binding instance for chaining

349

"""

350

351

def bind(self, entity, nowait=False, channel=None):

352

"""

353

Bind entity (queue/exchange) to this binding.

354

355

Parameters:

356

- entity (Queue|Exchange): Entity to bind

357

- nowait (bool): Don't wait for confirmation

358

- channel: Channel to use

359

360

Returns:

361

binding instance for chaining

362

"""

363

364

def unbind(self, entity, nowait=False, channel=None):

365

"""

366

Unbind entity from this binding.

367

368

Parameters:

369

- entity (Queue|Exchange): Entity to unbind

370

- nowait (bool): Don't wait for confirmation

371

- channel: Channel to use

372

373

Returns:

374

binding instance for chaining

375

"""

376

377

# Properties

378

@property

379

def exchange(self):

380

"""Exchange: Target exchange"""

381

382

@property

383

def routing_key(self):

384

"""str: Binding routing key"""

385

386

@property

387

def arguments(self):

388

"""dict: Binding arguments"""

389

```

390

391

## Usage Examples

392

393

### Exchange Types and Routing

394

395

```python

396

from kombu import Exchange, Queue

397

398

# Direct exchange - exact routing key match

399

direct_exchange = Exchange('logs', type='direct', durable=True)

400

401

# Topic exchange - pattern matching with wildcards

402

topic_exchange = Exchange('events', type='topic', durable=True)

403

404

# Fanout exchange - broadcast to all bound queues

405

fanout_exchange = Exchange('notifications', type='fanout', durable=True)

406

407

# Headers exchange - route by message headers

408

headers_exchange = Exchange('priority', type='headers', durable=True)

409

```

410

411

### Queue Declaration and Binding

412

413

```python

414

from kombu import Connection, Exchange, Queue

415

416

# Define entities

417

task_exchange = Exchange('tasks', type='direct', durable=True)

418

task_queue = Queue(

419

'high_priority_tasks',

420

exchange=task_exchange,

421

routing_key='high',

422

durable=True,

423

message_ttl=300000, # 5 minutes

424

max_length=1000

425

)

426

427

with Connection('redis://localhost:6379/0') as conn:

428

channel = conn.channel()

429

430

# Declare exchange and queue

431

task_exchange.declare(channel=channel)

432

task_queue.declare(channel=channel)

433

```

434

435

### Multiple Bindings

436

437

```python

438

from kombu import Exchange, Queue, binding

439

440

# Create exchange and queue

441

log_exchange = Exchange('logs', type='topic', durable=True)

442

error_queue = Queue('error_logs', durable=True)

443

444

# Create multiple bindings for the queue

445

error_bindings = [

446

binding(log_exchange, 'app.*.error'),

447

binding(log_exchange, 'system.critical'),

448

binding(log_exchange, 'database.failure')

449

]

450

451

# Apply bindings to queue

452

error_queue.bindings = error_bindings

453

454

with Connection('amqp://localhost') as conn:

455

# Declare everything

456

error_queue.declare(channel=conn.channel())

457

```

458

459

### Queue Operations

460

461

```python

462

from kombu import Connection, Queue, Exchange

463

464

task_exchange = Exchange('tasks', type='direct')

465

task_queue = Queue('task_queue', task_exchange, routing_key='task')

466

467

with Connection('redis://localhost:6379/0') as conn:

468

channel = conn.channel()

469

task_queue = task_queue.bind(channel)

470

471

# Get single message

472

message = task_queue.get()

473

if message:

474

print(f"Received: {message.payload}")

475

message.ack()

476

477

# Purge queue

478

purged_count = task_queue.purge()

479

print(f"Purged {purged_count} messages")

480

481

# Get queue info (if supported by transport)

482

try:

483

# Some transports support queue inspection

484

info = channel.queue_declare(task_queue.name, passive=True)

485

print(f"Queue has {info.message_count} messages")

486

except Exception:

487

pass

488

```

489

490

### Exchange-to-Exchange Binding

491

492

```python

493

from kombu import Exchange

494

495

# Create exchanges

496

source_exchange = Exchange('source', type='topic', durable=True)

497

destination_exchange = Exchange('destination', type='direct', durable=True)

498

499

with Connection('amqp://localhost') as conn:

500

channel = conn.channel()

501

502

# Declare both exchanges

503

source_exchange.declare(channel=channel)

504

destination_exchange.declare(channel=channel)

505

506

# Bind destination to source

507

destination_exchange.bind_to(

508

source_exchange,

509

routing_key='important.*',

510

channel=channel

511

)

512

```

513

514

### Dynamic Queue Management

515

516

```python

517

from kombu import Connection, Exchange, Queue

518

519

def create_user_queue(user_id):

520

"""Create dedicated queue for user"""

521

user_exchange = Exchange(f'user_{user_id}', type='direct', durable=True)

522

user_queue = Queue(

523

f'user_{user_id}_messages',

524

exchange=user_exchange,

525

routing_key='message',

526

durable=True,

527

auto_delete=True, # Clean up when user disconnects

528

expires=3600000 # Expire after 1 hour of inactivity

529

)

530

return user_exchange, user_queue

531

532

# Usage

533

with Connection('redis://localhost:6379/0') as conn:

534

exchange, queue = create_user_queue('12345')

535

536

# Declare entities

537

exchange.declare(channel=conn.channel())

538

queue.declare(channel=conn.channel())

539

540

# Later, when user is done

541

queue.delete(if_unused=True)

542

exchange.delete(if_unused=True)

543

```