0
# Advanced Queueing (AQ)
1
2
Oracle Advanced Queueing for message-oriented middleware with message enqueueing, dequeueing, and queue management operations providing reliable, persistent message delivery.
3
4
## Capabilities
5
6
### Queue Management
7
8
Access and manage Oracle Advanced Queues for message processing.
9
10
```python { .api }
11
class Connection:
12
def queue(self, name: str, payloadType=None) -> Queue:
13
"""
14
Access Oracle Advanced Queue.
15
16
Parameters:
17
- name (str): Queue name
18
- payloadType: Message payload type (ObjectType or None for RAW)
19
20
Returns:
21
Queue object for message operations
22
"""
23
```
24
25
```python { .api }
26
class Queue:
27
@property
28
def name(self) -> str:
29
"""Queue name"""
30
31
@property
32
def enqOptions(self) -> EnqOptions:
33
"""Enqueue options object"""
34
35
@property
36
def deqOptions(self) -> DeqOptions:
37
"""Dequeue options object"""
38
39
@property
40
def payloadType(self) -> ObjectType:
41
"""Message payload type"""
42
43
def enqOne(self, msgProperties: MessageProperties) -> None:
44
"""
45
Enqueue single message.
46
47
Parameters:
48
- msgProperties: Message properties and payload
49
"""
50
51
def enqMany(self, msgPropertiesList: list) -> None:
52
"""
53
Enqueue multiple messages.
54
55
Parameters:
56
- msgPropertiesList (list): List of MessageProperties objects
57
"""
58
59
def deqOne(self) -> MessageProperties:
60
"""
61
Dequeue single message.
62
63
Returns:
64
MessageProperties object or None if no message available
65
"""
66
67
def deqMany(self, maxMessages: int) -> list:
68
"""
69
Dequeue multiple messages.
70
71
Parameters:
72
- maxMessages (int): Maximum number of messages to dequeue
73
74
Returns:
75
List of MessageProperties objects
76
"""
77
```
78
79
Usage examples:
80
81
```python
82
# Access queue for RAW messages
83
raw_queue = connection.queue("my_raw_queue")
84
85
# Access queue with object payload type
86
emp_type = connection.gettype("EMPLOYEE_TYPE")
87
obj_queue = connection.queue("my_object_queue", emp_type)
88
89
# Basic message enqueueing
90
msg_props = connection.msgproperties()
91
msg_props.payload = b"Hello, World!"
92
raw_queue.enqOne(msg_props)
93
connection.commit()
94
95
# Basic message dequeueing
96
received_msg = raw_queue.deqOne()
97
if received_msg:
98
print(f"Received: {received_msg.payload}")
99
connection.commit()
100
```
101
102
### Message Properties
103
104
Configure message properties for delivery and processing control.
105
106
```python { .api }
107
class Connection:
108
def msgproperties(self) -> MessageProperties:
109
"""
110
Create message properties object.
111
112
Returns:
113
MessageProperties object for configuring messages
114
"""
115
```
116
117
```python { .api }
118
class MessageProperties:
119
@property
120
def payload(self):
121
"""Message payload (bytes, str, or Oracle object)"""
122
123
@property
124
def correlation(self) -> str:
125
"""Message correlation identifier"""
126
127
@property
128
def delay(self) -> int:
129
"""Delivery delay in seconds"""
130
131
@property
132
def exceptionQueue(self) -> str:
133
"""Exception queue name for failed messages"""
134
135
@property
136
def expiration(self) -> int:
137
"""Message expiration time in seconds"""
138
139
@property
140
def priority(self) -> int:
141
"""Message priority (higher numbers = higher priority)"""
142
143
@property
144
def attempts(self) -> int:
145
"""Number of dequeue attempts (read-only)"""
146
147
@property
148
def state(self) -> int:
149
"""Message state (read-only)"""
150
151
@property
152
def deliveryMode(self) -> int:
153
"""Message delivery mode"""
154
155
@property
156
def enqTime(self) -> datetime:
157
"""Enqueue timestamp (read-only)"""
158
159
@property
160
def msgId(self) -> bytes:
161
"""Message ID (read-only)"""
162
```
163
164
Usage examples:
165
166
```python
167
# Create message with properties
168
msg_props = connection.msgproperties()
169
msg_props.payload = b"Important message"
170
msg_props.priority = 5 # Higher priority
171
msg_props.correlation = "ORDER_12345"
172
msg_props.delay = 60 # Delay delivery by 60 seconds
173
msg_props.expiration = 3600 # Expire after 1 hour
174
175
# Enqueue message
176
queue.enqOne(msg_props)
177
connection.commit()
178
179
# Dequeue and examine message properties
180
received = queue.deqOne()
181
if received:
182
print(f"Message ID: {received.msgId}")
183
print(f"Correlation: {received.correlation}")
184
print(f"Priority: {received.priority}")
185
print(f"Enqueue time: {received.enqTime}")
186
print(f"Attempts: {received.attempts}")
187
print(f"State: {received.state}")
188
```
189
190
### Enqueue Options
191
192
Configure message enqueuing behavior and transaction handling.
193
194
```python { .api }
195
class EnqOptions:
196
@property
197
def visibility(self) -> int:
198
"""Transaction visibility (ENQ_IMMEDIATE, ENQ_ON_COMMIT)"""
199
200
@property
201
def deliveryMode(self) -> int:
202
"""Delivery mode (MSG_PERSISTENT, MSG_BUFFERED, MSG_PERSISTENT_OR_BUFFERED)"""
203
204
@property
205
def transformation(self) -> str:
206
"""Message transformation function"""
207
```
208
209
Enqueue visibility constants:
210
211
```python { .api }
212
ENQ_IMMEDIATE: int # Message visible immediately
213
ENQ_ON_COMMIT: int # Message visible after commit
214
```
215
216
Message delivery mode constants:
217
218
```python { .api }
219
MSG_PERSISTENT: int # Persistent messages (stored in database)
220
MSG_BUFFERED: int # Buffered messages (stored in memory)
221
MSG_PERSISTENT_OR_BUFFERED: int # Either persistent or buffered
222
```
223
224
Usage examples:
225
226
```python
227
# Configure enqueue options
228
queue.enqOptions.visibility = cx_Oracle.ENQ_ON_COMMIT
229
queue.enqOptions.deliveryMode = cx_Oracle.MSG_PERSISTENT
230
231
# Enqueue with immediate visibility
232
queue.enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE
233
msg_props = connection.msgproperties()
234
msg_props.payload = b"Urgent message"
235
queue.enqOne(msg_props)
236
237
# Enqueue buffered message for high performance
238
queue.enqOptions.deliveryMode = cx_Oracle.MSG_BUFFERED
239
msg_props.payload = b"High-volume message"
240
queue.enqOne(msg_props)
241
```
242
243
### Dequeue Options
244
245
Configure message dequeuing behavior and filtering.
246
247
```python { .api }
248
class DeqOptions:
249
@property
250
def condition(self) -> str:
251
"""Dequeue condition (WHERE clause)"""
252
253
@property
254
def consumername(self) -> str:
255
"""Consumer name for multi-consumer queues"""
256
257
@property
258
def correlation(self) -> str:
259
"""Correlation filter"""
260
261
@property
262
def mode(self) -> int:
263
"""Dequeue mode"""
264
265
@property
266
def navigation(self) -> int:
267
"""Navigation mode"""
268
269
@property
270
def transformation(self) -> str:
271
"""Message transformation function"""
272
273
@property
274
def visibility(self) -> int:
275
"""Transaction visibility"""
276
277
@property
278
def wait(self) -> int:
279
"""Wait time in seconds"""
280
281
@property
282
def msgid(self) -> bytes:
283
"""Specific message ID to dequeue"""
284
```
285
286
Dequeue mode constants:
287
288
```python { .api }
289
DEQ_BROWSE: int # Browse message without removing
290
DEQ_LOCKED: int # Lock message for processing
291
DEQ_REMOVE: int # Remove message from queue
292
DEQ_REMOVE_NODATA: int # Remove message without returning data
293
```
294
295
Navigation constants:
296
297
```python { .api }
298
DEQ_FIRST_MSG: int # Get first message
299
DEQ_NEXT_MSG: int # Get next message
300
DEQ_NEXT_TRANSACTION: int # Get next message in different transaction
301
```
302
303
Dequeue visibility constants:
304
305
```python { .api }
306
DEQ_IMMEDIATE: int # Changes visible immediately
307
DEQ_ON_COMMIT: int # Changes visible after commit
308
```
309
310
Wait time constants:
311
312
```python { .api }
313
DEQ_NO_WAIT: int # Don't wait if no message available
314
DEQ_WAIT_FOREVER: int # Wait indefinitely for message
315
```
316
317
Usage examples:
318
319
```python
320
# Configure dequeue options for selective message processing
321
queue.deqOptions.correlation = "ORDER_12345" # Only messages with this correlation
322
queue.deqOptions.condition = "priority > 3" # Only high-priority messages
323
queue.deqOptions.wait = 30 # Wait up to 30 seconds
324
325
# Dequeue with filtering
326
msg = queue.deqOne()
327
if msg:
328
print(f"High-priority order message: {msg.payload}")
329
330
# Browse messages without removing them
331
queue.deqOptions.mode = cx_Oracle.DEQ_BROWSE
332
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
333
334
browsed_msg = queue.deqOne()
335
while browsed_msg:
336
print(f"Browsing message: {browsed_msg.correlation}")
337
queue.deqOptions.navigation = cx_Oracle.DEQ_NEXT_MSG
338
browsed_msg = queue.deqOne()
339
340
# Reset to normal dequeue mode
341
queue.deqOptions.mode = cx_Oracle.DEQ_REMOVE
342
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
343
344
# Consumer-specific dequeue for multi-consumer queues
345
queue.deqOptions.consumername = "CONSUMER_A"
346
consumer_msg = queue.deqOne()
347
348
# Dequeue specific message by ID
349
if msg_id:
350
queue.deqOptions.msgid = msg_id
351
specific_msg = queue.deqOne()
352
```
353
354
### Object-Type Messages
355
356
Work with structured Oracle object types as message payloads.
357
358
```python
359
# Define object type in database first:
360
# CREATE TYPE employee_msg_type AS OBJECT (
361
# emp_id NUMBER,
362
# emp_name VARCHAR2(100),
363
# department VARCHAR2(50),
364
# action VARCHAR2(20)
365
# );
366
367
# Get object type and create queue
368
emp_msg_type = connection.gettype("EMPLOYEE_MSG_TYPE")
369
emp_queue = connection.queue("employee_updates", emp_msg_type)
370
371
# Create object message
372
emp_obj = emp_msg_type.newobject()
373
emp_obj.EMP_ID = 1001
374
emp_obj.EMP_NAME = "John Doe"
375
emp_obj.DEPARTMENT = "Engineering"
376
emp_obj.ACTION = "HIRE"
377
378
# Enqueue object message
379
msg_props = connection.msgproperties()
380
msg_props.payload = emp_obj
381
emp_queue.enqOne(msg_props)
382
connection.commit()
383
384
# Dequeue object message
385
received_msg = emp_queue.deqOne()
386
if received_msg:
387
emp_data = received_msg.payload
388
print(f"Employee {emp_data.EMP_ID}: {emp_data.EMP_NAME}")
389
print(f"Department: {emp_data.DEPARTMENT}")
390
print(f"Action: {emp_data.ACTION}")
391
connection.commit()
392
```
393
394
### Bulk Message Operations
395
396
Efficiently handle multiple messages with batch operations.
397
398
```python
399
# Enqueue multiple messages
400
messages = []
401
for i in range(100):
402
msg_props = connection.msgproperties()
403
msg_props.payload = f"Message {i}".encode()
404
msg_props.correlation = f"BATCH_{i // 10}" # Group by batch
405
messages.append(msg_props)
406
407
# Bulk enqueue
408
queue.enqMany(messages)
409
connection.commit()
410
print(f"Enqueued {len(messages)} messages")
411
412
# Bulk dequeue
413
max_messages = 50
414
received_messages = queue.deqMany(max_messages)
415
print(f"Dequeued {len(received_messages)} messages")
416
417
for msg in received_messages:
418
print(f"Processing: {msg.payload.decode()}")
419
420
connection.commit()
421
```
422
423
## Message States and Lifecycle
424
425
Message state constants indicate message processing status:
426
427
```python { .api }
428
MSG_EXPIRED: int # Message has expired
429
MSG_READY: int # Message ready for dequeue
430
MSG_PROCESSED: int # Message has been processed
431
MSG_WAITING: int # Message waiting for delay/schedule
432
```
433
434
Special constants for message timing:
435
436
```python { .api }
437
MSG_NO_DELAY: int # No delivery delay (0)
438
MSG_NO_EXPIRATION: int # No expiration time (-1)
439
```
440
441
Usage examples:
442
443
```python
444
# Check message state
445
received_msg = queue.deqOne()
446
if received_msg:
447
if received_msg.state == cx_Oracle.MSG_READY:
448
print("Message is ready for processing")
449
elif received_msg.state == cx_Oracle.MSG_EXPIRED:
450
print("Message has expired")
451
elif received_msg.state == cx_Oracle.MSG_PROCESSED:
452
print("Message already processed")
453
454
# Set message with no expiration
455
msg_props = connection.msgproperties()
456
msg_props.payload = b"Persistent message"
457
msg_props.expiration = cx_Oracle.MSG_NO_EXPIRATION
458
queue.enqOne(msg_props)
459
```
460
461
## Advanced AQ Patterns
462
463
### Message Correlation and Filtering
464
465
Implement sophisticated message routing and filtering:
466
467
```python
468
def send_order_updates(queue, orders):
469
"""Send order update messages with correlation IDs"""
470
for order in orders:
471
msg_props = connection.msgproperties()
472
msg_props.payload = json.dumps(order).encode()
473
msg_props.correlation = f"ORDER_{order['order_id']}"
474
msg_props.priority = order.get('priority', 1)
475
queue.enqOne(msg_props)
476
connection.commit()
477
478
def process_high_priority_orders(queue):
479
"""Process only high-priority order messages"""
480
queue.deqOptions.condition = "priority >= 5"
481
queue.deqOptions.wait = 10 # Wait 10 seconds
482
483
while True:
484
msg = queue.deqOne()
485
if not msg:
486
break
487
488
order_data = json.loads(msg.payload.decode())
489
print(f"Processing high-priority order: {order_data['order_id']}")
490
connection.commit()
491
492
def process_specific_order(queue, order_id):
493
"""Process messages for specific order"""
494
queue.deqOptions.correlation = f"ORDER_{order_id}"
495
496
msg = queue.deqOne()
497
if msg:
498
order_data = json.loads(msg.payload.decode())
499
print(f"Processing order {order_id}: {order_data}")
500
connection.commit()
501
else:
502
print(f"No messages found for order {order_id}")
503
```
504
505
### Exception Handling and Dead Letter Queues
506
507
Handle message processing failures:
508
509
```python
510
def setup_exception_handling(queue):
511
"""Configure exception queue for failed messages"""
512
# Set exception queue for failed message handling
513
msg_props = connection.msgproperties()
514
msg_props.exceptionQueue = "failed_messages_queue"
515
msg_props.payload = b"Message that might fail"
516
queue.enqOne(msg_props)
517
518
def process_with_retry(queue, max_retries=3):
519
"""Process messages with retry logic"""
520
msg = queue.deqOne()
521
if not msg:
522
return
523
524
try:
525
# Simulate message processing
526
payload = msg.payload.decode()
527
if "fail" in payload.lower():
528
raise Exception("Simulated processing failure")
529
530
print(f"Successfully processed: {payload}")
531
connection.commit()
532
533
except Exception as e:
534
print(f"Processing failed: {e}")
535
536
if msg.attempts < max_retries:
537
# Rollback to retry message
538
connection.rollback()
539
print(f"Message will be retried (attempt {msg.attempts + 1})")
540
else:
541
# Move to exception queue or log error
542
print(f"Message failed after {max_retries} attempts")
543
connection.commit() # Remove from queue
544
```
545
546
### Multi-Consumer Queues
547
548
Implement message distribution across multiple consumers:
549
550
```python
551
def setup_multi_consumer_processing():
552
"""Setup multiple consumers for parallel processing"""
553
consumers = ["WORKER_1", "WORKER_2", "WORKER_3"]
554
555
# Each consumer processes messages independently
556
for consumer_name in consumers:
557
# In practice, each consumer would run in separate process/thread
558
process_as_consumer(consumer_name)
559
560
def process_as_consumer(consumer_name):
561
"""Process messages as specific consumer"""
562
queue = connection.queue("work_queue")
563
queue.deqOptions.consumername = consumer_name
564
queue.deqOptions.wait = 30
565
566
print(f"Consumer {consumer_name} starting...")
567
568
while True:
569
msg = queue.deqOne()
570
if not msg:
571
print(f"Consumer {consumer_name}: No messages, waiting...")
572
continue
573
574
try:
575
work_item = json.loads(msg.payload.decode())
576
print(f"Consumer {consumer_name} processing: {work_item['task_id']}")
577
578
# Simulate work processing
579
import time
580
time.sleep(work_item.get('duration', 1))
581
582
print(f"Consumer {consumer_name} completed: {work_item['task_id']}")
583
connection.commit()
584
585
except Exception as e:
586
print(f"Consumer {consumer_name} error: {e}")
587
connection.rollback()
588
```
589
590
## Error Handling
591
592
Handle AQ-specific errors and exceptions:
593
594
```python
595
try:
596
# AQ operations
597
msg = queue.deqOne()
598
if msg:
599
# Process message
600
connection.commit()
601
602
except cx_Oracle.DatabaseError as e:
603
error_obj, = e.args
604
605
if error_obj.code == 25228: # No message available
606
print("No messages in queue")
607
elif error_obj.code == 25235: # Queue does not exist
608
print("Queue not found")
609
elif error_obj.code == 25207: # Enqueue failed
610
print("Failed to enqueue message")
611
elif error_obj.code == 25237: # Message expired
612
print("Message has expired")
613
else:
614
print(f"AQ error {error_obj.code}: {error_obj.message}")
615
616
except Exception as e:
617
print(f"Unexpected error: {e}")
618
connection.rollback()
619
620
finally:
621
# Ensure proper cleanup
622
if connection:
623
connection.rollback() # Rollback any pending changes on error
624
```
625
626
## AQ Best Practices
627
628
1. **Use appropriate delivery modes**: Choose between persistent and buffered messages based on reliability requirements
629
2. **Implement proper exception handling**: Use exception queues for failed messages
630
3. **Optimize message filtering**: Use correlation IDs and conditions to reduce processing overhead
631
4. **Handle transaction boundaries**: Commit or rollback appropriately after message processing
632
5. **Monitor queue depth**: Track message accumulation to identify processing bottlenecks
633
6. **Use bulk operations**: Leverage enqMany() and deqMany() for high-throughput scenarios
634
7. **Set appropriate timeouts**: Configure wait times based on application requirements
635
8. **Implement retry logic**: Handle transient failures with exponential backoff