0
# Replication
1
2
Logical replication functionality for real-time data streaming and change data capture from PostgreSQL databases.
3
4
## Capabilities
5
6
### Subscription Management
7
8
Subscribe to logical replication events to receive real-time data changes from PostgreSQL.
9
10
```javascript { .api }
11
/**
12
* Subscribe to logical replication events
13
* @param event - Event pattern to subscribe to (table.operation or wildcard)
14
* @param callback - Function called for each replication event
15
* @param onsubscribe - Optional callback when subscription is established
16
* @param onerror - Optional error handler
17
* @returns Promise resolving to subscription handle
18
*/
19
subscribe(
20
event: string,
21
callback: (row: Row | null, info: ReplicationEvent) => void,
22
onsubscribe?: () => void,
23
onerror?: (error: Error) => void
24
): Promise<SubscriptionHandle>;
25
```
26
27
**Usage Examples:**
28
29
```javascript
30
// Subscribe to all changes on a specific table
31
const subscription = await sql.subscribe(
32
'users',
33
(row, info) => {
34
console.log(`User ${info.command}:`, row);
35
handleUserChange(row, info);
36
},
37
() => console.log('Subscribed to user changes'),
38
(error) => console.error('Subscription error:', error)
39
);
40
41
// Subscribe to specific operations
42
await sql.subscribe('products.insert', (row, info) => {
43
console.log('New product added:', row);
44
invalidateProductCache();
45
});
46
47
await sql.subscribe('orders.update', (row, info) => {
48
console.log('Order updated:', row);
49
updateOrderDisplay(row);
50
});
51
52
// Subscribe to all events (wildcard)
53
await sql.subscribe('*', (row, info) => {
54
console.log(`${info.relation.schema}.${info.relation.table} ${info.command}`);
55
logDatabaseChange(info);
56
});
57
```
58
59
## Event Patterns
60
61
### Table-Specific Subscriptions
62
63
Subscribe to changes on specific tables with optional operation filtering.
64
65
```javascript { .api }
66
// Pattern formats:
67
// 'table' - All operations on table
68
// 'table.insert' - Only INSERT operations
69
// 'table.update' - Only UPDATE operations
70
// 'table.delete' - Only DELETE operations
71
// 'schema.table' - Fully qualified table name
72
// '*' - All tables and operations
73
```
74
75
**Usage Examples:**
76
77
```javascript
78
// Monitor user registration
79
await sql.subscribe('users.insert', (user, info) => {
80
sendWelcomeEmail(user.email);
81
updateUserCount();
82
});
83
84
// Track order status changes
85
await sql.subscribe('orders.update', (order, info) => {
86
if (info.old && info.old.status !== order.status) {
87
notifyStatusChange(order.id, order.status);
88
}
89
});
90
91
// Monitor product deletions
92
await sql.subscribe('products.delete', (product, info) => {
93
removeFromSearchIndex(product.id);
94
clearProductCache(product.id);
95
});
96
97
// Cross-schema monitoring
98
await sql.subscribe('inventory.stock_levels', (stock, info) => {
99
if (stock.quantity < stock.reorder_point) {
100
createReorderAlert(stock.product_id);
101
}
102
});
103
```
104
105
### Advanced Pattern Matching
106
107
Use sophisticated patterns to filter replication events.
108
109
```javascript
110
// Subscribe with key-based filtering
111
await sql.subscribe('users.update=id', (user, info) => {
112
// Only updates where primary key changed (rare but possible)
113
handleUserIdChange(user, info.old);
114
});
115
116
// Schema-specific subscriptions
117
await sql.subscribe('public.*', (row, info) => {
118
// All changes in public schema
119
auditPublicSchemaChange(row, info);
120
});
121
122
await sql.subscribe('audit.*', (row, info) => {
123
// All changes in audit schema
124
forwardToComplianceSystem(row, info);
125
});
126
```
127
128
## ReplicationEvent Interface
129
130
Comprehensive information about each replication event.
131
132
```javascript { .api }
133
interface ReplicationEvent {
134
/** Type of operation: 'insert', 'update', or 'delete' */
135
command: 'insert' | 'update' | 'delete';
136
137
/** Information about the table */
138
relation: RelationInfo;
139
140
/** Whether this change involves the primary key */
141
key?: boolean;
142
143
/** Previous row data for UPDATE and DELETE operations */
144
old?: Row | null;
145
}
146
147
interface RelationInfo {
148
/** Table OID */
149
oid: number;
150
151
/** Schema name */
152
schema: string;
153
154
/** Table name */
155
table: string;
156
157
/** Column information */
158
columns: ColumnInfo[];
159
}
160
161
interface ColumnInfo {
162
/** Column name */
163
name: string;
164
165
/** PostgreSQL type OID */
166
type: number;
167
168
/** Type modifier */
169
modifier: number;
170
171
/** Whether column is part of replica identity */
172
key: boolean;
173
}
174
```
175
176
**Usage Examples:**
177
178
```javascript
179
// Detailed event processing
180
await sql.subscribe('*', (row, info) => {
181
const { command, relation, key, old } = info;
182
183
console.log(`Operation: ${command}`);
184
console.log(`Table: ${relation.schema}.${relation.table}`);
185
console.log(`Key change: ${key || false}`);
186
187
switch (command) {
188
case 'insert':
189
console.log('New row:', row);
190
break;
191
192
case 'update':
193
console.log('New row:', row);
194
console.log('Old row:', old);
195
196
// Compare specific fields
197
const nameChanged = old?.name !== row?.name;
198
if (nameChanged) {
199
handleNameChange(row, old);
200
}
201
break;
202
203
case 'delete':
204
console.log('Deleted row:', old);
205
break;
206
}
207
});
208
```
209
210
## SubscriptionHandle Interface
211
212
Manage active replication subscriptions.
213
214
```javascript { .api }
215
interface SubscriptionHandle {
216
/** Stop the subscription */
217
unsubscribe(): Promise<void>;
218
219
/** Check if subscription is active */
220
active: boolean;
221
}
222
```
223
224
**Usage Examples:**
225
226
```javascript
227
// Store subscription references for cleanup
228
const subscriptions = [];
229
230
subscriptions.push(
231
await sql.subscribe('users', handleUserChange)
232
);
233
234
subscriptions.push(
235
await sql.subscribe('orders', handleOrderChange)
236
);
237
238
// Clean up all subscriptions
239
async function cleanup() {
240
for (const subscription of subscriptions) {
241
if (subscription.active) {
242
await subscription.unsubscribe();
243
}
244
}
245
subscriptions.length = 0;
246
}
247
248
// Handle process shutdown
249
process.on('SIGTERM', cleanup);
250
process.on('SIGINT', cleanup);
251
```
252
253
## Real-time Data Synchronization
254
255
### Cache Invalidation
256
257
Automatically invalidate caches when data changes.
258
259
```javascript
260
class SmartCache {
261
constructor() {
262
this.cache = new Map();
263
this.setupReplication();
264
}
265
266
async setupReplication() {
267
// Invalidate user cache on changes
268
await sql.subscribe('users', (user, info) => {
269
const userId = user?.id || info.old?.id;
270
this.cache.delete(`user:${userId}`);
271
console.log(`Invalidated cache for user ${userId}`);
272
});
273
274
// Invalidate product cache
275
await sql.subscribe('products', (product, info) => {
276
const productId = product?.id || info.old?.id;
277
this.cache.delete(`product:${productId}`);
278
279
// Also invalidate category cache
280
const categoryId = product?.category_id || info.old?.category_id;
281
this.cache.delete(`category:${categoryId}`);
282
});
283
}
284
285
async getUser(id) {
286
const key = `user:${id}`;
287
if (this.cache.has(key)) {
288
return this.cache.get(key);
289
}
290
291
const user = await sql`SELECT * FROM users WHERE id = ${id}`;
292
this.cache.set(key, user[0]);
293
return user[0];
294
}
295
}
296
297
const smartCache = new SmartCache();
298
```
299
300
### Event Sourcing
301
302
Implement event sourcing patterns using replication events.
303
304
```javascript
305
class EventStore {
306
constructor() {
307
this.events = [];
308
this.setupEventCapture();
309
}
310
311
async setupEventCapture() {
312
await sql.subscribe('*', (row, info) => {
313
const event = {
314
id: generateEventId(),
315
timestamp: new Date(),
316
aggregate: `${info.relation.schema}.${info.relation.table}`,
317
aggregateId: this.extractId(row, info),
318
eventType: info.command,
319
data: row,
320
previousData: info.old,
321
metadata: {
322
relation: info.relation,
323
keyChange: info.key
324
}
325
};
326
327
this.events.push(event);
328
this.processEvent(event);
329
});
330
}
331
332
extractId(row, info) {
333
// Extract ID from current or old row
334
return row?.id || info.old?.id;
335
}
336
337
processEvent(event) {
338
// Process event for projections, notifications, etc.
339
console.log(`Event: ${event.eventType} on ${event.aggregate}`);
340
341
// Update read models
342
this.updateProjections(event);
343
344
// Send notifications
345
this.notifySubscribers(event);
346
}
347
348
updateProjections(event) {
349
// Update materialized views or denormalized data
350
switch (event.aggregate) {
351
case 'public.orders':
352
this.updateOrderSummary(event);
353
break;
354
case 'public.users':
355
this.updateUserStats(event);
356
break;
357
}
358
}
359
}
360
361
const eventStore = new EventStore();
362
```
363
364
### Multi-Service Synchronization
365
366
Keep multiple services synchronized with database changes.
367
368
```javascript
369
class ServiceSynchronizer {
370
constructor() {
371
this.services = {
372
search: new SearchService(),
373
cache: new CacheService(),
374
analytics: new AnalyticsService()
375
};
376
377
this.setupSynchronization();
378
}
379
380
async setupSynchronization() {
381
// Synchronize user data across services
382
await sql.subscribe('users', async (user, info) => {
383
const { command } = info;
384
const userId = user?.id || info.old?.id;
385
386
try {
387
switch (command) {
388
case 'insert':
389
await Promise.all([
390
this.services.search.indexUser(user),
391
this.services.cache.cacheUser(user),
392
this.services.analytics.trackUserCreation(user)
393
]);
394
break;
395
396
case 'update':
397
await Promise.all([
398
this.services.search.updateUser(user),
399
this.services.cache.updateUser(user),
400
this.services.analytics.trackUserUpdate(user, info.old)
401
]);
402
break;
403
404
case 'delete':
405
await Promise.all([
406
this.services.search.removeUser(userId),
407
this.services.cache.removeUser(userId),
408
this.services.analytics.trackUserDeletion(info.old)
409
]);
410
break;
411
}
412
} catch (error) {
413
console.error(`Service sync error for user ${userId}:`, error);
414
// Implement retry logic or dead letter queue
415
}
416
});
417
418
// Product synchronization
419
await sql.subscribe('products', async (product, info) => {
420
await this.syncProduct(product, info);
421
});
422
}
423
424
async syncProduct(product, info) {
425
const productId = product?.id || info.old?.id;
426
427
// Update search index
428
if (info.command === 'delete') {
429
await this.services.search.removeProduct(productId);
430
} else {
431
await this.services.search.indexProduct(product);
432
}
433
434
// Update recommendations
435
await this.services.analytics.updateRecommendations(product, info);
436
437
// Clear related caches
438
await this.services.cache.clearProductCaches(productId);
439
}
440
}
441
442
const synchronizer = new ServiceSynchronizer();
443
```
444
445
## Configuration and Setup
446
447
### Prerequisites
448
449
Set up PostgreSQL for logical replication.
450
451
```sql
452
-- Enable logical replication in postgresql.conf
453
-- wal_level = logical
454
-- max_replication_slots = 4
455
-- max_wal_senders = 4
456
457
-- Create publication for tables you want to replicate
458
CREATE PUBLICATION app_changes FOR ALL TABLES;
459
460
-- Or create publication for specific tables
461
CREATE PUBLICATION user_changes FOR TABLE users, profiles;
462
463
-- Create replication slot
464
SELECT pg_create_logical_replication_slot('app_slot', 'pgoutput');
465
466
-- Grant replication permissions
467
ALTER USER myuser WITH REPLICATION;
468
```
469
470
### Connection Configuration
471
472
Configure postgres.js for replication access.
473
474
```javascript { .api }
475
// Replication requires specific connection configuration
476
const sql = postgres(connectionConfig, {
477
// Connection must have replication privileges
478
replication: 'database',
479
480
// Publication name (created above)
481
publication: 'app_changes',
482
483
// Replication slot name (created above)
484
slot: 'app_slot',
485
486
// Additional options
487
max: 1, // Replication uses dedicated connection
488
idle_timeout: 0, // Keep connection alive
489
});
490
```
491
492
**Usage Examples:**
493
494
```javascript
495
// Production replication setup
496
const replicationSql = postgres({
497
host: 'localhost',
498
database: 'myapp',
499
username: 'replication_user',
500
password: process.env.REPLICATION_PASSWORD,
501
replication: 'database',
502
publication: 'app_changes',
503
slot: 'myapp_slot',
504
max: 1,
505
idle_timeout: 0
506
});
507
508
// Start monitoring all changes
509
await replicationSql.subscribe('*', (row, info) => {
510
console.log(`Change detected: ${info.relation.table} ${info.command}`);
511
processChange(row, info);
512
});
513
```
514
515
### Error Handling and Resilience
516
517
Handle replication errors and connection issues.
518
519
```javascript
520
class ResilientReplication {
521
constructor(config) {
522
this.config = config;
523
this.subscriptions = new Map();
524
this.reconnectAttempts = 0;
525
this.maxReconnectAttempts = 10;
526
}
527
528
async connect() {
529
try {
530
this.sql = postgres(this.config);
531
await this.restoreSubscriptions();
532
this.reconnectAttempts = 0;
533
} catch (error) {
534
console.error('Replication connection failed:', error);
535
await this.scheduleReconnect();
536
}
537
}
538
539
async subscribe(pattern, handler) {
540
try {
541
const subscription = await this.sql.subscribe(
542
pattern,
543
handler,
544
() => console.log(`Subscribed to ${pattern}`),
545
(error) => this.handleSubscriptionError(pattern, error)
546
);
547
548
this.subscriptions.set(pattern, { handler, subscription });
549
return subscription;
550
} catch (error) {
551
console.error(`Subscription failed for ${pattern}:`, error);
552
throw error;
553
}
554
}
555
556
async handleSubscriptionError(pattern, error) {
557
console.error(`Subscription error for ${pattern}:`, error);
558
559
// Remove failed subscription
560
this.subscriptions.delete(pattern);
561
562
// Trigger reconnection
563
await this.scheduleReconnect();
564
}
565
566
async restoreSubscriptions() {
567
for (const [pattern, { handler }] of this.subscriptions) {
568
await this.subscribe(pattern, handler);
569
}
570
}
571
572
async scheduleReconnect() {
573
if (this.reconnectAttempts < this.maxReconnectAttempts) {
574
this.reconnectAttempts++;
575
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
576
577
setTimeout(() => {
578
console.log(`Reconnecting... (attempt ${this.reconnectAttempts})`);
579
this.connect();
580
}, delay);
581
} else {
582
console.error('Max reconnection attempts reached');
583
}
584
}
585
}
586
587
// Usage
588
const replication = new ResilientReplication({
589
host: 'localhost',
590
database: 'myapp',
591
username: 'replication_user',
592
replication: 'database',
593
publication: 'app_changes',
594
slot: 'myapp_slot'
595
});
596
597
await replication.connect();
598
await replication.subscribe('users', handleUserChanges);
599
```