0
# Listeners and Notifications
1
2
Support for PostgreSQL's LISTEN/NOTIFY functionality, server log message handling, query logging, and connection termination callbacks for real-time event processing and monitoring.
3
4
## Capabilities
5
6
### PostgreSQL LISTEN/NOTIFY
7
8
Asynchronous messaging system for real-time notifications between database sessions.
9
10
```python { .api }
11
async def add_listener(self, channel: str, callback: callable) -> None:
12
"""
13
Add a listener for PostgreSQL notifications on the specified channel.
14
15
Parameters:
16
channel: Channel name to listen on
17
callback: Function to call when notification received (channel, payload)
18
"""
19
20
async def remove_listener(self, channel: str, callback: callable) -> None:
21
"""
22
Remove a notification listener from the specified channel.
23
24
Parameters:
25
channel: Channel name
26
callback: Callback function to remove
27
"""
28
```
29
30
#### Example Usage
31
32
```python
33
import asyncio
34
35
# Notification callback
36
async def order_notification_handler(channel, payload):
37
"""Handle new order notifications."""
38
print(f"Received notification on {channel}: {payload}")
39
40
# Parse payload (typically JSON)
41
import json
42
data = json.loads(payload)
43
44
if data.get('action') == 'new_order':
45
print(f"New order #{data['order_id']} from customer {data['customer_id']}")
46
# Process new order...
47
elif data.get('action') == 'order_updated':
48
print(f"Order #{data['order_id']} status changed to {data['status']}")
49
# Update order status...
50
51
# Set up listener
52
await conn.add_listener('order_events', order_notification_handler)
53
54
# PostgreSQL trigger that sends notifications
55
await conn.execute("""
56
CREATE OR REPLACE FUNCTION notify_order_events()
57
RETURNS trigger AS $$
58
BEGIN
59
PERFORM pg_notify(
60
'order_events',
61
json_build_object(
62
'action', TG_OP,
63
'order_id', NEW.id,
64
'customer_id', NEW.customer_id,
65
'status', NEW.status,
66
'timestamp', extract(epoch from now())
67
)::text
68
);
69
RETURN NEW;
70
END;
71
$$ LANGUAGE plpgsql;
72
73
CREATE TRIGGER order_events_trigger
74
AFTER INSERT OR UPDATE ON orders
75
FOR EACH ROW EXECUTE FUNCTION notify_order_events();
76
""")
77
78
# Keep connection alive to receive notifications
79
try:
80
while True:
81
await asyncio.sleep(1)
82
except KeyboardInterrupt:
83
await conn.remove_listener('order_events', order_notification_handler)
84
```
85
86
#### Multiple Listeners and Channels
87
88
```python
89
# Multiple handlers for same channel
90
async def email_notification_handler(channel, payload):
91
"""Send email notifications."""
92
data = json.loads(payload)
93
await send_email_notification(data)
94
95
async def sms_notification_handler(channel, payload):
96
"""Send SMS notifications."""
97
data = json.loads(payload)
98
if data.get('priority') == 'high':
99
await send_sms_notification(data)
100
101
# Add multiple listeners to same channel
102
await conn.add_listener('user_events', email_notification_handler)
103
await conn.add_listener('user_events', sms_notification_handler)
104
105
# Listen on multiple channels
106
channels = ['order_events', 'user_events', 'inventory_events', 'payment_events']
107
for channel in channels:
108
await conn.add_listener(channel, general_event_handler)
109
110
# Event dispatcher pattern
111
event_handlers = {
112
'order_events': [process_order_event, log_order_event],
113
'user_events': [update_user_cache, send_welcome_email],
114
'inventory_events': [update_stock_levels, reorder_check]
115
}
116
117
async def dispatch_event(channel, payload):
118
"""Dispatch events to registered handlers."""
119
handlers = event_handlers.get(channel, [])
120
for handler in handlers:
121
try:
122
await handler(json.loads(payload))
123
except Exception as e:
124
print(f"Error in handler {handler.__name__}: {e}")
125
126
for channel in event_handlers:
127
await conn.add_listener(channel, dispatch_event)
128
```
129
130
### Server Log Message Listeners
131
132
Monitor PostgreSQL server log messages for debugging, monitoring, and alerting.
133
134
```python { .api }
135
def add_log_listener(self, callback: callable) -> None:
136
"""
137
Add a listener for PostgreSQL log messages.
138
139
Parameters:
140
callback: Function to call when log message received (log_message)
141
"""
142
143
def remove_log_listener(self, callback: callable) -> None:
144
"""
145
Remove a log message listener.
146
147
Parameters:
148
callback: Callback function to remove
149
"""
150
```
151
152
#### Example Usage
153
154
```python
155
# Log message handler
156
def log_message_handler(log_message):
157
"""Handle PostgreSQL log messages."""
158
print(f"PostgreSQL Log [{log_message.severity}]: {log_message.message}")
159
160
# Access log message details
161
if hasattr(log_message, 'detail') and log_message.detail:
162
print(f"Detail: {log_message.detail}")
163
164
if hasattr(log_message, 'hint') and log_message.hint:
165
print(f"Hint: {log_message.hint}")
166
167
# Alert on warnings and errors
168
if log_message.severity in ['WARNING', 'ERROR', 'FATAL']:
169
send_alert(f"PostgreSQL {log_message.severity}: {log_message.message}")
170
171
# Add log listener
172
conn.add_log_listener(log_message_handler)
173
174
# Advanced log processing
175
class LogProcessor:
176
def __init__(self):
177
self.error_count = 0
178
self.warning_count = 0
179
180
def process_log(self, log_message):
181
"""Process and categorize log messages."""
182
183
if log_message.severity == 'ERROR':
184
self.error_count += 1
185
self.handle_error(log_message)
186
elif log_message.severity == 'WARNING':
187
self.warning_count += 1
188
self.handle_warning(log_message)
189
elif log_message.severity == 'NOTICE':
190
self.handle_notice(log_message)
191
192
def handle_error(self, log_message):
193
"""Handle error log messages."""
194
print(f"ERROR: {log_message.message}")
195
# Send to error tracking system
196
197
def handle_warning(self, log_message):
198
"""Handle warning log messages."""
199
print(f"WARNING: {log_message.message}")
200
# Log to monitoring system
201
202
def handle_notice(self, log_message):
203
"""Handle notice log messages."""
204
print(f"NOTICE: {log_message.message}")
205
206
processor = LogProcessor()
207
conn.add_log_listener(processor.process_log)
208
```
209
210
### Connection Termination Listeners
211
212
Monitor connection lifecycle events and handle cleanup operations.
213
214
```python { .api }
215
def add_termination_listener(self, callback: callable) -> None:
216
"""
217
Add a listener that will be called when the connection is terminated.
218
219
Parameters:
220
callback: Function to call on connection termination
221
"""
222
223
def remove_termination_listener(self, callback: callable) -> None:
224
"""
225
Remove a connection termination listener.
226
227
Parameters:
228
callback: Callback function to remove
229
"""
230
```
231
232
#### Example Usage
233
234
```python
235
# Termination handler
236
def connection_terminated_handler():
237
"""Handle connection termination."""
238
print("Database connection terminated")
239
240
# Cleanup operations
241
cleanup_resources()
242
notify_monitoring_system("db_connection_lost")
243
244
# Attempt reconnection
245
asyncio.create_task(reconnect_database())
246
247
# Add termination listener
248
conn.add_termination_listener(connection_terminated_handler)
249
250
# Connection monitor with reconnection
251
class ConnectionMonitor:
252
def __init__(self, dsn):
253
self.dsn = dsn
254
self.connection = None
255
self.reconnect_attempts = 0
256
self.max_reconnect_attempts = 5
257
258
async def setup_connection(self):
259
"""Setup connection with termination monitoring."""
260
self.connection = await asyncpg.connect(self.dsn)
261
self.connection.add_termination_listener(self.on_connection_lost)
262
self.reconnect_attempts = 0
263
264
def on_connection_lost(self):
265
"""Handle connection loss and initiate reconnection."""
266
print("Connection lost, attempting to reconnect...")
267
asyncio.create_task(self.reconnect())
268
269
async def reconnect(self):
270
"""Attempt to re-establish connection."""
271
if self.reconnect_attempts >= self.max_reconnect_attempts:
272
print("Max reconnection attempts exceeded")
273
return
274
275
self.reconnect_attempts += 1
276
277
try:
278
await asyncio.sleep(2 ** self.reconnect_attempts) # Exponential backoff
279
await self.setup_connection()
280
print("Successfully reconnected to database")
281
282
except Exception as e:
283
print(f"Reconnection attempt {self.reconnect_attempts} failed: {e}")
284
asyncio.create_task(self.reconnect())
285
286
monitor = ConnectionMonitor('postgresql://user:pass@localhost/db')
287
await monitor.setup_connection()
288
```
289
290
### Query Logging
291
292
Monitor and log SQL query execution for debugging, performance analysis, and auditing.
293
294
```python { .api }
295
def add_query_logger(self, callback: callable) -> None:
296
"""
297
Add a logger that will be called when queries are executed.
298
299
Parameters:
300
callback: Function to call on query execution (query, args, duration)
301
"""
302
303
def remove_query_logger(self, callback: callable) -> None:
304
"""
305
Remove a query logger callback.
306
307
Parameters:
308
callback: Callback function to remove
309
"""
310
311
def query_logger(self, callback: callable):
312
"""
313
Context manager that temporarily adds a query logger.
314
315
Parameters:
316
callback: Function to call on query execution
317
318
Returns:
319
Context manager
320
"""
321
```
322
323
#### Example Usage
324
325
```python
326
import time
327
328
# Basic query logger
329
def simple_query_logger(query, args, duration):
330
"""Log all queries with execution time."""
331
print(f"Query executed in {duration:.3f}s: {query}")
332
if args:
333
print(f" Args: {args}")
334
335
# Add permanent query logger
336
conn.add_query_logger(simple_query_logger)
337
338
# Advanced query logger with filtering and metrics
339
class QueryAnalyzer:
340
def __init__(self):
341
self.total_queries = 0
342
self.slow_queries = []
343
self.query_stats = {}
344
345
def log_query(self, query, args, duration):
346
"""Log and analyze query performance."""
347
self.total_queries += 1
348
349
# Track slow queries (> 1 second)
350
if duration > 1.0:
351
self.slow_queries.append({
352
'query': query,
353
'args': args,
354
'duration': duration,
355
'timestamp': time.time()
356
})
357
print(f"SLOW QUERY ({duration:.3f}s): {query}")
358
359
# Query statistics
360
query_type = query.strip().upper().split()[0]
361
if query_type not in self.query_stats:
362
self.query_stats[query_type] = {'count': 0, 'total_time': 0}
363
364
self.query_stats[query_type]['count'] += 1
365
self.query_stats[query_type]['total_time'] += duration
366
367
def get_stats(self):
368
"""Get query execution statistics."""
369
stats = {
370
'total_queries': self.total_queries,
371
'slow_queries_count': len(self.slow_queries),
372
'by_type': {}
373
}
374
375
for query_type, data in self.query_stats.items():
376
stats['by_type'][query_type] = {
377
'count': data['count'],
378
'avg_duration': data['total_time'] / data['count'],
379
'total_time': data['total_time']
380
}
381
382
return stats
383
384
analyzer = QueryAnalyzer()
385
conn.add_query_logger(analyzer.log_query)
386
387
# Temporary query logging with context manager
388
def debug_query_logger(query, args, duration):
389
"""Detailed query logger for debugging."""
390
print(f"DEBUG - Query: {query}")
391
print(f"DEBUG - Args: {args}")
392
print(f"DEBUG - Duration: {duration:.6f}s")
393
print("DEBUG - " + "-" * 50)
394
395
# Use temporarily
396
with conn.query_logger(debug_query_logger):
397
# All queries in this block will be logged
398
users = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
399
await conn.execute("UPDATE users SET last_seen = $1 WHERE id = $2",
400
datetime.now(), user_id)
401
```
402
403
### Real-time Event Processing
404
405
Combine listeners for comprehensive real-time event processing systems.
406
407
```python
408
class EventProcessor:
409
"""Comprehensive event processing system."""
410
411
def __init__(self, connection):
412
self.conn = connection
413
self.event_queue = asyncio.Queue()
414
self.processing_task = None
415
416
async def setup(self):
417
"""Setup all listeners and start processing."""
418
419
# Database notifications
420
await self.conn.add_listener('app_events', self.handle_notification)
421
422
# Query logging for audit trail
423
self.conn.add_query_logger(self.log_query)
424
425
# Connection monitoring
426
self.conn.add_termination_listener(self.handle_connection_loss)
427
428
# Start event processing task
429
self.processing_task = asyncio.create_task(self.process_events())
430
431
async def handle_notification(self, channel, payload):
432
"""Handle PostgreSQL notifications."""
433
event = {
434
'type': 'notification',
435
'channel': channel,
436
'payload': json.loads(payload),
437
'timestamp': time.time()
438
}
439
await self.event_queue.put(event)
440
441
def log_query(self, query, args, duration):
442
"""Log queries for audit trail."""
443
if duration > 0.5: # Only log slow queries
444
event = {
445
'type': 'slow_query',
446
'query': query,
447
'args': args,
448
'duration': duration,
449
'timestamp': time.time()
450
}
451
asyncio.create_task(self.event_queue.put(event))
452
453
def handle_connection_loss(self):
454
"""Handle connection termination."""
455
event = {
456
'type': 'connection_lost',
457
'timestamp': time.time()
458
}
459
asyncio.create_task(self.event_queue.put(event))
460
461
async def process_events(self):
462
"""Process events from the queue."""
463
while True:
464
try:
465
event = await self.event_queue.get()
466
await self.dispatch_event(event)
467
self.event_queue.task_done()
468
except Exception as e:
469
print(f"Error processing event: {e}")
470
471
async def dispatch_event(self, event):
472
"""Dispatch events to appropriate handlers."""
473
event_type = event['type']
474
475
if event_type == 'notification':
476
await self.process_notification(event)
477
elif event_type == 'slow_query':
478
await self.process_slow_query(event)
479
elif event_type == 'connection_lost':
480
await self.process_connection_loss(event)
481
482
async def process_notification(self, event):
483
"""Process database notifications."""
484
payload = event['payload']
485
486
if payload.get('entity') == 'order':
487
await self.handle_order_event(payload)
488
elif payload.get('entity') == 'user':
489
await self.handle_user_event(payload)
490
491
async def handle_order_event(self, payload):
492
"""Handle order-related events."""
493
action = payload.get('action')
494
order_id = payload.get('order_id')
495
496
if action == 'created':
497
print(f"Processing new order: {order_id}")
498
# Send confirmation email, update inventory, etc.
499
elif action == 'cancelled':
500
print(f"Processing order cancellation: {order_id}")
501
# Refund payment, restore inventory, etc.
502
503
# Usage
504
processor = EventProcessor(conn)
505
await processor.setup()
506
507
# Keep processing events
508
try:
509
await asyncio.sleep(float('inf')) # Run forever
510
except KeyboardInterrupt:
511
print("Shutting down event processor...")
512
if processor.processing_task:
513
processor.processing_task.cancel()
514
```
515
516
## Types
517
518
```python { .api }
519
# Callback type signatures
520
NotificationCallback = typing.Callable[[str, str], typing.Awaitable[None]]
521
LogMessageCallback = typing.Callable[[typing.Any], None]
522
TerminationCallback = typing.Callable[[], None]
523
QueryLoggerCallback = typing.Callable[[str, typing.Tuple, float], None]
524
525
# Log message attributes
526
class PostgresLogMessage:
527
"""PostgreSQL server log message."""
528
severity: str # Message severity
529
message: str # Primary message text
530
detail: str # Additional details
531
hint: str # Suggested action
532
position: str # Error position (if applicable)
533
context: str # Error context
534
sqlstate: str # SQLSTATE code (if applicable)
535
```