0
# Notifications
1
2
PostgreSQL LISTEN/NOTIFY support for real-time messaging and pub/sub patterns between database clients.
3
4
## Capabilities
5
6
### Channel Listening
7
8
Subscribe to PostgreSQL notification channels to receive real-time messages from other database clients.
9
10
```javascript { .api }
11
/**
12
* Listen for notifications on a channel
13
* @param channel - Channel name to listen on
14
* @param fn - Callback function to handle notifications
15
* @param onlisten - Optional callback when listening starts
16
* @returns ListenRequest object for managing the subscription
17
*/
18
listen(
19
channel: string,
20
fn: (payload: string) => void,
21
onlisten?: () => void
22
): ListenRequest;
23
```
24
25
**Usage Examples:**
26
27
```javascript
28
// Basic channel listening
29
const listener = sql.listen('user_updates', (payload) => {
30
console.log('User update received:', payload);
31
const data = JSON.parse(payload);
32
handleUserUpdate(data);
33
});
34
35
// Listen with confirmation callback
36
sql.listen('order_status',
37
(payload) => {
38
const order = JSON.parse(payload);
39
updateOrderDisplay(order);
40
},
41
() => {
42
console.log('Now listening for order status updates');
43
}
44
);
45
46
// Multiple channel listeners
47
sql.listen('chat_messages', handleChatMessage);
48
sql.listen('system_alerts', handleSystemAlert);
49
sql.listen('user_presence', handlePresenceUpdate);
50
```
51
52
### Sending Notifications
53
54
Send notifications to channels that other clients can receive.
55
56
```javascript { .api }
57
/**
58
* Send notification to a channel
59
* @param channel - Channel name to send to
60
* @param payload - Message payload (string)
61
* @returns Promise resolving when notification is sent
62
*/
63
notify(channel: string, payload: string): Promise<void>;
64
```
65
66
**Usage Examples:**
67
68
```javascript
69
// Send simple notification
70
await sql.notify('user_updates', 'User profile changed');
71
72
// Send structured data as JSON
73
const userUpdate = {
74
userId: 123,
75
action: 'profile_update',
76
timestamp: Date.now()
77
};
78
await sql.notify('user_updates', JSON.stringify(userUpdate));
79
80
// Notify about database changes
81
await sql.notify('inventory_change', JSON.stringify({
82
productId: product.id,
83
oldQuantity: oldQty,
84
newQuantity: newQty,
85
operation: 'sale'
86
}));
87
```
88
89
## ListenRequest Interface
90
91
Manage active notification subscriptions with control methods.
92
93
```javascript { .api }
94
interface ListenRequest {
95
/** Channel being listened to */
96
channel: string;
97
98
/** Stop listening to this channel */
99
unlisten(): Promise<void>;
100
}
101
```
102
103
**Usage Examples:**
104
105
```javascript
106
// Store listener reference for later cleanup
107
const orderListener = sql.listen('orders', handleOrder);
108
109
// Stop listening when component unmounts
110
componentWillUnmount() {
111
orderListener.unlisten();
112
}
113
114
// Conditional listening
115
let priceListener = null;
116
if (user.subscribedToPriceUpdates) {
117
priceListener = sql.listen('price_changes', handlePriceChange);
118
}
119
120
// Clean up conditional listener
121
if (priceListener) {
122
await priceListener.unlisten();
123
}
124
```
125
126
## Advanced Notification Patterns
127
128
### Event-Driven Database Changes
129
130
Use database triggers to automatically send notifications when data changes.
131
132
```sql
133
-- Create trigger function to send notifications
134
CREATE OR REPLACE FUNCTION notify_user_changes()
135
RETURNS TRIGGER AS $$
136
BEGIN
137
IF TG_OP = 'INSERT' THEN
138
PERFORM pg_notify('user_changes', json_build_object(
139
'operation', 'insert',
140
'user_id', NEW.id,
141
'data', row_to_json(NEW)
142
)::text);
143
ELSIF TG_OP = 'UPDATE' THEN
144
PERFORM pg_notify('user_changes', json_build_object(
145
'operation', 'update',
146
'user_id', NEW.id,
147
'old_data', row_to_json(OLD),
148
'new_data', row_to_json(NEW)
149
)::text);
150
ELSIF TG_OP = 'DELETE' THEN
151
PERFORM pg_notify('user_changes', json_build_object(
152
'operation', 'delete',
153
'user_id', OLD.id,
154
'data', row_to_json(OLD)
155
)::text);
156
END IF;
157
RETURN NULL;
158
END;
159
$$ LANGUAGE plpgsql;
160
161
-- Create triggers
162
CREATE TRIGGER user_changes_trigger
163
AFTER INSERT OR UPDATE OR DELETE ON users
164
FOR EACH ROW EXECUTE FUNCTION notify_user_changes();
165
```
166
167
**Usage Examples:**
168
169
```javascript
170
// Listen for database-driven notifications
171
sql.listen('user_changes', (payload) => {
172
const change = JSON.parse(payload);
173
174
switch (change.operation) {
175
case 'insert':
176
console.log('New user created:', change.data);
177
addUserToCache(change.data);
178
break;
179
180
case 'update':
181
console.log('User updated:', change.user_id);
182
updateUserInCache(change.new_data);
183
break;
184
185
case 'delete':
186
console.log('User deleted:', change.user_id);
187
removeUserFromCache(change.user_id);
188
break;
189
}
190
});
191
192
// The triggers will automatically send notifications for any changes:
193
await sql`INSERT INTO users (name, email) VALUES (${name}, ${email})`;
194
// Automatically triggers notification
195
196
await sql`UPDATE users SET email = ${newEmail} WHERE id = ${userId}`;
197
// Automatically triggers notification
198
199
await sql`DELETE FROM users WHERE id = ${userId}`;
200
// Automatically triggers notification
201
```
202
203
### Real-time Application Features
204
205
Implement real-time features using PostgreSQL notifications.
206
207
```javascript
208
// Real-time chat system
209
class ChatRoom {
210
constructor(roomId) {
211
this.roomId = roomId;
212
this.setupListeners();
213
}
214
215
setupListeners() {
216
// Listen for new messages in this room
217
this.messageListener = sql.listen(`chat_${this.roomId}`, (payload) => {
218
const message = JSON.parse(payload);
219
this.displayMessage(message);
220
});
221
222
// Listen for user presence updates
223
this.presenceListener = sql.listen(`presence_${this.roomId}`, (payload) => {
224
const presence = JSON.parse(payload);
225
this.updateUserPresence(presence);
226
});
227
}
228
229
async sendMessage(userId, text) {
230
// Store message in database
231
await sql`
232
INSERT INTO messages (room_id, user_id, text, created_at)
233
VALUES (${this.roomId}, ${userId}, ${text}, NOW())
234
`;
235
236
// Notify other clients
237
await sql.notify(`chat_${this.roomId}`, JSON.stringify({
238
userId,
239
text,
240
timestamp: Date.now()
241
}));
242
}
243
244
async updatePresence(userId, status) {
245
await sql.notify(`presence_${this.roomId}`, JSON.stringify({
246
userId,
247
status,
248
timestamp: Date.now()
249
}));
250
}
251
252
cleanup() {
253
this.messageListener.unlisten();
254
this.presenceListener.unlisten();
255
}
256
}
257
258
// Usage
259
const chatRoom = new ChatRoom(123);
260
await chatRoom.sendMessage(456, 'Hello everyone!');
261
```
262
263
### Notification Routing
264
265
Create sophisticated routing systems for different types of notifications.
266
267
```javascript
268
class NotificationRouter {
269
constructor() {
270
this.handlers = new Map();
271
this.setupGlobalListener();
272
}
273
274
setupGlobalListener() {
275
// Single listener for all notifications with routing
276
sql.listen('app_notifications', (payload) => {
277
try {
278
const notification = JSON.parse(payload);
279
this.routeNotification(notification);
280
} catch (error) {
281
console.error('Invalid notification payload:', payload);
282
}
283
});
284
}
285
286
routeNotification(notification) {
287
const { type, target, data } = notification;
288
const key = `${type}:${target}`;
289
290
if (this.handlers.has(key)) {
291
this.handlers.get(key).forEach(handler => {
292
handler(data);
293
});
294
}
295
}
296
297
subscribe(type, target, handler) {
298
const key = `${type}:${target}`;
299
if (!this.handlers.has(key)) {
300
this.handlers.set(key, []);
301
}
302
this.handlers.get(key).push(handler);
303
}
304
305
async publish(type, target, data) {
306
await sql.notify('app_notifications', JSON.stringify({
307
type,
308
target,
309
data,
310
timestamp: Date.now()
311
}));
312
}
313
}
314
315
// Usage
316
const router = new NotificationRouter();
317
318
// Subscribe to specific notification types
319
router.subscribe('order', 'status_change', (data) => {
320
updateOrderStatus(data.orderId, data.status);
321
});
322
323
router.subscribe('inventory', 'low_stock', (data) => {
324
showLowStockAlert(data.productId, data.quantity);
325
});
326
327
router.subscribe('user', 'login', (data) => {
328
logUserActivity(data.userId, 'login');
329
});
330
331
// Publish notifications
332
await router.publish('order', 'status_change', {
333
orderId: 12345,
334
status: 'shipped'
335
});
336
337
await router.publish('inventory', 'low_stock', {
338
productId: 67890,
339
quantity: 5
340
});
341
```
342
343
## Error Handling
344
345
Handle connection issues and notification delivery errors gracefully.
346
347
```javascript { .api }
348
// Configure notification error handling
349
const sql = postgres(connectionConfig, {
350
onnotify: (channel, payload) => {
351
console.log(`Notification on ${channel}:`, payload);
352
},
353
354
onclose: (connectionId) => {
355
console.log(`Connection ${connectionId} closed, notifications stopped`);
356
// Implement reconnection logic
357
}
358
});
359
```
360
361
**Usage Examples:**
362
363
```javascript
364
// Robust notification listener with error handling
365
class RobustListener {
366
constructor(channel, handler) {
367
this.channel = channel;
368
this.handler = handler;
369
this.listener = null;
370
this.reconnectAttempts = 0;
371
this.maxReconnectAttempts = 5;
372
373
this.connect();
374
}
375
376
connect() {
377
try {
378
this.listener = sql.listen(this.channel,
379
(payload) => {
380
this.reconnectAttempts = 0; // Reset on successful message
381
this.handler(payload);
382
},
383
() => {
384
console.log(`Connected to channel: ${this.channel}`);
385
this.reconnectAttempts = 0;
386
}
387
);
388
} catch (error) {
389
console.error(`Failed to connect to ${this.channel}:`, error);
390
this.scheduleReconnect();
391
}
392
}
393
394
scheduleReconnect() {
395
if (this.reconnectAttempts < this.maxReconnectAttempts) {
396
this.reconnectAttempts++;
397
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
398
399
setTimeout(() => {
400
console.log(`Reconnecting to ${this.channel} (attempt ${this.reconnectAttempts})`);
401
this.connect();
402
}, delay);
403
} else {
404
console.error(`Max reconnection attempts reached for ${this.channel}`);
405
}
406
}
407
408
async stop() {
409
if (this.listener) {
410
await this.listener.unlisten();
411
}
412
}
413
}
414
415
// Usage
416
const robustListener = new RobustListener('critical_alerts', (payload) => {
417
handleCriticalAlert(JSON.parse(payload));
418
});
419
```
420
421
## Performance Considerations
422
423
### Connection Management
424
425
Optimize notification performance by managing connections efficiently.
426
427
```javascript
428
// Use dedicated connection for notifications
429
const notificationSql = postgres(connectionConfig, {
430
max: 1, // Single connection for notifications
431
idle_timeout: 0, // Keep connection alive
432
});
433
434
// Use separate connection pool for queries
435
const querySql = postgres(connectionConfig, {
436
max: 10, // Connection pool for regular queries
437
});
438
439
// Setup notifications on dedicated connection
440
notificationSql.listen('events', handleEvent);
441
442
// Use query connection for database operations
443
const users = await querySql`SELECT * FROM users`;
444
```
445
446
### Payload Optimization
447
448
Optimize notification payloads for better performance.
449
450
```javascript
451
// Efficient payload design
452
await sql.notify('user_update', JSON.stringify({
453
id: user.id, // Minimal identifier
454
type: 'profile', // Change type
455
timestamp: Date.now() // When it happened
456
}));
457
458
// Avoid large payloads - use identifiers instead
459
await sql.notify('large_data_update', JSON.stringify({
460
table: 'documents',
461
id: document.id,
462
// Don't include full document content
463
}));
464
465
// Handler fetches details as needed
466
sql.listen('large_data_update', async (payload) => {
467
const { table, id } = JSON.parse(payload);
468
const fullData = await sql`SELECT * FROM ${sql(table)} WHERE id = ${id}`;
469
processUpdate(fullData[0]);
470
});
471
```