Python interface to Oracle Database with thin and thick connectivity modes
—
Oracle Advanced Queuing (AQ) provides message queuing functionality as an integral part of the Oracle Database. AQ enables applications to use the database as a message broker for reliable, persistent, and transactional message passing. The API supports both synchronous and asynchronous operations, with options for message delivery modes, priorities, and transformation functions.
Access to AQ queues through database connections with support for both synchronous and asynchronous operations.
# Access queues through connection
def queue(self, name: str, payload_type: DbObjectType = None) -> Queue:
"""
Creates and returns a queue which is used to enqueue and dequeue
messages in Advanced Queueing (AQ).
Parameters:
- name (str): Name of the queue to access
- payload_type (DbObjectType): Object type for structured payloads
Returns:
Queue: Queue object for message operations
"""
# Async version
async def queue(self, name: str, payload_type: DbObjectType = None) -> AsyncQueue: ...Synchronous queue operations for enqueuing and dequeuing messages.
class Queue:
# Properties
connection: Connection # Read-only connection object
deqoptions: DeqOptions # Dequeue options for configuration
enqoptions: EnqOptions # Enqueue options for configuration
name: str # Read-only queue name
payload_type: Union[DbObjectType, str, None] # Read-only payload type
def deqone(self) -> Union[MessageProperties, None]:
"""
Dequeues at most one message from the queue and returns it. If no
message is dequeued, None is returned.
Returns:
Union[MessageProperties, None]: Dequeued message or None
"""
def deqmany(self, max_num_messages: int) -> list:
"""
Dequeues up to the specified number of messages from the queue and
returns a list of these messages.
Parameters:
- max_num_messages (int): Maximum number of messages to dequeue
Returns:
list: List of MessageProperties objects
"""
def enqone(self, message: MessageProperties) -> None:
"""
Enqueues a single message into the queue. The message must be a message
property object which has had its payload attribute set to a value that
the queue supports.
Parameters:
- message (MessageProperties): Message to enqueue
"""
def enqmany(self, messages: list) -> None:
"""
Enqueues multiple messages into the queue. The messages parameter must
be a sequence containing message property objects which have all had
their payload attribute set to a value that the queue supports.
Parameters:
- messages (list): List of MessageProperties objects to enqueue
Warning: calling this function in parallel on different connections
acquired from the same pool may fail due to Oracle bug 29928074.
"""Asynchronous queue operations for enqueuing and dequeuing messages.
class AsyncQueue:
# Properties (same as Queue)
connection: AsyncConnection
deqoptions: DeqOptions
enqoptions: EnqOptions
name: str
payload_type: Union[DbObjectType, str, None]
async def deqone(self) -> Union[MessageProperties, None]:
"""
Dequeues at most one message from the queue and returns it. If no
message is dequeued, None is returned.
Returns:
Union[MessageProperties, None]: Dequeued message or None
"""
async def deqmany(self, max_num_messages: int) -> list:
"""
Dequeues up to the specified number of messages from the queue and
returns a list of these messages.
Parameters:
- max_num_messages (int): Maximum number of messages to dequeue
Returns:
list: List of MessageProperties objects
"""
async def enqone(self, message: MessageProperties) -> None:
"""
Enqueues a single message into the queue. The message must be a message
property object which has had its payload attribute set to a value that
the queue supports.
Parameters:
- message (MessageProperties): Message to enqueue
"""
async def enqmany(self, messages: list) -> None:
"""
Enqueues multiple messages into the queue. The messages parameter must
be a sequence containing message property objects which have all had
their payload attribute set to a value that the queue supports.
Parameters:
- messages (list): List of MessageProperties objects to enqueue
"""Configuration options for message dequeuing operations.
class DeqOptions:
condition: str # Boolean expression for message filtering
consumername: str # Consumer name for multi-consumer queues
correlation: str # Correlation identifier with pattern matching
deliverymode: int # Message delivery mode (write-only)
mode: int # Locking behavior (DEQ_BROWSE, DEQ_LOCKED, DEQ_REMOVE, DEQ_REMOVE_NODATA)
msgid: bytes # Specific message identifier to dequeue
navigation: int # Message position (DEQ_FIRST_MSG, DEQ_NEXT_MSG, DEQ_NEXT_TRANSACTION)
transformation: str # Transformation function name
visibility: int # Transaction behavior (DEQ_ON_COMMIT, DEQ_IMMEDIATE)
wait: int # Wait time in seconds (DEQ_NO_WAIT, DEQ_WAIT_FOREVER, or timeout)Configuration options for message enqueuing operations.
class EnqOptions:
deliverymode: int # Message delivery mode (MSG_PERSISTENT, MSG_BUFFERED) - write-only
transformation: str # Transformation function name
visibility: int # Transaction behavior (ENQ_ON_COMMIT, ENQ_IMMEDIATE)Properties and content of queued messages with metadata and payload management.
class MessageProperties:
# Read-only properties
attempts: int # Number of dequeue attempts made
deliverymode: int # Message delivery mode
enqtime: datetime # Time message was enqueued
msgid: bytes # Message identifier
state: int # Message state (MSG_WAITING, MSG_READY, MSG_PROCESSED, MSG_EXPIRED)
# Read-write properties
correlation: str # Correlation identifier
delay: int # Delay before message becomes available
exceptionq: str # Exception queue name
expiration: int # Message expiration time in seconds
payload: Union[bytes, str, dict, list, DbObject] # Message payload
priority: int # Message priority (lower numbers = higher priority)
recipients: list # List of recipient names for targeted delivery# Delivery Modes
MSG_PERSISTENT: int # Persistent message storage
MSG_BUFFERED: int # Buffered message storage
MSG_PERSISTENT_OR_BUFFERED: int # Either persistent or buffered
# Dequeue Modes
DEQ_BROWSE: int # Browse without locking
DEQ_LOCKED: int # Lock for update
DEQ_REMOVE: int # Remove from queue (default)
DEQ_REMOVE_NODATA: int # Remove without returning data
# Dequeue Navigation
DEQ_FIRST_MSG: int # First message
DEQ_NEXT_MSG: int # Next message (default)
DEQ_NEXT_TRANSACTION: int # Next transaction
# Dequeue Visibility
DEQ_IMMEDIATE: int # Immediate visibility
DEQ_ON_COMMIT: int # Visible on commit (default)
# Dequeue Wait Modes
DEQ_NO_WAIT: int # Don't wait for messages
DEQ_WAIT_FOREVER: int # Wait indefinitely (default)
# Enqueue Visibility
ENQ_IMMEDIATE: int # Immediate visibility
ENQ_ON_COMMIT: int # Visible on commit (default)
# Message States
MSG_EXPIRED: int # Message has expired
MSG_PROCESSED: int # Message has been processed
MSG_READY: int # Message is ready for dequeue
MSG_WAITING: int # Message is waiting
# Message Timing Constants
MSG_NO_DELAY: int # No delay for message availability
MSG_NO_EXPIRATION: int # Message never expiresimport oracledb
# Basic AQ usage
with oracledb.connect(user="user", password="pwd", dsn="localhost/orclpdb") as connection:
# Create/access a queue
queue = connection.queue("my_queue")
# Create and enqueue a message
message = MessageProperties()
message.payload = "Hello, World!"
message.priority = 1
queue.enqone(message)
# Dequeue a message
received_msg = queue.deqone()
if received_msg:
print(f"Received: {received_msg.payload}")
print(f"Priority: {received_msg.priority}")
print(f"Enqueue time: {received_msg.enqtime}")
connection.commit()
# Advanced AQ with options
with oracledb.connect(user="user", password="pwd", dsn="localhost/orclpdb") as connection:
queue = connection.queue("priority_queue")
# Configure dequeue options
queue.deqoptions.wait = 10 # Wait 10 seconds
queue.deqoptions.mode = oracledb.DEQ_REMOVE
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
# Configure enqueue options
queue.enqoptions.visibility = oracledb.ENQ_IMMEDIATE
# Enqueue multiple messages with different priorities
messages = []
for i, priority in enumerate([3, 1, 2]):
msg = MessageProperties()
msg.payload = f"Message {i+1}"
msg.priority = priority
msg.correlation = f"batch_1_msg_{i+1}"
messages.append(msg)
queue.enqmany(messages)
# Dequeue messages (will come out in priority order)
while True:
msg = queue.deqone()
if msg is None:
break
print(f"Dequeued: {msg.payload} (priority {msg.priority})")
connection.commit()
# Async AQ operations
import asyncio
async def async_aq_example():
async with oracledb.connect_async(user="user", password="pwd", dsn="localhost/orclpdb") as connection:
queue = await connection.queue("async_queue")
# Async enqueue
message = MessageProperties()
message.payload = {"type": "notification", "data": "async message"}
await queue.enqone(message)
# Async dequeue
received = await queue.deqone()
if received:
print(f"Async received: {received.payload}")
await connection.commit()
asyncio.run(async_aq_example())Install with Tessl CLI
npx tessl i tessl/pypi-oracledb