Python interface to Oracle Database implementing DB API 2.0 with Oracle-specific extensions
—
Oracle Advanced Queueing for message-oriented middleware with message enqueueing, dequeueing, and queue management operations providing reliable, persistent message delivery.
Access and manage Oracle Advanced Queues for message processing.
class Connection:
def queue(self, name: str, payloadType=None) -> Queue:
"""
Access Oracle Advanced Queue.
Parameters:
- name (str): Queue name
- payloadType: Message payload type (ObjectType or None for RAW)
Returns:
Queue object for message operations
"""class Queue:
@property
def name(self) -> str:
"""Queue name"""
@property
def enqOptions(self) -> EnqOptions:
"""Enqueue options object"""
@property
def deqOptions(self) -> DeqOptions:
"""Dequeue options object"""
@property
def payloadType(self) -> ObjectType:
"""Message payload type"""
def enqOne(self, msgProperties: MessageProperties) -> None:
"""
Enqueue single message.
Parameters:
- msgProperties: Message properties and payload
"""
def enqMany(self, msgPropertiesList: list) -> None:
"""
Enqueue multiple messages.
Parameters:
- msgPropertiesList (list): List of MessageProperties objects
"""
def deqOne(self) -> MessageProperties:
"""
Dequeue single message.
Returns:
MessageProperties object or None if no message available
"""
def deqMany(self, maxMessages: int) -> list:
"""
Dequeue multiple messages.
Parameters:
- maxMessages (int): Maximum number of messages to dequeue
Returns:
List of MessageProperties objects
"""Usage examples:
# Access queue for RAW messages
raw_queue = connection.queue("my_raw_queue")
# Access queue with object payload type
emp_type = connection.gettype("EMPLOYEE_TYPE")
obj_queue = connection.queue("my_object_queue", emp_type)
# Basic message enqueueing
msg_props = connection.msgproperties()
msg_props.payload = b"Hello, World!"
raw_queue.enqOne(msg_props)
connection.commit()
# Basic message dequeueing
received_msg = raw_queue.deqOne()
if received_msg:
print(f"Received: {received_msg.payload}")
connection.commit()Configure message properties for delivery and processing control.
class Connection:
def msgproperties(self) -> MessageProperties:
"""
Create message properties object.
Returns:
MessageProperties object for configuring messages
"""class MessageProperties:
@property
def payload(self):
"""Message payload (bytes, str, or Oracle object)"""
@property
def correlation(self) -> str:
"""Message correlation identifier"""
@property
def delay(self) -> int:
"""Delivery delay in seconds"""
@property
def exceptionQueue(self) -> str:
"""Exception queue name for failed messages"""
@property
def expiration(self) -> int:
"""Message expiration time in seconds"""
@property
def priority(self) -> int:
"""Message priority (higher numbers = higher priority)"""
@property
def attempts(self) -> int:
"""Number of dequeue attempts (read-only)"""
@property
def state(self) -> int:
"""Message state (read-only)"""
@property
def deliveryMode(self) -> int:
"""Message delivery mode"""
@property
def enqTime(self) -> datetime:
"""Enqueue timestamp (read-only)"""
@property
def msgId(self) -> bytes:
"""Message ID (read-only)"""Usage examples:
# Create message with properties
msg_props = connection.msgproperties()
msg_props.payload = b"Important message"
msg_props.priority = 5 # Higher priority
msg_props.correlation = "ORDER_12345"
msg_props.delay = 60 # Delay delivery by 60 seconds
msg_props.expiration = 3600 # Expire after 1 hour
# Enqueue message
queue.enqOne(msg_props)
connection.commit()
# Dequeue and examine message properties
received = queue.deqOne()
if received:
print(f"Message ID: {received.msgId}")
print(f"Correlation: {received.correlation}")
print(f"Priority: {received.priority}")
print(f"Enqueue time: {received.enqTime}")
print(f"Attempts: {received.attempts}")
print(f"State: {received.state}")Configure message enqueuing behavior and transaction handling.
class EnqOptions:
@property
def visibility(self) -> int:
"""Transaction visibility (ENQ_IMMEDIATE, ENQ_ON_COMMIT)"""
@property
def deliveryMode(self) -> int:
"""Delivery mode (MSG_PERSISTENT, MSG_BUFFERED, MSG_PERSISTENT_OR_BUFFERED)"""
@property
def transformation(self) -> str:
"""Message transformation function"""Enqueue visibility constants:
ENQ_IMMEDIATE: int # Message visible immediately
ENQ_ON_COMMIT: int # Message visible after commitMessage delivery mode constants:
MSG_PERSISTENT: int # Persistent messages (stored in database)
MSG_BUFFERED: int # Buffered messages (stored in memory)
MSG_PERSISTENT_OR_BUFFERED: int # Either persistent or bufferedUsage examples:
# Configure enqueue options
queue.enqOptions.visibility = cx_Oracle.ENQ_ON_COMMIT
queue.enqOptions.deliveryMode = cx_Oracle.MSG_PERSISTENT
# Enqueue with immediate visibility
queue.enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE
msg_props = connection.msgproperties()
msg_props.payload = b"Urgent message"
queue.enqOne(msg_props)
# Enqueue buffered message for high performance
queue.enqOptions.deliveryMode = cx_Oracle.MSG_BUFFERED
msg_props.payload = b"High-volume message"
queue.enqOne(msg_props)Configure message dequeuing behavior and filtering.
class DeqOptions:
@property
def condition(self) -> str:
"""Dequeue condition (WHERE clause)"""
@property
def consumername(self) -> str:
"""Consumer name for multi-consumer queues"""
@property
def correlation(self) -> str:
"""Correlation filter"""
@property
def mode(self) -> int:
"""Dequeue mode"""
@property
def navigation(self) -> int:
"""Navigation mode"""
@property
def transformation(self) -> str:
"""Message transformation function"""
@property
def visibility(self) -> int:
"""Transaction visibility"""
@property
def wait(self) -> int:
"""Wait time in seconds"""
@property
def msgid(self) -> bytes:
"""Specific message ID to dequeue"""Dequeue mode constants:
DEQ_BROWSE: int # Browse message without removing
DEQ_LOCKED: int # Lock message for processing
DEQ_REMOVE: int # Remove message from queue
DEQ_REMOVE_NODATA: int # Remove message without returning dataNavigation constants:
DEQ_FIRST_MSG: int # Get first message
DEQ_NEXT_MSG: int # Get next message
DEQ_NEXT_TRANSACTION: int # Get next message in different transactionDequeue visibility constants:
DEQ_IMMEDIATE: int # Changes visible immediately
DEQ_ON_COMMIT: int # Changes visible after commitWait time constants:
DEQ_NO_WAIT: int # Don't wait if no message available
DEQ_WAIT_FOREVER: int # Wait indefinitely for messageUsage examples:
# Configure dequeue options for selective message processing
queue.deqOptions.correlation = "ORDER_12345" # Only messages with this correlation
queue.deqOptions.condition = "priority > 3" # Only high-priority messages
queue.deqOptions.wait = 30 # Wait up to 30 seconds
# Dequeue with filtering
msg = queue.deqOne()
if msg:
print(f"High-priority order message: {msg.payload}")
# Browse messages without removing them
queue.deqOptions.mode = cx_Oracle.DEQ_BROWSE
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
browsed_msg = queue.deqOne()
while browsed_msg:
print(f"Browsing message: {browsed_msg.correlation}")
queue.deqOptions.navigation = cx_Oracle.DEQ_NEXT_MSG
browsed_msg = queue.deqOne()
# Reset to normal dequeue mode
queue.deqOptions.mode = cx_Oracle.DEQ_REMOVE
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
# Consumer-specific dequeue for multi-consumer queues
queue.deqOptions.consumername = "CONSUMER_A"
consumer_msg = queue.deqOne()
# Dequeue specific message by ID
if msg_id:
queue.deqOptions.msgid = msg_id
specific_msg = queue.deqOne()Work with structured Oracle object types as message payloads.
# Define object type in database first:
# CREATE TYPE employee_msg_type AS OBJECT (
# emp_id NUMBER,
# emp_name VARCHAR2(100),
# department VARCHAR2(50),
# action VARCHAR2(20)
# );
# Get object type and create queue
emp_msg_type = connection.gettype("EMPLOYEE_MSG_TYPE")
emp_queue = connection.queue("employee_updates", emp_msg_type)
# Create object message
emp_obj = emp_msg_type.newobject()
emp_obj.EMP_ID = 1001
emp_obj.EMP_NAME = "John Doe"
emp_obj.DEPARTMENT = "Engineering"
emp_obj.ACTION = "HIRE"
# Enqueue object message
msg_props = connection.msgproperties()
msg_props.payload = emp_obj
emp_queue.enqOne(msg_props)
connection.commit()
# Dequeue object message
received_msg = emp_queue.deqOne()
if received_msg:
emp_data = received_msg.payload
print(f"Employee {emp_data.EMP_ID}: {emp_data.EMP_NAME}")
print(f"Department: {emp_data.DEPARTMENT}")
print(f"Action: {emp_data.ACTION}")
connection.commit()Efficiently handle multiple messages with batch operations.
# Enqueue multiple messages
messages = []
for i in range(100):
msg_props = connection.msgproperties()
msg_props.payload = f"Message {i}".encode()
msg_props.correlation = f"BATCH_{i // 10}" # Group by batch
messages.append(msg_props)
# Bulk enqueue
queue.enqMany(messages)
connection.commit()
print(f"Enqueued {len(messages)} messages")
# Bulk dequeue
max_messages = 50
received_messages = queue.deqMany(max_messages)
print(f"Dequeued {len(received_messages)} messages")
for msg in received_messages:
print(f"Processing: {msg.payload.decode()}")
connection.commit()Message state constants indicate message processing status:
MSG_EXPIRED: int # Message has expired
MSG_READY: int # Message ready for dequeue
MSG_PROCESSED: int # Message has been processed
MSG_WAITING: int # Message waiting for delay/scheduleSpecial constants for message timing:
MSG_NO_DELAY: int # No delivery delay (0)
MSG_NO_EXPIRATION: int # No expiration time (-1)Usage examples:
# Check message state
received_msg = queue.deqOne()
if received_msg:
if received_msg.state == cx_Oracle.MSG_READY:
print("Message is ready for processing")
elif received_msg.state == cx_Oracle.MSG_EXPIRED:
print("Message has expired")
elif received_msg.state == cx_Oracle.MSG_PROCESSED:
print("Message already processed")
# Set message with no expiration
msg_props = connection.msgproperties()
msg_props.payload = b"Persistent message"
msg_props.expiration = cx_Oracle.MSG_NO_EXPIRATION
queue.enqOne(msg_props)Implement sophisticated message routing and filtering:
def send_order_updates(queue, orders):
"""Send order update messages with correlation IDs"""
for order in orders:
msg_props = connection.msgproperties()
msg_props.payload = json.dumps(order).encode()
msg_props.correlation = f"ORDER_{order['order_id']}"
msg_props.priority = order.get('priority', 1)
queue.enqOne(msg_props)
connection.commit()
def process_high_priority_orders(queue):
"""Process only high-priority order messages"""
queue.deqOptions.condition = "priority >= 5"
queue.deqOptions.wait = 10 # Wait 10 seconds
while True:
msg = queue.deqOne()
if not msg:
break
order_data = json.loads(msg.payload.decode())
print(f"Processing high-priority order: {order_data['order_id']}")
connection.commit()
def process_specific_order(queue, order_id):
"""Process messages for specific order"""
queue.deqOptions.correlation = f"ORDER_{order_id}"
msg = queue.deqOne()
if msg:
order_data = json.loads(msg.payload.decode())
print(f"Processing order {order_id}: {order_data}")
connection.commit()
else:
print(f"No messages found for order {order_id}")Handle message processing failures:
def setup_exception_handling(queue):
"""Configure exception queue for failed messages"""
# Set exception queue for failed message handling
msg_props = connection.msgproperties()
msg_props.exceptionQueue = "failed_messages_queue"
msg_props.payload = b"Message that might fail"
queue.enqOne(msg_props)
def process_with_retry(queue, max_retries=3):
"""Process messages with retry logic"""
msg = queue.deqOne()
if not msg:
return
try:
# Simulate message processing
payload = msg.payload.decode()
if "fail" in payload.lower():
raise Exception("Simulated processing failure")
print(f"Successfully processed: {payload}")
connection.commit()
except Exception as e:
print(f"Processing failed: {e}")
if msg.attempts < max_retries:
# Rollback to retry message
connection.rollback()
print(f"Message will be retried (attempt {msg.attempts + 1})")
else:
# Move to exception queue or log error
print(f"Message failed after {max_retries} attempts")
connection.commit() # Remove from queueImplement message distribution across multiple consumers:
def setup_multi_consumer_processing():
"""Setup multiple consumers for parallel processing"""
consumers = ["WORKER_1", "WORKER_2", "WORKER_3"]
# Each consumer processes messages independently
for consumer_name in consumers:
# In practice, each consumer would run in separate process/thread
process_as_consumer(consumer_name)
def process_as_consumer(consumer_name):
"""Process messages as specific consumer"""
queue = connection.queue("work_queue")
queue.deqOptions.consumername = consumer_name
queue.deqOptions.wait = 30
print(f"Consumer {consumer_name} starting...")
while True:
msg = queue.deqOne()
if not msg:
print(f"Consumer {consumer_name}: No messages, waiting...")
continue
try:
work_item = json.loads(msg.payload.decode())
print(f"Consumer {consumer_name} processing: {work_item['task_id']}")
# Simulate work processing
import time
time.sleep(work_item.get('duration', 1))
print(f"Consumer {consumer_name} completed: {work_item['task_id']}")
connection.commit()
except Exception as e:
print(f"Consumer {consumer_name} error: {e}")
connection.rollback()Handle AQ-specific errors and exceptions:
try:
# AQ operations
msg = queue.deqOne()
if msg:
# Process message
connection.commit()
except cx_Oracle.DatabaseError as e:
error_obj, = e.args
if error_obj.code == 25228: # No message available
print("No messages in queue")
elif error_obj.code == 25235: # Queue does not exist
print("Queue not found")
elif error_obj.code == 25207: # Enqueue failed
print("Failed to enqueue message")
elif error_obj.code == 25237: # Message expired
print("Message has expired")
else:
print(f"AQ error {error_obj.code}: {error_obj.message}")
except Exception as e:
print(f"Unexpected error: {e}")
connection.rollback()
finally:
# Ensure proper cleanup
if connection:
connection.rollback() # Rollback any pending changes on errorInstall with Tessl CLI
npx tessl i tessl/pypi-cx-oracle