CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-oracledb

Python interface to Oracle Database with thin and thick connectivity modes

Pending
Overview
Eval results
Files

subscriptions.mddocs/

Event Subscriptions

Subscribe to database events including object changes, query result changes, and Advanced Queuing (AQ) messages for real-time notifications. Database event subscriptions enable applications to receive notifications when specific database changes occur, allowing for reactive programming patterns and real-time data synchronization.

Capabilities

Subscription Creation

Create subscriptions for various types of database events with flexible notification protocols.

# Subscription creation through Connection.subscribe()
def subscribe(
    self,
    namespace=SUBSCR_NAMESPACE_DBCHANGE,
    protocol=SUBSCR_PROTO_CALLBACK,
    callback=None,
    timeout=0,
    operations=OPCODE_ALLOPS,
    port=0,
    qos=0,
    ip_address=None,
    grouping_class=SUBSCR_GROUPING_CLASS_NONE,
    grouping_value=0,
    grouping_type=SUBSCR_GROUPING_TYPE_SUMMARY,
    name=None,
    client_initiated=False,
    recipient_name=None
) -> Subscription:
    """
    Create a subscription for database event notifications.
    
    Parameters:
    - namespace (int): Subscription namespace (SUBSCR_NAMESPACE_DBCHANGE, SUBSCR_NAMESPACE_AQ)
    - protocol (int): Notification protocol (SUBSCR_PROTO_CALLBACK, SUBSCR_PROTO_HTTP, etc.)
    - callback (callable): Callback function for notifications
    - timeout (int): Subscription timeout in seconds (0 for no timeout)
    - operations (int): Database operations to monitor
    - port (int): Port for HTTP notifications
    - qos (int): Quality of service flags
    - ip_address (str): IP address for notifications
    - grouping_class (int): Grouping class for notifications
    - grouping_value (int): Grouping value
    - grouping_type (int): Grouping type
    - name (str): Subscription name
    - client_initiated (bool): Client-initiated subscription
    - recipient_name (str): Recipient name for notifications
    
    Returns:
    Subscription object
    """

Message Classes

Classes representing different types of database event messages.

class Message:
    """Database event notification message."""
    
    # Properties
    type: int           # Event type (EVENT_OBJCHANGE, EVENT_QUERYCHANGE, etc.)
    dbname: str         # Database name
    tables: list        # List of MessageTable objects
    queries: list       # List of MessageQuery objects  
    consumer_name: str  # AQ consumer name
    queue_name: str     # AQ queue name
    subscription: object # Subscription object that received the message

class MessageTable:
    """Table change notification details."""
    
    # Properties
    name: str           # Table name
    operation: int      # Operation type (OPCODE_INSERT, OPCODE_UPDATE, etc.)
    rows: list          # List of MessageRow objects

class MessageRow:
    """Row change notification details."""
    
    # Properties
    operation: int      # Operation type
    rowid: str          # Row identifier

class MessageQuery:
    """Query change notification details."""
    
    # Properties
    id: int             # Query ID
    operation: int      # Operation type
    queryctx: object    # Query context
    tables: list        # List of affected MessageTable objects

Subscription Constants

Constants for configuring subscription behavior and identifying event types.

# Subscription Namespaces
SUBSCR_NAMESPACE_DBCHANGE: int  # Database change notifications
SUBSCR_NAMESPACE_AQ: int        # Advanced Queuing notifications

# Subscription Protocols
SUBSCR_PROTO_CALLBACK: int      # Python callback function
SUBSCR_PROTO_HTTP: int          # HTTP notifications
SUBSCR_PROTO_MAIL: int          # Email notifications
SUBSCR_PROTO_SERVER: int        # Server-to-server notifications

# Quality of Service
SUBSCR_QOS_DEFAULT: int         # Default QoS
SUBSCR_QOS_RELIABLE: int        # Reliable delivery
SUBSCR_QOS_BEST_EFFORT: int     # Best effort delivery
SUBSCR_QOS_DEREG_NFY: int       # Deregistration notification
SUBSCR_QOS_ROWIDS: int          # Include row IDs
SUBSCR_QOS_QUERY: int           # Query change notification

# Grouping Classes
SUBSCR_GROUPING_CLASS_NONE: int # No grouping
SUBSCR_GROUPING_CLASS_TIME: int # Time-based grouping

# Grouping Types
SUBSCR_GROUPING_TYPE_SUMMARY: int # Summary notifications
SUBSCR_GROUPING_TYPE_LAST: int    # Last notification only

# Event Types
EVENT_NONE: int                 # No event
EVENT_STARTUP: int              # Database startup
EVENT_SHUTDOWN: int             # Database shutdown
EVENT_SHUTDOWN_ANY: int         # Any shutdown
EVENT_DEREG: int                # Deregistration
EVENT_OBJCHANGE: int            # Object change
EVENT_QUERYCHANGE: int          # Query result change
EVENT_AQ: int                   # Advanced Queuing

# Operation Codes
OPCODE_ALLOPS: int              # All operations
OPCODE_ALLROWS: int             # All rows
OPCODE_INSERT: int              # Insert operations
OPCODE_UPDATE: int              # Update operations  
OPCODE_DELETE: int              # Delete operations
OPCODE_ALTER: int               # Alter operations
OPCODE_DROP: int                # Drop operations

Usage Examples

Basic Database Change Notifications

import oracledb
import time

def notification_callback(message):
    """Callback function for database change notifications."""
    print(f"Received notification: Type={message.type}, DB={message.dbname}")
    
    for table in message.tables:
        print(f"  Table: {table.name}, Operation: {table.operation}")
        for row in table.rows:
            print(f"    Row: {row.rowid}, Operation: {row.operation}")

# Connect to database
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

# Create subscription for database changes
subscription = connection.subscribe(
    namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
    protocol=oracledb.SUBSCR_PROTO_CALLBACK,
    callback=notification_callback,
    timeout=300,  # 5 minutes
    operations=oracledb.OPCODE_ALLOPS,
    qos=oracledb.SUBSCR_QOS_ROWIDS
)

print(f"Created subscription with ID: {subscription.id}")

# Register queries for change notification
with connection.cursor() as cursor:
    # Register interest in employees table changes
    cursor.execute("SELECT employee_id, first_name, last_name FROM employees WHERE department_id = 10")
    cursor.fetchall()  # Consume results to register query
    
    # Register interest in departments table
    cursor.execute("SELECT department_id, department_name FROM departments")
    cursor.fetchall()

print("Subscriptions registered. Making changes to trigger notifications...")

# Make changes to trigger notifications
with connection.cursor() as cursor:
    cursor.execute("""
        UPDATE employees SET salary = salary * 1.1 WHERE department_id = 10
    """)
    connection.commit()
    
    cursor.execute("""
        INSERT INTO employees (employee_id, first_name, last_name, department_id)
        VALUES (9999, 'Test', 'Employee', 10)
    """)
    connection.commit()

# Wait for notifications
print("Waiting for notifications...")
time.sleep(10)

# Clean up
connection.close()

Query Change Notifications

import oracledb
import threading
import time

# Global flag to control notification processing
processing_notifications = True

def query_change_callback(message):
    """Handle query change notifications."""
    global processing_notifications
    
    if not processing_notifications:
        return
        
    print(f"Query change notification received:")
    print(f"  Event type: {message.type}")
    print(f"  Database: {message.dbname}")
    
    for query in message.queries:
        print(f"  Query ID: {query.id}")
        print(f"  Operation: {query.operation}")
        
        for table in query.tables:
            print(f"    Affected table: {table.name}")
            print(f"    Table operation: {table.operation}")
            print(f"    Affected rows: {len(table.rows)}")

def notification_thread():
    """Run in separate thread to handle notifications."""
    connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
    
    try:
        # Create subscription for query changes
        subscription = connection.subscribe(
            namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
            protocol=oracledb.SUBSCR_PROTO_CALLBACK,
            callback=query_change_callback,
            timeout=0,  # No timeout
            qos=oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_RELIABLE
        )
        
        print(f"Query subscription created: {subscription.id}")
        
        # Register queries to monitor
        with connection.cursor() as cursor:
            # Monitor high-salary employees
            cursor.execute("""
                SELECT employee_id, first_name, last_name, salary
                FROM employees
                WHERE salary > 50000
            """)
            cursor.fetchall()
            
            # Monitor recent hires
            cursor.execute("""
                SELECT employee_id, first_name, hire_date
                FROM employees  
                WHERE hire_date >= SYSDATE - 30
            """)
            cursor.fetchall()
        
        print("Query monitoring active. Waiting for changes...")
        
        # Keep connection alive for notifications
        while processing_notifications:
            time.sleep(1)
            
    finally:
        connection.close()

# Start notification thread
notification_thread = threading.Thread(target=notification_thread)
notification_thread.daemon = True
notification_thread.start()

# Give subscription time to initialize
time.sleep(2)

# Main thread: make changes to trigger notifications
main_connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

with main_connection.cursor() as cursor:
    print("Making changes to trigger query notifications...")
    
    # Change that affects high-salary query
    cursor.execute("""
        UPDATE employees SET salary = 55000 
        WHERE employee_id = (SELECT MIN(employee_id) FROM employees WHERE salary < 50000)
    """)
    main_connection.commit()
    
    time.sleep(2)
    
    # Insert new employee (affects recent hires query)
    cursor.execute("""
        INSERT INTO employees (employee_id, first_name, last_name, hire_date, salary, department_id)
        VALUES (8888, 'Recent', 'Hire', SYSDATE, 45000, 10)
    """)
    main_connection.commit()

# Wait for notifications
print("Waiting for notifications...")
time.sleep(5)

# Clean up
processing_notifications = False
main_connection.close()

Advanced Queuing (AQ) Notifications

import oracledb
import time

def aq_notification_callback(message):
    """Handle Advanced Queuing notifications."""
    print(f"AQ Notification received:")
    print(f"  Event type: {message.type}")
    print(f"  Queue: {message.queue_name}")
    print(f"  Consumer: {message.consumer_name}")
    print(f"  Database: {message.dbname}")

# Connect to database
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

# Set up Advanced Queuing infrastructure
with connection.cursor() as cursor:
    try:
        # Create queue table
        cursor.execute("""
            BEGIN
                DBMS_AQADM.CREATE_QUEUE_TABLE(
                    queue_table => 'my_queue_table',
                    queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE'
                );
            END;
        """)
        
        # Create queue
        cursor.execute("""
            BEGIN
                DBMS_AQADM.CREATE_QUEUE(
                    queue_name => 'my_notification_queue',
                    queue_table => 'my_queue_table'
                );
            END;
        """)
        
        # Start queue
        cursor.execute("""
            BEGIN
                DBMS_AQADM.START_QUEUE('my_notification_queue');
            END;
        """)
        
        connection.commit()
        
    except oracledb.DatabaseError as e:
        if "ORA-00955" in str(e):  # Object already exists
            print("Queue infrastructure already exists")
        else:
            raise

# Create AQ subscription
subscription = connection.subscribe(
    namespace=oracledb.SUBSCR_NAMESPACE_AQ,
    protocol=oracledb.SUBSCR_PROTO_CALLBACK,
    callback=aq_notification_callback,
    name="my_notification_queue",
    timeout=300
)

print(f"AQ subscription created: {subscription.id}")

# Enqueue messages to trigger notifications
with connection.cursor() as cursor:
    # Enqueue a message
    cursor.execute("""
        DECLARE
            enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
            message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
            message_handle     RAW(16);
            message            SYS.AQ$_JMS_TEXT_MESSAGE;
        BEGIN
            message := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
            message.set_text('Hello from AQ notification test!');
            
            DBMS_AQ.ENQUEUE(
                queue_name => 'my_notification_queue',
                enqueue_options => enqueue_options,
                message_properties => message_properties,
                payload => message,
                msgid => message_handle
            );
        END;
    """)
    
    connection.commit()

print("Message enqueued. Waiting for notification...")
time.sleep(5)

connection.close()

Subscription Management

import oracledb
import time

class SubscriptionManager:
    """Manage multiple database subscriptions."""
    
    def __init__(self, connection):
        self.connection = connection
        self.subscriptions = {}
        self.active = True
    
    def create_table_subscription(self, table_name, callback):
        """Create subscription for specific table changes."""
        
        def table_callback(message):
            # Filter messages for specific table
            for table in message.tables:
                if table.name.upper() == table_name.upper():
                    callback(message, table)
        
        subscription = self.connection.subscribe(
            namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
            protocol=oracledb.SUBSCR_PROTO_CALLBACK,
            callback=table_callback,
            timeout=0,
            operations=oracledb.OPCODE_ALLOPS,
            qos=oracledb.SUBSCR_QOS_ROWIDS | oracledb.SUBSCR_QOS_RELIABLE
        )
        
        # Register interest in table
        with self.connection.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {table_name} WHERE ROWNUM <= 1")
            cursor.fetchall()
        
        self.subscriptions[table_name] = subscription
        return subscription.id
    
    def create_query_subscription(self, query, callback):
        """Create subscription for query result changes."""
        
        subscription = self.connection.subscribe(
            namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
            protocol=oracledb.SUBSCR_PROTO_CALLBACK,
            callback=callback,
            timeout=0,
            qos=oracledb.SUBSCR_QOS_QUERY | oracledb.SUBSCR_QOS_RELIABLE
        )
        
        # Register query
        with self.connection.cursor() as cursor:
            cursor.execute(query)
            cursor.fetchall()
        
        query_id = f"query_{subscription.id}"
        self.subscriptions[query_id] = subscription
        return subscription.id
    
    def cleanup(self):
        """Clean up all subscriptions."""
        self.active = False
        # Subscriptions are automatically cleaned up when connection closes

# Usage example
def employee_change_handler(message, table):
    """Handle employee table changes."""
    print(f"Employee table changed: {table.operation}")
    print(f"  Affected rows: {len(table.rows)}")

def high_salary_query_handler(message):
    """Handle high salary query changes."""
    print("High salary employees query results changed")
    for query in message.queries:
        print(f"  Query {query.id} affected {len(query.tables)} tables")

# Set up subscription manager
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")
manager = SubscriptionManager(connection)

# Create subscriptions
emp_sub_id = manager.create_table_subscription("employees", employee_change_handler)
query_sub_id = manager.create_query_subscription(
    "SELECT * FROM employees WHERE salary > 75000",
    high_salary_query_handler
)

print(f"Created subscriptions: Employee table={emp_sub_id}, High salary query={query_sub_id}")

# Make changes to trigger notifications
with connection.cursor() as cursor:
    cursor.execute("UPDATE employees SET salary = 80000 WHERE employee_id = 100")
    connection.commit()
    
    cursor.execute("INSERT INTO employees (employee_id, first_name, last_name, salary, department_id) VALUES (7777, 'High', 'Earner', 85000, 10)")
    connection.commit()

print("Changes made. Waiting for notifications...")
time.sleep(5)

# Cleanup
manager.cleanup()
connection.close()

Subscription Error Handling

import oracledb
import time

def robust_notification_callback(message):
    """Notification callback with error handling."""
    try:
        print(f"Processing notification: {message.type}")
        
        # Process tables
        for table in message.tables:
            print(f"  Table: {table.name}")
            
            # Validate table operations
            if table.operation in [oracledb.OPCODE_INSERT, oracledb.OPCODE_UPDATE, oracledb.OPCODE_DELETE]:
                print(f"    Valid operation: {table.operation}")
                
                # Process rows safely
                for row in table.rows[:10]:  # Limit processing to avoid overload
                    if hasattr(row, 'rowid') and row.rowid:
                        print(f"      Row: {row.rowid}")
            else:
                print(f"    Unexpected operation: {table.operation}")
        
        # Process queries
        for query in message.queries:
            print(f"  Query ID: {query.id}")
            
    except Exception as e:
        print(f"Error in notification callback: {e}")
        # Log error but don't raise to avoid breaking notification system

def create_resilient_subscription(connection, max_retries=3):
    """Create subscription with retry logic."""
    
    for attempt in range(max_retries):
        try:
            subscription = connection.subscribe(
                namespace=oracledb.SUBSCR_NAMESPACE_DBCHANGE,
                protocol=oracledb.SUBSCR_PROTO_CALLBACK,
                callback=robust_notification_callback,
                timeout=300,
                operations=oracledb.OPCODE_ALLOPS,
                qos=oracledb.SUBSCR_QOS_RELIABLE
            )
            
            print(f"Subscription created successfully on attempt {attempt + 1}")
            return subscription
            
        except oracledb.DatabaseError as e:
            print(f"Subscription attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2)  # Wait before retry

# Create resilient subscription
connection = oracledb.connect(user="hr", password="password", dsn="localhost/xepdb1")

try:
    subscription = create_resilient_subscription(connection)
    
    # Register queries with error handling
    try:
        with connection.cursor() as cursor:
            cursor.execute("SELECT * FROM employees WHERE department_id <= 50")
            cursor.fetchall()
            print("Query registered successfully")
    except oracledb.DatabaseError as e:
        print(f"Query registration failed: {e}")
    
    # Test notifications
    with connection.cursor() as cursor:
        cursor.execute("UPDATE employees SET salary = salary + 1 WHERE ROWNUM <= 1")
        connection.commit()
    
    print("Waiting for notifications...")
    time.sleep(5)
    
except oracledb.DatabaseError as e:
    print(f"Subscription setup failed: {e}")

finally:
    connection.close()

Install with Tessl CLI

npx tessl i tessl/pypi-oracledb

docs

advanced-queuing.md

connection-pooling.md

connectivity.md

data-types.md

database-objects.md

index.md

lobs.md

pipeline.md

soda.md

sql-execution.md

subscriptions.md

tile.json