CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncpg

An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

listeners-notifications.mddocs/

Listeners and Notifications

Support for PostgreSQL's LISTEN/NOTIFY functionality, server log message handling, query logging, and connection termination callbacks for real-time event processing and monitoring.

Capabilities

PostgreSQL LISTEN/NOTIFY

Asynchronous messaging system for real-time notifications between database sessions.

async def add_listener(self, channel: str, callback: callable) -> None:
    """
    Add a listener for PostgreSQL notifications on the specified channel.
    
    Parameters:
    channel: Channel name to listen on
    callback: Function to call when notification received (channel, payload)
    """

async def remove_listener(self, channel: str, callback: callable) -> None:
    """
    Remove a notification listener from the specified channel.
    
    Parameters:  
    channel: Channel name
    callback: Callback function to remove
    """

Example Usage

import asyncio

# Notification callback
async def order_notification_handler(channel, payload):
    """Handle new order notifications."""
    print(f"Received notification on {channel}: {payload}")
    
    # Parse payload (typically JSON)
    import json
    data = json.loads(payload)
    
    if data.get('action') == 'new_order':
        print(f"New order #{data['order_id']} from customer {data['customer_id']}")
        # Process new order...
    elif data.get('action') == 'order_updated':
        print(f"Order #{data['order_id']} status changed to {data['status']}")
        # Update order status...

# Set up listener
await conn.add_listener('order_events', order_notification_handler)

# PostgreSQL trigger that sends notifications
await conn.execute("""
    CREATE OR REPLACE FUNCTION notify_order_events()
    RETURNS trigger AS $$
    BEGIN
        PERFORM pg_notify(
            'order_events',
            json_build_object(
                'action', TG_OP,
                'order_id', NEW.id,
                'customer_id', NEW.customer_id,
                'status', NEW.status,
                'timestamp', extract(epoch from now())
            )::text
        );
        RETURN NEW;
    END;
    $$ LANGUAGE plpgsql;
    
    CREATE TRIGGER order_events_trigger
        AFTER INSERT OR UPDATE ON orders
        FOR EACH ROW EXECUTE FUNCTION notify_order_events();
""")

# Keep connection alive to receive notifications
try:
    while True:
        await asyncio.sleep(1)
except KeyboardInterrupt:
    await conn.remove_listener('order_events', order_notification_handler)

Multiple Listeners and Channels

# Multiple handlers for same channel
async def email_notification_handler(channel, payload):
    """Send email notifications."""
    data = json.loads(payload)
    await send_email_notification(data)

async def sms_notification_handler(channel, payload):
    """Send SMS notifications."""  
    data = json.loads(payload)
    if data.get('priority') == 'high':
        await send_sms_notification(data)

# Add multiple listeners to same channel
await conn.add_listener('user_events', email_notification_handler)
await conn.add_listener('user_events', sms_notification_handler)

# Listen on multiple channels
channels = ['order_events', 'user_events', 'inventory_events', 'payment_events']
for channel in channels:
    await conn.add_listener(channel, general_event_handler)

# Event dispatcher pattern
event_handlers = {
    'order_events': [process_order_event, log_order_event],
    'user_events': [update_user_cache, send_welcome_email],
    'inventory_events': [update_stock_levels, reorder_check]
}

async def dispatch_event(channel, payload):
    """Dispatch events to registered handlers."""
    handlers = event_handlers.get(channel, [])
    for handler in handlers:
        try:
            await handler(json.loads(payload))
        except Exception as e:
            print(f"Error in handler {handler.__name__}: {e}")

for channel in event_handlers:
    await conn.add_listener(channel, dispatch_event)

Server Log Message Listeners

Monitor PostgreSQL server log messages for debugging, monitoring, and alerting.

def add_log_listener(self, callback: callable) -> None:
    """
    Add a listener for PostgreSQL log messages.
    
    Parameters:
    callback: Function to call when log message received (log_message)
    """

def remove_log_listener(self, callback: callable) -> None:
    """
    Remove a log message listener.
    
    Parameters:
    callback: Callback function to remove
    """

Example Usage

# Log message handler
def log_message_handler(log_message):
    """Handle PostgreSQL log messages."""
    print(f"PostgreSQL Log [{log_message.severity}]: {log_message.message}")
    
    # Access log message details
    if hasattr(log_message, 'detail') and log_message.detail:
        print(f"Detail: {log_message.detail}")
    
    if hasattr(log_message, 'hint') and log_message.hint:
        print(f"Hint: {log_message.hint}")
    
    # Alert on warnings and errors
    if log_message.severity in ['WARNING', 'ERROR', 'FATAL']:
        send_alert(f"PostgreSQL {log_message.severity}: {log_message.message}")

# Add log listener
conn.add_log_listener(log_message_handler)

# Advanced log processing
class LogProcessor:
    def __init__(self):
        self.error_count = 0
        self.warning_count = 0
        
    def process_log(self, log_message):
        """Process and categorize log messages."""
        
        if log_message.severity == 'ERROR':
            self.error_count += 1
            self.handle_error(log_message)
        elif log_message.severity == 'WARNING':
            self.warning_count += 1
            self.handle_warning(log_message)
        elif log_message.severity == 'NOTICE':
            self.handle_notice(log_message)
            
    def handle_error(self, log_message):
        """Handle error log messages."""
        print(f"ERROR: {log_message.message}")
        # Send to error tracking system
        
    def handle_warning(self, log_message):
        """Handle warning log messages."""
        print(f"WARNING: {log_message.message}")
        # Log to monitoring system
        
    def handle_notice(self, log_message):
        """Handle notice log messages."""
        print(f"NOTICE: {log_message.message}")

processor = LogProcessor()
conn.add_log_listener(processor.process_log)

Connection Termination Listeners

Monitor connection lifecycle events and handle cleanup operations.

def add_termination_listener(self, callback: callable) -> None:
    """
    Add a listener that will be called when the connection is terminated.
    
    Parameters:
    callback: Function to call on connection termination
    """

def remove_termination_listener(self, callback: callable) -> None:
    """
    Remove a connection termination listener.
    
    Parameters:
    callback: Callback function to remove
    """

Example Usage

# Termination handler
def connection_terminated_handler():
    """Handle connection termination."""
    print("Database connection terminated")
    
    # Cleanup operations
    cleanup_resources()
    notify_monitoring_system("db_connection_lost")
    
    # Attempt reconnection
    asyncio.create_task(reconnect_database())

# Add termination listener
conn.add_termination_listener(connection_terminated_handler)

# Connection monitor with reconnection
class ConnectionMonitor:
    def __init__(self, dsn):
        self.dsn = dsn
        self.connection = None
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5
        
    async def setup_connection(self):
        """Setup connection with termination monitoring."""
        self.connection = await asyncpg.connect(self.dsn)
        self.connection.add_termination_listener(self.on_connection_lost)
        self.reconnect_attempts = 0
        
    def on_connection_lost(self):
        """Handle connection loss and initiate reconnection."""
        print("Connection lost, attempting to reconnect...")
        asyncio.create_task(self.reconnect())
        
    async def reconnect(self):
        """Attempt to re-establish connection."""
        if self.reconnect_attempts >= self.max_reconnect_attempts:
            print("Max reconnection attempts exceeded")
            return
            
        self.reconnect_attempts += 1
        
        try:
            await asyncio.sleep(2 ** self.reconnect_attempts)  # Exponential backoff
            await self.setup_connection()
            print("Successfully reconnected to database")
            
        except Exception as e:
            print(f"Reconnection attempt {self.reconnect_attempts} failed: {e}")
            asyncio.create_task(self.reconnect())

monitor = ConnectionMonitor('postgresql://user:pass@localhost/db')
await monitor.setup_connection()

Query Logging

Monitor and log SQL query execution for debugging, performance analysis, and auditing.

def add_query_logger(self, callback: callable) -> None:
    """
    Add a logger that will be called when queries are executed.
    
    Parameters:
    callback: Function to call on query execution (query, args, duration)
    """

def remove_query_logger(self, callback: callable) -> None:
    """
    Remove a query logger callback.
    
    Parameters:
    callback: Callback function to remove
    """

def query_logger(self, callback: callable):
    """
    Context manager that temporarily adds a query logger.
    
    Parameters:
    callback: Function to call on query execution
    
    Returns:
    Context manager
    """

Example Usage

import time

# Basic query logger
def simple_query_logger(query, args, duration):
    """Log all queries with execution time."""
    print(f"Query executed in {duration:.3f}s: {query}")
    if args:
        print(f"  Args: {args}")

# Add permanent query logger
conn.add_query_logger(simple_query_logger)

# Advanced query logger with filtering and metrics
class QueryAnalyzer:
    def __init__(self):
        self.total_queries = 0
        self.slow_queries = []
        self.query_stats = {}
        
    def log_query(self, query, args, duration):
        """Log and analyze query performance."""
        self.total_queries += 1
        
        # Track slow queries (> 1 second)
        if duration > 1.0:
            self.slow_queries.append({
                'query': query,
                'args': args,
                'duration': duration,
                'timestamp': time.time()
            })
            print(f"SLOW QUERY ({duration:.3f}s): {query}")
        
        # Query statistics
        query_type = query.strip().upper().split()[0]
        if query_type not in self.query_stats:
            self.query_stats[query_type] = {'count': 0, 'total_time': 0}
        
        self.query_stats[query_type]['count'] += 1
        self.query_stats[query_type]['total_time'] += duration
        
    def get_stats(self):
        """Get query execution statistics."""
        stats = {
            'total_queries': self.total_queries,
            'slow_queries_count': len(self.slow_queries),
            'by_type': {}
        }
        
        for query_type, data in self.query_stats.items():
            stats['by_type'][query_type] = {
                'count': data['count'],
                'avg_duration': data['total_time'] / data['count'],
                'total_time': data['total_time']
            }
            
        return stats

analyzer = QueryAnalyzer()
conn.add_query_logger(analyzer.log_query)

# Temporary query logging with context manager
def debug_query_logger(query, args, duration):
    """Detailed query logger for debugging."""
    print(f"DEBUG - Query: {query}")
    print(f"DEBUG - Args: {args}")
    print(f"DEBUG - Duration: {duration:.6f}s")
    print("DEBUG - " + "-" * 50)

# Use temporarily
with conn.query_logger(debug_query_logger):
    # All queries in this block will be logged
    users = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
    await conn.execute("UPDATE users SET last_seen = $1 WHERE id = $2", 
                      datetime.now(), user_id)

Real-time Event Processing

Combine listeners for comprehensive real-time event processing systems.

class EventProcessor:
    """Comprehensive event processing system."""
    
    def __init__(self, connection):
        self.conn = connection
        self.event_queue = asyncio.Queue()
        self.processing_task = None
        
    async def setup(self):
        """Setup all listeners and start processing."""
        
        # Database notifications
        await self.conn.add_listener('app_events', self.handle_notification)
        
        # Query logging for audit trail
        self.conn.add_query_logger(self.log_query)
        
        # Connection monitoring
        self.conn.add_termination_listener(self.handle_connection_loss)
        
        # Start event processing task
        self.processing_task = asyncio.create_task(self.process_events())
        
    async def handle_notification(self, channel, payload):
        """Handle PostgreSQL notifications."""
        event = {
            'type': 'notification',
            'channel': channel,
            'payload': json.loads(payload),
            'timestamp': time.time()
        }
        await self.event_queue.put(event)
        
    def log_query(self, query, args, duration):
        """Log queries for audit trail."""
        if duration > 0.5:  # Only log slow queries
            event = {
                'type': 'slow_query',
                'query': query,
                'args': args,
                'duration': duration,
                'timestamp': time.time()
            }
            asyncio.create_task(self.event_queue.put(event))
            
    def handle_connection_loss(self):
        """Handle connection termination."""
        event = {
            'type': 'connection_lost',
            'timestamp': time.time()
        }
        asyncio.create_task(self.event_queue.put(event))
        
    async def process_events(self):
        """Process events from the queue."""
        while True:
            try:
                event = await self.event_queue.get()
                await self.dispatch_event(event)
                self.event_queue.task_done()
            except Exception as e:
                print(f"Error processing event: {e}")
                
    async def dispatch_event(self, event):
        """Dispatch events to appropriate handlers."""
        event_type = event['type']
        
        if event_type == 'notification':
            await self.process_notification(event)
        elif event_type == 'slow_query':
            await self.process_slow_query(event)
        elif event_type == 'connection_lost':
            await self.process_connection_loss(event)
            
    async def process_notification(self, event):
        """Process database notifications."""
        payload = event['payload']
        
        if payload.get('entity') == 'order':
            await self.handle_order_event(payload)
        elif payload.get('entity') == 'user':
            await self.handle_user_event(payload)
        
    async def handle_order_event(self, payload):
        """Handle order-related events."""
        action = payload.get('action')
        order_id = payload.get('order_id')
        
        if action == 'created':
            print(f"Processing new order: {order_id}")
            # Send confirmation email, update inventory, etc.
        elif action == 'cancelled':
            print(f"Processing order cancellation: {order_id}")
            # Refund payment, restore inventory, etc.

# Usage
processor = EventProcessor(conn)
await processor.setup()

# Keep processing events
try:
    await asyncio.sleep(float('inf'))  # Run forever
except KeyboardInterrupt:
    print("Shutting down event processor...")
    if processor.processing_task:
        processor.processing_task.cancel()

Types

# Callback type signatures
NotificationCallback = typing.Callable[[str, str], typing.Awaitable[None]]
LogMessageCallback = typing.Callable[[typing.Any], None]  
TerminationCallback = typing.Callable[[], None]
QueryLoggerCallback = typing.Callable[[str, typing.Tuple, float], None]

# Log message attributes
class PostgresLogMessage:
    """PostgreSQL server log message."""
    severity: str          # Message severity
    message: str          # Primary message text
    detail: str           # Additional details
    hint: str             # Suggested action
    position: str         # Error position (if applicable)
    context: str          # Error context
    sqlstate: str         # SQLSTATE code (if applicable)

Install with Tessl CLI

npx tessl i tessl/pypi-asyncpg

docs

connection-management.md

connection-pooling.md

copy-operations.md

cursor-operations.md

exception-handling.md

index.md

listeners-notifications.md

prepared-statements.md

query-execution.md

transaction-management.md

type-system.md

tile.json