An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Support for PostgreSQL's LISTEN/NOTIFY functionality, server log message handling, query logging, and connection termination callbacks for real-time event processing and monitoring.
Asynchronous messaging system for real-time notifications between database sessions.
async def add_listener(self, channel: str, callback: callable) -> None:
"""
Add a listener for PostgreSQL notifications on the specified channel.
Parameters:
channel: Channel name to listen on
callback: Function to call when notification received (channel, payload)
"""
async def remove_listener(self, channel: str, callback: callable) -> None:
"""
Remove a notification listener from the specified channel.
Parameters:
channel: Channel name
callback: Callback function to remove
"""import asyncio
# Notification callback
async def order_notification_handler(channel, payload):
"""Handle new order notifications."""
print(f"Received notification on {channel}: {payload}")
# Parse payload (typically JSON)
import json
data = json.loads(payload)
if data.get('action') == 'new_order':
print(f"New order #{data['order_id']} from customer {data['customer_id']}")
# Process new order...
elif data.get('action') == 'order_updated':
print(f"Order #{data['order_id']} status changed to {data['status']}")
# Update order status...
# Set up listener
await conn.add_listener('order_events', order_notification_handler)
# PostgreSQL trigger that sends notifications
await conn.execute("""
CREATE OR REPLACE FUNCTION notify_order_events()
RETURNS trigger AS $$
BEGIN
PERFORM pg_notify(
'order_events',
json_build_object(
'action', TG_OP,
'order_id', NEW.id,
'customer_id', NEW.customer_id,
'status', NEW.status,
'timestamp', extract(epoch from now())
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER order_events_trigger
AFTER INSERT OR UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION notify_order_events();
""")
# Keep connection alive to receive notifications
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await conn.remove_listener('order_events', order_notification_handler)# Multiple handlers for same channel
async def email_notification_handler(channel, payload):
"""Send email notifications."""
data = json.loads(payload)
await send_email_notification(data)
async def sms_notification_handler(channel, payload):
"""Send SMS notifications."""
data = json.loads(payload)
if data.get('priority') == 'high':
await send_sms_notification(data)
# Add multiple listeners to same channel
await conn.add_listener('user_events', email_notification_handler)
await conn.add_listener('user_events', sms_notification_handler)
# Listen on multiple channels
channels = ['order_events', 'user_events', 'inventory_events', 'payment_events']
for channel in channels:
await conn.add_listener(channel, general_event_handler)
# Event dispatcher pattern
event_handlers = {
'order_events': [process_order_event, log_order_event],
'user_events': [update_user_cache, send_welcome_email],
'inventory_events': [update_stock_levels, reorder_check]
}
async def dispatch_event(channel, payload):
"""Dispatch events to registered handlers."""
handlers = event_handlers.get(channel, [])
for handler in handlers:
try:
await handler(json.loads(payload))
except Exception as e:
print(f"Error in handler {handler.__name__}: {e}")
for channel in event_handlers:
await conn.add_listener(channel, dispatch_event)Monitor PostgreSQL server log messages for debugging, monitoring, and alerting.
def add_log_listener(self, callback: callable) -> None:
"""
Add a listener for PostgreSQL log messages.
Parameters:
callback: Function to call when log message received (log_message)
"""
def remove_log_listener(self, callback: callable) -> None:
"""
Remove a log message listener.
Parameters:
callback: Callback function to remove
"""# Log message handler
def log_message_handler(log_message):
"""Handle PostgreSQL log messages."""
print(f"PostgreSQL Log [{log_message.severity}]: {log_message.message}")
# Access log message details
if hasattr(log_message, 'detail') and log_message.detail:
print(f"Detail: {log_message.detail}")
if hasattr(log_message, 'hint') and log_message.hint:
print(f"Hint: {log_message.hint}")
# Alert on warnings and errors
if log_message.severity in ['WARNING', 'ERROR', 'FATAL']:
send_alert(f"PostgreSQL {log_message.severity}: {log_message.message}")
# Add log listener
conn.add_log_listener(log_message_handler)
# Advanced log processing
class LogProcessor:
def __init__(self):
self.error_count = 0
self.warning_count = 0
def process_log(self, log_message):
"""Process and categorize log messages."""
if log_message.severity == 'ERROR':
self.error_count += 1
self.handle_error(log_message)
elif log_message.severity == 'WARNING':
self.warning_count += 1
self.handle_warning(log_message)
elif log_message.severity == 'NOTICE':
self.handle_notice(log_message)
def handle_error(self, log_message):
"""Handle error log messages."""
print(f"ERROR: {log_message.message}")
# Send to error tracking system
def handle_warning(self, log_message):
"""Handle warning log messages."""
print(f"WARNING: {log_message.message}")
# Log to monitoring system
def handle_notice(self, log_message):
"""Handle notice log messages."""
print(f"NOTICE: {log_message.message}")
processor = LogProcessor()
conn.add_log_listener(processor.process_log)Monitor connection lifecycle events and handle cleanup operations.
def add_termination_listener(self, callback: callable) -> None:
"""
Add a listener that will be called when the connection is terminated.
Parameters:
callback: Function to call on connection termination
"""
def remove_termination_listener(self, callback: callable) -> None:
"""
Remove a connection termination listener.
Parameters:
callback: Callback function to remove
"""# Termination handler
def connection_terminated_handler():
"""Handle connection termination."""
print("Database connection terminated")
# Cleanup operations
cleanup_resources()
notify_monitoring_system("db_connection_lost")
# Attempt reconnection
asyncio.create_task(reconnect_database())
# Add termination listener
conn.add_termination_listener(connection_terminated_handler)
# Connection monitor with reconnection
class ConnectionMonitor:
def __init__(self, dsn):
self.dsn = dsn
self.connection = None
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
async def setup_connection(self):
"""Setup connection with termination monitoring."""
self.connection = await asyncpg.connect(self.dsn)
self.connection.add_termination_listener(self.on_connection_lost)
self.reconnect_attempts = 0
def on_connection_lost(self):
"""Handle connection loss and initiate reconnection."""
print("Connection lost, attempting to reconnect...")
asyncio.create_task(self.reconnect())
async def reconnect(self):
"""Attempt to re-establish connection."""
if self.reconnect_attempts >= self.max_reconnect_attempts:
print("Max reconnection attempts exceeded")
return
self.reconnect_attempts += 1
try:
await asyncio.sleep(2 ** self.reconnect_attempts) # Exponential backoff
await self.setup_connection()
print("Successfully reconnected to database")
except Exception as e:
print(f"Reconnection attempt {self.reconnect_attempts} failed: {e}")
asyncio.create_task(self.reconnect())
monitor = ConnectionMonitor('postgresql://user:pass@localhost/db')
await monitor.setup_connection()Monitor and log SQL query execution for debugging, performance analysis, and auditing.
def add_query_logger(self, callback: callable) -> None:
"""
Add a logger that will be called when queries are executed.
Parameters:
callback: Function to call on query execution (query, args, duration)
"""
def remove_query_logger(self, callback: callable) -> None:
"""
Remove a query logger callback.
Parameters:
callback: Callback function to remove
"""
def query_logger(self, callback: callable):
"""
Context manager that temporarily adds a query logger.
Parameters:
callback: Function to call on query execution
Returns:
Context manager
"""import time
# Basic query logger
def simple_query_logger(query, args, duration):
"""Log all queries with execution time."""
print(f"Query executed in {duration:.3f}s: {query}")
if args:
print(f" Args: {args}")
# Add permanent query logger
conn.add_query_logger(simple_query_logger)
# Advanced query logger with filtering and metrics
class QueryAnalyzer:
def __init__(self):
self.total_queries = 0
self.slow_queries = []
self.query_stats = {}
def log_query(self, query, args, duration):
"""Log and analyze query performance."""
self.total_queries += 1
# Track slow queries (> 1 second)
if duration > 1.0:
self.slow_queries.append({
'query': query,
'args': args,
'duration': duration,
'timestamp': time.time()
})
print(f"SLOW QUERY ({duration:.3f}s): {query}")
# Query statistics
query_type = query.strip().upper().split()[0]
if query_type not in self.query_stats:
self.query_stats[query_type] = {'count': 0, 'total_time': 0}
self.query_stats[query_type]['count'] += 1
self.query_stats[query_type]['total_time'] += duration
def get_stats(self):
"""Get query execution statistics."""
stats = {
'total_queries': self.total_queries,
'slow_queries_count': len(self.slow_queries),
'by_type': {}
}
for query_type, data in self.query_stats.items():
stats['by_type'][query_type] = {
'count': data['count'],
'avg_duration': data['total_time'] / data['count'],
'total_time': data['total_time']
}
return stats
analyzer = QueryAnalyzer()
conn.add_query_logger(analyzer.log_query)
# Temporary query logging with context manager
def debug_query_logger(query, args, duration):
"""Detailed query logger for debugging."""
print(f"DEBUG - Query: {query}")
print(f"DEBUG - Args: {args}")
print(f"DEBUG - Duration: {duration:.6f}s")
print("DEBUG - " + "-" * 50)
# Use temporarily
with conn.query_logger(debug_query_logger):
# All queries in this block will be logged
users = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
await conn.execute("UPDATE users SET last_seen = $1 WHERE id = $2",
datetime.now(), user_id)Combine listeners for comprehensive real-time event processing systems.
class EventProcessor:
"""Comprehensive event processing system."""
def __init__(self, connection):
self.conn = connection
self.event_queue = asyncio.Queue()
self.processing_task = None
async def setup(self):
"""Setup all listeners and start processing."""
# Database notifications
await self.conn.add_listener('app_events', self.handle_notification)
# Query logging for audit trail
self.conn.add_query_logger(self.log_query)
# Connection monitoring
self.conn.add_termination_listener(self.handle_connection_loss)
# Start event processing task
self.processing_task = asyncio.create_task(self.process_events())
async def handle_notification(self, channel, payload):
"""Handle PostgreSQL notifications."""
event = {
'type': 'notification',
'channel': channel,
'payload': json.loads(payload),
'timestamp': time.time()
}
await self.event_queue.put(event)
def log_query(self, query, args, duration):
"""Log queries for audit trail."""
if duration > 0.5: # Only log slow queries
event = {
'type': 'slow_query',
'query': query,
'args': args,
'duration': duration,
'timestamp': time.time()
}
asyncio.create_task(self.event_queue.put(event))
def handle_connection_loss(self):
"""Handle connection termination."""
event = {
'type': 'connection_lost',
'timestamp': time.time()
}
asyncio.create_task(self.event_queue.put(event))
async def process_events(self):
"""Process events from the queue."""
while True:
try:
event = await self.event_queue.get()
await self.dispatch_event(event)
self.event_queue.task_done()
except Exception as e:
print(f"Error processing event: {e}")
async def dispatch_event(self, event):
"""Dispatch events to appropriate handlers."""
event_type = event['type']
if event_type == 'notification':
await self.process_notification(event)
elif event_type == 'slow_query':
await self.process_slow_query(event)
elif event_type == 'connection_lost':
await self.process_connection_loss(event)
async def process_notification(self, event):
"""Process database notifications."""
payload = event['payload']
if payload.get('entity') == 'order':
await self.handle_order_event(payload)
elif payload.get('entity') == 'user':
await self.handle_user_event(payload)
async def handle_order_event(self, payload):
"""Handle order-related events."""
action = payload.get('action')
order_id = payload.get('order_id')
if action == 'created':
print(f"Processing new order: {order_id}")
# Send confirmation email, update inventory, etc.
elif action == 'cancelled':
print(f"Processing order cancellation: {order_id}")
# Refund payment, restore inventory, etc.
# Usage
processor = EventProcessor(conn)
await processor.setup()
# Keep processing events
try:
await asyncio.sleep(float('inf')) # Run forever
except KeyboardInterrupt:
print("Shutting down event processor...")
if processor.processing_task:
processor.processing_task.cancel()# Callback type signatures
NotificationCallback = typing.Callable[[str, str], typing.Awaitable[None]]
LogMessageCallback = typing.Callable[[typing.Any], None]
TerminationCallback = typing.Callable[[], None]
QueryLoggerCallback = typing.Callable[[str, typing.Tuple, float], None]
# Log message attributes
class PostgresLogMessage:
"""PostgreSQL server log message."""
severity: str # Message severity
message: str # Primary message text
detail: str # Additional details
hint: str # Suggested action
position: str # Error position (if applicable)
context: str # Error context
sqlstate: str # SQLSTATE code (if applicable)