or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

database-operations.mdextensions.mdfilesystem-storage.mdindex.mdlive-queries.mdsql-templates.mdvector-operations.mdworker-support.md
tile.json

live-queries.mddocs/

Live Queries

Real-time query results that automatically update when underlying data changes, enabling reactive applications with PostgreSQL.

Import

import { live } from "@electric-sql/pglite/live";

Setup

import { PGlite } from "@electric-sql/pglite";
import { live } from "@electric-sql/pglite/live";

const db = await PGlite.create({
  extensions: {
    live,
  },
});

// Now db.live is available

Capabilities

Live Query Interface

Main interface for creating and managing live queries.

interface LiveNamespace {
  /**
   * Create a live query that updates when data changes
   * @param query - SQL query string
   * @param params - Query parameters
   * @param key - Optional cache key for query
   * @returns LiveQuery instance
   */
  query<T = Record<string, any>>(
    query: string,
    params?: any[],
    key?: string
  ): Promise<LiveQuery<T>>;

  /**
   * Create an incremental live query with efficient updates
   * @param query - SQL query string
   * @param params - Query parameters  
   * @param key - Optional cache key for query
   * @returns LiveQuery instance with incremental updates
   */
  incrementalQuery<T = Record<string, any>>(
    query: string,
    params?: any[],
    key?: string
  ): Promise<LiveQuery<T>>;

  /**
   * Subscribe to data changes without initial results
   * @param query - SQL query string
   * @param params - Query parameters
   * @param key - Optional cache key for query
   * @returns LiveChanges instance
   */
  changes<T = Record<string, any>>(
    query: string,
    params?: any[],
    key?: string
  ): Promise<LiveChanges<T>>;
}

Live Query Results

Object representing a live query with methods to access and manage updates.

interface LiveQuery<T = Record<string, any>> {
  /** Initial query results */
  readonly initialResults: Results<T>;
  
  /**
   * Manually refresh the query results
   * @returns Updated query results
   */
  refresh(): Promise<Results<T>>;
  
  /**
   * Stop listening for updates and cleanup
   */
  unsubscribe(): Promise<void>;
  
  /**
   * Subscribe to query result changes
   * @param callback - Function called when results change
   * @returns Function to unsubscribe
   */
  subscribe(callback: (results: Results<T>) => void): () => void;
}

Live Changes Tracking

Interface for tracking specific data changes without full result sets.

interface LiveChanges<T = Record<string, any>> {
  /**
   * Subscribe to data changes
   * @param callback - Function called when data changes
   * @returns Function to unsubscribe
   */
  subscribe(callback: (changes: Change<T>[]) => void): () => void;
  
  /**
   * Stop listening for changes
   */
  unsubscribe(): Promise<void>;
}

type Change<T> = InsertChange<T> | UpdateChange<T> | DeleteChange<T>;

interface InsertChange<T> {
  __changed_columns__: string[];
  __op__: 'INSERT';
  __table__: string;
  new: T;
}

interface UpdateChange<T> {
  __changed_columns__: string[];
  __op__: 'UPDATE';
  __table__: string;
  old: T;
  new: T;
}

interface DeleteChange<T> {
  __changed_columns__: string[];
  __op__: 'DELETE';
  __table__: string;
  old: T;
}

Live Query Results with Pagination

Extended results interface with pagination support.

interface LiveQueryResults<T = Record<string, any>> extends Results<T> {
  /** Total number of rows across all pages */
  totalRows?: number;
  /** Current page number (0-based) */
  page?: number;
  /** Number of rows per page */
  pageSize?: number;
  /** Whether there are more pages available */
  hasMore?: boolean;
}

PGlite with Live Extension

Type definition for PGlite instance with live query capability.

interface PGliteWithLive extends PGliteInterface {
  /** Live query namespace */
  live: LiveNamespace;
}

Types

/** Extension configuration for live queries */
const live: Extension<LiveNamespace>;

Usage Examples:

import { PGlite } from "@electric-sql/pglite";
import { live } from "@electric-sql/pglite/live";

// Setup database with live queries
const db = await PGlite.create({
  extensions: {
    live,
  },
});

// Create tables
await db.exec(`
  CREATE TABLE IF NOT EXISTS messages (
    id SERIAL PRIMARY KEY,
    text TEXT,
    user_id INTEGER,
    created_at TIMESTAMP DEFAULT NOW()
  );
`);

// Basic live query
const liveQuery = await db.live.query(
  "SELECT * FROM messages ORDER BY created_at DESC LIMIT 10"
);

console.log("Initial results:", liveQuery.initialResults.rows);

// Subscribe to changes
const unsubscribe = liveQuery.subscribe((results) => {
  console.log("Updated results:", results.rows);
});

// Insert data - subscribers will be notified
await db.query("INSERT INTO messages (text, user_id) VALUES ($1, $2)", [
  "Hello, live queries!",
  1
]);

// Parameterized live query
const userMessages = await db.live.query(
  "SELECT * FROM messages WHERE user_id = $1 ORDER BY created_at DESC",
  [1],
  "user-messages-1" // Cache key
);

// Incremental live query for better performance
const incrementalQuery = await db.live.incrementalQuery(
  "SELECT * FROM messages ORDER BY created_at DESC LIMIT 100"
);

// Subscribe to changes only (no initial results)
const changesSubscription = await db.live.changes(
  "SELECT * FROM messages WHERE user_id = $1",
  [1]
);

changesSubscription.subscribe((changes) => {
  for (const change of changes) {
    switch (change.__op__) {
      case 'INSERT':
        console.log("New message:", change.new);
        break;
      case 'UPDATE':
        console.log("Updated message:", change.old, "->", change.new);
        break;
      case 'DELETE':
        console.log("Deleted message:", change.old);
        break;
    }
  }
});

// Cleanup
setTimeout(async () => {
  unsubscribe();
  await userMessages.unsubscribe();
  await incrementalQuery.unsubscribe();
  await changesSubscription.unsubscribe();
}, 30000);

// Manual refresh
const refreshedResults = await liveQuery.refresh();
console.log("Manually refreshed:", refreshedResults.rows);