PostgreSQL warehouse client with SSH tunnel support, streaming cursors, parameterized queries, and SSL/TLS configuration including AWS RDS certificate bundles.
Generic base class for PostgreSQL-compatible warehouse clients (PostgreSQL and Redshift). Provides core PostgreSQL wire protocol implementation with connection pooling, streaming cursors, and error handling.
PostgresClient vs PostgresWarehouseClient:
PostgresClient is the generic base class that implements core PostgreSQL wire protocol functionalityPostgresWarehouseClient extends PostgresClient and provides PostgreSQL-specific configuration (SSL modes, connection setup)RedshiftWarehouseClient also extends PostgresClient with Redshift-specific configurationPostgresWarehouseClient or RedshiftWarehouseClient for typical database connectionsPostgresClient directly only when building custom PostgreSQL-compatible warehouse clients or when you need access to the generic base class methodsNote: PostgresClient IS exported from @lightdash/warehouses and can be imported directly. It's exported via the standard export * from './warehouseClients/PostgresWarehouseClient' statement in the package index.
// Both PostgresClient and PostgresWarehouseClient are directly exported from the package
import { PostgresClient, PostgresWarehouseClient } from "@lightdash/warehouses";
import type { CreatePostgresLikeCredentials } from "@lightdash/common";
import type { PoolConfig } from "pg";
/**
* Generic base class for PostgreSQL-compatible warehouse clients
* Extends WarehouseBaseClient with PostgreSQL-specific functionality
* @template T - Credentials type extending CreatePostgresLikeCredentials
* @note PostgresClient IS directly exported from @lightdash/warehouses via export * from the PostgresWarehouseClient module.
* However, most users should use the concrete implementations (PostgresWarehouseClient
* or RedshiftWarehouseClient) instead.
*/
class PostgresClient<T extends CreatePostgresLikeCredentials> extends WarehouseBaseClient<T> {
/**
* PostgreSQL connection pool configuration
* Includes connection string, SSL settings, and pool parameters
*/
config: PoolConfig;
/**
* Initialize PostgreSQL-compatible client
* @param credentials - Credentials for PostgreSQL or Redshift
* @param config - pg.PoolConfig with connection string and SSL configuration
*/
constructor(credentials: T, config: PoolConfig);
/**
* Stream query results using server-side cursors
* Executes query with DECLARE CURSOR and fetches in batches
* @param sql - SQL query string with optional $1, $2 parameter placeholders
* @param streamCallback - Callback invoked for each batch of results
* @param options - Query options with parameter values, tags, and timezone
* @returns Promise resolving when all results are streamed
*/
streamQuery(
sql: string,
streamCallback: (data: WarehouseResults) => void,
options: {
values?: AnyType[];
tags?: Record<string, string>;
timezone?: string;
}
): Promise<void>;
/**
* Get table metadata for specific tables
* Queries information_schema for columns, data types, and constraints
* @param requests - Array of table identifiers (database, schema, table)
* @returns Promise resolving to nested catalog structure
*/
getCatalog(
requests: Array<{ database: string; schema: string; table: string }>
): Promise<WarehouseCatalog>;
/**
* Get all BASE TABLEs in the database
* Queries information_schema.tables for BASE TABLE type only
* Excludes system tables from information_schema and pg_catalog
* Note: This method does NOT accept parameters despite base interface definition
* Note: Materialized views are only included in getCatalog(), not getAllTables()
* @returns Promise resolving to array of table metadata
*/
getAllTables(): Promise<
Array<{
database: string;
schema: string;
table: string;
}>
>;
/**
* Get field metadata for specific table
* @param tableName - Name of the table
* @param schema - Optional schema name (uses credentials schema if omitted)
* @param database - Optional database name (PostgreSQL catalog)
* @param tags - Optional tags for query tracking
* @returns Promise resolving to catalog with field information
*/
getFields(
tableName: string,
schema?: string,
database?: string,
tags?: Record<string, string>
): Promise<WarehouseCatalog>;
/**
* Parse PostgreSQL database errors into WarehouseQueryError
* Converts CTE references and position markers to line numbers
* @param error - PostgreSQL error from pg driver
* @param query - Optional SQL query for enhanced error messages (defaults to empty string if not provided)
* @returns Enhanced error with line numbers and context
*/
parseError(error: pg.DatabaseError, query?: string): WarehouseQueryError;
/**
* Parse raw query results into warehouse catalog structure
* Inherited from WarehouseBaseClient, transforms rows into nested catalog format
* @param rows - Raw query result rows containing database/schema/table/column metadata
* @param mapFieldType - Function to map warehouse-specific types to DimensionType
* @returns Nested catalog structure organized by database/schema/table/column
*/
parseWarehouseCatalog(
rows: Record<string, AnyType>[],
mapFieldType: (type: string) => DimensionType
): WarehouseCatalog;
/**
* Convert pg query result field types to DimensionType
* Maps PostgreSQL OID types (numeric data type identifiers) to Lightdash dimension types
*
* Note: This is primarily an internal utility method used by the client implementation.
* Users typically only need this when working with raw pg driver results outside of
* the standard streamQuery()/runQuery() methods, which handle type conversion automatically.
*
* @param fields - Field metadata from pg QueryResult (array of { name, dataTypeID } objects)
* @returns Record mapping field names to dimension types
* @example
* ```typescript
* import { PostgresWarehouseClient } from '@lightdash/warehouses';
* import type { QueryResult } from 'pg';
*
* // After executing a raw pg query
* const pgResult: QueryResult = await pgPool.query('SELECT * FROM users');
*
* // Convert field types to Lightdash dimension types
* const fieldTypes = PostgresWarehouseClient.convertQueryResultFields(pgResult.fields);
* console.log(fieldTypes);
* // {
* // user_id: { type: DimensionType.NUMBER },
* // email: { type: DimensionType.STRING },
* // created_at: { type: DimensionType.TIMESTAMP }
* // }
* ```
*/
static convertQueryResultFields(
fields: QueryResult<AnyType>["fields"]
): Record<string, { type: DimensionType }>;
}Implementation Details:
pg library connection pooling for efficient resource managementtypes.setTypeParser):
NUMERIC (OID 1700) → JavaScript number (via parseFloat)INT8 / BIGINT (OID 20) → JavaScript BigInt (native BigInt type)$1, $2, ... placeholdersSshTunnel class (automatic when useSshTunnel: true)timeoutSeconds in credentials)Usage Examples:
// Note: PostgresClient IS exported, but most users should use concrete implementations
// Use PostgresWarehouseClient or RedshiftWarehouseClient for typical use cases
import { PostgresClient, PostgresWarehouseClient } from "@lightdash/warehouses";
import type { CreatePostgresCredentials } from "@lightdash/common";
// Create PostgresWarehouseClient (which extends PostgresClient)
const client = new PostgresWarehouseClient({
type: "postgres",
host: "localhost",
port: 5432,
user: "dbuser",
password: "dbpass",
dbname: "analytics",
schema: "public",
sslmode: "disable",
startOfWeek: null,
});
// Use static method to convert field types (inherited from PostgresClient)
const fieldTypes = PostgresWarehouseClient.convertQueryResultFields(pgResult.fields);
console.log(fieldTypes);
// { user_id: { type: 'number' }, created_at: { type: 'timestamp' } }PostgreSQL warehouse client implementation with SSL modes and connection pooling.
/**
* PostgreSQL warehouse client with SSH tunnel support
* Supports multiple SSL modes including AWS RDS certificate verification
* Extends PostgresClient base class with all standard WarehouseClient methods
*/
class PostgresWarehouseClient extends PostgresClient<CreatePostgresCredentials> {
/**
* Initialize PostgreSQL client with credentials
* Configures SSL based on sslmode, includes AWS RDS CA certificates
* @param credentials - PostgreSQL credentials with host, port, database, and SSL config
*/
constructor(credentials: CreatePostgresCredentials);
}Public Methods:
PostgresWarehouseClient implements all standard WarehouseClient methods:
test() - Test database connectionstreamQuery() - Stream query results with cursorsgetCatalog() - Get table metadata for specific tablesgetAllTables() - Get all tables in the databasegetFields() - Get column information for a tableparseError() - Parse PostgreSQL errors with enhanced messagesparseWarehouseCatalog() - Parse raw rows into warehouse catalog structurerunQuery() - NOT RECOMMENDED Execute query and return all results at once (use streamQuery() instead to avoid loading all results into memory)executeAsyncQuery() - Execute async query (fallback to streaming)getAsyncQueryResults() - NOT SUPPORTED Throws NotImplementedError (inherited from base class - native pagination not supported)Note: The runQuery() method is not recommended for new code. Use streamQuery() for all implementations to avoid memory issues with large result sets.
See Core Types for complete WarehouseClient interface documentation.
Usage Examples:
import { PostgresWarehouseClient } from "@lightdash/warehouses";
// Basic connection without SSL
const client = new PostgresWarehouseClient({
type: "postgres",
host: "localhost",
port: 5432,
user: "myuser",
password: "mypassword",
dbname: "analytics",
schema: "public",
sslmode: "disable",
startOfWeek: null,
});
// AWS RDS with SSL verification
const rdsClient = new PostgresWarehouseClient({
type: "postgres",
host: "my-db.abc123.us-east-1.rds.amazonaws.com",
port: 5432,
user: "admin",
password: "secure-password",
dbname: "production",
schema: "analytics",
sslmode: "verify-full",
startOfWeek: null,
});
// With SSH tunnel
const tunnelClient = new PostgresWarehouseClient({
type: "postgres",
host: "internal-db.local",
port: 5432,
user: "dbuser",
password: "dbpass",
dbname: "warehouse",
schema: "reporting",
sslmode: "disable",
startOfWeek: null,
useSshTunnel: true,
sshTunnelHost: "bastion.example.com",
sshTunnelPort: 22,
sshTunnelUser: "sshuser",
sshTunnelPrivateKey: "-----BEGIN RSA PRIVATE KEY-----\n...",
});
// Test connection
await client.test();
// Stream query results with cursor
await client.streamQuery(
"SELECT * FROM users WHERE created_at >= $1 ORDER BY created_at",
(data) => {
console.log("Received batch:", data.rows.length, "rows");
data.rows.forEach((row) => {
console.log(`User ${row.id}: ${row.email}`);
});
},
{
values: ["2023-01-01"],
tags: { source: "lightdash" },
}
);
// Parameterized query with positional parameters ($1, $2, etc.)
const results = await client.runQuery(
"SELECT COUNT(*) as total FROM orders WHERE status = $1 AND created_at >= $2",
{ source: "analytics" },
"UTC",
["completed", "2023-01-01"]
);
console.log("Total orders:", results.rows[0].total);
// Get all tables excluding system schemas
const tables = await client.getAllTables();
console.log("Tables:", tables.map((t) => t.table).join(", "));
// Get table fields
const catalog = await client.getFields("users", "public");
const columns = catalog["current_database"]["public"]["users"];
Object.entries(columns).forEach(([col, type]) => {
console.log(`${col}: ${type}`);
});SQL builder for PostgreSQL-specific syntax.
/**
* PostgreSQL SQL builder with double-quote quoting and PERCENTILE_CONT
*/
class PostgresSqlBuilder extends WarehouseBaseSqlBuilder {
/** Warehouse type identifier */
type = WarehouseTypes.POSTGRES;
/**
* Get DBT adapter type for PostgreSQL
* @returns SupportedDbtAdapter.POSTGRES
*/
getAdapterType(): SupportedDbtAdapter;
/**
* Get escape string quote character for PostgreSQL
* @returns Single quote character "'"
*/
getEscapeStringQuoteChar(): string;
/**
* Generate SQL for metric aggregation
* Uses PERCENTILE_CONT for percentile/median metrics
* Note: AVERAGE metrics are cast to DOUBLE PRECISION in PostgreSQL
* @param sql - Column SQL expression
* @param metric - Metric definition
* @returns SQL string for the metric aggregation
*/
getMetricSql(sql: string, metric: Metric): string;
/**
* Generate SQL for string concatenation
* Uses PostgreSQL || operator
* @param args - String expressions to concatenate
* @returns SQL concatenation expression
*/
concatString(...args: string[]): string;
/**
* Escape string value for PostgreSQL
* Doubles single quotes for escaping
* Also normalizes unicode characters, escapes backslashes, strips SQL comments (-- and /* */), and removes null bytes
* @param value - Raw string value
* @returns Escaped string safe for PostgreSQL query
*/
escapeString(value: string): string;
}Usage Examples:
import { PostgresSqlBuilder } from "@lightdash/warehouses";
const builder = new PostgresSqlBuilder();
// Get quote characters
const fieldQuote = builder.getFieldQuoteChar(); // '"'
const fieldName = `${fieldQuote}user_id${fieldQuote}`; // "user_id"
// Escape string with single quotes
const escaped = builder.escapeString("O'Reilly's Book");
// Result: 'O''Reilly''s Book'
// Generate concatenation SQL
const concatSql = builder.concatString('"first_name"', "' '", '"last_name"');
// Result: "first_name" || ' ' || "last_name"
// Generate metric SQL
const medianSql = builder.getMetricSql("price", {
type: MetricType.MEDIAN,
sql: "price",
});
// Result: 'PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price)'
const percentileSql = builder.getMetricSql("response_time", {
type: MetricType.PERCENTILE,
sql: "response_time",
percentile: 95,
});
// Result: 'PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time)'PostgreSQL data type constants for field type checking.
// Import from @lightdash/warehouses
import { PostgresTypes } from "@lightdash/warehouses";
/**
* PostgreSQL data type enum
* Covers standard PostgreSQL types and aliases
*/
enum PostgresTypes {
// Integer types
INTEGER = "integer",
INT = "int",
INT2 = "int2",
INT4 = "int4",
INT8 = "int8",
SMALLINT = "smallint",
BIGINT = "bigint",
SMALLSERIAL = "smallserial",
SERIAL = "serial",
SERIAL2 = "serial2",
SERIAL4 = "serial4",
SERIAL8 = "serial8",
BIGSERIAL = "bigserial",
// Numeric types
MONEY = "money",
NUMERIC = "numeric",
DECIMAL = "decimal",
REAL = "real",
FLOAT = "float",
FLOAT4 = "float4",
FLOAT8 = "float8",
DOUBLE_PRECISION = "double precision",
// Boolean
BOOLEAN = "boolean",
BOOL = "bool",
// Date/Time types
DATE = "date",
TIME = "time",
TIME_TZ = "timetz",
TIME_WITHOUT_TIME_ZONE = "time without time zone",
TIMESTAMP = "timestamp",
TIMESTAMP_TZ = "timestamptz",
TIMESTAMP_WITHOUT_TIME_ZONE = "timestamp without time zone",
// String types
CHAR = "char",
CHARACTER = "character",
NCHAR = "nchar",
BPCHAR = "bpchar",
VARCHAR = "varchar",
CHARACTER_VARYING = "character varying",
NVARCHAR = "nvarchar",
TEXT = "text",
// JSON types
JSON = "json",
JSONB = "jsonb",
}The following methods are inherited from WarehouseBaseClient and WarehouseSqlBuilder, and are available on all PostgreSQL client instances:
/**
* Execute query and return all results in memory
* @deprecated Use streamQuery() to avoid loading all results into memory
*/
runQuery(
sql: string,
tags?: Record<string, string>,
timezone?: string,
values?: AnyType[],
queryParams?: Record<string, AnyType>
): Promise<WarehouseResults>;
/** Get the warehouse adapter type */
getAdapterType(): SupportedDbtAdapter;
/** Get string quote character (single quote) */
getStringQuoteChar(): string;
/** Get escape character for string quotes (backslash) */
getEscapeStringQuoteChar(): string;
/** Get field/identifier quote character (double quote for PostgreSQL) */
getFieldQuoteChar(): string;
/** Get floating-point type name (DOUBLE PRECISION for PostgreSQL) */
getFloatingType(): string;
/**
* Generate SQL for metric aggregation
* @param sql - Column SQL expression
* @param metric - Metric definition with aggregation type
* @returns SQL string for the metric aggregation
*/
getMetricSql(sql: string, metric: Metric): string;
/** Concatenate strings using warehouse-specific SQL */
concatString(...args: string[]): string;
/** Escape string for SQL safety */
escapeString(value: string): string;
/** Get configured week start day */
getStartOfWeek(): WeekDay | null | undefined;
/** Parse catalog rows into nested structure */
parseWarehouseCatalog(
rows: Record<string, AnyType>[],
mapFieldType: (type: string) => DimensionType
): WarehouseCatalog;/**
* PostgreSQL warehouse credentials
*/
interface CreatePostgresCredentials extends CreatePostgresLikeCredentials {
/** Warehouse type discriminator */
type: 'postgres';
/** PostgreSQL server hostname or IP */
host: string;
/** PostgreSQL server port (default: 5432) */
port: number;
/** Database username */
user: string;
/** Database password */
password: string;
/** Database name */
dbname: string;
/** Default schema for queries */
schema: string;
/** Number of concurrent query threads (optional) */
threads?: number;
/** TCP keepalive idle time in seconds (optional) */
keepalivesIdle?: number;
/** PostgreSQL search_path setting (optional) */
searchPath?: string;
/** PostgreSQL role to SET ROLE after connection (optional) */
role?: string;
/** Query timeout in seconds (optional) */
timeoutSeconds?: number;
/** SSL mode: disable, no-verify, allow, prefer, require, verify-ca, verify-full */
sslmode: 'disable' | 'no-verify' | 'allow' | 'prefer' | 'require' | 'verify-ca' | 'verify-full';
/** SSL client certificate filename (for reference) (optional) */
sslcertFileName?: string;
/** SSL client certificate file content (PEM format) (optional) */
sslcert?: string | null;
/** SSL client private key filename (for reference) (optional) */
sslkeyFileName?: string;
/** SSL client private key file content (PEM format) (optional) */
sslkey?: string | null;
/** SSL root certificate filename (for reference) (optional) */
sslrootcertFileName?: string;
/** SSL root certificate file content (PEM format) (optional) */
sslrootcert?: string | null;
/** Enable SSH tunnel (optional) */
useSshTunnel?: boolean;
/** SSH tunnel hostname (optional) */
sshTunnelHost?: string;
/** SSH tunnel port (default: 22) (optional) */
sshTunnelPort?: number;
/** SSH tunnel username (optional) */
sshTunnelUser?: string;
/** SSH tunnel public key (if using key-based auth) (optional) */
sshTunnelPublicKey?: string;
/** SSH tunnel private key (PEM format, required if useSshTunnel is true) (optional) */
sshTunnelPrivateKey?: string;
/** Week start day configuration (optional) */
startOfWeek?: WeekDay | null;
/** Require user-provided credentials for queries (optional) */
requireUserCredentials?: boolean;
}
/**
* Base interface for PostgreSQL-like credentials
* Shared by PostgreSQL and Redshift
*/
interface CreatePostgresLikeCredentials {
host: string;
port: number;
user: string;
password: string;
dbname: string;
schema: string;
sslmode: string;
useSshTunnel?: boolean;
sshTunnelHost?: string;
sshTunnelPort?: number;
sshTunnelUser?: string;
sshTunnelPublicKey?: string;
sshTunnelPrivateKey?: string;
startOfWeek?: WeekDay | null;
}
/**
* PostgreSQL connection pool configuration
*/
interface pg.PoolConfig {
host: string;
port: number;
user: string;
password: string;
database: string;
ssl?: boolean | object;
[key: string]: any;
}