CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-postgres

Fastest full featured PostgreSQL client for Node.js and Deno with tagged template literals, transactions, streaming, and logical replication support

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

replication.mddocs/

Replication

Logical replication functionality for real-time data streaming and change data capture from PostgreSQL databases.

Capabilities

Subscription Management

Subscribe to logical replication events to receive real-time data changes from PostgreSQL.

/**
 * Subscribe to logical replication events
 * @param event - Event pattern to subscribe to (table.operation or wildcard)
 * @param callback - Function called for each replication event
 * @param onsubscribe - Optional callback when subscription is established
 * @param onerror - Optional error handler
 * @returns Promise resolving to subscription handle
 */
subscribe(
  event: string,
  callback: (row: Row | null, info: ReplicationEvent) => void,
  onsubscribe?: () => void,
  onerror?: (error: Error) => void
): Promise<SubscriptionHandle>;

Usage Examples:

// Subscribe to all changes on a specific table
const subscription = await sql.subscribe(
  'users',
  (row, info) => {
    console.log(`User ${info.command}:`, row);
    handleUserChange(row, info);
  },
  () => console.log('Subscribed to user changes'),
  (error) => console.error('Subscription error:', error)
);

// Subscribe to specific operations
await sql.subscribe('products.insert', (row, info) => {
  console.log('New product added:', row);
  invalidateProductCache();
});

await sql.subscribe('orders.update', (row, info) => {
  console.log('Order updated:', row);
  updateOrderDisplay(row);
});

// Subscribe to all events (wildcard)
await sql.subscribe('*', (row, info) => {
  console.log(`${info.relation.schema}.${info.relation.table} ${info.command}`);
  logDatabaseChange(info);
});

Event Patterns

Table-Specific Subscriptions

Subscribe to changes on specific tables with optional operation filtering.

// Pattern formats:
// 'table'           - All operations on table
// 'table.insert'    - Only INSERT operations
// 'table.update'    - Only UPDATE operations  
// 'table.delete'    - Only DELETE operations
// 'schema.table'    - Fully qualified table name
// '*'               - All tables and operations

Usage Examples:

// Monitor user registration
await sql.subscribe('users.insert', (user, info) => {
  sendWelcomeEmail(user.email);
  updateUserCount();
});

// Track order status changes
await sql.subscribe('orders.update', (order, info) => {
  if (info.old && info.old.status !== order.status) {
    notifyStatusChange(order.id, order.status);
  }
});

// Monitor product deletions
await sql.subscribe('products.delete', (product, info) => {
  removeFromSearchIndex(product.id);
  clearProductCache(product.id);
});

// Cross-schema monitoring
await sql.subscribe('inventory.stock_levels', (stock, info) => {
  if (stock.quantity < stock.reorder_point) {
    createReorderAlert(stock.product_id);
  }
});

Advanced Pattern Matching

Use sophisticated patterns to filter replication events.

// Subscribe with key-based filtering
await sql.subscribe('users.update=id', (user, info) => {
  // Only updates where primary key changed (rare but possible)
  handleUserIdChange(user, info.old);
});

// Schema-specific subscriptions
await sql.subscribe('public.*', (row, info) => {
  // All changes in public schema
  auditPublicSchemaChange(row, info);
});

await sql.subscribe('audit.*', (row, info) => {
  // All changes in audit schema
  forwardToComplianceSystem(row, info);
});

ReplicationEvent Interface

Comprehensive information about each replication event.

interface ReplicationEvent {
  /** Type of operation: 'insert', 'update', or 'delete' */
  command: 'insert' | 'update' | 'delete';
  
  /** Information about the table */
  relation: RelationInfo;
  
  /** Whether this change involves the primary key */
  key?: boolean;
  
  /** Previous row data for UPDATE and DELETE operations */
  old?: Row | null;
}

interface RelationInfo {
  /** Table OID */
  oid: number;
  
  /** Schema name */
  schema: string;
  
  /** Table name */
  table: string;
  
  /** Column information */
  columns: ColumnInfo[];
}

interface ColumnInfo {
  /** Column name */
  name: string;
  
  /** PostgreSQL type OID */
  type: number;
  
  /** Type modifier */
  modifier: number;
  
  /** Whether column is part of replica identity */
  key: boolean;
}

Usage Examples:

// Detailed event processing
await sql.subscribe('*', (row, info) => {
  const { command, relation, key, old } = info;
  
  console.log(`Operation: ${command}`);
  console.log(`Table: ${relation.schema}.${relation.table}`);
  console.log(`Key change: ${key || false}`);
  
  switch (command) {
    case 'insert':
      console.log('New row:', row);
      break;
      
    case 'update':
      console.log('New row:', row);
      console.log('Old row:', old);
      
      // Compare specific fields
      const nameChanged = old?.name !== row?.name;
      if (nameChanged) {
        handleNameChange(row, old);
      }
      break;
      
    case 'delete':
      console.log('Deleted row:', old);
      break;
  }
});

SubscriptionHandle Interface

Manage active replication subscriptions.

interface SubscriptionHandle {
  /** Stop the subscription */
  unsubscribe(): Promise<void>;
  
  /** Check if subscription is active */
  active: boolean;
}

Usage Examples:

// Store subscription references for cleanup
const subscriptions = [];

subscriptions.push(
  await sql.subscribe('users', handleUserChange)
);

subscriptions.push(
  await sql.subscribe('orders', handleOrderChange)
);

// Clean up all subscriptions
async function cleanup() {
  for (const subscription of subscriptions) {
    if (subscription.active) {
      await subscription.unsubscribe();
    }
  }
  subscriptions.length = 0;
}

// Handle process shutdown
process.on('SIGTERM', cleanup);
process.on('SIGINT', cleanup);

Real-time Data Synchronization

Cache Invalidation

Automatically invalidate caches when data changes.

class SmartCache {
  constructor() {
    this.cache = new Map();
    this.setupReplication();
  }
  
  async setupReplication() {
    // Invalidate user cache on changes
    await sql.subscribe('users', (user, info) => {
      const userId = user?.id || info.old?.id;
      this.cache.delete(`user:${userId}`);
      console.log(`Invalidated cache for user ${userId}`);
    });
    
    // Invalidate product cache
    await sql.subscribe('products', (product, info) => {
      const productId = product?.id || info.old?.id;
      this.cache.delete(`product:${productId}`);
      
      // Also invalidate category cache
      const categoryId = product?.category_id || info.old?.category_id;
      this.cache.delete(`category:${categoryId}`);
    });
  }
  
  async getUser(id) {
    const key = `user:${id}`;
    if (this.cache.has(key)) {
      return this.cache.get(key);
    }
    
    const user = await sql`SELECT * FROM users WHERE id = ${id}`;
    this.cache.set(key, user[0]);
    return user[0];
  }
}

const smartCache = new SmartCache();

Event Sourcing

Implement event sourcing patterns using replication events.

class EventStore {
  constructor() {
    this.events = [];
    this.setupEventCapture();
  }
  
  async setupEventCapture() {
    await sql.subscribe('*', (row, info) => {
      const event = {
        id: generateEventId(),
        timestamp: new Date(),
        aggregate: `${info.relation.schema}.${info.relation.table}`,
        aggregateId: this.extractId(row, info),
        eventType: info.command,
        data: row,
        previousData: info.old,
        metadata: {
          relation: info.relation,
          keyChange: info.key
        }
      };
      
      this.events.push(event);
      this.processEvent(event);
    });
  }
  
  extractId(row, info) {
    // Extract ID from current or old row
    return row?.id || info.old?.id;
  }
  
  processEvent(event) {
    // Process event for projections, notifications, etc.
    console.log(`Event: ${event.eventType} on ${event.aggregate}`);
    
    // Update read models
    this.updateProjections(event);
    
    // Send notifications
    this.notifySubscribers(event);
  }
  
  updateProjections(event) {
    // Update materialized views or denormalized data
    switch (event.aggregate) {
      case 'public.orders':
        this.updateOrderSummary(event);
        break;
      case 'public.users':
        this.updateUserStats(event);
        break;
    }
  }
}

const eventStore = new EventStore();

Multi-Service Synchronization

Keep multiple services synchronized with database changes.

class ServiceSynchronizer {
  constructor() {
    this.services = {
      search: new SearchService(),
      cache: new CacheService(),
      analytics: new AnalyticsService()
    };
    
    this.setupSynchronization();
  }
  
  async setupSynchronization() {
    // Synchronize user data across services
    await sql.subscribe('users', async (user, info) => {
      const { command } = info;
      const userId = user?.id || info.old?.id;
      
      try {
        switch (command) {
          case 'insert':
            await Promise.all([
              this.services.search.indexUser(user),
              this.services.cache.cacheUser(user),
              this.services.analytics.trackUserCreation(user)
            ]);
            break;
            
          case 'update':
            await Promise.all([
              this.services.search.updateUser(user),
              this.services.cache.updateUser(user),
              this.services.analytics.trackUserUpdate(user, info.old)
            ]);
            break;
            
          case 'delete':
            await Promise.all([
              this.services.search.removeUser(userId),
              this.services.cache.removeUser(userId),
              this.services.analytics.trackUserDeletion(info.old)
            ]);
            break;
        }
      } catch (error) {
        console.error(`Service sync error for user ${userId}:`, error);
        // Implement retry logic or dead letter queue
      }
    });
    
    // Product synchronization
    await sql.subscribe('products', async (product, info) => {
      await this.syncProduct(product, info);
    });
  }
  
  async syncProduct(product, info) {
    const productId = product?.id || info.old?.id;
    
    // Update search index
    if (info.command === 'delete') {
      await this.services.search.removeProduct(productId);
    } else {
      await this.services.search.indexProduct(product);
    }
    
    // Update recommendations
    await this.services.analytics.updateRecommendations(product, info);
    
    // Clear related caches
    await this.services.cache.clearProductCaches(productId);
  }
}

const synchronizer = new ServiceSynchronizer();

Configuration and Setup

Prerequisites

Set up PostgreSQL for logical replication.

-- Enable logical replication in postgresql.conf
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4

-- Create publication for tables you want to replicate
CREATE PUBLICATION app_changes FOR ALL TABLES;

-- Or create publication for specific tables
CREATE PUBLICATION user_changes FOR TABLE users, profiles;

-- Create replication slot
SELECT pg_create_logical_replication_slot('app_slot', 'pgoutput');

-- Grant replication permissions
ALTER USER myuser WITH REPLICATION;

Connection Configuration

Configure postgres.js for replication access.

// Replication requires specific connection configuration
const sql = postgres(connectionConfig, {
  // Connection must have replication privileges
  replication: 'database',
  
  // Publication name (created above)
  publication: 'app_changes',
  
  // Replication slot name (created above)
  slot: 'app_slot',
  
  // Additional options
  max: 1, // Replication uses dedicated connection
  idle_timeout: 0, // Keep connection alive
});

Usage Examples:

// Production replication setup
const replicationSql = postgres({
  host: 'localhost',
  database: 'myapp',
  username: 'replication_user',
  password: process.env.REPLICATION_PASSWORD,
  replication: 'database',
  publication: 'app_changes',
  slot: 'myapp_slot',
  max: 1,
  idle_timeout: 0
});

// Start monitoring all changes
await replicationSql.subscribe('*', (row, info) => {
  console.log(`Change detected: ${info.relation.table} ${info.command}`);
  processChange(row, info);
});

Error Handling and Resilience

Handle replication errors and connection issues.

class ResilientReplication {
  constructor(config) {
    this.config = config;
    this.subscriptions = new Map();
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 10;
  }
  
  async connect() {
    try {
      this.sql = postgres(this.config);
      await this.restoreSubscriptions();
      this.reconnectAttempts = 0;
    } catch (error) {
      console.error('Replication connection failed:', error);
      await this.scheduleReconnect();
    }
  }
  
  async subscribe(pattern, handler) {
    try {
      const subscription = await this.sql.subscribe(
        pattern,
        handler,
        () => console.log(`Subscribed to ${pattern}`),
        (error) => this.handleSubscriptionError(pattern, error)
      );
      
      this.subscriptions.set(pattern, { handler, subscription });
      return subscription;
    } catch (error) {
      console.error(`Subscription failed for ${pattern}:`, error);
      throw error;
    }
  }
  
  async handleSubscriptionError(pattern, error) {
    console.error(`Subscription error for ${pattern}:`, error);
    
    // Remove failed subscription
    this.subscriptions.delete(pattern);
    
    // Trigger reconnection
    await this.scheduleReconnect();
  }
  
  async restoreSubscriptions() {
    for (const [pattern, { handler }] of this.subscriptions) {
      await this.subscribe(pattern, handler);
    }
  }
  
  async scheduleReconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
      
      setTimeout(() => {
        console.log(`Reconnecting... (attempt ${this.reconnectAttempts})`);
        this.connect();
      }, delay);
    } else {
      console.error('Max reconnection attempts reached');
    }
  }
}

// Usage
const replication = new ResilientReplication({
  host: 'localhost',
  database: 'myapp',
  username: 'replication_user',
  replication: 'database',
  publication: 'app_changes',
  slot: 'myapp_slot'
});

await replication.connect();
await replication.subscribe('users', handleUserChanges);

Install with Tessl CLI

npx tessl i tessl/npm-postgres

docs

connections.md

errors.md

index.md

large-objects.md

notifications.md

query-processing.md

querying.md

replication.md

transactions.md

types.md

tile.json