Fastest full featured PostgreSQL client for Node.js and Deno with tagged template literals, transactions, streaming, and logical replication support
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
PostgreSQL LISTEN/NOTIFY support for real-time messaging and pub/sub patterns between database clients.
Subscribe to PostgreSQL notification channels to receive real-time messages from other database clients.
/**
* Listen for notifications on a channel
* @param channel - Channel name to listen on
* @param fn - Callback function to handle notifications
* @param onlisten - Optional callback when listening starts
* @returns ListenRequest object for managing the subscription
*/
listen(
channel: string,
fn: (payload: string) => void,
onlisten?: () => void
): ListenRequest;Usage Examples:
// Basic channel listening
const listener = sql.listen('user_updates', (payload) => {
console.log('User update received:', payload);
const data = JSON.parse(payload);
handleUserUpdate(data);
});
// Listen with confirmation callback
sql.listen('order_status',
(payload) => {
const order = JSON.parse(payload);
updateOrderDisplay(order);
},
() => {
console.log('Now listening for order status updates');
}
);
// Multiple channel listeners
sql.listen('chat_messages', handleChatMessage);
sql.listen('system_alerts', handleSystemAlert);
sql.listen('user_presence', handlePresenceUpdate);Send notifications to channels that other clients can receive.
/**
* Send notification to a channel
* @param channel - Channel name to send to
* @param payload - Message payload (string)
* @returns Promise resolving when notification is sent
*/
notify(channel: string, payload: string): Promise<void>;Usage Examples:
// Send simple notification
await sql.notify('user_updates', 'User profile changed');
// Send structured data as JSON
const userUpdate = {
userId: 123,
action: 'profile_update',
timestamp: Date.now()
};
await sql.notify('user_updates', JSON.stringify(userUpdate));
// Notify about database changes
await sql.notify('inventory_change', JSON.stringify({
productId: product.id,
oldQuantity: oldQty,
newQuantity: newQty,
operation: 'sale'
}));Manage active notification subscriptions with control methods.
interface ListenRequest {
/** Channel being listened to */
channel: string;
/** Stop listening to this channel */
unlisten(): Promise<void>;
}Usage Examples:
// Store listener reference for later cleanup
const orderListener = sql.listen('orders', handleOrder);
// Stop listening when component unmounts
componentWillUnmount() {
orderListener.unlisten();
}
// Conditional listening
let priceListener = null;
if (user.subscribedToPriceUpdates) {
priceListener = sql.listen('price_changes', handlePriceChange);
}
// Clean up conditional listener
if (priceListener) {
await priceListener.unlisten();
}Use database triggers to automatically send notifications when data changes.
-- Create trigger function to send notifications
CREATE OR REPLACE FUNCTION notify_user_changes()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
PERFORM pg_notify('user_changes', json_build_object(
'operation', 'insert',
'user_id', NEW.id,
'data', row_to_json(NEW)
)::text);
ELSIF TG_OP = 'UPDATE' THEN
PERFORM pg_notify('user_changes', json_build_object(
'operation', 'update',
'user_id', NEW.id,
'old_data', row_to_json(OLD),
'new_data', row_to_json(NEW)
)::text);
ELSIF TG_OP = 'DELETE' THEN
PERFORM pg_notify('user_changes', json_build_object(
'operation', 'delete',
'user_id', OLD.id,
'data', row_to_json(OLD)
)::text);
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Create triggers
CREATE TRIGGER user_changes_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_user_changes();Usage Examples:
// Listen for database-driven notifications
sql.listen('user_changes', (payload) => {
const change = JSON.parse(payload);
switch (change.operation) {
case 'insert':
console.log('New user created:', change.data);
addUserToCache(change.data);
break;
case 'update':
console.log('User updated:', change.user_id);
updateUserInCache(change.new_data);
break;
case 'delete':
console.log('User deleted:', change.user_id);
removeUserFromCache(change.user_id);
break;
}
});
// The triggers will automatically send notifications for any changes:
await sql`INSERT INTO users (name, email) VALUES (${name}, ${email})`;
// Automatically triggers notification
await sql`UPDATE users SET email = ${newEmail} WHERE id = ${userId}`;
// Automatically triggers notification
await sql`DELETE FROM users WHERE id = ${userId}`;
// Automatically triggers notificationImplement real-time features using PostgreSQL notifications.
// Real-time chat system
class ChatRoom {
constructor(roomId) {
this.roomId = roomId;
this.setupListeners();
}
setupListeners() {
// Listen for new messages in this room
this.messageListener = sql.listen(`chat_${this.roomId}`, (payload) => {
const message = JSON.parse(payload);
this.displayMessage(message);
});
// Listen for user presence updates
this.presenceListener = sql.listen(`presence_${this.roomId}`, (payload) => {
const presence = JSON.parse(payload);
this.updateUserPresence(presence);
});
}
async sendMessage(userId, text) {
// Store message in database
await sql`
INSERT INTO messages (room_id, user_id, text, created_at)
VALUES (${this.roomId}, ${userId}, ${text}, NOW())
`;
// Notify other clients
await sql.notify(`chat_${this.roomId}`, JSON.stringify({
userId,
text,
timestamp: Date.now()
}));
}
async updatePresence(userId, status) {
await sql.notify(`presence_${this.roomId}`, JSON.stringify({
userId,
status,
timestamp: Date.now()
}));
}
cleanup() {
this.messageListener.unlisten();
this.presenceListener.unlisten();
}
}
// Usage
const chatRoom = new ChatRoom(123);
await chatRoom.sendMessage(456, 'Hello everyone!');Create sophisticated routing systems for different types of notifications.
class NotificationRouter {
constructor() {
this.handlers = new Map();
this.setupGlobalListener();
}
setupGlobalListener() {
// Single listener for all notifications with routing
sql.listen('app_notifications', (payload) => {
try {
const notification = JSON.parse(payload);
this.routeNotification(notification);
} catch (error) {
console.error('Invalid notification payload:', payload);
}
});
}
routeNotification(notification) {
const { type, target, data } = notification;
const key = `${type}:${target}`;
if (this.handlers.has(key)) {
this.handlers.get(key).forEach(handler => {
handler(data);
});
}
}
subscribe(type, target, handler) {
const key = `${type}:${target}`;
if (!this.handlers.has(key)) {
this.handlers.set(key, []);
}
this.handlers.get(key).push(handler);
}
async publish(type, target, data) {
await sql.notify('app_notifications', JSON.stringify({
type,
target,
data,
timestamp: Date.now()
}));
}
}
// Usage
const router = new NotificationRouter();
// Subscribe to specific notification types
router.subscribe('order', 'status_change', (data) => {
updateOrderStatus(data.orderId, data.status);
});
router.subscribe('inventory', 'low_stock', (data) => {
showLowStockAlert(data.productId, data.quantity);
});
router.subscribe('user', 'login', (data) => {
logUserActivity(data.userId, 'login');
});
// Publish notifications
await router.publish('order', 'status_change', {
orderId: 12345,
status: 'shipped'
});
await router.publish('inventory', 'low_stock', {
productId: 67890,
quantity: 5
});Handle connection issues and notification delivery errors gracefully.
// Configure notification error handling
const sql = postgres(connectionConfig, {
onnotify: (channel, payload) => {
console.log(`Notification on ${channel}:`, payload);
},
onclose: (connectionId) => {
console.log(`Connection ${connectionId} closed, notifications stopped`);
// Implement reconnection logic
}
});Usage Examples:
// Robust notification listener with error handling
class RobustListener {
constructor(channel, handler) {
this.channel = channel;
this.handler = handler;
this.listener = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.connect();
}
connect() {
try {
this.listener = sql.listen(this.channel,
(payload) => {
this.reconnectAttempts = 0; // Reset on successful message
this.handler(payload);
},
() => {
console.log(`Connected to channel: ${this.channel}`);
this.reconnectAttempts = 0;
}
);
} catch (error) {
console.error(`Failed to connect to ${this.channel}:`, error);
this.scheduleReconnect();
}
}
scheduleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
setTimeout(() => {
console.log(`Reconnecting to ${this.channel} (attempt ${this.reconnectAttempts})`);
this.connect();
}, delay);
} else {
console.error(`Max reconnection attempts reached for ${this.channel}`);
}
}
async stop() {
if (this.listener) {
await this.listener.unlisten();
}
}
}
// Usage
const robustListener = new RobustListener('critical_alerts', (payload) => {
handleCriticalAlert(JSON.parse(payload));
});Optimize notification performance by managing connections efficiently.
// Use dedicated connection for notifications
const notificationSql = postgres(connectionConfig, {
max: 1, // Single connection for notifications
idle_timeout: 0, // Keep connection alive
});
// Use separate connection pool for queries
const querySql = postgres(connectionConfig, {
max: 10, // Connection pool for regular queries
});
// Setup notifications on dedicated connection
notificationSql.listen('events', handleEvent);
// Use query connection for database operations
const users = await querySql`SELECT * FROM users`;Optimize notification payloads for better performance.
// Efficient payload design
await sql.notify('user_update', JSON.stringify({
id: user.id, // Minimal identifier
type: 'profile', // Change type
timestamp: Date.now() // When it happened
}));
// Avoid large payloads - use identifiers instead
await sql.notify('large_data_update', JSON.stringify({
table: 'documents',
id: document.id,
// Don't include full document content
}));
// Handler fetches details as needed
sql.listen('large_data_update', async (payload) => {
const { table, id } = JSON.parse(payload);
const fullData = await sql`SELECT * FROM ${sql(table)} WHERE id = ${id}`;
processUpdate(fullData[0]);
});Install with Tessl CLI
npx tessl i tessl/npm-postgres