or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-management.mdconnection-pooling.mdcopy-operations.mdcursor-operations.mdexception-handling.mdindex.mdlisteners-notifications.mdprepared-statements.mdquery-execution.mdtransaction-management.mdtype-system.md

listeners-notifications.mddocs/

0

# Listeners and Notifications

1

2

Support for PostgreSQL's LISTEN/NOTIFY functionality, server log message handling, query logging, and connection termination callbacks for real-time event processing and monitoring.

3

4

## Capabilities

5

6

### PostgreSQL LISTEN/NOTIFY

7

8

Asynchronous messaging system for real-time notifications between database sessions.

9

10

```python { .api }

11

async def add_listener(self, channel: str, callback: callable) -> None:

12

"""

13

Add a listener for PostgreSQL notifications on the specified channel.

14

15

Parameters:

16

channel: Channel name to listen on

17

callback: Function to call when notification received (channel, payload)

18

"""

19

20

async def remove_listener(self, channel: str, callback: callable) -> None:

21

"""

22

Remove a notification listener from the specified channel.

23

24

Parameters:

25

channel: Channel name

26

callback: Callback function to remove

27

"""

28

```

29

30

#### Example Usage

31

32

```python

33

import asyncio

34

35

# Notification callback

36

async def order_notification_handler(channel, payload):

37

"""Handle new order notifications."""

38

print(f"Received notification on {channel}: {payload}")

39

40

# Parse payload (typically JSON)

41

import json

42

data = json.loads(payload)

43

44

if data.get('action') == 'new_order':

45

print(f"New order #{data['order_id']} from customer {data['customer_id']}")

46

# Process new order...

47

elif data.get('action') == 'order_updated':

48

print(f"Order #{data['order_id']} status changed to {data['status']}")

49

# Update order status...

50

51

# Set up listener

52

await conn.add_listener('order_events', order_notification_handler)

53

54

# PostgreSQL trigger that sends notifications

55

await conn.execute("""

56

CREATE OR REPLACE FUNCTION notify_order_events()

57

RETURNS trigger AS $$

58

BEGIN

59

PERFORM pg_notify(

60

'order_events',

61

json_build_object(

62

'action', TG_OP,

63

'order_id', NEW.id,

64

'customer_id', NEW.customer_id,

65

'status', NEW.status,

66

'timestamp', extract(epoch from now())

67

)::text

68

);

69

RETURN NEW;

70

END;

71

$$ LANGUAGE plpgsql;

72

73

CREATE TRIGGER order_events_trigger

74

AFTER INSERT OR UPDATE ON orders

75

FOR EACH ROW EXECUTE FUNCTION notify_order_events();

76

""")

77

78

# Keep connection alive to receive notifications

79

try:

80

while True:

81

await asyncio.sleep(1)

82

except KeyboardInterrupt:

83

await conn.remove_listener('order_events', order_notification_handler)

84

```

85

86

#### Multiple Listeners and Channels

87

88

```python

89

# Multiple handlers for same channel

90

async def email_notification_handler(channel, payload):

91

"""Send email notifications."""

92

data = json.loads(payload)

93

await send_email_notification(data)

94

95

async def sms_notification_handler(channel, payload):

96

"""Send SMS notifications."""

97

data = json.loads(payload)

98

if data.get('priority') == 'high':

99

await send_sms_notification(data)

100

101

# Add multiple listeners to same channel

102

await conn.add_listener('user_events', email_notification_handler)

103

await conn.add_listener('user_events', sms_notification_handler)

104

105

# Listen on multiple channels

106

channels = ['order_events', 'user_events', 'inventory_events', 'payment_events']

107

for channel in channels:

108

await conn.add_listener(channel, general_event_handler)

109

110

# Event dispatcher pattern

111

event_handlers = {

112

'order_events': [process_order_event, log_order_event],

113

'user_events': [update_user_cache, send_welcome_email],

114

'inventory_events': [update_stock_levels, reorder_check]

115

}

116

117

async def dispatch_event(channel, payload):

118

"""Dispatch events to registered handlers."""

119

handlers = event_handlers.get(channel, [])

120

for handler in handlers:

121

try:

122

await handler(json.loads(payload))

123

except Exception as e:

124

print(f"Error in handler {handler.__name__}: {e}")

125

126

for channel in event_handlers:

127

await conn.add_listener(channel, dispatch_event)

128

```

129

130

### Server Log Message Listeners

131

132

Monitor PostgreSQL server log messages for debugging, monitoring, and alerting.

133

134

```python { .api }

135

def add_log_listener(self, callback: callable) -> None:

136

"""

137

Add a listener for PostgreSQL log messages.

138

139

Parameters:

140

callback: Function to call when log message received (log_message)

141

"""

142

143

def remove_log_listener(self, callback: callable) -> None:

144

"""

145

Remove a log message listener.

146

147

Parameters:

148

callback: Callback function to remove

149

"""

150

```

151

152

#### Example Usage

153

154

```python

155

# Log message handler

156

def log_message_handler(log_message):

157

"""Handle PostgreSQL log messages."""

158

print(f"PostgreSQL Log [{log_message.severity}]: {log_message.message}")

159

160

# Access log message details

161

if hasattr(log_message, 'detail') and log_message.detail:

162

print(f"Detail: {log_message.detail}")

163

164

if hasattr(log_message, 'hint') and log_message.hint:

165

print(f"Hint: {log_message.hint}")

166

167

# Alert on warnings and errors

168

if log_message.severity in ['WARNING', 'ERROR', 'FATAL']:

169

send_alert(f"PostgreSQL {log_message.severity}: {log_message.message}")

170

171

# Add log listener

172

conn.add_log_listener(log_message_handler)

173

174

# Advanced log processing

175

class LogProcessor:

176

def __init__(self):

177

self.error_count = 0

178

self.warning_count = 0

179

180

def process_log(self, log_message):

181

"""Process and categorize log messages."""

182

183

if log_message.severity == 'ERROR':

184

self.error_count += 1

185

self.handle_error(log_message)

186

elif log_message.severity == 'WARNING':

187

self.warning_count += 1

188

self.handle_warning(log_message)

189

elif log_message.severity == 'NOTICE':

190

self.handle_notice(log_message)

191

192

def handle_error(self, log_message):

193

"""Handle error log messages."""

194

print(f"ERROR: {log_message.message}")

195

# Send to error tracking system

196

197

def handle_warning(self, log_message):

198

"""Handle warning log messages."""

199

print(f"WARNING: {log_message.message}")

200

# Log to monitoring system

201

202

def handle_notice(self, log_message):

203

"""Handle notice log messages."""

204

print(f"NOTICE: {log_message.message}")

205

206

processor = LogProcessor()

207

conn.add_log_listener(processor.process_log)

208

```

209

210

### Connection Termination Listeners

211

212

Monitor connection lifecycle events and handle cleanup operations.

213

214

```python { .api }

215

def add_termination_listener(self, callback: callable) -> None:

216

"""

217

Add a listener that will be called when the connection is terminated.

218

219

Parameters:

220

callback: Function to call on connection termination

221

"""

222

223

def remove_termination_listener(self, callback: callable) -> None:

224

"""

225

Remove a connection termination listener.

226

227

Parameters:

228

callback: Callback function to remove

229

"""

230

```

231

232

#### Example Usage

233

234

```python

235

# Termination handler

236

def connection_terminated_handler():

237

"""Handle connection termination."""

238

print("Database connection terminated")

239

240

# Cleanup operations

241

cleanup_resources()

242

notify_monitoring_system("db_connection_lost")

243

244

# Attempt reconnection

245

asyncio.create_task(reconnect_database())

246

247

# Add termination listener

248

conn.add_termination_listener(connection_terminated_handler)

249

250

# Connection monitor with reconnection

251

class ConnectionMonitor:

252

def __init__(self, dsn):

253

self.dsn = dsn

254

self.connection = None

255

self.reconnect_attempts = 0

256

self.max_reconnect_attempts = 5

257

258

async def setup_connection(self):

259

"""Setup connection with termination monitoring."""

260

self.connection = await asyncpg.connect(self.dsn)

261

self.connection.add_termination_listener(self.on_connection_lost)

262

self.reconnect_attempts = 0

263

264

def on_connection_lost(self):

265

"""Handle connection loss and initiate reconnection."""

266

print("Connection lost, attempting to reconnect...")

267

asyncio.create_task(self.reconnect())

268

269

async def reconnect(self):

270

"""Attempt to re-establish connection."""

271

if self.reconnect_attempts >= self.max_reconnect_attempts:

272

print("Max reconnection attempts exceeded")

273

return

274

275

self.reconnect_attempts += 1

276

277

try:

278

await asyncio.sleep(2 ** self.reconnect_attempts) # Exponential backoff

279

await self.setup_connection()

280

print("Successfully reconnected to database")

281

282

except Exception as e:

283

print(f"Reconnection attempt {self.reconnect_attempts} failed: {e}")

284

asyncio.create_task(self.reconnect())

285

286

monitor = ConnectionMonitor('postgresql://user:pass@localhost/db')

287

await monitor.setup_connection()

288

```

289

290

### Query Logging

291

292

Monitor and log SQL query execution for debugging, performance analysis, and auditing.

293

294

```python { .api }

295

def add_query_logger(self, callback: callable) -> None:

296

"""

297

Add a logger that will be called when queries are executed.

298

299

Parameters:

300

callback: Function to call on query execution (query, args, duration)

301

"""

302

303

def remove_query_logger(self, callback: callable) -> None:

304

"""

305

Remove a query logger callback.

306

307

Parameters:

308

callback: Callback function to remove

309

"""

310

311

def query_logger(self, callback: callable):

312

"""

313

Context manager that temporarily adds a query logger.

314

315

Parameters:

316

callback: Function to call on query execution

317

318

Returns:

319

Context manager

320

"""

321

```

322

323

#### Example Usage

324

325

```python

326

import time

327

328

# Basic query logger

329

def simple_query_logger(query, args, duration):

330

"""Log all queries with execution time."""

331

print(f"Query executed in {duration:.3f}s: {query}")

332

if args:

333

print(f" Args: {args}")

334

335

# Add permanent query logger

336

conn.add_query_logger(simple_query_logger)

337

338

# Advanced query logger with filtering and metrics

339

class QueryAnalyzer:

340

def __init__(self):

341

self.total_queries = 0

342

self.slow_queries = []

343

self.query_stats = {}

344

345

def log_query(self, query, args, duration):

346

"""Log and analyze query performance."""

347

self.total_queries += 1

348

349

# Track slow queries (> 1 second)

350

if duration > 1.0:

351

self.slow_queries.append({

352

'query': query,

353

'args': args,

354

'duration': duration,

355

'timestamp': time.time()

356

})

357

print(f"SLOW QUERY ({duration:.3f}s): {query}")

358

359

# Query statistics

360

query_type = query.strip().upper().split()[0]

361

if query_type not in self.query_stats:

362

self.query_stats[query_type] = {'count': 0, 'total_time': 0}

363

364

self.query_stats[query_type]['count'] += 1

365

self.query_stats[query_type]['total_time'] += duration

366

367

def get_stats(self):

368

"""Get query execution statistics."""

369

stats = {

370

'total_queries': self.total_queries,

371

'slow_queries_count': len(self.slow_queries),

372

'by_type': {}

373

}

374

375

for query_type, data in self.query_stats.items():

376

stats['by_type'][query_type] = {

377

'count': data['count'],

378

'avg_duration': data['total_time'] / data['count'],

379

'total_time': data['total_time']

380

}

381

382

return stats

383

384

analyzer = QueryAnalyzer()

385

conn.add_query_logger(analyzer.log_query)

386

387

# Temporary query logging with context manager

388

def debug_query_logger(query, args, duration):

389

"""Detailed query logger for debugging."""

390

print(f"DEBUG - Query: {query}")

391

print(f"DEBUG - Args: {args}")

392

print(f"DEBUG - Duration: {duration:.6f}s")

393

print("DEBUG - " + "-" * 50)

394

395

# Use temporarily

396

with conn.query_logger(debug_query_logger):

397

# All queries in this block will be logged

398

users = await conn.fetch("SELECT * FROM users WHERE active = $1", True)

399

await conn.execute("UPDATE users SET last_seen = $1 WHERE id = $2",

400

datetime.now(), user_id)

401

```

402

403

### Real-time Event Processing

404

405

Combine listeners for comprehensive real-time event processing systems.

406

407

```python

408

class EventProcessor:

409

"""Comprehensive event processing system."""

410

411

def __init__(self, connection):

412

self.conn = connection

413

self.event_queue = asyncio.Queue()

414

self.processing_task = None

415

416

async def setup(self):

417

"""Setup all listeners and start processing."""

418

419

# Database notifications

420

await self.conn.add_listener('app_events', self.handle_notification)

421

422

# Query logging for audit trail

423

self.conn.add_query_logger(self.log_query)

424

425

# Connection monitoring

426

self.conn.add_termination_listener(self.handle_connection_loss)

427

428

# Start event processing task

429

self.processing_task = asyncio.create_task(self.process_events())

430

431

async def handle_notification(self, channel, payload):

432

"""Handle PostgreSQL notifications."""

433

event = {

434

'type': 'notification',

435

'channel': channel,

436

'payload': json.loads(payload),

437

'timestamp': time.time()

438

}

439

await self.event_queue.put(event)

440

441

def log_query(self, query, args, duration):

442

"""Log queries for audit trail."""

443

if duration > 0.5: # Only log slow queries

444

event = {

445

'type': 'slow_query',

446

'query': query,

447

'args': args,

448

'duration': duration,

449

'timestamp': time.time()

450

}

451

asyncio.create_task(self.event_queue.put(event))

452

453

def handle_connection_loss(self):

454

"""Handle connection termination."""

455

event = {

456

'type': 'connection_lost',

457

'timestamp': time.time()

458

}

459

asyncio.create_task(self.event_queue.put(event))

460

461

async def process_events(self):

462

"""Process events from the queue."""

463

while True:

464

try:

465

event = await self.event_queue.get()

466

await self.dispatch_event(event)

467

self.event_queue.task_done()

468

except Exception as e:

469

print(f"Error processing event: {e}")

470

471

async def dispatch_event(self, event):

472

"""Dispatch events to appropriate handlers."""

473

event_type = event['type']

474

475

if event_type == 'notification':

476

await self.process_notification(event)

477

elif event_type == 'slow_query':

478

await self.process_slow_query(event)

479

elif event_type == 'connection_lost':

480

await self.process_connection_loss(event)

481

482

async def process_notification(self, event):

483

"""Process database notifications."""

484

payload = event['payload']

485

486

if payload.get('entity') == 'order':

487

await self.handle_order_event(payload)

488

elif payload.get('entity') == 'user':

489

await self.handle_user_event(payload)

490

491

async def handle_order_event(self, payload):

492

"""Handle order-related events."""

493

action = payload.get('action')

494

order_id = payload.get('order_id')

495

496

if action == 'created':

497

print(f"Processing new order: {order_id}")

498

# Send confirmation email, update inventory, etc.

499

elif action == 'cancelled':

500

print(f"Processing order cancellation: {order_id}")

501

# Refund payment, restore inventory, etc.

502

503

# Usage

504

processor = EventProcessor(conn)

505

await processor.setup()

506

507

# Keep processing events

508

try:

509

await asyncio.sleep(float('inf')) # Run forever

510

except KeyboardInterrupt:

511

print("Shutting down event processor...")

512

if processor.processing_task:

513

processor.processing_task.cancel()

514

```

515

516

## Types

517

518

```python { .api }

519

# Callback type signatures

520

NotificationCallback = typing.Callable[[str, str], typing.Awaitable[None]]

521

LogMessageCallback = typing.Callable[[typing.Any], None]

522

TerminationCallback = typing.Callable[[], None]

523

QueryLoggerCallback = typing.Callable[[str, typing.Tuple, float], None]

524

525

# Log message attributes

526

class PostgresLogMessage:

527

"""PostgreSQL server log message."""

528

severity: str # Message severity

529

message: str # Primary message text

530

detail: str # Additional details

531

hint: str # Suggested action

532

position: str # Error position (if applicable)

533

context: str # Error context

534

sqlstate: str # SQLSTATE code (if applicable)

535

```