PostgreSQL client with SSH tunnel support and streaming cursors.
import { PostgresWarehouseClient } from '@lightdash/warehouses';
const client = new PostgresWarehouseClient({
type: 'postgres',
host: 'localhost',
port: 5432,
user: 'myuser',
password: 'mypassword',
dbname: 'analytics',
schema: 'public',
sslmode: 'disable',
});
await client.test();$1, $2 placeholderspg librarytimeoutSeconds)NUMERIC → number, INT8 → BigIntpg library connection pooling for efficient resource managementtimeoutSeconds credential field)pg-cursor for memory-efficient result streamingPostgreSQL types are automatically converted to JavaScript types:
NUMERIC, DECIMAL → JavaScript number (via parseFloat)INT8, BIGINT → JavaScript BigIntDimensionType for Lightdash compatibilitygetAllTables() returns BASE TABLEs only (excludes materialized views)getCatalog() includes materialized views (PostgreSQL 12+ only)pg_%, information_schema// PostgreSQL uses positional placeholders: $1, $2, $3, etc.
await client.streamQuery(
'SELECT * FROM users WHERE created_at >= $1 AND status = $2',
(data) => { /* process rows */ },
{
values: ['2023-01-01', 'active'], // Positional parameters
tags: {} // Required
}
);Edge Cases:
$1, $2, $3 (not $0)const client = new PostgresWarehouseClient({
type: 'postgres',
host: 'internal-db.local', // Internal hostname (not publicly accessible)
port: 5432,
user: 'dbuser',
password: 'dbpass',
dbname: 'warehouse',
schema: 'public',
sslmode: 'disable', // SSL mode for DB connection
// SSH Tunnel configuration
useSshTunnel: true,
sshTunnelHost: 'bastion.example.com', // Bastion/jump host
sshTunnelPort: 22, // Optional (defaults to 22)
sshTunnelUser: 'sshuser',
sshTunnelPrivateKey: '-----BEGIN RSA PRIVATE KEY-----\n...', // PEM format
// sshTunnelPassphrase: 'passphrase', // Optional if key is encrypted
});
// SSH tunnel automatically created on first query, closed when client disposedEdge Cases:
sshTunnelHost, then tunnels to host:portssh-keygen -p -m PEM)const client = new PostgresWarehouseClient({
type: 'postgres',
host: 'my-db.abc123.us-east-1.rds.amazonaws.com',
port: 5432,
user: 'admin',
password: 'password',
dbname: 'production',
schema: 'public',
sslmode: 'verify-full', // Uses built-in AWS RDS certificates
});import { PostgresSqlBuilder } from '@lightdash/warehouses';
const builder = new PostgresSqlBuilder();
builder.getFieldQuoteChar(); // '"'
builder.escapeString("O'Reilly"); // "O''Reilly"
builder.concatString('"first"', "' '", '"last"'); // ("first" || ' ' || "last")Percentile SQL: Uses PERCENTILE_CONT (exact)
// Median: PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY col)
// 95th: PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY col)Average SQL: Casts to DOUBLE PRECISION
// AVG(col::DOUBLE PRECISION)| Mode | Description |
|---|---|
disable | No SSL |
no-verify | SSL without certificate verification |
allow | Try SSL, fallback to plain |
prefer | Try SSL first, fallback to plain |
require | Require SSL |
verify-ca | Require SSL with CA verification |
verify-full | Require SSL with full verification |
import { PostgresTypes } from '@lightdash/warehouses';
enum PostgresTypes {
INTEGER, INT, BIGINT, SMALLINT, SERIAL, BIGSERIAL,
NUMERIC, DECIMAL, REAL, FLOAT, DOUBLE_PRECISION, MONEY,
BOOLEAN, BOOL, DATE, TIME, TIMESTAMP, TIMESTAMPTZ,
CHAR, VARCHAR, TEXT, JSON, JSONB,
}NUMERIC → JavaScript number (via parseFloat)INT8 (BIGINT) → JavaScript BigIntPostgresClient<T> is the exported generic base class that provides shared functionality for PostgreSQL and Redshift.
Important: Unlike ClickhouseWarehouseClient which is NOT exported, PostgresClient IS exported from @lightdash/warehouses. Most users should use the concrete implementations (PostgresWarehouseClient or RedshiftWarehouseClient), but PostgresClient is available for advanced use cases requiring generic PostgreSQL-compatible client functionality.
import { PostgresClient } from '@lightdash/warehouses';
import type { CreatePostgresLikeCredentials, WarehouseCatalog } from '@lightdash/common';
import type { PoolConfig } from 'pg';
/**
* Generic base class for PostgreSQL-compatible warehouse clients
* Provides shared functionality for PostgreSQL and Redshift
* @template T - Credentials type extending CreatePostgresLikeCredentials
*/
class PostgresClient<T extends CreatePostgresLikeCredentials> extends WarehouseBaseClient<T> {
/**
* PostgreSQL connection pool configuration
* Includes connection string, SSL settings, and pool parameters
*/
config: PoolConfig;
/**
* Initialize PostgreSQL-compatible client
* @param credentials - Credentials for PostgreSQL or Redshift
* @param config - pg.PoolConfig with connection string and SSL configuration
*/
constructor(credentials: T, config: PoolConfig);
/**
* Stream query results using server-side cursors
* Executes query with DECLARE CURSOR and fetches in batches
* @param sql - SQL query string with optional $1, $2 parameter placeholders
* @param streamCallback - Callback invoked for each batch of results
* @param options - Query options with parameter values, tags, and timezone
* @returns Promise resolving when all results are streamed
*/
streamQuery(
sql: string,
streamCallback: (data: WarehouseResults) => void,
options: {
values?: any[];
tags?: Record<string, string>;
timezone?: string;
}
): Promise<void>;
/**
* Get table metadata for specific tables
* Queries information_schema for columns, data types, and constraints
* @param requests - Array of table identifiers (database, schema, table)
* @returns Promise resolving to nested catalog structure
*/
getCatalog(
requests: Array<{ database: string; schema: string; table: string }>
): Promise<WarehouseCatalog>;
/**
* Get all BASE TABLEs in the database
* Queries information_schema.tables for BASE TABLE type only
* Note: Materialized views are only included in getCatalog(), not getAllTables()
* @returns Promise resolving to array of table metadata
*/
getAllTables(): Promise<Array<{ database: string; schema: string; table: string }>>;
/**
* Parse PostgreSQL database errors into WarehouseQueryError
* Converts CTE references and position markers to line numbers
* @param error - PostgreSQL error from pg driver
* @param query - Optional SQL query for enhanced error messages
* @returns Enhanced error with line numbers and context
*/
parseError(error: Error, query?: string): Error;
}Implementation Details:
pg library connection pooling for efficient resource managementnumber, INT8 → BigInt)$1, $2, ... placeholdersUsage: Most users should use concrete implementations (PostgresWarehouseClient or RedshiftWarehouseClient), but PostgresClient is available for advanced use cases requiring generic PostgreSQL-compatible client functionality.
TypeScript:
/**
* Convert pg query result field types to DimensionType
*
* Maps PostgreSQL OID types to Lightdash dimension types using internal
* type mapping tables. This is a static utility method that can be called
* without instantiating a client.
*
* @param fields - Field metadata from pg QueryResult
* - Array of field objects from pg query result
* - Each field contains: name (string), dataTypeID (number/OID)
* - dataTypeID is PostgreSQL's internal type identifier (OID)
* - Examples: 23=INT4, 1043=VARCHAR, 1114=TIMESTAMP
* - Field metadata is available on every QueryResult object
*
* @returns Record mapping field names to dimension types
* - Object where keys are column names (strings)
* - Values are objects with single property 'type' (DimensionType)
* - Structure: { [columnName]: { type: DimensionType } }
* - All fields from input are included in output
* - Unknown OIDs default to DimensionType.STRING
*
* @example
* import { PostgresWarehouseClient } from '@lightdash/warehouses';
* import { Pool } from 'pg';
*
* const pool = new Pool({ ... });
* const result = await pool.query('SELECT * FROM users');
* const fields = PostgresWarehouseClient.convertQueryResultFields(result.fields);
* console.log(fields);
* // { user_id: { type: 'number' }, email: { type: 'string' }, created_at: { type: 'timestamp' } }
*
* @static
* @memberof PostgresWarehouseClient
*/
static convertQueryResultFields(
fields: QueryResult<any>['fields']
): Record<string, { type: DimensionType }>;CommonJS:
const { PostgresWarehouseClient } = require('@lightdash/warehouses');
const { Pool } = require('pg');
/**
* @typedef {import('@lightdash/common').DimensionType} DimensionType
* @typedef {import('pg').QueryResult} QueryResult
*/
// Usage
const pool = new Pool({ /* config */ });
const result = await pool.query('SELECT * FROM users');
const fields = PostgresWarehouseClient.convertQueryResultFields(result.fields);
console.log(fields);
// { user_id: { type: 'number' }, email: { type: 'string' }, created_at: { type: 'timestamp' } }Type Mappings (by PostgreSQL OID):
DimensionType.STRINGDimensionType.NUMBERDimensionType.BOOLEANDimensionType.DATEDimensionType.TIMESTAMPDimensionType.STRINGconst error = client.parseError(pgError, query);
// Extracts line numbers and adds contextimport { type CreatePostgresCredentials, type CreatePostgresLikeCredentials, WarehouseTypes, WeekDay } from '@lightdash/common';
/**
* PostgreSQL warehouse credentials
*/
interface CreatePostgresCredentials extends CreatePostgresLikeCredentials {
/** Warehouse type discriminator */
type: 'postgres' | WarehouseTypes.POSTGRES;
/** PostgreSQL server hostname or IP */
host: string;
/** PostgreSQL server port (default: 5432) */
port: number;
/** Database username */
user: string;
/** Database password */
password: string;
/** Database name */
dbname: string;
/** Default schema for queries */
schema: string;
/** SSL mode: disable, no-verify, allow, prefer, require, verify-ca, verify-full */
sslmode: 'disable' | 'no-verify' | 'allow' | 'prefer' | 'require' | 'verify-ca' | 'verify-full';
// Optional fields
requireUserCredentials?: boolean;
threads?: number; // Number of concurrent query threads
keepalivesIdle?: number; // TCP keepalive idle time in seconds
searchPath?: string; // PostgreSQL search_path setting
role?: string; // PostgreSQL role to SET ROLE after connection
timeoutSeconds?: number; // Query timeout in seconds (default: 300)
sslcertFileName?: string;
sslcert?: string | null;
sslkeyFileName?: string;
sslkey?: string | null;
sslrootcertFileName?: string;
sslrootcert?: string | null;
useSshTunnel?: boolean;
sshTunnelHost?: string;
sshTunnelPort?: number;
sshTunnelUser?: string;
sshTunnelPublicKey?: string;
sshTunnelPrivateKey?: string;
startOfWeek?: WeekDay | null;
}
/**
* Base interface for PostgreSQL-like credentials
* Shared by PostgreSQL and Redshift
*/
interface CreatePostgresLikeCredentials {
host: string;
port: number;
user: string;
password: string;
dbname: string;
schema: string;
sslmode: string;
useSshTunnel?: boolean;
sshTunnelHost?: string;
sshTunnelPort?: number;
sshTunnelUser?: string;
sshTunnelPublicKey?: string;
sshTunnelPrivateKey?: string;
startOfWeek?: WeekDay | null;
}timeoutSeconds)getAllTables() returns BASE TABLEs only (not materialized views)getCatalog() includes materialized views (PostgreSQL 12+)pg_%, information_schema) automatically filtered