Python interface to Oracle Database with thin and thick connectivity modes
—
Subscribe to database events including object changes, query result changes, and Advanced Queuing (AQ) messages for real-time notifications. Database event subscriptions enable applications to receive notifications when specific database changes occur, allowing for reactive programming patterns and real-time data synchronization.
Create subscriptions for various types of database events with flexible notification protocols.
# Subscription creation through Connection.subscribe()
def subscribe(
self,
namespace=SUBSCR_NAMESPACE_DBCHANGE,
protocol=SUBSCR_PROTO_CALLBACK,
callback=None,
timeout=0,
operations=OPCODE_ALLOPS,
port=0,
qos=0,
ip_address=None,
grouping_class=SUBSCR_GROUPING_CLASS_NONE,
grouping_value=0,
grouping_type=SUBSCR_GROUPING_TYPE_SUMMARY,
name=None,
client_initiated=False,
recipient_name=None
) -> Subscription:
"""
Create a subscription for database event notifications.
Parameters:
- namespace (int): Subscription namespace (SUBSCR_NAMESPACE_DBCHANGE, SUBSCR_NAMESPACE_AQ)
- protocol (int): Notification protocol (SUBSCR_PROTO_CALLBACK, SUBSCR_PROTO_HTTP, etc.)
- callback (callable): Callback function for notifications
- timeout (int): Subscription timeout in seconds (0 for no timeout)
- operations (int): Database operations to monitor
- port (int): Port for HTTP notifications
- qos (int): Quality of service flags
- ip_address (str): IP address for notifications
- grouping_class (int): Grouping class for notifications
- grouping_value (int): Grouping value
- grouping_type (int): Grouping type
- name (str): Subscription name
- client_initiated (bool): Client-initiated subscription
- recipient_name (str): Recipient name for notifications
Returns:
Subscription object
"""Classes representing different types of database event messages.
class Message:
"""Database event notification message."""
# Properties
type: int # Event type (EVENT_OBJCHANGE, EVENT_QUERYCHANGE, etc.)
dbname: str # Database name
tables: list # List of MessageTable objects
queries: list # List of MessageQuery objects
consumer_name: str # AQ consumer name
queue_name: str # AQ queue name
subscription: object # Subscription object that received the message
class MessageTable:
"""Table change notification details."""
# Properties
name: str # Table name
operation: int # Operation type (OPCODE_INSERT, OPCODE_UPDATE, etc.)
rows: list # List of MessageRow objects
class MessageRow:
"""Row change notification details."""
# Properties
operation: int # Operation type
rowid: str # Row identifier
class MessageQuery:
"""Query change notification details."""
# Properties
id: int # Query ID
operation: int # Operation type
queryctx: object # Query context
tables: list # List of affected MessageTable objectsConstants for configuring subscription behavior and identifying event types.
# Subscription Namespaces
SUBSCR_NAMESPACE_DBCHANGE: int # Database change notifications
SUBSCR_NAMESPACE_AQ: int # Advanced Queuing notifications
# Subscription Protocols
SUBSCR_PROTO_CALLBACK: int # Python callback function
SUBSCR_PROTO_HTTP: int # HTTP notifications
SUBSCR_PROTO_MAIL: int # Email notifications
SUBSCR_PROTO_SERVER: int # Server-to-server notifications
# Quality of Service
SUBSCR_QOS_DEFAULT: int # Default QoS
SUBSCR_QOS_RELIABLE: int # Reliable delivery
SUBSCR_QOS_BEST_EFFORT: int # Best effort delivery
SUBSCR_QOS_DEREG_NFY: int # Deregistration notification
SUBSCR_QOS_ROWIDS: int # Include row IDs
SUBSCR_QOS_QUERY: int # Query change notification
# Grouping Classes
SUBSCR_GROUPING_CLASS_NONE: int # No grouping
SUBSCR_GROUPING_CLASS_TIME: int # Time-based grouping
# Grouping Types
SUBSCR_GROUPING_TYPE_SUMMARY: int # Summary notifications
SUBSCR_GROUPING_TYPE_LAST: int # Last notification only
# Event Types
EVENT_NONE: int # No event
EVENT_STARTUP: int # Database startup
EVENT_SHUTDOWN: int # Database shutdown
EVENT_SHUTDOWN_ANY: int # Any shutdown
EVENT_DEREG: int # Deregistration
EVENT_OBJCHANGE: int # Object change
EVENT_QUERYCHANGE: int # Query result change
EVENT_AQ: int # Advanced Queuing
# Operation Codes
OPCODE_ALLOPS: int # All operations
OPCODE_ALLROWS: int # All rows
OPCODE_INSERT: int # Insert operations
OPCODE_UPDATE: int # Update operations
OPCODE_DELETE: int # Delete operations
OPCODE_ALTER: int # Alter operations
OPCODE_DROP: int # Drop operationsimport oracledb
import time
def notification_callback(message):
"""Callback function for database change notifications."""
print(f"Received notification: Type={message.type}, DB={message.dbname}")
for table in message.tables:
print(f" Table: {table.name}, Operation: {table.operation}")
for row in table.rows:
print(f" Row: {row.rowid}, Operation: {row.operation}")
# Connect to database
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
# Create subscription for database changes
subscription = connection.subscribe(
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
callback=notification_callback,
timeout=300, # 5 minutes
operations=oracledb.OPCODE_ALLOPS,
qos=oracledb.SUBSCR_QOS_ROWIDS
)
print(f"Created subscription with ID: {subscription.id}")
# Register queries for change notification
with connection.cursor() as cursor:
# Register interest in employees table changes
cursor.execute("SELECT employee_id, first_name, last_name FROM employees WHERE department_id = 10")
cursor.fetchall() # Consume results to register query
# Register interest in departments table
cursor.execute("SELECT department_id, department_name FROM departments")
cursor.fetchall()
print("Subscriptions registered. Making changes to trigger notifications...")
# Make changes to trigger notifications
with connection.cursor() as cursor:
cursor.execute("""
UPDATE employees SET salary = salary * 1.1 WHERE department_id = 10
""")
connection.commit()
cursor.execute("""
INSERT INTO employees (employee_id, first_name, last_name, department_id)
VALUES (9999, 'Test', 'Employee', 10)
""")
connection.commit()
# Wait for notifications
print("Waiting for notifications...")
time.sleep(10)
# Clean up
connection.close()import oracledb
import threading
import time
# Global flag to control notification processing
processing_notifications = True
def query_change_callback(message):
"""Handle query change notifications."""
global processing_notifications
if not processing_notifications:
return
print(f"Query change notification received:")
print(f" Event type: {message.type}")
print(f" Database: {message.dbname}")
for query in message.queries:
print(f" Query ID: {query.id}")
print(f" Operation: {query.operation}")
for table in query.tables:
print(f" Affected table: {table.name}")
print(f" Table operation: {table.operation}")
print(f" Affected rows: {len(table.rows)}")
def notification_thread():
"""Run in separate thread to handle notifications."""
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
try:
# Create subscription for query changes
subscription = connection.subscribe(
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
callback=query_change_callback,
timeout=0, # No timeout
qos=oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_RELIABLE
)
print(f"Query subscription created: {subscription.id}")
# Register queries to monitor
with connection.cursor() as cursor:
# Monitor high-salary employees
cursor.execute("""
SELECT employee_id, first_name, last_name, salary
FROM employees
WHERE salary > 50000
""")
cursor.fetchall()
# Monitor recent hires
cursor.execute("""
SELECT employee_id, first_name, hire_date
FROM employees
WHERE hire_date >= SYSDATE - 30
""")
cursor.fetchall()
print("Query monitoring active. Waiting for changes...")
# Keep connection alive for notifications
while processing_notifications:
time.sleep(1)
finally:
connection.close()
# Start notification thread
notification_thread = threading.Thread(target=notification_thread)
notification_thread.daemon = True
notification_thread.start()
# Give subscription time to initialize
time.sleep(2)
# Main thread: make changes to trigger notifications
main_connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
with main_connection.cursor() as cursor:
print("Making changes to trigger query notifications...")
# Change that affects high-salary query
cursor.execute("""
UPDATE employees SET salary = 55000
WHERE employee_id = (SELECT MIN(employee_id) FROM employees WHERE salary < 50000)
""")
main_connection.commit()
time.sleep(2)
# Insert new employee (affects recent hires query)
cursor.execute("""
INSERT INTO employees (employee_id, first_name, last_name, hire_date, salary, department_id)
VALUES (8888, 'Recent', 'Hire', SYSDATE, 45000, 10)
""")
main_connection.commit()
# Wait for notifications
print("Waiting for notifications...")
time.sleep(5)
# Clean up
processing_notifications = False
main_connection.close()import oracledb
import time
def aq_notification_callback(message):
"""Handle Advanced Queuing notifications."""
print(f"AQ Notification received:")
print(f" Event type: {message.type}")
print(f" Queue: {message.queue_name}")
print(f" Consumer: {message.consumer_name}")
print(f" Database: {message.dbname}")
# Connect to database
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
# Set up Advanced Queuing infrastructure
with connection.cursor() as cursor:
try:
# Create queue table
cursor.execute("""
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'my_queue_table',
queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE'
);
END;
""")
# Create queue
cursor.execute("""
BEGIN
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'my_notification_queue',
queue_table => 'my_queue_table'
);
END;
""")
# Start queue
cursor.execute("""
BEGIN
DBMS_AQADM.START_QUEUE('my_notification_queue');
END;
""")
connection.commit()
except oracledb.DatabaseError as e:
if "ORA-00955" in str(e): # Object already exists
print("Queue infrastructure already exists")
else:
raise
# Create AQ subscription
subscription = connection.subscribe(
namespace=oracledb.SUBSCR_NAMESPACE_AQ,
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
callback=aq_notification_callback,
name="my_notification_queue",
timeout=300
)
print(f"AQ subscription created: {subscription.id}")
# Enqueue messages to trigger notifications
with connection.cursor() as cursor:
# Enqueue a message
cursor.execute("""
DECLARE
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW(16);
message SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
message := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
message.set_text('Hello from AQ notification test!');
DBMS_AQ.ENQUEUE(
queue_name => 'my_notification_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
END;
""")
connection.commit()
print("Message enqueued. Waiting for notification...")
time.sleep(5)
connection.close()import oracledb
import time
class SubscriptionManager:
"""Manage multiple database subscriptions."""
def __init__(self, connection):
self.connection = connection
self.subscriptions = {}
self.active = True
def create_table_subscription(self, table_name, callback):
"""Create subscription for specific table changes."""
def table_callback(message):
# Filter messages for specific table
for table in message.tables:
if table.name.upper() == table_name.upper():
callback(message, table)
subscription = self.connection.subscribe(
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
callback=table_callback,
timeout=0,
operations=oracledb.OPCODE_ALLOPS,
qos=oracledb.SUBSCR_QOS_ROWIDS | oracledb.SUBSCR_QOS_RELIABLE
)
# Register interest in table
with self.connection.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_name} WHERE ROWNUM <= 1")
cursor.fetchall()
self.subscriptions[table_name] = subscription
return subscription.id
def create_query_subscription(self, query, callback):
"""Create subscription for query result changes."""
subscription = self.connection.subscribe(
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
callback=callback,
timeout=0,
qos=oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_RELIABLE
)
# Register query
with self.connection.cursor() as cursor:
cursor.execute(query)
cursor.fetchall()
query_id = f"query_{subscription.id}"
self.subscriptions[query_id] = subscription
return subscription.id
def cleanup(self):
"""Clean up all subscriptions."""
self.active = False
# Subscriptions are automatically cleaned up when connection closes
# Usage example
def employee_change_handler(message, table):
"""Handle employee table changes."""
print(f"Employee table changed: {table.operation}")
print(f" Affected rows: {len(table.rows)}")
def high_salary_query_handler(message):
"""Handle high salary query changes."""
print("High salary employees query results changed")
for query in message.queries:
print(f" Query {query.id} affected {len(query.tables)} tables")
# Set up subscription manager
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
manager = SubscriptionManager(connection)
# Create subscriptions
emp_sub_id = manager.create_table_subscription("employees", employee_change_handler)
query_sub_id = manager.create_query_subscription(
"SELECT * FROM employees WHERE salary > 75000",
high_salary_query_handler
)
print(f"Created subscriptions: Employee table={emp_sub_id}, High salary query={query_sub_id}")
# Make changes to trigger notifications
with connection.cursor() as cursor:
cursor.execute("UPDATE employees SET salary = 80000 WHERE employee_id = 100")
connection.commit()
cursor.execute("INSERT INTO employees (employee_id, first_name, last_name, salary, department_id) VALUES (7777, 'High', 'Earner', 85000, 10)")
connection.commit()
print("Changes made. Waiting for notifications...")
time.sleep(5)
# Cleanup
manager.cleanup()
connection.close()import oracledb
import time
def robust_notification_callback(message):
"""Notification callback with error handling."""
try:
print(f"Processing notification: {message.type}")
# Process tables
for table in message.tables:
print(f" Table: {table.name}")
# Validate table operations
if table.operation in [oracledb.OPCODE_INSERT, oracledb.OPCODE_UPDATE, oracledb.OPCODE_DELETE]:
print(f" Valid operation: {table.operation}")
# Process rows safely
for row in table.rows[:10]: # Limit processing to avoid overload
if hasattr(row, 'rowid') and row.rowid:
print(f" Row: {row.rowid}")
else:
print(f" Unexpected operation: {table.operation}")
# Process queries
for query in message.queries:
print(f" Query ID: {query.id}")
except Exception as e:
print(f"Error in notification callback: {e}")
# Log error but don't raise to avoid breaking notification system
def create_resilient_subscription(connection, max_retries=3):
"""Create subscription with retry logic."""
for attempt in range(max_retries):
try:
subscription = connection.subscribe(
namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
protocol=oracledb.SUBSCR_PROTO_CALLBACK,
callback=robust_notification_callback,
timeout=300,
operations=oracledb.OPCODE_ALLOPS,
qos=oracledb.SUBSCR_QOS_RELIABLE
)
print(f"Subscription created successfully on attempt {attempt + 1}")
return subscription
except oracledb.DatabaseError as e:
print(f"Subscription attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2) # Wait before retry
# Create resilient subscription
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
try:
subscription = create_resilient_subscription(connection)
# Register queries with error handling
try:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM employees WHERE department_id <= 50")
cursor.fetchall()
print("Query registered successfully")
except oracledb.DatabaseError as e:
print(f"Query registration failed: {e}")
# Test notifications
with connection.cursor() as cursor:
cursor.execute("UPDATE employees SET salary = salary + 1 WHERE ROWNUM <= 1")
connection.commit()
print("Waiting for notifications...")
time.sleep(5)
except oracledb.DatabaseError as e:
print(f"Subscription setup failed: {e}")
finally:
connection.close()Install with Tessl CLI
npx tessl i tessl/pypi-oracledb