or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

simple.mddocs/

0

# Simple Interface

1

2

Queue-like API for simple use cases that provides an easy-to-use interface similar to Python's queue module. The simple interface abstracts away the complexity of AMQP entities and provides a straightforward way to send and receive messages.

3

4

## Capabilities

5

6

### SimpleQueue

7

8

Simple API for persistent queues that provides a high-level, queue-like interface for message passing.

9

10

```python { .api }

11

class SimpleQueue:

12

def __init__(self, channel, name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, serializer=None, compression=None, accept=None, **kwargs):

13

"""

14

Create simple persistent queue.

15

16

Parameters:

17

- channel: AMQP channel to use

18

- name (str): Queue name

19

- no_ack (bool): Disable acknowledgments (None=default, False for persistence)

20

- queue_opts (dict): Additional queue options (durable, exclusive, etc.)

21

- queue_args (dict): Queue declaration arguments

22

- exchange_opts (dict): Additional exchange options

23

- serializer (str): Default serialization method

24

- compression (str): Default compression method

25

- accept (list): Accepted content types

26

- **kwargs: Additional options

27

"""

28

29

def get(self, block=True, timeout=None):

30

"""

31

Get message from queue.

32

33

Parameters:

34

- block (bool): Block if queue is empty (default True)

35

- timeout (float): Timeout in seconds for blocking get

36

37

Returns:

38

Decoded message body

39

40

Raises:

41

Empty: If queue is empty and block=False or timeout exceeded

42

"""

43

44

def get_nowait(self):

45

"""

46

Get message without blocking.

47

48

Returns:

49

Decoded message body

50

51

Raises:

52

Empty: If queue is empty

53

"""

54

55

def put(self, message, serializer=None, headers=None, compression=None, routing_key=None, **kwargs):

56

"""

57

Put message into queue.

58

59

Parameters:

60

- message: Message body to send

61

- serializer (str): Serialization method override

62

- headers (dict): Message headers

63

- compression (str): Compression method override

64

- routing_key (str): Routing key override

65

- **kwargs: Additional publish parameters

66

"""

67

68

def clear(self):

69

"""

70

Clear all messages from queue.

71

72

Returns:

73

int: Number of messages cleared

74

"""

75

76

def qsize(self):

77

"""

78

Get approximate queue size.

79

80

Returns:

81

int: Number of messages in queue (approximate)

82

83

Note:

84

Not all transports support this operation

85

"""

86

87

def close(self):

88

"""Close queue and cleanup resources."""

89

90

# Properties

91

@property

92

def Empty(self):

93

"""Exception class raised when queue is empty"""

94

95

@property

96

def no_ack(self):

97

"""bool: Auto-acknowledgment flag"""

98

```

99

100

### SimpleBuffer

101

102

Simple API for ephemeral queues that provides a high-level interface for temporary message passing with automatic cleanup.

103

104

```python { .api }

105

class SimpleBuffer:

106

def __init__(self, channel, name, no_ack=True, queue_opts=None, queue_args=None, exchange_opts=None, serializer=None, compression=None, accept=None, **kwargs):

107

"""

108

Create simple ephemeral queue.

109

110

Parameters:

111

- channel: AMQP channel to use

112

- name (str): Queue name

113

- no_ack (bool): Disable acknowledgments (default True for performance)

114

- queue_opts (dict): Additional queue options (auto_delete=True, durable=False by default)

115

- queue_args (dict): Queue declaration arguments

116

- exchange_opts (dict): Additional exchange options

117

- serializer (str): Default serialization method

118

- compression (str): Default compression method

119

- accept (list): Accepted content types

120

- **kwargs: Additional options

121

"""

122

123

# Inherits all methods from SimpleQueue

124

def get(self, block=True, timeout=None):

125

"""Get message from buffer (same as SimpleQueue.get)"""

126

127

def get_nowait(self):

128

"""Get message without blocking (same as SimpleQueue.get_nowait)"""

129

130

def put(self, message, serializer=None, headers=None, compression=None, routing_key=None, **kwargs):

131

"""Put message into buffer (same as SimpleQueue.put)"""

132

133

def clear(self):

134

"""Clear all messages from buffer (same as SimpleQueue.clear)"""

135

136

def qsize(self):

137

"""Get approximate buffer size (same as SimpleQueue.qsize)"""

138

139

def close(self):

140

"""Close buffer and cleanup resources (same as SimpleQueue.close)"""

141

142

# Properties

143

@property

144

def Empty(self):

145

"""Exception class raised when buffer is empty"""

146

147

@property

148

def no_ack(self):

149

"""bool: Auto-acknowledgment flag (True by default)"""

150

```

151

152

## Usage Examples

153

154

### Basic SimpleQueue Usage

155

156

```python

157

from kombu import Connection

158

159

# Connect and create simple queue

160

with Connection('redis://localhost:6379/0') as conn:

161

# Create persistent queue

162

queue = conn.SimpleQueue('task_queue')

163

164

# Send messages

165

queue.put({'task': 'process_data', 'id': 1})

166

queue.put({'task': 'send_email', 'id': 2})

167

queue.put({'task': 'generate_report', 'id': 3})

168

169

# Receive messages

170

while True:

171

try:

172

message = queue.get(timeout=5.0)

173

print(f"Processing: {message}")

174

175

# Simulate work

176

if message['task'] == 'process_data':

177

print(f"Processing data for task {message['id']}")

178

elif message['task'] == 'send_email':

179

print(f"Sending email for task {message['id']}")

180

elif message['task'] == 'generate_report':

181

print(f"Generating report for task {message['id']}")

182

183

except queue.Empty:

184

print("No more messages")

185

break

186

187

queue.close()

188

```

189

190

### Non-blocking Queue Operations

191

192

```python

193

from kombu import Connection

194

195

with Connection('redis://localhost:6379/0') as conn:

196

queue = conn.SimpleQueue('work_queue')

197

198

# Send some messages

199

for i in range(5):

200

queue.put(f'Message {i}')

201

202

# Process messages without blocking

203

processed = 0

204

while True:

205

try:

206

message = queue.get_nowait()

207

print(f"Got: {message}")

208

processed += 1

209

except queue.Empty:

210

print(f"Queue empty, processed {processed} messages")

211

break

212

213

queue.close()

214

```

215

216

### SimpleBuffer for Temporary Communication

217

218

```python

219

from kombu import Connection

220

import threading

221

import time

222

223

def producer(conn, buffer_name):

224

"""Producer function"""

225

buffer = conn.SimpleBuffer(buffer_name)

226

227

for i in range(10):

228

message = f'Temp message {i}'

229

buffer.put(message)

230

print(f"Sent: {message}")

231

time.sleep(0.1)

232

233

buffer.close()

234

235

def consumer(conn, buffer_name):

236

"""Consumer function"""

237

buffer = conn.SimpleBuffer(buffer_name)

238

239

while True:

240

try:

241

message = buffer.get(timeout=2.0)

242

print(f"Received: {message}")

243

except buffer.Empty:

244

print("Buffer empty, stopping consumer")

245

break

246

247

buffer.close()

248

249

# Use SimpleBuffer for temporary communication

250

with Connection('redis://localhost:6379/0') as conn:

251

buffer_name = 'temp_communication'

252

253

# Start producer and consumer in separate threads

254

producer_thread = threading.Thread(target=producer, args=(conn, buffer_name))

255

consumer_thread = threading.Thread(target=consumer, args=(conn, buffer_name))

256

257

producer_thread.start()

258

consumer_thread.start()

259

260

producer_thread.join()

261

consumer_thread.join()

262

```

263

264

### Queue Management Operations

265

266

```python

267

from kombu import Connection

268

269

with Connection('amqp://localhost') as conn:

270

queue = conn.SimpleQueue('management_queue')

271

272

# Add several messages

273

for i in range(100):

274

queue.put(f'Message {i}')

275

276

# Check queue size (if supported by transport)

277

try:

278

size = queue.qsize()

279

print(f"Queue has approximately {size} messages")

280

except NotImplementedError:

281

print("Queue size checking not supported by this transport")

282

283

# Process first 10 messages

284

for i in range(10):

285

try:

286

message = queue.get_nowait()

287

print(f"Processed: {message}")

288

except queue.Empty:

289

break

290

291

# Clear remaining messages

292

cleared = queue.clear()

293

print(f"Cleared {cleared} remaining messages")

294

295

queue.close()

296

```

297

298

### Serialization and Compression

299

300

```python

301

from kombu import Connection

302

import json

303

import pickle

304

305

with Connection('redis://localhost:6379/0') as conn:

306

# Queue with JSON serialization

307

json_queue = conn.SimpleQueue('json_queue', serializer='json')

308

309

# Send complex data structure

310

data = {

311

'user_id': 12345,

312

'action': 'purchase',

313

'items': [

314

{'id': 1, 'name': 'Widget', 'price': 9.99},

315

{'id': 2, 'name': 'Gadget', 'price': 19.99}

316

],

317

'total': 29.98

318

}

319

320

json_queue.put(data)

321

received = json_queue.get()

322

print(f"JSON data: {received}")

323

324

# Queue with pickle serialization and compression

325

binary_queue = conn.SimpleQueue(

326

'binary_queue',

327

serializer='pickle',

328

compression='gzip'

329

)

330

331

# Send binary data

332

binary_data = {

333

'large_list': list(range(1000)),

334

'nested_dict': {'level1': {'level2': {'level3': 'deep_value'}}}

335

}

336

337

binary_queue.put(binary_data)

338

received_binary = binary_queue.get()

339

print(f"Binary data received: {len(received_binary['large_list'])} items")

340

341

json_queue.close()

342

binary_queue.close()

343

```

344

345

### Message Headers and Properties

346

347

```python

348

from kombu import Connection

349

import time

350

351

with Connection('redis://localhost:6379/0') as conn:

352

queue = conn.SimpleQueue('header_queue')

353

354

# Send message with custom headers

355

queue.put(

356

{'task': 'important_work'},

357

headers={

358

'priority': 'high',

359

'created_by': 'worker_service',

360

'timestamp': time.time(),

361

'retry_count': 0

362

}

363

)

364

365

# The headers are automatically included with the message

366

# but access depends on the underlying implementation

367

message = queue.get()

368

print(f"Received: {message}")

369

370

queue.close()

371

```

372

373

### Error Handling with Simple Interface

374

375

```python

376

from kombu import Connection

377

import socket

378

379

def robust_queue_processing(queue_name, conn_url):

380

"""Robust queue processing with error handling"""

381

382

try:

383

with Connection(conn_url) as conn:

384

queue = conn.SimpleQueue(queue_name)

385

386

while True:

387

try:

388

# Try to get message with timeout

389

message = queue.get(timeout=30.0)

390

391

# Process message

392

print(f"Processing: {message}")

393

394

# Simulate processing that might fail

395

if message.get('should_fail'):

396

raise ValueError("Simulated processing error")

397

398

print("Processing completed successfully")

399

400

except queue.Empty:

401

print("No messages received in 30 seconds, continuing...")

402

continue

403

404

except ValueError as e:

405

print(f"Processing error: {e}")

406

# With SimpleQueue, failed messages are lost unless

407

# you implement your own retry mechanism

408

continue

409

410

except KeyboardInterrupt:

411

print("Shutting down gracefully...")

412

break

413

414

queue.close()

415

416

except socket.error as e:

417

print(f"Connection error: {e}")

418

except Exception as e:

419

print(f"Unexpected error: {e}")

420

421

# Usage

422

robust_queue_processing('work_queue', 'redis://localhost:6379/0')

423

```

424

425

### Producer-Consumer Pattern

426

427

```python

428

from kombu import Connection

429

import threading

430

import time

431

import random

432

433

class TaskProducer:

434

def __init__(self, conn, queue_name):

435

self.queue = conn.SimpleQueue(queue_name)

436

self.running = True

437

438

def produce_tasks(self):

439

"""Produce tasks continuously"""

440

task_id = 0

441

while self.running:

442

task = {

443

'id': task_id,

444

'type': random.choice(['email', 'report', 'cleanup']),

445

'created_at': time.time()

446

}

447

448

self.queue.put(task)

449

print(f"Produced task {task_id}: {task['type']}")

450

451

task_id += 1

452

time.sleep(random.uniform(0.5, 2.0))

453

454

def stop(self):

455

self.running = False

456

self.queue.close()

457

458

class TaskConsumer:

459

def __init__(self, conn, queue_name, consumer_id):

460

self.queue = conn.SimpleQueue(queue_name)

461

self.consumer_id = consumer_id

462

self.running = True

463

464

def consume_tasks(self):

465

"""Consume tasks continuously"""

466

while self.running:

467

try:

468

task = self.queue.get(timeout=1.0)

469

470

# Simulate processing time

471

processing_time = random.uniform(0.1, 1.0)

472

print(f"Consumer {self.consumer_id} processing task {task['id']}")

473

time.sleep(processing_time)

474

475

print(f"Consumer {self.consumer_id} completed task {task['id']}")

476

477

except self.queue.Empty:

478

continue

479

except KeyboardInterrupt:

480

break

481

482

def stop(self):

483

self.running = False

484

self.queue.close()

485

486

# Run producer-consumer system

487

with Connection('redis://localhost:6379/0') as conn:

488

queue_name = 'task_processing'

489

490

# Create producer and consumers

491

producer = TaskProducer(conn, queue_name)

492

consumers = [

493

TaskConsumer(conn, queue_name, i)

494

for i in range(3) # 3 consumer workers

495

]

496

497

# Start threads

498

producer_thread = threading.Thread(target=producer.produce_tasks)

499

consumer_threads = [

500

threading.Thread(target=consumer.consume_tasks)

501

for consumer in consumers

502

]

503

504

producer_thread.start()

505

for thread in consumer_threads:

506

thread.start()

507

508

try:

509

# Run for 30 seconds

510

time.sleep(30)

511

except KeyboardInterrupt:

512

pass

513

514

# Graceful shutdown

515

producer.stop()

516

for consumer in consumers:

517

consumer.stop()

518

519

producer_thread.join()

520

for thread in consumer_threads:

521

thread.join()

522

523

print("All threads stopped")

524

```