Real-time query results that automatically update when underlying data changes, enabling reactive applications with PostgreSQL.
import { live } from "@electric-sql/pglite/live";import { PGlite } from "@electric-sql/pglite";
import { live } from "@electric-sql/pglite/live";
const db = await PGlite.create({
extensions: {
live,
},
});
// Now db.live is availableMain interface for creating and managing live queries.
interface LiveNamespace {
/**
* Create a live query that updates when data changes
* @param query - SQL query string
* @param params - Query parameters
* @param key - Optional cache key for query
* @returns LiveQuery instance
*/
query<T = Record<string, any>>(
query: string,
params?: any[],
key?: string
): Promise<LiveQuery<T>>;
/**
* Create an incremental live query with efficient updates
* @param query - SQL query string
* @param params - Query parameters
* @param key - Optional cache key for query
* @returns LiveQuery instance with incremental updates
*/
incrementalQuery<T = Record<string, any>>(
query: string,
params?: any[],
key?: string
): Promise<LiveQuery<T>>;
/**
* Subscribe to data changes without initial results
* @param query - SQL query string
* @param params - Query parameters
* @param key - Optional cache key for query
* @returns LiveChanges instance
*/
changes<T = Record<string, any>>(
query: string,
params?: any[],
key?: string
): Promise<LiveChanges<T>>;
}Object representing a live query with methods to access and manage updates.
interface LiveQuery<T = Record<string, any>> {
/** Initial query results */
readonly initialResults: Results<T>;
/**
* Manually refresh the query results
* @returns Updated query results
*/
refresh(): Promise<Results<T>>;
/**
* Stop listening for updates and cleanup
*/
unsubscribe(): Promise<void>;
/**
* Subscribe to query result changes
* @param callback - Function called when results change
* @returns Function to unsubscribe
*/
subscribe(callback: (results: Results<T>) => void): () => void;
}Interface for tracking specific data changes without full result sets.
interface LiveChanges<T = Record<string, any>> {
/**
* Subscribe to data changes
* @param callback - Function called when data changes
* @returns Function to unsubscribe
*/
subscribe(callback: (changes: Change<T>[]) => void): () => void;
/**
* Stop listening for changes
*/
unsubscribe(): Promise<void>;
}
type Change<T> = InsertChange<T> | UpdateChange<T> | DeleteChange<T>;
interface InsertChange<T> {
__changed_columns__: string[];
__op__: 'INSERT';
__table__: string;
new: T;
}
interface UpdateChange<T> {
__changed_columns__: string[];
__op__: 'UPDATE';
__table__: string;
old: T;
new: T;
}
interface DeleteChange<T> {
__changed_columns__: string[];
__op__: 'DELETE';
__table__: string;
old: T;
}Extended results interface with pagination support.
interface LiveQueryResults<T = Record<string, any>> extends Results<T> {
/** Total number of rows across all pages */
totalRows?: number;
/** Current page number (0-based) */
page?: number;
/** Number of rows per page */
pageSize?: number;
/** Whether there are more pages available */
hasMore?: boolean;
}Type definition for PGlite instance with live query capability.
interface PGliteWithLive extends PGliteInterface {
/** Live query namespace */
live: LiveNamespace;
}/** Extension configuration for live queries */
const live: Extension<LiveNamespace>;Usage Examples:
import { PGlite } from "@electric-sql/pglite";
import { live } from "@electric-sql/pglite/live";
// Setup database with live queries
const db = await PGlite.create({
extensions: {
live,
},
});
// Create tables
await db.exec(`
CREATE TABLE IF NOT EXISTS messages (
id SERIAL PRIMARY KEY,
text TEXT,
user_id INTEGER,
created_at TIMESTAMP DEFAULT NOW()
);
`);
// Basic live query
const liveQuery = await db.live.query(
"SELECT * FROM messages ORDER BY created_at DESC LIMIT 10"
);
console.log("Initial results:", liveQuery.initialResults.rows);
// Subscribe to changes
const unsubscribe = liveQuery.subscribe((results) => {
console.log("Updated results:", results.rows);
});
// Insert data - subscribers will be notified
await db.query("INSERT INTO messages (text, user_id) VALUES ($1, $2)", [
"Hello, live queries!",
1
]);
// Parameterized live query
const userMessages = await db.live.query(
"SELECT * FROM messages WHERE user_id = $1 ORDER BY created_at DESC",
[1],
"user-messages-1" // Cache key
);
// Incremental live query for better performance
const incrementalQuery = await db.live.incrementalQuery(
"SELECT * FROM messages ORDER BY created_at DESC LIMIT 100"
);
// Subscribe to changes only (no initial results)
const changesSubscription = await db.live.changes(
"SELECT * FROM messages WHERE user_id = $1",
[1]
);
changesSubscription.subscribe((changes) => {
for (const change of changes) {
switch (change.__op__) {
case 'INSERT':
console.log("New message:", change.new);
break;
case 'UPDATE':
console.log("Updated message:", change.old, "->", change.new);
break;
case 'DELETE':
console.log("Deleted message:", change.old);
break;
}
}
});
// Cleanup
setTimeout(async () => {
unsubscribe();
await userMessages.unsubscribe();
await incrementalQuery.unsubscribe();
await changesSubscription.unsubscribe();
}, 30000);
// Manual refresh
const refreshedResults = await liveQuery.refresh();
console.log("Manually refreshed:", refreshedResults.rows);