CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cx-oracle

Python interface to Oracle Database implementing DB API 2.0 with Oracle-specific extensions

Pending
Overview
Eval results
Files

advanced-queueing.mddocs/

Advanced Queueing (AQ)

Oracle Advanced Queueing for message-oriented middleware with message enqueueing, dequeueing, and queue management operations providing reliable, persistent message delivery.

Capabilities

Queue Management

Access and manage Oracle Advanced Queues for message processing.

class Connection:
    def queue(self, name: str, payloadType=None) -> Queue:
        """
        Access Oracle Advanced Queue.
        
        Parameters:
        - name (str): Queue name
        - payloadType: Message payload type (ObjectType or None for RAW)
        
        Returns:
        Queue object for message operations
        """
class Queue:
    @property
    def name(self) -> str:
        """Queue name"""
        
    @property
    def enqOptions(self) -> EnqOptions:
        """Enqueue options object"""
        
    @property
    def deqOptions(self) -> DeqOptions:
        """Dequeue options object"""
        
    @property
    def payloadType(self) -> ObjectType:
        """Message payload type"""
        
    def enqOne(self, msgProperties: MessageProperties) -> None:
        """
        Enqueue single message.
        
        Parameters:
        - msgProperties: Message properties and payload
        """
        
    def enqMany(self, msgPropertiesList: list) -> None:
        """
        Enqueue multiple messages.
        
        Parameters:
        - msgPropertiesList (list): List of MessageProperties objects
        """
        
    def deqOne(self) -> MessageProperties:
        """
        Dequeue single message.
        
        Returns:
        MessageProperties object or None if no message available
        """
        
    def deqMany(self, maxMessages: int) -> list:
        """
        Dequeue multiple messages.
        
        Parameters:
        - maxMessages (int): Maximum number of messages to dequeue
        
        Returns:
        List of MessageProperties objects
        """

Usage examples:

# Access queue for RAW messages
raw_queue = connection.queue("my_raw_queue")

# Access queue with object payload type
emp_type = connection.gettype("EMPLOYEE_TYPE")
obj_queue = connection.queue("my_object_queue", emp_type)

# Basic message enqueueing
msg_props = connection.msgproperties()
msg_props.payload = b"Hello, World!"
raw_queue.enqOne(msg_props)
connection.commit()

# Basic message dequeueing
received_msg = raw_queue.deqOne()
if received_msg:
    print(f"Received: {received_msg.payload}")
    connection.commit()

Message Properties

Configure message properties for delivery and processing control.

class Connection:
    def msgproperties(self) -> MessageProperties:
        """
        Create message properties object.
        
        Returns:
        MessageProperties object for configuring messages
        """
class MessageProperties:
    @property
    def payload(self):
        """Message payload (bytes, str, or Oracle object)"""
        
    @property
    def correlation(self) -> str:
        """Message correlation identifier"""
        
    @property
    def delay(self) -> int:
        """Delivery delay in seconds"""
        
    @property
    def exceptionQueue(self) -> str:
        """Exception queue name for failed messages"""
        
    @property
    def expiration(self) -> int:
        """Message expiration time in seconds"""
        
    @property
    def priority(self) -> int:
        """Message priority (higher numbers = higher priority)"""
        
    @property
    def attempts(self) -> int:
        """Number of dequeue attempts (read-only)"""
        
    @property
    def state(self) -> int:
        """Message state (read-only)"""
        
    @property
    def deliveryMode(self) -> int:
        """Message delivery mode"""
        
    @property
    def enqTime(self) -> datetime:
        """Enqueue timestamp (read-only)"""
        
    @property
    def msgId(self) -> bytes:
        """Message ID (read-only)"""

Usage examples:

# Create message with properties
msg_props = connection.msgproperties()
msg_props.payload = b"Important message"
msg_props.priority = 5  # Higher priority
msg_props.correlation = "ORDER_12345"
msg_props.delay = 60  # Delay delivery by 60 seconds
msg_props.expiration = 3600  # Expire after 1 hour

# Enqueue message
queue.enqOne(msg_props)
connection.commit()

# Dequeue and examine message properties
received = queue.deqOne()
if received:
    print(f"Message ID: {received.msgId}")
    print(f"Correlation: {received.correlation}")
    print(f"Priority: {received.priority}")
    print(f"Enqueue time: {received.enqTime}")
    print(f"Attempts: {received.attempts}")
    print(f"State: {received.state}")

Enqueue Options

Configure message enqueuing behavior and transaction handling.

class EnqOptions:
    @property
    def visibility(self) -> int:
        """Transaction visibility (ENQ_IMMEDIATE, ENQ_ON_COMMIT)"""
        
    @property
    def deliveryMode(self) -> int:
        """Delivery mode (MSG_PERSISTENT, MSG_BUFFERED, MSG_PERSISTENT_OR_BUFFERED)"""
        
    @property
    def transformation(self) -> str:
        """Message transformation function"""

Enqueue visibility constants:

ENQ_IMMEDIATE: int      # Message visible immediately
ENQ_ON_COMMIT: int      # Message visible after commit

Message delivery mode constants:

MSG_PERSISTENT: int                 # Persistent messages (stored in database)
MSG_BUFFERED: int                   # Buffered messages (stored in memory)
MSG_PERSISTENT_OR_BUFFERED: int     # Either persistent or buffered

Usage examples:

# Configure enqueue options
queue.enqOptions.visibility = cx_Oracle.ENQ_ON_COMMIT
queue.enqOptions.deliveryMode = cx_Oracle.MSG_PERSISTENT

# Enqueue with immediate visibility
queue.enqOptions.visibility = cx_Oracle.ENQ_IMMEDIATE
msg_props = connection.msgproperties()
msg_props.payload = b"Urgent message"
queue.enqOne(msg_props)

# Enqueue buffered message for high performance
queue.enqOptions.deliveryMode = cx_Oracle.MSG_BUFFERED
msg_props.payload = b"High-volume message"
queue.enqOne(msg_props)

Dequeue Options

Configure message dequeuing behavior and filtering.

class DeqOptions:
    @property
    def condition(self) -> str:
        """Dequeue condition (WHERE clause)"""
        
    @property
    def consumername(self) -> str:
        """Consumer name for multi-consumer queues"""
        
    @property
    def correlation(self) -> str:
        """Correlation filter"""
        
    @property
    def mode(self) -> int:
        """Dequeue mode"""
        
    @property
    def navigation(self) -> int:
        """Navigation mode"""
        
    @property
    def transformation(self) -> str:
        """Message transformation function"""
        
    @property
    def visibility(self) -> int:
        """Transaction visibility"""
        
    @property
    def wait(self) -> int:
        """Wait time in seconds"""
        
    @property
    def msgid(self) -> bytes:
        """Specific message ID to dequeue"""

Dequeue mode constants:

DEQ_BROWSE: int             # Browse message without removing
DEQ_LOCKED: int             # Lock message for processing
DEQ_REMOVE: int             # Remove message from queue
DEQ_REMOVE_NODATA: int      # Remove message without returning data

Navigation constants:

DEQ_FIRST_MSG: int          # Get first message
DEQ_NEXT_MSG: int           # Get next message
DEQ_NEXT_TRANSACTION: int   # Get next message in different transaction

Dequeue visibility constants:

DEQ_IMMEDIATE: int          # Changes visible immediately
DEQ_ON_COMMIT: int          # Changes visible after commit

Wait time constants:

DEQ_NO_WAIT: int            # Don't wait if no message available
DEQ_WAIT_FOREVER: int       # Wait indefinitely for message

Usage examples:

# Configure dequeue options for selective message processing
queue.deqOptions.correlation = "ORDER_12345"  # Only messages with this correlation
queue.deqOptions.condition = "priority > 3"   # Only high-priority messages
queue.deqOptions.wait = 30  # Wait up to 30 seconds

# Dequeue with filtering
msg = queue.deqOne()
if msg:
    print(f"High-priority order message: {msg.payload}")

# Browse messages without removing them
queue.deqOptions.mode = cx_Oracle.DEQ_BROWSE
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG

browsed_msg = queue.deqOne()
while browsed_msg:
    print(f"Browsing message: {browsed_msg.correlation}")
    queue.deqOptions.navigation = cx_Oracle.DEQ_NEXT_MSG
    browsed_msg = queue.deqOne()

# Reset to normal dequeue mode
queue.deqOptions.mode = cx_Oracle.DEQ_REMOVE
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG

# Consumer-specific dequeue for multi-consumer queues
queue.deqOptions.consumername = "CONSUMER_A"
consumer_msg = queue.deqOne()

# Dequeue specific message by ID
if msg_id:
    queue.deqOptions.msgid = msg_id
    specific_msg = queue.deqOne()

Object-Type Messages

Work with structured Oracle object types as message payloads.

# Define object type in database first:
# CREATE TYPE employee_msg_type AS OBJECT (
#   emp_id NUMBER,
#   emp_name VARCHAR2(100),
#   department VARCHAR2(50),
#   action VARCHAR2(20)
# );

# Get object type and create queue
emp_msg_type = connection.gettype("EMPLOYEE_MSG_TYPE")
emp_queue = connection.queue("employee_updates", emp_msg_type)

# Create object message
emp_obj = emp_msg_type.newobject()
emp_obj.EMP_ID = 1001
emp_obj.EMP_NAME = "John Doe"
emp_obj.DEPARTMENT = "Engineering"
emp_obj.ACTION = "HIRE"

# Enqueue object message
msg_props = connection.msgproperties()
msg_props.payload = emp_obj
emp_queue.enqOne(msg_props)
connection.commit()

# Dequeue object message
received_msg = emp_queue.deqOne()
if received_msg:
    emp_data = received_msg.payload
    print(f"Employee {emp_data.EMP_ID}: {emp_data.EMP_NAME}")
    print(f"Department: {emp_data.DEPARTMENT}")
    print(f"Action: {emp_data.ACTION}")
    connection.commit()

Bulk Message Operations

Efficiently handle multiple messages with batch operations.

# Enqueue multiple messages
messages = []
for i in range(100):
    msg_props = connection.msgproperties()
    msg_props.payload = f"Message {i}".encode()
    msg_props.correlation = f"BATCH_{i // 10}"  # Group by batch
    messages.append(msg_props)

# Bulk enqueue
queue.enqMany(messages)
connection.commit()
print(f"Enqueued {len(messages)} messages")

# Bulk dequeue
max_messages = 50
received_messages = queue.deqMany(max_messages)
print(f"Dequeued {len(received_messages)} messages")

for msg in received_messages:
    print(f"Processing: {msg.payload.decode()}")
    
connection.commit()

Message States and Lifecycle

Message state constants indicate message processing status:

MSG_EXPIRED: int        # Message has expired
MSG_READY: int          # Message ready for dequeue
MSG_PROCESSED: int      # Message has been processed
MSG_WAITING: int        # Message waiting for delay/schedule

Special constants for message timing:

MSG_NO_DELAY: int       # No delivery delay (0)
MSG_NO_EXPIRATION: int  # No expiration time (-1)

Usage examples:

# Check message state
received_msg = queue.deqOne()
if received_msg:
    if received_msg.state == cx_Oracle.MSG_READY:
        print("Message is ready for processing")
    elif received_msg.state == cx_Oracle.MSG_EXPIRED:
        print("Message has expired")
    elif received_msg.state == cx_Oracle.MSG_PROCESSED:
        print("Message already processed")

# Set message with no expiration
msg_props = connection.msgproperties()
msg_props.payload = b"Persistent message"
msg_props.expiration = cx_Oracle.MSG_NO_EXPIRATION
queue.enqOne(msg_props)

Advanced AQ Patterns

Message Correlation and Filtering

Implement sophisticated message routing and filtering:

def send_order_updates(queue, orders):
    """Send order update messages with correlation IDs"""
    for order in orders:
        msg_props = connection.msgproperties()
        msg_props.payload = json.dumps(order).encode()
        msg_props.correlation = f"ORDER_{order['order_id']}"
        msg_props.priority = order.get('priority', 1)
        queue.enqOne(msg_props)
    connection.commit()

def process_high_priority_orders(queue):
    """Process only high-priority order messages"""
    queue.deqOptions.condition = "priority >= 5"
    queue.deqOptions.wait = 10  # Wait 10 seconds
    
    while True:
        msg = queue.deqOne()
        if not msg:
            break
            
        order_data = json.loads(msg.payload.decode())
        print(f"Processing high-priority order: {order_data['order_id']}")
        connection.commit()

def process_specific_order(queue, order_id):
    """Process messages for specific order"""
    queue.deqOptions.correlation = f"ORDER_{order_id}"
    
    msg = queue.deqOne()
    if msg:
        order_data = json.loads(msg.payload.decode())
        print(f"Processing order {order_id}: {order_data}")
        connection.commit()
    else:
        print(f"No messages found for order {order_id}")

Exception Handling and Dead Letter Queues

Handle message processing failures:

def setup_exception_handling(queue):
    """Configure exception queue for failed messages"""
    # Set exception queue for failed message handling
    msg_props = connection.msgproperties()
    msg_props.exceptionQueue = "failed_messages_queue"
    msg_props.payload = b"Message that might fail"
    queue.enqOne(msg_props)

def process_with_retry(queue, max_retries=3):
    """Process messages with retry logic"""
    msg = queue.deqOne()
    if not msg:
        return
        
    try:
        # Simulate message processing
        payload = msg.payload.decode()
        if "fail" in payload.lower():
            raise Exception("Simulated processing failure")
            
        print(f"Successfully processed: {payload}")
        connection.commit()
        
    except Exception as e:
        print(f"Processing failed: {e}")
        
        if msg.attempts < max_retries:
            # Rollback to retry message
            connection.rollback()
            print(f"Message will be retried (attempt {msg.attempts + 1})")
        else:
            # Move to exception queue or log error
            print(f"Message failed after {max_retries} attempts")
            connection.commit()  # Remove from queue

Multi-Consumer Queues

Implement message distribution across multiple consumers:

def setup_multi_consumer_processing():
    """Setup multiple consumers for parallel processing"""
    consumers = ["WORKER_1", "WORKER_2", "WORKER_3"]
    
    # Each consumer processes messages independently
    for consumer_name in consumers:
        # In practice, each consumer would run in separate process/thread
        process_as_consumer(consumer_name)

def process_as_consumer(consumer_name):
    """Process messages as specific consumer"""
    queue = connection.queue("work_queue")
    queue.deqOptions.consumername = consumer_name
    queue.deqOptions.wait = 30
    
    print(f"Consumer {consumer_name} starting...")
    
    while True:
        msg = queue.deqOne()
        if not msg:
            print(f"Consumer {consumer_name}: No messages, waiting...")
            continue
            
        try:
            work_item = json.loads(msg.payload.decode())
            print(f"Consumer {consumer_name} processing: {work_item['task_id']}")
            
            # Simulate work processing
            import time
            time.sleep(work_item.get('duration', 1))
            
            print(f"Consumer {consumer_name} completed: {work_item['task_id']}")
            connection.commit()
            
        except Exception as e:
            print(f"Consumer {consumer_name} error: {e}")
            connection.rollback()

Error Handling

Handle AQ-specific errors and exceptions:

try:
    # AQ operations
    msg = queue.deqOne()
    if msg:
        # Process message
        connection.commit()
        
except cx_Oracle.DatabaseError as e:
    error_obj, = e.args
    
    if error_obj.code == 25228:  # No message available
        print("No messages in queue")
    elif error_obj.code == 25235:  # Queue does not exist
        print("Queue not found")
    elif error_obj.code == 25207:  # Enqueue failed
        print("Failed to enqueue message")
    elif error_obj.code == 25237:  # Message expired
        print("Message has expired")
    else:
        print(f"AQ error {error_obj.code}: {error_obj.message}")
        
except Exception as e:
    print(f"Unexpected error: {e}")
    connection.rollback()
    
finally:
    # Ensure proper cleanup
    if connection:
        connection.rollback()  # Rollback any pending changes on error

AQ Best Practices

  1. Use appropriate delivery modes: Choose between persistent and buffered messages based on reliability requirements
  2. Implement proper exception handling: Use exception queues for failed messages
  3. Optimize message filtering: Use correlation IDs and conditions to reduce processing overhead
  4. Handle transaction boundaries: Commit or rollback appropriately after message processing
  5. Monitor queue depth: Track message accumulation to identify processing bottlenecks
  6. Use bulk operations: Leverage enqMany() and deqMany() for high-throughput scenarios
  7. Set appropriate timeouts: Configure wait times based on application requirements
  8. Implement retry logic: Handle transient failures with exponential backoff

Install with Tessl CLI

npx tessl i tessl/pypi-cx-oracle

docs

advanced-queueing.md

connections.md

cursors.md

index.md

lobs.md

notifications.md

object-types.md

session-pools.md

soda.md

tile.json