or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdchannel-operations.mdconnection-adapters.mdconnection-management.mdexception-handling.mdindex.mdmessage-properties-types.md

channel-operations.mddocs/

0

# Channel Operations

1

2

Channel-based message operations including publishing, consuming, queue and exchange management, and transaction support with comprehensive callback handling for AMQP messaging.

3

4

## Capabilities

5

6

### Channel Management

7

8

Basic channel lifecycle and flow control operations.

9

10

```python { .api }

11

class BlockingChannel:

12

"""Synchronous channel for message operations."""

13

14

def close(self, reply_code=200, reply_text='Normal shutdown'):

15

"""

16

Close the channel.

17

18

Parameters:

19

- reply_code (int): AMQP reply code (default: 200)

20

- reply_text (str): Human-readable close reason

21

"""

22

23

def flow(self, active):

24

"""

25

Enable or disable message flow.

26

27

Parameters:

28

- active (bool): True to enable flow, False to disable

29

30

Returns:

31

- bool: Current flow state

32

"""

33

34

def add_on_cancel_callback(self, callback):

35

"""

36

Add callback for consumer cancellation.

37

38

Parameters:

39

- callback (callable): Function called when consumer is cancelled

40

"""

41

42

def add_on_return_callback(self, callback):

43

"""

44

Add callback for returned messages.

45

46

Parameters:

47

- callback (callable): Function called with (channel, method, properties, body)

48

"""

49

50

# Channel properties

51

@property

52

def channel_number(self) -> int:

53

"""Channel number."""

54

55

@property

56

def connection(self):

57

"""Parent connection instance."""

58

59

@property

60

def is_closed(self) -> bool:

61

"""True if channel is closed."""

62

63

@property

64

def is_open(self) -> bool:

65

"""True if channel is open."""

66

67

@property

68

def consumer_tags(self) -> set:

69

"""Set of active consumer tags."""

70

```

71

72

### Message Publishing

73

74

Publish messages to exchanges with routing keys and properties.

75

76

```python { .api }

77

def basic_publish(self, exchange, routing_key, body, properties=None, mandatory=False):

78

"""

79

Publish a message.

80

81

Parameters:

82

- exchange (str): Exchange name (empty string for default exchange)

83

- routing_key (str): Routing key for message routing

84

- body (bytes or str): Message body

85

- properties (BasicProperties): Message properties

86

- mandatory (bool): If True, message must be routable or returned

87

88

Returns:

89

- bool: True if message was published (or raises exception)

90

"""

91

```

92

93

### Message Consuming

94

95

Consume messages from queues with callback-based processing.

96

97

```python { .api }

98

def basic_consume(self, queue, on_message_callback, auto_ack=False, exclusive=False,

99

consumer_tag=None, arguments=None):

100

"""

101

Start consuming messages from queue.

102

103

Parameters:

104

- queue (str): Queue name to consume from

105

- on_message_callback (callable): Function called for each message (ch, method, properties, body)

106

- auto_ack (bool): If True, automatically acknowledge messages

107

- exclusive (bool): If True, only this consumer can access the queue

108

- consumer_tag (str): Consumer identifier (auto-generated if None)

109

- arguments (dict): Additional arguments for consume

110

111

Returns:

112

- str: Consumer tag

113

"""

114

115

def basic_cancel(self, consumer_tag):

116

"""

117

Cancel a message consumer.

118

119

Parameters:

120

- consumer_tag (str): Consumer tag to cancel

121

122

Returns:

123

- str: Cancelled consumer tag

124

"""

125

126

def start_consuming(self):

127

"""Start consuming messages (blocking loop)."""

128

129

def stop_consuming(self, consumer_tag=None):

130

"""

131

Stop consuming messages.

132

133

Parameters:

134

- consumer_tag (str, optional): Specific consumer to stop (all if None)

135

"""

136

137

def consume(self, queue, no_ack=False, exclusive=False, arguments=None):

138

"""

139

Generator-based message consumption.

140

141

Parameters:

142

- queue (str): Queue name to consume from

143

- no_ack (bool): If True, don't require acknowledgments

144

- exclusive (bool): If True, exclusive access to queue

145

- arguments (dict): Additional consume arguments

146

147

Yields:

148

- tuple: (method, properties, body) for each message

149

"""

150

```

151

152

### Message Acknowledgment

153

154

Acknowledge, reject, or recover messages for reliable delivery.

155

156

```python { .api }

157

def basic_ack(self, delivery_tag, multiple=False):

158

"""

159

Acknowledge message delivery.

160

161

Parameters:

162

- delivery_tag (int): Delivery tag of message to acknowledge

163

- multiple (bool): If True, acknowledge all messages up to delivery_tag

164

"""

165

166

def basic_nack(self, delivery_tag, multiple=False, requeue=True):

167

"""

168

Negative acknowledgment of message delivery.

169

170

Parameters:

171

- delivery_tag (int): Delivery tag of message to nack

172

- multiple (bool): If True, nack all messages up to delivery_tag

173

- requeue (bool): If True, requeue the message(s)

174

"""

175

176

def basic_reject(self, delivery_tag, requeue=True):

177

"""

178

Reject a single message.

179

180

Parameters:

181

- delivery_tag (int): Delivery tag of message to reject

182

- requeue (bool): If True, requeue the message

183

"""

184

185

def basic_recover(self, requeue=True):

186

"""

187

Recover unacknowledged messages.

188

189

Parameters:

190

- requeue (bool): If True, requeue unacknowledged messages

191

"""

192

```

193

194

### Single Message Retrieval

195

196

Get individual messages from queues without setting up consumers.

197

198

```python { .api }

199

def basic_get(self, queue, auto_ack=False):

200

"""

201

Get a single message from queue.

202

203

Parameters:

204

- queue (str): Queue name to get message from

205

- auto_ack (bool): If True, automatically acknowledge message

206

207

Returns:

208

- tuple or None: (method, properties, body) if message available, None otherwise

209

"""

210

```

211

212

### Quality of Service

213

214

Control message delivery rate and prefetch behavior.

215

216

```python { .api }

217

def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):

218

"""

219

Set quality of service parameters.

220

221

Parameters:

222

- prefetch_size (int): Prefetch window size in bytes (0 = no limit)

223

- prefetch_count (int): Number of messages to prefetch (0 = no limit)

224

- global_qos (bool): If True, apply QoS globally on connection

225

"""

226

```

227

228

### Queue Operations

229

230

Declare, delete, purge, and bind queues.

231

232

```python { .api }

233

def queue_declare(self, queue='', passive=False, durable=False, exclusive=False,

234

auto_delete=False, arguments=None):

235

"""

236

Declare a queue.

237

238

Parameters:

239

- queue (str): Queue name (empty string for server-generated name)

240

- passive (bool): If True, only check if queue exists

241

- durable (bool): If True, queue survives broker restart

242

- exclusive (bool): If True, queue is exclusive to this connection

243

- auto_delete (bool): If True, queue deletes when last consumer disconnects

244

- arguments (dict): Additional queue arguments

245

246

Returns:

247

- QueueDeclareOk: Result with queue name, message count, consumer count

248

"""

249

250

def queue_delete(self, queue, if_unused=False, if_empty=False):

251

"""

252

Delete a queue.

253

254

Parameters:

255

- queue (str): Queue name to delete

256

- if_unused (bool): If True, only delete if no consumers

257

- if_empty (bool): If True, only delete if no messages

258

259

Returns:

260

- QueueDeleteOk: Result with message count

261

"""

262

263

def queue_purge(self, queue):

264

"""

265

Purge messages from queue.

266

267

Parameters:

268

- queue (str): Queue name to purge

269

270

Returns:

271

- QueuePurgeOk: Result with purged message count

272

"""

273

274

def queue_bind(self, queue, exchange, routing_key=None, arguments=None):

275

"""

276

Bind queue to exchange.

277

278

Parameters:

279

- queue (str): Queue name to bind

280

- exchange (str): Exchange name to bind to

281

- routing_key (str): Routing key for binding

282

- arguments (dict): Additional binding arguments

283

"""

284

285

def queue_unbind(self, queue, exchange, routing_key=None, arguments=None):

286

"""

287

Unbind queue from exchange.

288

289

Parameters:

290

- queue (str): Queue name to unbind

291

- exchange (str): Exchange name to unbind from

292

- routing_key (str): Routing key for binding

293

- arguments (dict): Additional binding arguments

294

"""

295

```

296

297

### Exchange Operations

298

299

Declare, delete, and bind exchanges.

300

301

```python { .api }

302

def exchange_declare(self, exchange, exchange_type='direct', passive=False,

303

durable=False, auto_delete=False, internal=False, arguments=None):

304

"""

305

Declare an exchange.

306

307

Parameters:

308

- exchange (str): Exchange name

309

- exchange_type (str): Exchange type ('direct', 'fanout', 'topic', 'headers')

310

- passive (bool): If True, only check if exchange exists

311

- durable (bool): If True, exchange survives broker restart

312

- auto_delete (bool): If True, exchange deletes when last queue unbinds

313

- internal (bool): If True, exchange is internal (cannot be published to directly)

314

- arguments (dict): Additional exchange arguments

315

"""

316

317

def exchange_delete(self, exchange, if_unused=False):

318

"""

319

Delete an exchange.

320

321

Parameters:

322

- exchange (str): Exchange name to delete

323

- if_unused (bool): If True, only delete if no queue bindings

324

"""

325

326

def exchange_bind(self, destination, source, routing_key='', arguments=None):

327

"""

328

Bind exchange to another exchange.

329

330

Parameters:

331

- destination (str): Destination exchange name

332

- source (str): Source exchange name

333

- routing_key (str): Routing key for binding

334

- arguments (dict): Additional binding arguments

335

"""

336

337

def exchange_unbind(self, destination, source, routing_key='', arguments=None):

338

"""

339

Unbind exchange from another exchange.

340

341

Parameters:

342

- destination (str): Destination exchange name

343

- source (str): Source exchange name

344

- routing_key (str): Routing key for binding

345

- arguments (dict): Additional binding arguments

346

"""

347

```

348

349

### Transaction Support

350

351

AMQP transaction support for atomic message operations.

352

353

```python { .api }

354

def tx_select(self):

355

"""Start a transaction."""

356

357

def tx_commit(self):

358

"""Commit the current transaction."""

359

360

def tx_rollback(self):

361

"""Rollback the current transaction."""

362

```

363

364

### Publisher Confirms

365

366

Enable publisher confirmations for reliable message delivery.

367

368

```python { .api }

369

def confirm_delivery(self):

370

"""

371

Enable publisher confirmations.

372

373

Returns:

374

- bool: True if confirmations enabled

375

"""

376

377

@property

378

def publisher_confirms(self) -> bool:

379

"""True if publisher confirmations are enabled."""

380

```

381

382

## Usage Examples

383

384

### Basic Publishing

385

386

```python

387

import pika

388

389

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

390

channel = connection.channel()

391

392

# Declare a queue

393

channel.queue_declare(queue='task_queue', durable=True)

394

395

# Publish a message

396

message = "Hello World!"

397

channel.basic_publish(

398

exchange='',

399

routing_key='task_queue',

400

body=message,

401

properties=pika.BasicProperties(delivery_mode=2) # Make message persistent

402

)

403

404

connection.close()

405

```

406

407

### Basic Consuming

408

409

```python

410

import pika

411

412

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

413

channel = connection.channel()

414

415

channel.queue_declare(queue='task_queue', durable=True)

416

417

def callback(ch, method, properties, body):

418

print(f"Received {body.decode()}")

419

# Simulate work

420

import time

421

time.sleep(1)

422

423

# Acknowledge the message

424

ch.basic_ack(delivery_tag=method.delivery_tag)

425

426

# Set up consumer

427

channel.basic_qos(prefetch_count=1) # Fair dispatch

428

channel.basic_consume(queue='task_queue', on_message_callback=callback)

429

430

print('Waiting for messages. Press CTRL+C to exit')

431

channel.start_consuming()

432

```

433

434

### Publisher Confirms

435

436

```python

437

import pika

438

439

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

440

channel = connection.channel()

441

442

# Enable publisher confirms

443

channel.confirm_delivery()

444

445

try:

446

channel.basic_publish(

447

exchange='',

448

routing_key='test_queue',

449

body='Hello World!',

450

mandatory=True

451

)

452

print("Message published successfully")

453

except pika.exceptions.UnroutableError:

454

print("Message was returned as unroutable")

455

except pika.exceptions.NackError:

456

print("Message was nacked by broker")

457

458

connection.close()

459

```

460

461

### Generator-Based Consuming

462

463

```python

464

import pika

465

466

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

467

channel = connection.channel()

468

469

channel.queue_declare(queue='test_queue')

470

471

# Consume messages using generator

472

for method, properties, body in channel.consume('test_queue', auto_ack=True):

473

print(f"Received: {body.decode()}")

474

475

# Process 10 messages then stop

476

if method.delivery_tag == 10:

477

channel.cancel()

478

break

479

480

connection.close()

481

```

482

483

### Exchange and Queue Setup

484

485

```python

486

import pika

487

488

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

489

channel = connection.channel()

490

491

# Declare topic exchange

492

channel.exchange_declare(exchange='logs', exchange_type='topic', durable=True)

493

494

# Declare queue with TTL

495

queue_args = {'x-message-ttl': 60000} # 60 seconds

496

result = channel.queue_declare(queue='', exclusive=True, arguments=queue_args)

497

queue_name = result.method.queue

498

499

# Bind queue to exchange with routing key pattern

500

channel.queue_bind(exchange='logs', queue=queue_name, routing_key='app.*.error')

501

502

connection.close()

503

```

504

505

### Transaction Usage

506

507

```python

508

import pika

509

510

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

511

channel = connection.channel()

512

513

# Start transaction

514

channel.tx_select()

515

516

try:

517

# Publish multiple messages in transaction

518

for i in range(5):

519

channel.basic_publish(

520

exchange='',

521

routing_key='transactional_queue',

522

body=f'Message {i}'

523

)

524

525

# Commit transaction

526

channel.tx_commit()

527

print("All messages published successfully")

528

529

except Exception as e:

530

# Rollback on error

531

channel.tx_rollback()

532

print(f"Transaction rolled back: {e}")

533

534

connection.close()

535

```