CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-oracledb

Python interface to Oracle Database with thin and thick connectivity modes

Pending
Overview
Eval results
Files

advanced-queuing.mddocs/

Advanced Queuing (AQ)

Oracle Advanced Queuing (AQ) provides message queuing functionality as an integral part of the Oracle Database. AQ enables applications to use the database as a message broker for reliable, persistent, and transactional message passing. The API supports both synchronous and asynchronous operations, with options for message delivery modes, priorities, and transformation functions.

Capabilities

Queue Access

Access to AQ queues through database connections with support for both synchronous and asynchronous operations.

# Access queues through connection
def queue(self, name: str, payload_type: DbObjectType = None) -> Queue:
    """
    Creates and returns a queue which is used to enqueue and dequeue
    messages in Advanced Queueing (AQ).

    Parameters:
    - name (str): Name of the queue to access
    - payload_type (DbObjectType): Object type for structured payloads

    Returns:
    Queue: Queue object for message operations
    """

# Async version
async def queue(self, name: str, payload_type: DbObjectType = None) -> AsyncQueue: ...

Queue Class

Synchronous queue operations for enqueuing and dequeuing messages.

class Queue:
    # Properties
    connection: Connection  # Read-only connection object
    deqoptions: DeqOptions  # Dequeue options for configuration
    enqoptions: EnqOptions  # Enqueue options for configuration
    name: str  # Read-only queue name
    payload_type: Union[DbObjectType, str, None]  # Read-only payload type

    def deqone(self) -> Union[MessageProperties, None]:
        """
        Dequeues at most one message from the queue and returns it. If no
        message is dequeued, None is returned.

        Returns:
        Union[MessageProperties, None]: Dequeued message or None
        """

    def deqmany(self, max_num_messages: int) -> list:
        """
        Dequeues up to the specified number of messages from the queue and
        returns a list of these messages.

        Parameters:
        - max_num_messages (int): Maximum number of messages to dequeue

        Returns:
        list: List of MessageProperties objects
        """

    def enqone(self, message: MessageProperties) -> None:
        """
        Enqueues a single message into the queue. The message must be a message
        property object which has had its payload attribute set to a value that
        the queue supports.

        Parameters:
        - message (MessageProperties): Message to enqueue
        """

    def enqmany(self, messages: list) -> None:
        """
        Enqueues multiple messages into the queue. The messages parameter must
        be a sequence containing message property objects which have all had
        their payload attribute set to a value that the queue supports.

        Parameters:
        - messages (list): List of MessageProperties objects to enqueue

        Warning: calling this function in parallel on different connections
        acquired from the same pool may fail due to Oracle bug 29928074.
        """

Async Queue Class

Asynchronous queue operations for enqueuing and dequeuing messages.

class AsyncQueue:
    # Properties (same as Queue)
    connection: AsyncConnection
    deqoptions: DeqOptions
    enqoptions: EnqOptions
    name: str
    payload_type: Union[DbObjectType, str, None]

    async def deqone(self) -> Union[MessageProperties, None]:
        """
        Dequeues at most one message from the queue and returns it. If no
        message is dequeued, None is returned.

        Returns:
        Union[MessageProperties, None]: Dequeued message or None
        """

    async def deqmany(self, max_num_messages: int) -> list:
        """
        Dequeues up to the specified number of messages from the queue and
        returns a list of these messages.

        Parameters:
        - max_num_messages (int): Maximum number of messages to dequeue

        Returns:
        list: List of MessageProperties objects
        """

    async def enqone(self, message: MessageProperties) -> None:
        """
        Enqueues a single message into the queue. The message must be a message
        property object which has had its payload attribute set to a value that
        the queue supports.

        Parameters:
        - message (MessageProperties): Message to enqueue
        """

    async def enqmany(self, messages: list) -> None:
        """
        Enqueues multiple messages into the queue. The messages parameter must
        be a sequence containing message property objects which have all had
        their payload attribute set to a value that the queue supports.

        Parameters:
        - messages (list): List of MessageProperties objects to enqueue
        """

Dequeue Options

Configuration options for message dequeuing operations.

class DeqOptions:
    condition: str  # Boolean expression for message filtering
    consumername: str  # Consumer name for multi-consumer queues
    correlation: str  # Correlation identifier with pattern matching
    deliverymode: int  # Message delivery mode (write-only)
    mode: int  # Locking behavior (DEQ_BROWSE, DEQ_LOCKED, DEQ_REMOVE, DEQ_REMOVE_NODATA)
    msgid: bytes  # Specific message identifier to dequeue
    navigation: int  # Message position (DEQ_FIRST_MSG, DEQ_NEXT_MSG, DEQ_NEXT_TRANSACTION)
    transformation: str  # Transformation function name
    visibility: int  # Transaction behavior (DEQ_ON_COMMIT, DEQ_IMMEDIATE)
    wait: int  # Wait time in seconds (DEQ_NO_WAIT, DEQ_WAIT_FOREVER, or timeout)

Enqueue Options

Configuration options for message enqueuing operations.

class EnqOptions:
    deliverymode: int  # Message delivery mode (MSG_PERSISTENT, MSG_BUFFERED) - write-only
    transformation: str  # Transformation function name
    visibility: int  # Transaction behavior (ENQ_ON_COMMIT, ENQ_IMMEDIATE)

Message Properties

Properties and content of queued messages with metadata and payload management.

class MessageProperties:
    # Read-only properties
    attempts: int  # Number of dequeue attempts made
    deliverymode: int  # Message delivery mode
    enqtime: datetime  # Time message was enqueued
    msgid: bytes  # Message identifier
    state: int  # Message state (MSG_WAITING, MSG_READY, MSG_PROCESSED, MSG_EXPIRED)

    # Read-write properties
    correlation: str  # Correlation identifier
    delay: int  # Delay before message becomes available
    exceptionq: str  # Exception queue name
    expiration: int  # Message expiration time in seconds
    payload: Union[bytes, str, dict, list, DbObject]  # Message payload
    priority: int  # Message priority (lower numbers = higher priority)
    recipients: list  # List of recipient names for targeted delivery

Constants

# Delivery Modes
MSG_PERSISTENT: int  # Persistent message storage
MSG_BUFFERED: int  # Buffered message storage
MSG_PERSISTENT_OR_BUFFERED: int  # Either persistent or buffered

# Dequeue Modes
DEQ_BROWSE: int  # Browse without locking
DEQ_LOCKED: int  # Lock for update
DEQ_REMOVE: int  # Remove from queue (default)
DEQ_REMOVE_NODATA: int  # Remove without returning data

# Dequeue Navigation
DEQ_FIRST_MSG: int  # First message
DEQ_NEXT_MSG: int  # Next message (default)
DEQ_NEXT_TRANSACTION: int  # Next transaction

# Dequeue Visibility
DEQ_IMMEDIATE: int  # Immediate visibility
DEQ_ON_COMMIT: int  # Visible on commit (default)

# Dequeue Wait Modes
DEQ_NO_WAIT: int  # Don't wait for messages
DEQ_WAIT_FOREVER: int  # Wait indefinitely (default)

# Enqueue Visibility
ENQ_IMMEDIATE: int  # Immediate visibility
ENQ_ON_COMMIT: int  # Visible on commit (default)

# Message States
MSG_EXPIRED: int  # Message has expired
MSG_PROCESSED: int  # Message has been processed
MSG_READY: int  # Message is ready for dequeue
MSG_WAITING: int  # Message is waiting

# Message Timing Constants
MSG_NO_DELAY: int  # No delay for message availability
MSG_NO_EXPIRATION: int  # Message never expires

Usage Examples

import oracledb

# Basic AQ usage
with oracledb.connect(user="user", password="pwd", dsn="localhost/orclpdb") as connection:
    # Create/access a queue
    queue = connection.queue("my_queue")
    
    # Create and enqueue a message
    message = MessageProperties()
    message.payload = "Hello, World!"
    message.priority = 1
    queue.enqone(message)
    
    # Dequeue a message
    received_msg = queue.deqone()
    if received_msg:
        print(f"Received: {received_msg.payload}")
        print(f"Priority: {received_msg.priority}")
        print(f"Enqueue time: {received_msg.enqtime}")
    
    connection.commit()

# Advanced AQ with options
with oracledb.connect(user="user", password="pwd", dsn="localhost/orclpdb") as connection:
    queue = connection.queue("priority_queue")
    
    # Configure dequeue options
    queue.deqoptions.wait = 10  # Wait 10 seconds
    queue.deqoptions.mode = oracledb.DEQ_REMOVE
    queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
    
    # Configure enqueue options  
    queue.enqoptions.visibility = oracledb.ENQ_IMMEDIATE
    
    # Enqueue multiple messages with different priorities
    messages = []
    for i, priority in enumerate([3, 1, 2]):
        msg = MessageProperties()
        msg.payload = f"Message {i+1}"
        msg.priority = priority
        msg.correlation = f"batch_1_msg_{i+1}"
        messages.append(msg)
    
    queue.enqmany(messages)
    
    # Dequeue messages (will come out in priority order)
    while True:
        msg = queue.deqone()
        if msg is None:
            break
        print(f"Dequeued: {msg.payload} (priority {msg.priority})")
    
    connection.commit()

# Async AQ operations
import asyncio

async def async_aq_example():
    async with oracledb.connect_async(user="user", password="pwd", dsn="localhost/orclpdb") as connection:
        queue = await connection.queue("async_queue")
        
        # Async enqueue
        message = MessageProperties()
        message.payload = {"type": "notification", "data": "async message"}
        await queue.enqone(message)
        
        # Async dequeue
        received = await queue.deqone()  
        if received:
            print(f"Async received: {received.payload}")
        
        await connection.commit()

asyncio.run(async_aq_example())

Install with Tessl CLI

npx tessl i tessl/pypi-oracledb

docs

advanced-queuing.md

connection-pooling.md

connectivity.md

data-types.md

database-objects.md

index.md

lobs.md

pipeline.md

soda.md

sql-execution.md

subscriptions.md

tile.json