CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-postgres

Fastest full featured PostgreSQL client for Node.js and Deno with tagged template literals, transactions, streaming, and logical replication support

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

notifications.mddocs/

Notifications

PostgreSQL LISTEN/NOTIFY support for real-time messaging and pub/sub patterns between database clients.

Capabilities

Channel Listening

Subscribe to PostgreSQL notification channels to receive real-time messages from other database clients.

/**
 * Listen for notifications on a channel
 * @param channel - Channel name to listen on
 * @param fn - Callback function to handle notifications
 * @param onlisten - Optional callback when listening starts
 * @returns ListenRequest object for managing the subscription
 */
listen(
  channel: string, 
  fn: (payload: string) => void, 
  onlisten?: () => void
): ListenRequest;

Usage Examples:

// Basic channel listening
const listener = sql.listen('user_updates', (payload) => {
  console.log('User update received:', payload);
  const data = JSON.parse(payload);
  handleUserUpdate(data);
});

// Listen with confirmation callback
sql.listen('order_status', 
  (payload) => {
    const order = JSON.parse(payload);
    updateOrderDisplay(order);
  },
  () => {
    console.log('Now listening for order status updates');
  }
);

// Multiple channel listeners
sql.listen('chat_messages', handleChatMessage);
sql.listen('system_alerts', handleSystemAlert);
sql.listen('user_presence', handlePresenceUpdate);

Sending Notifications

Send notifications to channels that other clients can receive.

/**
 * Send notification to a channel
 * @param channel - Channel name to send to
 * @param payload - Message payload (string)
 * @returns Promise resolving when notification is sent
 */
notify(channel: string, payload: string): Promise<void>;

Usage Examples:

// Send simple notification
await sql.notify('user_updates', 'User profile changed');

// Send structured data as JSON
const userUpdate = {
  userId: 123,
  action: 'profile_update',
  timestamp: Date.now()
};
await sql.notify('user_updates', JSON.stringify(userUpdate));

// Notify about database changes
await sql.notify('inventory_change', JSON.stringify({
  productId: product.id,
  oldQuantity: oldQty,
  newQuantity: newQty,
  operation: 'sale'
}));

ListenRequest Interface

Manage active notification subscriptions with control methods.

interface ListenRequest {
  /** Channel being listened to */
  channel: string;
  
  /** Stop listening to this channel */
  unlisten(): Promise<void>;
}

Usage Examples:

// Store listener reference for later cleanup
const orderListener = sql.listen('orders', handleOrder);

// Stop listening when component unmounts
componentWillUnmount() {
  orderListener.unlisten();
}

// Conditional listening
let priceListener = null;
if (user.subscribedToPriceUpdates) {
  priceListener = sql.listen('price_changes', handlePriceChange);
}

// Clean up conditional listener
if (priceListener) {
  await priceListener.unlisten();
}

Advanced Notification Patterns

Event-Driven Database Changes

Use database triggers to automatically send notifications when data changes.

-- Create trigger function to send notifications
CREATE OR REPLACE FUNCTION notify_user_changes()
RETURNS TRIGGER AS $$
BEGIN
  IF TG_OP = 'INSERT' THEN
    PERFORM pg_notify('user_changes', json_build_object(
      'operation', 'insert',
      'user_id', NEW.id,
      'data', row_to_json(NEW)
    )::text);
  ELSIF TG_OP = 'UPDATE' THEN
    PERFORM pg_notify('user_changes', json_build_object(
      'operation', 'update', 
      'user_id', NEW.id,
      'old_data', row_to_json(OLD),
      'new_data', row_to_json(NEW)
    )::text);
  ELSIF TG_OP = 'DELETE' THEN
    PERFORM pg_notify('user_changes', json_build_object(
      'operation', 'delete',
      'user_id', OLD.id,
      'data', row_to_json(OLD)
    )::text);
  END IF;
  RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Create triggers
CREATE TRIGGER user_changes_trigger
  AFTER INSERT OR UPDATE OR DELETE ON users
  FOR EACH ROW EXECUTE FUNCTION notify_user_changes();

Usage Examples:

// Listen for database-driven notifications
sql.listen('user_changes', (payload) => {
  const change = JSON.parse(payload);
  
  switch (change.operation) {
    case 'insert':
      console.log('New user created:', change.data);
      addUserToCache(change.data);
      break;
      
    case 'update':
      console.log('User updated:', change.user_id);
      updateUserInCache(change.new_data);
      break;
      
    case 'delete':
      console.log('User deleted:', change.user_id);
      removeUserFromCache(change.user_id);
      break;
  }
});

// The triggers will automatically send notifications for any changes:
await sql`INSERT INTO users (name, email) VALUES (${name}, ${email})`;
// Automatically triggers notification

await sql`UPDATE users SET email = ${newEmail} WHERE id = ${userId}`;
// Automatically triggers notification

await sql`DELETE FROM users WHERE id = ${userId}`;
// Automatically triggers notification

Real-time Application Features

Implement real-time features using PostgreSQL notifications.

// Real-time chat system
class ChatRoom {
  constructor(roomId) {
    this.roomId = roomId;
    this.setupListeners();
  }
  
  setupListeners() {
    // Listen for new messages in this room
    this.messageListener = sql.listen(`chat_${this.roomId}`, (payload) => {
      const message = JSON.parse(payload);
      this.displayMessage(message);
    });
    
    // Listen for user presence updates
    this.presenceListener = sql.listen(`presence_${this.roomId}`, (payload) => {
      const presence = JSON.parse(payload);
      this.updateUserPresence(presence);
    });
  }
  
  async sendMessage(userId, text) {
    // Store message in database
    await sql`
      INSERT INTO messages (room_id, user_id, text, created_at)
      VALUES (${this.roomId}, ${userId}, ${text}, NOW())
    `;
    
    // Notify other clients
    await sql.notify(`chat_${this.roomId}`, JSON.stringify({
      userId,
      text,
      timestamp: Date.now()
    }));
  }
  
  async updatePresence(userId, status) {
    await sql.notify(`presence_${this.roomId}`, JSON.stringify({
      userId,
      status,
      timestamp: Date.now()
    }));
  }
  
  cleanup() {
    this.messageListener.unlisten();
    this.presenceListener.unlisten();
  }
}

// Usage
const chatRoom = new ChatRoom(123);
await chatRoom.sendMessage(456, 'Hello everyone!');

Notification Routing

Create sophisticated routing systems for different types of notifications.

class NotificationRouter {
  constructor() {
    this.handlers = new Map();
    this.setupGlobalListener();
  }
  
  setupGlobalListener() {
    // Single listener for all notifications with routing
    sql.listen('app_notifications', (payload) => {
      try {
        const notification = JSON.parse(payload);
        this.routeNotification(notification);
      } catch (error) {
        console.error('Invalid notification payload:', payload);
      }
    });
  }
  
  routeNotification(notification) {
    const { type, target, data } = notification;
    const key = `${type}:${target}`;
    
    if (this.handlers.has(key)) {
      this.handlers.get(key).forEach(handler => {
        handler(data);
      });
    }
  }
  
  subscribe(type, target, handler) {
    const key = `${type}:${target}`;
    if (!this.handlers.has(key)) {
      this.handlers.set(key, []);
    }
    this.handlers.get(key).push(handler);
  }
  
  async publish(type, target, data) {
    await sql.notify('app_notifications', JSON.stringify({
      type,
      target, 
      data,
      timestamp: Date.now()
    }));
  }
}

// Usage
const router = new NotificationRouter();

// Subscribe to specific notification types
router.subscribe('order', 'status_change', (data) => {
  updateOrderStatus(data.orderId, data.status);
});

router.subscribe('inventory', 'low_stock', (data) => {
  showLowStockAlert(data.productId, data.quantity);
});

router.subscribe('user', 'login', (data) => {
  logUserActivity(data.userId, 'login');
});

// Publish notifications
await router.publish('order', 'status_change', {
  orderId: 12345,
  status: 'shipped'
});

await router.publish('inventory', 'low_stock', {
  productId: 67890,
  quantity: 5
});

Error Handling

Handle connection issues and notification delivery errors gracefully.

// Configure notification error handling
const sql = postgres(connectionConfig, {
  onnotify: (channel, payload) => {
    console.log(`Notification on ${channel}:`, payload);
  },
  
  onclose: (connectionId) => {
    console.log(`Connection ${connectionId} closed, notifications stopped`);
    // Implement reconnection logic
  }
});

Usage Examples:

// Robust notification listener with error handling
class RobustListener {
  constructor(channel, handler) {
    this.channel = channel;
    this.handler = handler;
    this.listener = null;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
    
    this.connect();
  }
  
  connect() {
    try {
      this.listener = sql.listen(this.channel, 
        (payload) => {
          this.reconnectAttempts = 0; // Reset on successful message
          this.handler(payload);
        },
        () => {
          console.log(`Connected to channel: ${this.channel}`);
          this.reconnectAttempts = 0;
        }
      );
    } catch (error) {
      console.error(`Failed to connect to ${this.channel}:`, error);
      this.scheduleReconnect();
    }
  }
  
  scheduleReconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
      
      setTimeout(() => {
        console.log(`Reconnecting to ${this.channel} (attempt ${this.reconnectAttempts})`);
        this.connect();
      }, delay);
    } else {
      console.error(`Max reconnection attempts reached for ${this.channel}`);
    }
  }
  
  async stop() {
    if (this.listener) {
      await this.listener.unlisten();
    }
  }
}

// Usage
const robustListener = new RobustListener('critical_alerts', (payload) => {
  handleCriticalAlert(JSON.parse(payload));
});

Performance Considerations

Connection Management

Optimize notification performance by managing connections efficiently.

// Use dedicated connection for notifications
const notificationSql = postgres(connectionConfig, {
  max: 1, // Single connection for notifications
  idle_timeout: 0, // Keep connection alive
});

// Use separate connection pool for queries
const querySql = postgres(connectionConfig, {
  max: 10, // Connection pool for regular queries
});

// Setup notifications on dedicated connection
notificationSql.listen('events', handleEvent);

// Use query connection for database operations
const users = await querySql`SELECT * FROM users`;

Payload Optimization

Optimize notification payloads for better performance.

// Efficient payload design
await sql.notify('user_update', JSON.stringify({
  id: user.id,           // Minimal identifier
  type: 'profile',       // Change type
  timestamp: Date.now()  // When it happened
}));

// Avoid large payloads - use identifiers instead
await sql.notify('large_data_update', JSON.stringify({
  table: 'documents',
  id: document.id,
  // Don't include full document content
}));

// Handler fetches details as needed
sql.listen('large_data_update', async (payload) => {
  const { table, id } = JSON.parse(payload);
  const fullData = await sql`SELECT * FROM ${sql(table)} WHERE id = ${id}`;
  processUpdate(fullData[0]);
});

Install with Tessl CLI

npx tessl i tessl/npm-postgres

docs

connections.md

errors.md

index.md

large-objects.md

notifications.md

query-processing.md

querying.md

replication.md

transactions.md

types.md

tile.json