CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cx-oracle

Python interface to Oracle Database implementing DB API 2.0 with Oracle-specific extensions

Pending
Overview
Eval results
Files

notifications.mddocs/

Database Change Notifications

Continuous Query Notification (CQN) and Database Change Notification for real-time monitoring of database changes with callback-based event handling.

Capabilities

Subscription Management

Create and manage subscriptions for database change notifications.

class Connection:
    def subscribe(self, callback, sql=None, operations=None, qos=None, 
                 timeout=0, namespace=SUBSCR_NAMESPACE_DBCHANGE, 
                 protocol=SUBSCR_PROTO_OCI, port=0, ipAddress=None,
                 groupingClass=0, groupingValue=0, groupingType=0,
                 name=None) -> Subscription:
        """
        Create subscription for database change notifications.
        
        Parameters:
        - callback: Function to call when changes occur
        - sql (str): SQL statement to monitor (for CQN)
        - operations (int): Operations to monitor (OPCODE_* constants)
        - qos (int): Quality of service flags
        - timeout (int): Subscription timeout in seconds (0 = no timeout)
        - namespace (int): Notification namespace
        - protocol (int): Notification protocol
        - port (int): Port for notifications (0 = auto-assign)
        - ipAddress (str): IP address for notifications
        - groupingClass (int): Grouping class for batching
        - groupingValue (int): Grouping value
        - groupingType (int): Grouping type
        - name (str): Subscription name
        
        Returns:
        Subscription object
        """
        
    def unsubscribe(self, subscription: Subscription) -> None:
        """
        Remove database change notification subscription.
        
        Parameters:
        - subscription: Subscription object to remove
        """

Usage examples:

def change_callback(message):
    """Callback function for handling database changes"""
    print(f"Database change notification received!")
    print(f"Event type: {message.type}")
    print(f"Database: {message.dbname}")
    
    if message.tables:
        for table in message.tables:
            print(f"Table changed: {table.name}")
            print(f"Operation: {table.operation}")
            
            if table.rows:
                for row in table.rows:
                    print(f"  Row ID: {row.rowid}")
                    print(f"  Operation: {row.operation}")

# Create basic subscription
subscription = connection.subscribe(
    callback=change_callback,
    operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE | cx_Oracle.OPCODE_DELETE,
    timeout=3600  # 1 hour timeout
)

print(f"Subscription ID: {subscription.id}")
print("Monitoring database changes...")

# Register tables for monitoring (requires additional setup)
cursor = connection.cursor()
cursor.execute("SELECT * FROM employees")  # This query will be monitored
cursor.close()

# Keep subscription active
import time
time.sleep(60)  # Monitor for 1 minute

# Clean up
connection.unsubscribe(subscription)

Subscription Properties and Configuration

Access subscription properties and configuration options.

class Subscription:
    @property
    def callback(self):
        """Callback function for change notifications"""
        
    @property
    def connection(self) -> Connection:
        """Associated connection object"""
        
    @property
    def name(self) -> str:
        """Subscription name"""
        
    @property
    def namespace(self) -> int:
        """Notification namespace"""
        
    @property
    def operations(self) -> int:
        """Operations being monitored"""
        
    @property
    def port(self) -> int:
        """Notification port"""
        
    @property
    def protocol(self) -> int:
        """Notification protocol"""
        
    @property
    def qos(self) -> int:
        """Quality of service flags"""
        
    @property
    def timeout(self) -> int:
        """Subscription timeout in seconds"""
        
    @property
    def id(self) -> int:
        """Subscription ID"""

Namespace Constants

Define notification namespaces for different types of events.

SUBSCR_NAMESPACE_DBCHANGE: int    # Database change notifications
SUBSCR_NAMESPACE_AQ: int          # Advanced Queueing notifications

Protocol Constants

Control how notifications are delivered.

SUBSCR_PROTO_OCI: int      # OCI callback protocol (default)
SUBSCR_PROTO_MAIL: int     # Email notifications
SUBSCR_PROTO_HTTP: int     # HTTP notifications
SUBSCR_PROTO_PLSQL: int    # PL/SQL server-side notifications

Quality of Service Flags

Configure notification behavior and reliability.

SUBSCR_QOS_RELIABLE: int      # Reliable notification delivery
SUBSCR_QOS_DEREG_NFY: int     # Notify when subscription is deregistered
SUBSCR_QOS_ROWIDS: int        # Include row IDs in notifications
SUBSCR_QOS_QUERY: int         # Enable continuous query notification
SUBSCR_QOS_BEST_EFFORT: int   # Best effort delivery (may lose notifications)

Usage examples:

# Create subscription with quality of service options
reliable_subscription = connection.subscribe(
    callback=change_callback,
    qos=cx_Oracle.SUBSCR_QOS_RELIABLE | cx_Oracle.SUBSCR_QOS_ROWIDS,
    timeout=7200  # 2 hours
)

# Create CQN subscription for specific query
cqn_subscription = connection.subscribe(
    callback=change_callback,
    sql="SELECT employee_id, name FROM employees WHERE department = 'IT'",
    qos=cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS
)

Change Notification Events

Event Types

Different types of database events that can trigger notifications.

EVENT_NONE: int           # No event
EVENT_STARTUP: int        # Database startup
EVENT_SHUTDOWN: int       # Database shutdown
EVENT_SHUTDOWN_ANY: int   # Any shutdown event
EVENT_DEREG: int          # Subscription deregistration
EVENT_OBJCHANGE: int      # Object change (table/view)
EVENT_QUERYCHANGE: int    # Query result change (CQN)
EVENT_AQ: int             # Advanced Queueing event

Operation Codes

Specific database operations that triggered the change.

OPCODE_ALLOPS: int        # All operations
OPCODE_ALLROWS: int       # All rows affected
OPCODE_INSERT: int        # Insert operation
OPCODE_UPDATE: int        # Update operation
OPCODE_DELETE: int        # Delete operation
OPCODE_ALTER: int         # DDL alter operation
OPCODE_DROP: int          # DDL drop operation

Message Structure

Objects passed to callback functions containing event details.

class Message:
    @property
    def type(self) -> int:
        """Event type (EVENT_* constants)"""
        
    @property
    def dbname(self) -> str:
        """Database name"""
        
    @property
    def tables(self) -> list:
        """List of affected tables (MessageTable objects)"""
        
    @property
    def queries(self) -> list:
        """List of affected queries (MessageQuery objects)"""
        
    @property
    def queueName(self) -> str:
        """Queue name (for AQ events)"""
        
    @property
    def consumerName(self) -> str:
        """Consumer name (for AQ events)"""
        
    @property
    def registered(self) -> bool:
        """Whether subscription is still registered"""

class MessageTable:
    @property
    def name(self) -> str:
        """Table name"""
        
    @property
    def operation(self) -> int:
        """Operation type (OPCODE_* constants)"""
        
    @property
    def rows(self) -> list:
        """List of affected rows (MessageRow objects)"""

class MessageRow:
    @property
    def rowid(self) -> str:
        """Row ID of affected row"""
        
    @property
    def operation(self) -> int:
        """Operation type for this row"""

class MessageQuery:
    @property
    def id(self) -> int:
        """Query ID"""
        
    @property
    def operation(self) -> int:
        """Operation type"""
        
    @property
    def tables(self) -> list:
        """List of tables involved in query"""

Advanced Notification Patterns

Continuous Query Notification (CQN)

Monitor specific query results for changes:

def setup_cqn_monitoring():
    """Setup continuous query notification for specific data"""
    
    def query_change_callback(message):
        """Handle query result changes"""
        print("Query results changed!")
        
        if message.queries:
            for query in message.queries:
                print(f"Query {query.id} changed")
                print(f"Operation: {query.operation}")
                
                # Re-execute query to get updated results
                cursor = connection.cursor()
                cursor.execute("SELECT * FROM employees WHERE salary > 50000")
                updated_results = cursor.fetchall()
                print(f"Updated query returned {len(updated_results)} rows")
                cursor.close()
    
    # Create CQN subscription for high-salary employees
    cqn_sub = connection.subscribe(
        callback=query_change_callback,
        sql="SELECT employee_id, name, salary FROM employees WHERE salary > 50000",
        qos=cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS
    )
    
    return cqn_sub

# Setup monitoring
cqn_subscription = setup_cqn_monitoring()

# Execute the monitored query to register it
cursor = connection.cursor()
cursor.execute("SELECT employee_id, name, salary FROM employees WHERE salary > 50000")
initial_results = cursor.fetchall()
print(f"Initially found {len(initial_results)} high-salary employees")
cursor.close()

Table-Level Change Monitoring

Monitor all changes to specific tables:

def setup_table_monitoring(table_names):
    """Monitor changes to specific tables"""
    
    def table_change_callback(message):
        """Handle table change notifications"""
        print(f"Table changes detected in database: {message.dbname}")
        
        for table in message.tables:
            print(f"\nTable: {table.name}")
            
            operation_name = {
                cx_Oracle.OPCODE_INSERT: "INSERT",
                cx_Oracle.OPCODE_UPDATE: "UPDATE", 
                cx_Oracle.OPCODE_DELETE: "DELETE",
                cx_Oracle.OPCODE_ALTER: "ALTER",
                cx_Oracle.OPCODE_DROP: "DROP"
            }.get(table.operation, f"Unknown({table.operation})")
            
            print(f"Operation: {operation_name}")
            
            if table.rows:
                print(f"Affected rows: {len(table.rows)}")
                for row in table.rows[:5]:  # Show first 5 rows
                    print(f"  Row ID: {row.rowid}")
                    
                if len(table.rows) > 5:
                    print(f"  ... and {len(table.rows) - 5} more rows")
    
    # Create subscription for table changes
    table_sub = connection.subscribe(
        callback=table_change_callback,
        operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE | cx_Oracle.OPCODE_DELETE,
        qos=cx_Oracle.SUBSCR_QOS_ROWIDS,
        timeout=3600
    )
    
    # Register tables by querying them
    cursor = connection.cursor()
    for table_name in table_names:
        try:
            cursor.execute(f"SELECT 1 FROM {table_name} WHERE ROWNUM = 1")
            cursor.fetchall()
            print(f"Registered table: {table_name}")
        except cx_Oracle.DatabaseError as e:
            print(f"Could not register table {table_name}: {e}")
    cursor.close()
    
    return table_sub

# Monitor specific tables
monitored_tables = ["employees", "departments", "projects"]
table_subscription = setup_table_monitoring(monitored_tables)

Subscription Grouping and Batching

Group notifications to reduce callback frequency:

def setup_grouped_notifications():
    """Setup grouped notifications for better performance"""
    
    def batch_change_callback(message):
        """Handle batched change notifications"""
        print("Received batch of database changes")
        
        # Process all tables in batch
        all_changes = []
        for table in message.tables:
            all_changes.append({
                'table': table.name,
                'operation': table.operation,
                'row_count': len(table.rows) if table.rows else 0
            })
        
        # Process changes in batch
        print(f"Processing {len(all_changes)} table changes:")
        for change in all_changes:
            print(f"  {change['table']}: {change['row_count']} rows")
    
    # Create subscription with grouping
    grouped_sub = connection.subscribe(
        callback=batch_change_callback,
        operations=cx_Oracle.OPCODE_ALLOPS,
        qos=cx_Oracle.SUBSCR_QOS_RELIABLE,
        groupingClass=cx_Oracle.SUBSCR_GROUPING_CLASS_TIME,
        groupingValue=30,  # Batch changes for 30 seconds
        groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY
    )
    
    return grouped_sub

# Grouping constants
SUBSCR_GROUPING_CLASS_TIME: int = 1    # Time-based grouping
SUBSCR_GROUPING_TYPE_SUMMARY: int = 1  # Summary grouping
SUBSCR_GROUPING_TYPE_LAST: int = 2     # Last event only

grouped_subscription = setup_grouped_notifications()

Notification Application Patterns

Real-time Cache Invalidation

Use notifications to maintain cache consistency:

class DatabaseCache:
    def __init__(self, connection):
        self.connection = connection
        self.cache = {}
        self.setup_invalidation()
    
    def setup_invalidation(self):
        """Setup cache invalidation on data changes"""
        def invalidate_callback(message):
            for table in message.tables:
                cache_key = f"table_{table.name}"
                if cache_key in self.cache:
                    print(f"Invalidating cache for table: {table.name}")
                    del self.cache[cache_key]
        
        self.subscription = self.connection.subscribe(
            callback=invalidate_callback,
            operations=cx_Oracle.OPCODE_ALLOPS,
            qos=cx_Oracle.SUBSCR_QOS_RELIABLE
        )
        
        # Register tables
        cursor = self.connection.cursor()
        cursor.execute("SELECT 1 FROM employees WHERE ROWNUM = 1")
        cursor.execute("SELECT 1 FROM departments WHERE ROWNUM = 1")
        cursor.close()
    
    def get_employees(self):
        """Get employees with caching"""
        cache_key = "table_employees"
        
        if cache_key not in self.cache:
            cursor = self.connection.cursor()
            cursor.execute("SELECT employee_id, name FROM employees")
            self.cache[cache_key] = cursor.fetchall()
            cursor.close()
            print("Loaded employees into cache")
        
        return self.cache[cache_key]
    
    def cleanup(self):
        """Clean up subscription"""
        self.connection.unsubscribe(self.subscription)

# Usage
cache = DatabaseCache(connection)
employees = cache.get_employees()  # Loads from database
employees = cache.get_employees()  # Returns from cache

# Simulate data change (in another session)
# UPDATE employees SET name = 'Updated Name' WHERE employee_id = 1;
# COMMIT;

employees = cache.get_employees()  # Reloads from database after invalidation
cache.cleanup()

Event-Driven Processing

Use notifications to trigger application workflows:

def setup_workflow_triggers():
    """Setup event-driven workflow processing"""
    
    def workflow_callback(message):
        """Process database changes as workflow events"""
        for table in message.tables:
            if table.name.upper() == "ORDERS":
                if table.operation == cx_Oracle.OPCODE_INSERT:
                    print("New order created - triggering fulfillment workflow")
                    # trigger_order_fulfillment()
                    
                elif table.operation == cx_Oracle.OPCODE_UPDATE:
                    print("Order updated - checking status changes")
                    # check_order_status_changes()
                    
            elif table.name.upper() == "EMPLOYEES":
                if table.operation == cx_Oracle.OPCODE_INSERT:
                    print("New employee added - triggering onboarding workflow")
                    # trigger_employee_onboarding()
    
    workflow_sub = connection.subscribe(
        callback=workflow_callback,
        operations=cx_Oracle.OPCODE_INSERT | cx_Oracle.OPCODE_UPDATE,
        qos=cx_Oracle.SUBSCR_QOS_RELIABLE
    )
    
    # Register workflow tables
    cursor = connection.cursor()
    cursor.execute("SELECT 1 FROM orders WHERE ROWNUM = 1")
    cursor.execute("SELECT 1 FROM employees WHERE ROWNUM = 1")
    cursor.close()
    
    return workflow_sub

workflow_subscription = setup_workflow_triggers()

Error Handling and Troubleshooting

Handle notification-related errors:

try:
    subscription = connection.subscribe(
        callback=change_callback,
        operations=cx_Oracle.OPCODE_ALLOPS
    )
    
except cx_Oracle.DatabaseError as e:
    error_obj, = e.args
    
    if error_obj.code == 29972:  # Insufficient privileges
        print("Insufficient privileges for change notification")
    elif error_obj.code == 29966:  # Subscription limit reached
        print("Maximum number of subscriptions reached")
    elif error_obj.code == 29970:  # Invalid callback
        print("Invalid callback function")
    else:
        print(f"Subscription error: {error_obj.message}")

def robust_callback(message):
    """Callback with error handling"""
    try:
        # Process notification
        for table in message.tables:
            print(f"Processing changes to {table.name}")
            
    except Exception as e:
        print(f"Error processing notification: {e}")
        # Log error but don't raise to avoid breaking subscription

# Use robust callback
subscription = connection.subscribe(
    callback=robust_callback,
    operations=cx_Oracle.OPCODE_ALLOPS
)

Notification Best Practices

  1. Handle callback errors gracefully: Don't let exceptions in callbacks break subscriptions
  2. Use appropriate QoS settings: Choose between reliability and performance based on requirements
  3. Monitor subscription health: Check subscription.registered status periodically
  4. Clean up subscriptions: Always unsubscribe when done to free resources
  5. Batch related operations: Use grouping to reduce callback frequency for high-volume changes
  6. Test with realistic data volumes: Ensure callbacks can handle expected notification rates
  7. Use connection pooling carefully: Subscriptions are tied to specific connections
  8. Consider security implications: Notifications can reveal data access patterns

Install with Tessl CLI

npx tessl i tessl/pypi-cx-oracle

docs

advanced-queueing.md

connections.md

cursors.md

index.md

lobs.md

notifications.md

object-types.md

session-pools.md

soda.md

tile.json