Python interface to Oracle Database implementing DB API 2.0 with Oracle-specific extensions
—
Continuous Query Notification (CQN) and Database Change Notification for real-time monitoring of database changes with callback-based event handling.
Create and manage subscriptions for database change notifications.
class Connection:
def subscribe(self, callback, sql=None, operations=None, qos=None,
timeout=0, namespace=SUBSCR_NAMESPACE_DBCHANGE,
protocol=SUBSCR_PROTO_OCI, port=0, ipAddress=None,
groupingClass=0, groupingValue=0, groupingType=0,
name=None) -> Subscription:
"""
Create subscription for database change notifications.
Parameters:
- callback: Function to call when changes occur
- sql (str): SQL statement to monitor (for CQN)
- operations (int): Operations to monitor (OPCODE_* constants)
- qos (int): Quality of service flags
- timeout (int): Subscription timeout in seconds (0 = no timeout)
- namespace (int): Notification namespace
- protocol (int): Notification protocol
- port (int): Port for notifications (0 = auto-assign)
- ipAddress (str): IP address for notifications
- groupingClass (int): Grouping class for batching
- groupingValue (int): Grouping value
- groupingType (int): Grouping type
- name (str): Subscription name
Returns:
Subscription object
"""
def unsubscribe(self, subscription: Subscription) -> None:
"""
Remove database change notification subscription.
Parameters:
- subscription: Subscription object to remove
"""Usage examples:
def change_callback(message):
"""Callback function for handling database changes"""
print(f"Database change notification received!")
print(f"Event type: {message.type}")
print(f"Database: {message.dbname}")
if message.tables:
for table in message.tables:
print(f"Table changed: {table.name}")
print(f"Operation: {table.operation}")
if table.rows:
for row in table.rows:
print(f" Row ID: {row.rowid}")
print(f" Operation: {row.operation}")
# Create basic subscription
subscription = connection.subscribe(
callback=change_callback,
operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE | cx_Oracle.OPCODE_DELETE,
timeout=3600 # 1 hour timeout
)
print(f"Subscription ID: {subscription.id}")
print("Monitoring database changes...")
# Register tables for monitoring (requires additional setup)
cursor = connection.cursor()
cursor.execute("SELECT * FROM employees") # This query will be monitored
cursor.close()
# Keep subscription active
import time
time.sleep(60) # Monitor for 1 minute
# Clean up
connection.unsubscribe(subscription)Access subscription properties and configuration options.
class Subscription:
@property
def callback(self):
"""Callback function for change notifications"""
@property
def connection(self) -> Connection:
"""Associated connection object"""
@property
def name(self) -> str:
"""Subscription name"""
@property
def namespace(self) -> int:
"""Notification namespace"""
@property
def operations(self) -> int:
"""Operations being monitored"""
@property
def port(self) -> int:
"""Notification port"""
@property
def protocol(self) -> int:
"""Notification protocol"""
@property
def qos(self) -> int:
"""Quality of service flags"""
@property
def timeout(self) -> int:
"""Subscription timeout in seconds"""
@property
def id(self) -> int:
"""Subscription ID"""Define notification namespaces for different types of events.
SUBSCR_NAMESPACE_DBCHANGE: int # Database change notifications
SUBSCR_NAMESPACE_AQ: int # Advanced Queueing notificationsControl how notifications are delivered.
SUBSCR_PROTO_OCI: int # OCI callback protocol (default)
SUBSCR_PROTO_MAIL: int # Email notifications
SUBSCR_PROTO_HTTP: int # HTTP notifications
SUBSCR_PROTO_PLSQL: int # PL/SQL server-side notificationsConfigure notification behavior and reliability.
SUBSCR_QOS_RELIABLE: int # Reliable notification delivery
SUBSCR_QOS_DEREG_NFY: int # Notify when subscription is deregistered
SUBSCR_QOS_ROWIDS: int # Include row IDs in notifications
SUBSCR_QOS_QUERY: int # Enable continuous query notification
SUBSCR_QOS_BEST_EFFORT: int # Best effort delivery (may lose notifications)Usage examples:
# Create subscription with quality of service options
reliable_subscription = connection.subscribe(
callback=change_callback,
qos=cx_Oracle.SUBSCR_QOS_RELIABLE | cx_Oracle.SUBSCR_QOS_ROWIDS,
timeout=7200 # 2 hours
)
# Create CQN subscription for specific query
cqn_subscription = connection.subscribe(
callback=change_callback,
sql="SELECT employee_id, name FROM employees WHERE department = 'IT'",
qos=cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS
)Different types of database events that can trigger notifications.
EVENT_NONE: int # No event
EVENT_STARTUP: int # Database startup
EVENT_SHUTDOWN: int # Database shutdown
EVENT_SHUTDOWN_ANY: int # Any shutdown event
EVENT_DEREG: int # Subscription deregistration
EVENT_OBJCHANGE: int # Object change (table/view)
EVENT_QUERYCHANGE: int # Query result change (CQN)
EVENT_AQ: int # Advanced Queueing eventSpecific database operations that triggered the change.
OPCODE_ALLOPS: int # All operations
OPCODE_ALLROWS: int # All rows affected
OPCODE_INSERT: int # Insert operation
OPCODE_UPDATE: int # Update operation
OPCODE_DELETE: int # Delete operation
OPCODE_ALTER: int # DDL alter operation
OPCODE_DROP: int # DDL drop operationObjects passed to callback functions containing event details.
class Message:
@property
def type(self) -> int:
"""Event type (EVENT_* constants)"""
@property
def dbname(self) -> str:
"""Database name"""
@property
def tables(self) -> list:
"""List of affected tables (MessageTable objects)"""
@property
def queries(self) -> list:
"""List of affected queries (MessageQuery objects)"""
@property
def queueName(self) -> str:
"""Queue name (for AQ events)"""
@property
def consumerName(self) -> str:
"""Consumer name (for AQ events)"""
@property
def registered(self) -> bool:
"""Whether subscription is still registered"""
class MessageTable:
@property
def name(self) -> str:
"""Table name"""
@property
def operation(self) -> int:
"""Operation type (OPCODE_* constants)"""
@property
def rows(self) -> list:
"""List of affected rows (MessageRow objects)"""
class MessageRow:
@property
def rowid(self) -> str:
"""Row ID of affected row"""
@property
def operation(self) -> int:
"""Operation type for this row"""
class MessageQuery:
@property
def id(self) -> int:
"""Query ID"""
@property
def operation(self) -> int:
"""Operation type"""
@property
def tables(self) -> list:
"""List of tables involved in query"""Monitor specific query results for changes:
def setup_cqn_monitoring():
"""Setup continuous query notification for specific data"""
def query_change_callback(message):
"""Handle query result changes"""
print("Query results changed!")
if message.queries:
for query in message.queries:
print(f"Query {query.id} changed")
print(f"Operation: {query.operation}")
# Re-execute query to get updated results
cursor = connection.cursor()
cursor.execute("SELECT * FROM employees WHERE salary > 50000")
updated_results = cursor.fetchall()
print(f"Updated query returned {len(updated_results)} rows")
cursor.close()
# Create CQN subscription for high-salary employees
cqn_sub = connection.subscribe(
callback=query_change_callback,
sql="SELECT employee_id, name, salary FROM employees WHERE salary > 50000",
qos=cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS
)
return cqn_sub
# Setup monitoring
cqn_subscription = setup_cqn_monitoring()
# Execute the monitored query to register it
cursor = connection.cursor()
cursor.execute("SELECT employee_id, name, salary FROM employees WHERE salary > 50000")
initial_results = cursor.fetchall()
print(f"Initially found {len(initial_results)} high-salary employees")
cursor.close()Monitor all changes to specific tables:
def setup_table_monitoring(table_names):
"""Monitor changes to specific tables"""
def table_change_callback(message):
"""Handle table change notifications"""
print(f"Table changes detected in database: {message.dbname}")
for table in message.tables:
print(f"\nTable: {table.name}")
operation_name = {
cx_Oracle.OPCODE_INSERT: "INSERT",
cx_Oracle.OPCODE_UPDATE: "UPDATE",
cx_Oracle.OPCODE_DELETE: "DELETE",
cx_Oracle.OPCODE_ALTER: "ALTER",
cx_Oracle.OPCODE_DROP: "DROP"
}.get(table.operation, f"Unknown({table.operation})")
print(f"Operation: {operation_name}")
if table.rows:
print(f"Affected rows: {len(table.rows)}")
for row in table.rows[:5]: # Show first 5 rows
print(f" Row ID: {row.rowid}")
if len(table.rows) > 5:
print(f" ... and {len(table.rows) - 5} more rows")
# Create subscription for table changes
table_sub = connection.subscribe(
callback=table_change_callback,
operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE | cx_Oracle.OPCODE_DELETE,
qos=cx_Oracle.SUBSCR_QOS_ROWIDS,
timeout=3600
)
# Register tables by querying them
cursor = connection.cursor()
for table_name in table_names:
try:
cursor.execute(f"SELECT 1 FROM {table_name} WHERE ROWNUM = 1")
cursor.fetchall()
print(f"Registered table: {table_name}")
except cx_Oracle.DatabaseError as e:
print(f"Could not register table {table_name}: {e}")
cursor.close()
return table_sub
# Monitor specific tables
monitored_tables = ["employees", "departments", "projects"]
table_subscription = setup_table_monitoring(monitored_tables)Group notifications to reduce callback frequency:
def setup_grouped_notifications():
"""Setup grouped notifications for better performance"""
def batch_change_callback(message):
"""Handle batched change notifications"""
print("Received batch of database changes")
# Process all tables in batch
all_changes = []
for table in message.tables:
all_changes.append({
'table': table.name,
'operation': table.operation,
'row_count': len(table.rows) if table.rows else 0
})
# Process changes in batch
print(f"Processing {len(all_changes)} table changes:")
for change in all_changes:
print(f" {change['table']}: {change['row_count']} rows")
# Create subscription with grouping
grouped_sub = connection.subscribe(
callback=batch_change_callback,
operations=cx_Oracle.OPCODE_ALLOPS,
qos=cx_Oracle.SUBSCR_QOS_RELIABLE,
groupingClass=cx_Oracle.SUBSCR_GROUPING_CLASS_TIME,
groupingValue=30, # Batch changes for 30 seconds
groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY
)
return grouped_sub
# Grouping constants
SUBSCR_GROUPING_CLASS_TIME: int = 1 # Time-based grouping
SUBSCR_GROUPING_TYPE_SUMMARY: int = 1 # Summary grouping
SUBSCR_GROUPING_TYPE_LAST: int = 2 # Last event only
grouped_subscription = setup_grouped_notifications()Use notifications to maintain cache consistency:
class DatabaseCache:
def __init__(self, connection):
self.connection = connection
self.cache = {}
self.setup_invalidation()
def setup_invalidation(self):
"""Setup cache invalidation on data changes"""
def invalidate_callback(message):
for table in message.tables:
cache_key = f"table_{table.name}"
if cache_key in self.cache:
print(f"Invalidating cache for table: {table.name}")
del self.cache[cache_key]
self.subscription = self.connection.subscribe(
callback=invalidate_callback,
operations=cx_Oracle.OPCODE_ALLOPS,
qos=cx_Oracle.SUBSCR_QOS_RELIABLE
)
# Register tables
cursor = self.connection.cursor()
cursor.execute("SELECT 1 FROM employees WHERE ROWNUM = 1")
cursor.execute("SELECT 1 FROM departments WHERE ROWNUM = 1")
cursor.close()
def get_employees(self):
"""Get employees with caching"""
cache_key = "table_employees"
if cache_key not in self.cache:
cursor = self.connection.cursor()
cursor.execute("SELECT employee_id, name FROM employees")
self.cache[cache_key] = cursor.fetchall()
cursor.close()
print("Loaded employees into cache")
return self.cache[cache_key]
def cleanup(self):
"""Clean up subscription"""
self.connection.unsubscribe(self.subscription)
# Usage
cache = DatabaseCache(connection)
employees = cache.get_employees() # Loads from database
employees = cache.get_employees() # Returns from cache
# Simulate data change (in another session)
# UPDATE employees SET name = 'Updated Name' WHERE employee_id = 1;
# COMMIT;
employees = cache.get_employees() # Reloads from database after invalidation
cache.cleanup()Use notifications to trigger application workflows:
def setup_workflow_triggers():
"""Setup event-driven workflow processing"""
def workflow_callback(message):
"""Process database changes as workflow events"""
for table in message.tables:
if table.name.upper() == "ORDERS":
if table.operation == cx_Oracle.OPCODE_INSERT:
print("New order created - triggering fulfillment workflow")
# trigger_order_fulfillment()
elif table.operation == cx_Oracle.OPCODE_UPDATE:
print("Order updated - checking status changes")
# check_order_status_changes()
elif table.name.upper() == "EMPLOYEES":
if table.operation == cx_Oracle.OPCODE_INSERT:
print("New employee added - triggering onboarding workflow")
# trigger_employee_onboarding()
workflow_sub = connection.subscribe(
callback=workflow_callback,
operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE,
qos=cx_Oracle.SUBSCR_QOS_RELIABLE
)
# Register workflow tables
cursor = connection.cursor()
cursor.execute("SELECT 1 FROM orders WHERE ROWNUM = 1")
cursor.execute("SELECT 1 FROM employees WHERE ROWNUM = 1")
cursor.close()
return workflow_sub
workflow_subscription = setup_workflow_triggers()Handle notification-related errors:
try:
subscription = connection.subscribe(
callback=change_callback,
operations=cx_Oracle.OPCODE_ALLOPS
)
except cx_Oracle.DatabaseError as e:
error_obj, = e.args
if error_obj.code == 29972: # Insufficient privileges
print("Insufficient privileges for change notification")
elif error_obj.code == 29966: # Subscription limit reached
print("Maximum number of subscriptions reached")
elif error_obj.code == 29970: # Invalid callback
print("Invalid callback function")
else:
print(f"Subscription error: {error_obj.message}")
def robust_callback(message):
"""Callback with error handling"""
try:
# Process notification
for table in message.tables:
print(f"Processing changes to {table.name}")
except Exception as e:
print(f"Error processing notification: {e}")
# Log error but don't raise to avoid breaking subscription
# Use robust callback
subscription = connection.subscribe(
callback=robust_callback,
operations=cx_Oracle.OPCODE_ALLOPS
)Install with Tessl CLI
npx tessl i tessl/pypi-cx-oracle