Fastest full featured PostgreSQL client for Node.js and Deno with tagged template literals, transactions, streaming, and logical replication support
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Logical replication functionality for real-time data streaming and change data capture from PostgreSQL databases.
Subscribe to logical replication events to receive real-time data changes from PostgreSQL.
/**
* Subscribe to logical replication events
* @param event - Event pattern to subscribe to (table.operation or wildcard)
* @param callback - Function called for each replication event
* @param onsubscribe - Optional callback when subscription is established
* @param onerror - Optional error handler
* @returns Promise resolving to subscription handle
*/
subscribe(
event: string,
callback: (row: Row | null, info: ReplicationEvent) => void,
onsubscribe?: () => void,
onerror?: (error: Error) => void
): Promise<SubscriptionHandle>;Usage Examples:
// Subscribe to all changes on a specific table
const subscription = await sql.subscribe(
'users',
(row, info) => {
console.log(`User ${info.command}:`, row);
handleUserChange(row, info);
},
() => console.log('Subscribed to user changes'),
(error) => console.error('Subscription error:', error)
);
// Subscribe to specific operations
await sql.subscribe('products.insert', (row, info) => {
console.log('New product added:', row);
invalidateProductCache();
});
await sql.subscribe('orders.update', (row, info) => {
console.log('Order updated:', row);
updateOrderDisplay(row);
});
// Subscribe to all events (wildcard)
await sql.subscribe('*', (row, info) => {
console.log(`${info.relation.schema}.${info.relation.table} ${info.command}`);
logDatabaseChange(info);
});Subscribe to changes on specific tables with optional operation filtering.
// Pattern formats:
// 'table' - All operations on table
// 'table.insert' - Only INSERT operations
// 'table.update' - Only UPDATE operations
// 'table.delete' - Only DELETE operations
// 'schema.table' - Fully qualified table name
// '*' - All tables and operationsUsage Examples:
// Monitor user registration
await sql.subscribe('users.insert', (user, info) => {
sendWelcomeEmail(user.email);
updateUserCount();
});
// Track order status changes
await sql.subscribe('orders.update', (order, info) => {
if (info.old && info.old.status !== order.status) {
notifyStatusChange(order.id, order.status);
}
});
// Monitor product deletions
await sql.subscribe('products.delete', (product, info) => {
removeFromSearchIndex(product.id);
clearProductCache(product.id);
});
// Cross-schema monitoring
await sql.subscribe('inventory.stock_levels', (stock, info) => {
if (stock.quantity < stock.reorder_point) {
createReorderAlert(stock.product_id);
}
});Use sophisticated patterns to filter replication events.
// Subscribe with key-based filtering
await sql.subscribe('users.update=id', (user, info) => {
// Only updates where primary key changed (rare but possible)
handleUserIdChange(user, info.old);
});
// Schema-specific subscriptions
await sql.subscribe('public.*', (row, info) => {
// All changes in public schema
auditPublicSchemaChange(row, info);
});
await sql.subscribe('audit.*', (row, info) => {
// All changes in audit schema
forwardToComplianceSystem(row, info);
});Comprehensive information about each replication event.
interface ReplicationEvent {
/** Type of operation: 'insert', 'update', or 'delete' */
command: 'insert' | 'update' | 'delete';
/** Information about the table */
relation: RelationInfo;
/** Whether this change involves the primary key */
key?: boolean;
/** Previous row data for UPDATE and DELETE operations */
old?: Row | null;
}
interface RelationInfo {
/** Table OID */
oid: number;
/** Schema name */
schema: string;
/** Table name */
table: string;
/** Column information */
columns: ColumnInfo[];
}
interface ColumnInfo {
/** Column name */
name: string;
/** PostgreSQL type OID */
type: number;
/** Type modifier */
modifier: number;
/** Whether column is part of replica identity */
key: boolean;
}Usage Examples:
// Detailed event processing
await sql.subscribe('*', (row, info) => {
const { command, relation, key, old } = info;
console.log(`Operation: ${command}`);
console.log(`Table: ${relation.schema}.${relation.table}`);
console.log(`Key change: ${key || false}`);
switch (command) {
case 'insert':
console.log('New row:', row);
break;
case 'update':
console.log('New row:', row);
console.log('Old row:', old);
// Compare specific fields
const nameChanged = old?.name !== row?.name;
if (nameChanged) {
handleNameChange(row, old);
}
break;
case 'delete':
console.log('Deleted row:', old);
break;
}
});Manage active replication subscriptions.
interface SubscriptionHandle {
/** Stop the subscription */
unsubscribe(): Promise<void>;
/** Check if subscription is active */
active: boolean;
}Usage Examples:
// Store subscription references for cleanup
const subscriptions = [];
subscriptions.push(
await sql.subscribe('users', handleUserChange)
);
subscriptions.push(
await sql.subscribe('orders', handleOrderChange)
);
// Clean up all subscriptions
async function cleanup() {
for (const subscription of subscriptions) {
if (subscription.active) {
await subscription.unsubscribe();
}
}
subscriptions.length = 0;
}
// Handle process shutdown
process.on('SIGTERM', cleanup);
process.on('SIGINT', cleanup);Automatically invalidate caches when data changes.
class SmartCache {
constructor() {
this.cache = new Map();
this.setupReplication();
}
async setupReplication() {
// Invalidate user cache on changes
await sql.subscribe('users', (user, info) => {
const userId = user?.id || info.old?.id;
this.cache.delete(`user:${userId}`);
console.log(`Invalidated cache for user ${userId}`);
});
// Invalidate product cache
await sql.subscribe('products', (product, info) => {
const productId = product?.id || info.old?.id;
this.cache.delete(`product:${productId}`);
// Also invalidate category cache
const categoryId = product?.category_id || info.old?.category_id;
this.cache.delete(`category:${categoryId}`);
});
}
async getUser(id) {
const key = `user:${id}`;
if (this.cache.has(key)) {
return this.cache.get(key);
}
const user = await sql`SELECT * FROM users WHERE id = ${id}`;
this.cache.set(key, user[0]);
return user[0];
}
}
const smartCache = new SmartCache();Implement event sourcing patterns using replication events.
class EventStore {
constructor() {
this.events = [];
this.setupEventCapture();
}
async setupEventCapture() {
await sql.subscribe('*', (row, info) => {
const event = {
id: generateEventId(),
timestamp: new Date(),
aggregate: `${info.relation.schema}.${info.relation.table}`,
aggregateId: this.extractId(row, info),
eventType: info.command,
data: row,
previousData: info.old,
metadata: {
relation: info.relation,
keyChange: info.key
}
};
this.events.push(event);
this.processEvent(event);
});
}
extractId(row, info) {
// Extract ID from current or old row
return row?.id || info.old?.id;
}
processEvent(event) {
// Process event for projections, notifications, etc.
console.log(`Event: ${event.eventType} on ${event.aggregate}`);
// Update read models
this.updateProjections(event);
// Send notifications
this.notifySubscribers(event);
}
updateProjections(event) {
// Update materialized views or denormalized data
switch (event.aggregate) {
case 'public.orders':
this.updateOrderSummary(event);
break;
case 'public.users':
this.updateUserStats(event);
break;
}
}
}
const eventStore = new EventStore();Keep multiple services synchronized with database changes.
class ServiceSynchronizer {
constructor() {
this.services = {
search: new SearchService(),
cache: new CacheService(),
analytics: new AnalyticsService()
};
this.setupSynchronization();
}
async setupSynchronization() {
// Synchronize user data across services
await sql.subscribe('users', async (user, info) => {
const { command } = info;
const userId = user?.id || info.old?.id;
try {
switch (command) {
case 'insert':
await Promise.all([
this.services.search.indexUser(user),
this.services.cache.cacheUser(user),
this.services.analytics.trackUserCreation(user)
]);
break;
case 'update':
await Promise.all([
this.services.search.updateUser(user),
this.services.cache.updateUser(user),
this.services.analytics.trackUserUpdate(user, info.old)
]);
break;
case 'delete':
await Promise.all([
this.services.search.removeUser(userId),
this.services.cache.removeUser(userId),
this.services.analytics.trackUserDeletion(info.old)
]);
break;
}
} catch (error) {
console.error(`Service sync error for user ${userId}:`, error);
// Implement retry logic or dead letter queue
}
});
// Product synchronization
await sql.subscribe('products', async (product, info) => {
await this.syncProduct(product, info);
});
}
async syncProduct(product, info) {
const productId = product?.id || info.old?.id;
// Update search index
if (info.command === 'delete') {
await this.services.search.removeProduct(productId);
} else {
await this.services.search.indexProduct(product);
}
// Update recommendations
await this.services.analytics.updateRecommendations(product, info);
// Clear related caches
await this.services.cache.clearProductCaches(productId);
}
}
const synchronizer = new ServiceSynchronizer();Set up PostgreSQL for logical replication.
-- Enable logical replication in postgresql.conf
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4
-- Create publication for tables you want to replicate
CREATE PUBLICATION app_changes FOR ALL TABLES;
-- Or create publication for specific tables
CREATE PUBLICATION user_changes FOR TABLE users, profiles;
-- Create replication slot
SELECT pg_create_logical_replication_slot('app_slot', 'pgoutput');
-- Grant replication permissions
ALTER USER myuser WITH REPLICATION;Configure postgres.js for replication access.
// Replication requires specific connection configuration
const sql = postgres(connectionConfig, {
// Connection must have replication privileges
replication: 'database',
// Publication name (created above)
publication: 'app_changes',
// Replication slot name (created above)
slot: 'app_slot',
// Additional options
max: 1, // Replication uses dedicated connection
idle_timeout: 0, // Keep connection alive
});Usage Examples:
// Production replication setup
const replicationSql = postgres({
host: 'localhost',
database: 'myapp',
username: 'replication_user',
password: process.env.REPLICATION_PASSWORD,
replication: 'database',
publication: 'app_changes',
slot: 'myapp_slot',
max: 1,
idle_timeout: 0
});
// Start monitoring all changes
await replicationSql.subscribe('*', (row, info) => {
console.log(`Change detected: ${info.relation.table} ${info.command}`);
processChange(row, info);
});Handle replication errors and connection issues.
class ResilientReplication {
constructor(config) {
this.config = config;
this.subscriptions = new Map();
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
}
async connect() {
try {
this.sql = postgres(this.config);
await this.restoreSubscriptions();
this.reconnectAttempts = 0;
} catch (error) {
console.error('Replication connection failed:', error);
await this.scheduleReconnect();
}
}
async subscribe(pattern, handler) {
try {
const subscription = await this.sql.subscribe(
pattern,
handler,
() => console.log(`Subscribed to ${pattern}`),
(error) => this.handleSubscriptionError(pattern, error)
);
this.subscriptions.set(pattern, { handler, subscription });
return subscription;
} catch (error) {
console.error(`Subscription failed for ${pattern}:`, error);
throw error;
}
}
async handleSubscriptionError(pattern, error) {
console.error(`Subscription error for ${pattern}:`, error);
// Remove failed subscription
this.subscriptions.delete(pattern);
// Trigger reconnection
await this.scheduleReconnect();
}
async restoreSubscriptions() {
for (const [pattern, { handler }] of this.subscriptions) {
await this.subscribe(pattern, handler);
}
}
async scheduleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
setTimeout(() => {
console.log(`Reconnecting... (attempt ${this.reconnectAttempts})`);
this.connect();
}, delay);
} else {
console.error('Max reconnection attempts reached');
}
}
}
// Usage
const replication = new ResilientReplication({
host: 'localhost',
database: 'myapp',
username: 'replication_user',
replication: 'database',
publication: 'app_changes',
slot: 'myapp_slot'
});
await replication.connect();
await replication.subscribe('users', handleUserChanges);Install with Tessl CLI
npx tessl i tessl/npm-postgres