or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-queueing.mdconnections.mdcursors.mdindex.mdlobs.mdnotifications.mdobject-types.mdsession-pools.mdsoda.md

advanced-queueing.mddocs/

0

# Advanced Queueing (AQ)

1

2

Oracle Advanced Queueing for message-oriented middleware with message enqueueing, dequeueing, and queue management operations providing reliable, persistent message delivery.

3

4

## Capabilities

5

6

### Queue Management

7

8

Access and manage Oracle Advanced Queues for message processing.

9

10

```python { .api }

11

class Connection:

12

def queue(self, name: str, payloadType=None) -> Queue:

13

"""

14

Access Oracle Advanced Queue.

15

16

Parameters:

17

- name (str): Queue name

18

- payloadType: Message payload type (ObjectType or None for RAW)

19

20

Returns:

21

Queue object for message operations

22

"""

23

```

24

25

```python { .api }

26

class Queue:

27

@property

28

def name(self) -> str:

29

"""Queue name"""

30

31

@property

32

def enqOptions(self) -> EnqOptions:

33

"""Enqueue options object"""

34

35

@property

36

def deqOptions(self) -> DeqOptions:

37

"""Dequeue options object"""

38

39

@property

40

def payloadType(self) -> ObjectType:

41

"""Message payload type"""

42

43

def enqOne(self, msgProperties: MessageProperties) -> None:

44

"""

45

Enqueue single message.

46

47

Parameters:

48

- msgProperties: Message properties and payload

49

"""

50

51

def enqMany(self, msgPropertiesList: list) -> None:

52

"""

53

Enqueue multiple messages.

54

55

Parameters:

56

- msgPropertiesList (list): List of MessageProperties objects

57

"""

58

59

def deqOne(self) -> MessageProperties:

60

"""

61

Dequeue single message.

62

63

Returns:

64

MessageProperties object or None if no message available

65

"""

66

67

def deqMany(self, maxMessages: int) -> list:

68

"""

69

Dequeue multiple messages.

70

71

Parameters:

72

- maxMessages (int): Maximum number of messages to dequeue

73

74

Returns:

75

List of MessageProperties objects

76

"""

77

```

78

79

Usage examples:

80

81

```python

82

# Access queue for RAW messages

83

raw_queue = connection.queue("my_raw_queue")

84

85

# Access queue with object payload type

86

emp_type = connection.gettype("EMPLOYEE_TYPE")

87

obj_queue = connection.queue("my_object_queue", emp_type)

88

89

# Basic message enqueueing

90

msg_props = connection.msgproperties()

91

msg_props.payload = b"Hello, World!"

92

raw_queue.enqOne(msg_props)

93

connection.commit()

94

95

# Basic message dequeueing

96

received_msg = raw_queue.deqOne()

97

if received_msg:

98

print(f"Received: {received_msg.payload}")

99

connection.commit()

100

```

101

102

### Message Properties

103

104

Configure message properties for delivery and processing control.

105

106

```python { .api }

107

class Connection:

108

def msgproperties(self) -> MessageProperties:

109

"""

110

Create message properties object.

111

112

Returns:

113

MessageProperties object for configuring messages

114

"""

115

```

116

117

```python { .api }

118

class MessageProperties:

119

@property

120

def payload(self):

121

"""Message payload (bytes, str, or Oracle object)"""

122

123

@property

124

def correlation(self) -> str:

125

"""Message correlation identifier"""

126

127

@property

128

def delay(self) -> int:

129

"""Delivery delay in seconds"""

130

131

@property

132

def exceptionQueue(self) -> str:

133

"""Exception queue name for failed messages"""

134

135

@property

136

def expiration(self) -> int:

137

"""Message expiration time in seconds"""

138

139

@property

140

def priority(self) -> int:

141

"""Message priority (higher numbers = higher priority)"""

142

143

@property

144

def attempts(self) -> int:

145

"""Number of dequeue attempts (read-only)"""

146

147

@property

148

def state(self) -> int:

149

"""Message state (read-only)"""

150

151

@property

152

def deliveryMode(self) -> int:

153

"""Message delivery mode"""

154

155

@property

156

def enqTime(self) -> datetime:

157

"""Enqueue timestamp (read-only)"""

158

159

@property

160

def msgId(self) -> bytes:

161

"""Message ID (read-only)"""

162

```

163

164

Usage examples:

165

166

```python

167

# Create message with properties

168

msg_props = connection.msgproperties()

169

msg_props.payload = b"Important message"

170

msg_props.priority = 5 # Higher priority

171

msg_props.correlation = "ORDER_12345"

172

msg_props.delay = 60 # Delay delivery by 60 seconds

173

msg_props.expiration = 3600 # Expire after 1 hour

174

175

# Enqueue message

176

queue.enqOne(msg_props)

177

connection.commit()

178

179

# Dequeue and examine message properties

180

received = queue.deqOne()

181

if received:

182

print(f"Message ID: {received.msgId}")

183

print(f"Correlation: {received.correlation}")

184

print(f"Priority: {received.priority}")

185

print(f"Enqueue time: {received.enqTime}")

186

print(f"Attempts: {received.attempts}")

187

print(f"State: {received.state}")

188

```

189

190

### Enqueue Options

191

192

Configure message enqueuing behavior and transaction handling.

193

194

```python { .api }

195

class EnqOptions:

196

@property

197

def visibility(self) -> int:

198

"""Transaction visibility (ENQ_IMMEDIATE, ENQ_ON_COMMIT)"""

199

200

@property

201

def deliveryMode(self) -> int:

202

"""Delivery mode (MSG_PERSISTENT, MSG_BUFFERED, MSG_PERSISTENT_OR_BUFFERED)"""

203

204

@property

205

def transformation(self) -> str:

206

"""Message transformation function"""

207

```

208

209

Enqueue visibility constants:

210

211

```python { .api }

212

ENQ_IMMEDIATE: int # Message visible immediately

213

ENQ_ON_COMMIT: int # Message visible after commit

214

```

215

216

Message delivery mode constants:

217

218

```python { .api }

219

MSG_PERSISTENT: int # Persistent messages (stored in database)

220

MSG_BUFFERED: int # Buffered messages (stored in memory)

221

MSG_PERSISTENT_OR_BUFFERED: int # Either persistent or buffered

222

```

223

224

Usage examples:

225

226

```python

227

# Configure enqueue options

228

queue.enqOptions.visibility = cx_Oracle.ENQ_ON_COMMIT

229

queue.enqOptions.deliveryMode = cx_Oracle.MSG_PERSISTENT

230

231

# Enqueue with immediate visibility

232

queue.enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE

233

msg_props = connection.msgproperties()

234

msg_props.payload = b"Urgent message"

235

queue.enqOne(msg_props)

236

237

# Enqueue buffered message for high performance

238

queue.enqOptions.deliveryMode = cx_Oracle.MSG_BUFFERED

239

msg_props.payload = b"High-volume message"

240

queue.enqOne(msg_props)

241

```

242

243

### Dequeue Options

244

245

Configure message dequeuing behavior and filtering.

246

247

```python { .api }

248

class DeqOptions:

249

@property

250

def condition(self) -> str:

251

"""Dequeue condition (WHERE clause)"""

252

253

@property

254

def consumername(self) -> str:

255

"""Consumer name for multi-consumer queues"""

256

257

@property

258

def correlation(self) -> str:

259

"""Correlation filter"""

260

261

@property

262

def mode(self) -> int:

263

"""Dequeue mode"""

264

265

@property

266

def navigation(self) -> int:

267

"""Navigation mode"""

268

269

@property

270

def transformation(self) -> str:

271

"""Message transformation function"""

272

273

@property

274

def visibility(self) -> int:

275

"""Transaction visibility"""

276

277

@property

278

def wait(self) -> int:

279

"""Wait time in seconds"""

280

281

@property

282

def msgid(self) -> bytes:

283

"""Specific message ID to dequeue"""

284

```

285

286

Dequeue mode constants:

287

288

```python { .api }

289

DEQ_BROWSE: int # Browse message without removing

290

DEQ_LOCKED: int # Lock message for processing

291

DEQ_REMOVE: int # Remove message from queue

292

DEQ_REMOVE_NODATA: int # Remove message without returning data

293

```

294

295

Navigation constants:

296

297

```python { .api }

298

DEQ_FIRST_MSG: int # Get first message

299

DEQ_NEXT_MSG: int # Get next message

300

DEQ_NEXT_TRANSACTION: int # Get next message in different transaction

301

```

302

303

Dequeue visibility constants:

304

305

```python { .api }

306

DEQ_IMMEDIATE: int # Changes visible immediately

307

DEQ_ON_COMMIT: int # Changes visible after commit

308

```

309

310

Wait time constants:

311

312

```python { .api }

313

DEQ_NO_WAIT: int # Don't wait if no message available

314

DEQ_WAIT_FOREVER: int # Wait indefinitely for message

315

```

316

317

Usage examples:

318

319

```python

320

# Configure dequeue options for selective message processing

321

queue.deqOptions.correlation = "ORDER_12345" # Only messages with this correlation

322

queue.deqOptions.condition = "priority > 3" # Only high-priority messages

323

queue.deqOptions.wait = 30 # Wait up to 30 seconds

324

325

# Dequeue with filtering

326

msg = queue.deqOne()

327

if msg:

328

print(f"High-priority order message: {msg.payload}")

329

330

# Browse messages without removing them

331

queue.deqOptions.mode = cx_Oracle.DEQ_BROWSE

332

queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG

333

334

browsed_msg = queue.deqOne()

335

while browsed_msg:

336

print(f"Browsing message: {browsed_msg.correlation}")

337

queue.deqOptions.navigation = cx_Oracle.DEQ_NEXT_MSG

338

browsed_msg = queue.deqOne()

339

340

# Reset to normal dequeue mode

341

queue.deqOptions.mode = cx_Oracle.DEQ_REMOVE

342

queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG

343

344

# Consumer-specific dequeue for multi-consumer queues

345

queue.deqOptions.consumername = "CONSUMER_A"

346

consumer_msg = queue.deqOne()

347

348

# Dequeue specific message by ID

349

if msg_id:

350

queue.deqOptions.msgid = msg_id

351

specific_msg = queue.deqOne()

352

```

353

354

### Object-Type Messages

355

356

Work with structured Oracle object types as message payloads.

357

358

```python

359

# Define object type in database first:

360

# CREATE TYPE employee_msg_type AS OBJECT (

361

# emp_id NUMBER,

362

# emp_name VARCHAR2(100),

363

# department VARCHAR2(50),

364

# action VARCHAR2(20)

365

# );

366

367

# Get object type and create queue

368

emp_msg_type = connection.gettype("EMPLOYEE_MSG_TYPE")

369

emp_queue = connection.queue("employee_updates", emp_msg_type)

370

371

# Create object message

372

emp_obj = emp_msg_type.newobject()

373

emp_obj.EMP_ID = 1001

374

emp_obj.EMP_NAME = "John Doe"

375

emp_obj.DEPARTMENT = "Engineering"

376

emp_obj.ACTION = "HIRE"

377

378

# Enqueue object message

379

msg_props = connection.msgproperties()

380

msg_props.payload = emp_obj

381

emp_queue.enqOne(msg_props)

382

connection.commit()

383

384

# Dequeue object message

385

received_msg = emp_queue.deqOne()

386

if received_msg:

387

emp_data = received_msg.payload

388

print(f"Employee {emp_data.EMP_ID}: {emp_data.EMP_NAME}")

389

print(f"Department: {emp_data.DEPARTMENT}")

390

print(f"Action: {emp_data.ACTION}")

391

connection.commit()

392

```

393

394

### Bulk Message Operations

395

396

Efficiently handle multiple messages with batch operations.

397

398

```python

399

# Enqueue multiple messages

400

messages = []

401

for i in range(100):

402

msg_props = connection.msgproperties()

403

msg_props.payload = f"Message {i}".encode()

404

msg_props.correlation = f"BATCH_{i // 10}" # Group by batch

405

messages.append(msg_props)

406

407

# Bulk enqueue

408

queue.enqMany(messages)

409

connection.commit()

410

print(f"Enqueued {len(messages)} messages")

411

412

# Bulk dequeue

413

max_messages = 50

414

received_messages = queue.deqMany(max_messages)

415

print(f"Dequeued {len(received_messages)} messages")

416

417

for msg in received_messages:

418

print(f"Processing: {msg.payload.decode()}")

419

420

connection.commit()

421

```

422

423

## Message States and Lifecycle

424

425

Message state constants indicate message processing status:

426

427

```python { .api }

428

MSG_EXPIRED: int # Message has expired

429

MSG_READY: int # Message ready for dequeue

430

MSG_PROCESSED: int # Message has been processed

431

MSG_WAITING: int # Message waiting for delay/schedule

432

```

433

434

Special constants for message timing:

435

436

```python { .api }

437

MSG_NO_DELAY: int # No delivery delay (0)

438

MSG_NO_EXPIRATION: int # No expiration time (-1)

439

```

440

441

Usage examples:

442

443

```python

444

# Check message state

445

received_msg = queue.deqOne()

446

if received_msg:

447

if received_msg.state == cx_Oracle.MSG_READY:

448

print("Message is ready for processing")

449

elif received_msg.state == cx_Oracle.MSG_EXPIRED:

450

print("Message has expired")

451

elif received_msg.state == cx_Oracle.MSG_PROCESSED:

452

print("Message already processed")

453

454

# Set message with no expiration

455

msg_props = connection.msgproperties()

456

msg_props.payload = b"Persistent message"

457

msg_props.expiration = cx_Oracle.MSG_NO_EXPIRATION

458

queue.enqOne(msg_props)

459

```

460

461

## Advanced AQ Patterns

462

463

### Message Correlation and Filtering

464

465

Implement sophisticated message routing and filtering:

466

467

```python

468

def send_order_updates(queue, orders):

469

"""Send order update messages with correlation IDs"""

470

for order in orders:

471

msg_props = connection.msgproperties()

472

msg_props.payload = json.dumps(order).encode()

473

msg_props.correlation = f"ORDER_{order['order_id']}"

474

msg_props.priority = order.get('priority', 1)

475

queue.enqOne(msg_props)

476

connection.commit()

477

478

def process_high_priority_orders(queue):

479

"""Process only high-priority order messages"""

480

queue.deqOptions.condition = "priority >= 5"

481

queue.deqOptions.wait = 10 # Wait 10 seconds

482

483

while True:

484

msg = queue.deqOne()

485

if not msg:

486

break

487

488

order_data = json.loads(msg.payload.decode())

489

print(f"Processing high-priority order: {order_data['order_id']}")

490

connection.commit()

491

492

def process_specific_order(queue, order_id):

493

"""Process messages for specific order"""

494

queue.deqOptions.correlation = f"ORDER_{order_id}"

495

496

msg = queue.deqOne()

497

if msg:

498

order_data = json.loads(msg.payload.decode())

499

print(f"Processing order {order_id}: {order_data}")

500

connection.commit()

501

else:

502

print(f"No messages found for order {order_id}")

503

```

504

505

### Exception Handling and Dead Letter Queues

506

507

Handle message processing failures:

508

509

```python

510

def setup_exception_handling(queue):

511

"""Configure exception queue for failed messages"""

512

# Set exception queue for failed message handling

513

msg_props = connection.msgproperties()

514

msg_props.exceptionQueue = "failed_messages_queue"

515

msg_props.payload = b"Message that might fail"

516

queue.enqOne(msg_props)

517

518

def process_with_retry(queue, max_retries=3):

519

"""Process messages with retry logic"""

520

msg = queue.deqOne()

521

if not msg:

522

return

523

524

try:

525

# Simulate message processing

526

payload = msg.payload.decode()

527

if "fail" in payload.lower():

528

raise Exception("Simulated processing failure")

529

530

print(f"Successfully processed: {payload}")

531

connection.commit()

532

533

except Exception as e:

534

print(f"Processing failed: {e}")

535

536

if msg.attempts < max_retries:

537

# Rollback to retry message

538

connection.rollback()

539

print(f"Message will be retried (attempt {msg.attempts + 1})")

540

else:

541

# Move to exception queue or log error

542

print(f"Message failed after {max_retries} attempts")

543

connection.commit() # Remove from queue

544

```

545

546

### Multi-Consumer Queues

547

548

Implement message distribution across multiple consumers:

549

550

```python

551

def setup_multi_consumer_processing():

552

"""Setup multiple consumers for parallel processing"""

553

consumers = ["WORKER_1", "WORKER_2", "WORKER_3"]

554

555

# Each consumer processes messages independently

556

for consumer_name in consumers:

557

# In practice, each consumer would run in separate process/thread

558

process_as_consumer(consumer_name)

559

560

def process_as_consumer(consumer_name):

561

"""Process messages as specific consumer"""

562

queue = connection.queue("work_queue")

563

queue.deqOptions.consumername = consumer_name

564

queue.deqOptions.wait = 30

565

566

print(f"Consumer {consumer_name} starting...")

567

568

while True:

569

msg = queue.deqOne()

570

if not msg:

571

print(f"Consumer {consumer_name}: No messages, waiting...")

572

continue

573

574

try:

575

work_item = json.loads(msg.payload.decode())

576

print(f"Consumer {consumer_name} processing: {work_item['task_id']}")

577

578

# Simulate work processing

579

import time

580

time.sleep(work_item.get('duration', 1))

581

582

print(f"Consumer {consumer_name} completed: {work_item['task_id']}")

583

connection.commit()

584

585

except Exception as e:

586

print(f"Consumer {consumer_name} error: {e}")

587

connection.rollback()

588

```

589

590

## Error Handling

591

592

Handle AQ-specific errors and exceptions:

593

594

```python

595

try:

596

# AQ operations

597

msg = queue.deqOne()

598

if msg:

599

# Process message

600

connection.commit()

601

602

except cx_Oracle.DatabaseError as e:

603

error_obj, = e.args

604

605

if error_obj.code == 25228: # No message available

606

print("No messages in queue")

607

elif error_obj.code == 25235: # Queue does not exist

608

print("Queue not found")

609

elif error_obj.code == 25207: # Enqueue failed

610

print("Failed to enqueue message")

611

elif error_obj.code == 25237: # Message expired

612

print("Message has expired")

613

else:

614

print(f"AQ error {error_obj.code}: {error_obj.message}")

615

616

except Exception as e:

617

print(f"Unexpected error: {e}")

618

connection.rollback()

619

620

finally:

621

# Ensure proper cleanup

622

if connection:

623

connection.rollback() # Rollback any pending changes on error

624

```

625

626

## AQ Best Practices

627

628

1. **Use appropriate delivery modes**: Choose between persistent and buffered messages based on reliability requirements

629

2. **Implement proper exception handling**: Use exception queues for failed messages

630

3. **Optimize message filtering**: Use correlation IDs and conditions to reduce processing overhead

631

4. **Handle transaction boundaries**: Commit or rollback appropriately after message processing

632

5. **Monitor queue depth**: Track message accumulation to identify processing bottlenecks

633

6. **Use bulk operations**: Leverage enqMany() and deqMany() for high-throughput scenarios

634

7. **Set appropriate timeouts**: Configure wait times based on application requirements

635

8. **Implement retry logic**: Handle transient failures with exponential backoff