or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bigquery-client.mdclickhouse-client.mdcore-types.mddatabricks-client.mdfactory-functions.mdindex.mdpostgres-client.mdredshift-client.mdsnowflake-client.mdsql-builders.mdssh-tunnel.mdtrino-client.md
tile.json

postgres-client.mddocs/

PostgreSQL Client

PostgreSQL warehouse client with SSH tunnel support, streaming cursors, parameterized queries, and SSL/TLS configuration including AWS RDS certificate bundles.

Capabilities

PostgresClient Base Class

Generic base class for PostgreSQL-compatible warehouse clients (PostgreSQL and Redshift). Provides core PostgreSQL wire protocol implementation with connection pooling, streaming cursors, and error handling.

PostgresClient vs PostgresWarehouseClient:

  • PostgresClient is the generic base class that implements core PostgreSQL wire protocol functionality
  • PostgresWarehouseClient extends PostgresClient and provides PostgreSQL-specific configuration (SSL modes, connection setup)
  • RedshiftWarehouseClient also extends PostgresClient with Redshift-specific configuration
  • Most users should use PostgresWarehouseClient or RedshiftWarehouseClient for typical database connections
  • Use PostgresClient directly only when building custom PostgreSQL-compatible warehouse clients or when you need access to the generic base class methods

Note: PostgresClient IS exported from @lightdash/warehouses and can be imported directly. It's exported via the standard export * from './warehouseClients/PostgresWarehouseClient' statement in the package index.

// Both PostgresClient and PostgresWarehouseClient are directly exported from the package
import { PostgresClient, PostgresWarehouseClient } from "@lightdash/warehouses";
import type { CreatePostgresLikeCredentials } from "@lightdash/common";
import type { PoolConfig } from "pg";

/**
 * Generic base class for PostgreSQL-compatible warehouse clients
 * Extends WarehouseBaseClient with PostgreSQL-specific functionality
 * @template T - Credentials type extending CreatePostgresLikeCredentials
 * @note PostgresClient IS directly exported from @lightdash/warehouses via export * from the PostgresWarehouseClient module.
 *       However, most users should use the concrete implementations (PostgresWarehouseClient
 *       or RedshiftWarehouseClient) instead.
 */
class PostgresClient<T extends CreatePostgresLikeCredentials> extends WarehouseBaseClient<T> {
  /**
   * PostgreSQL connection pool configuration
   * Includes connection string, SSL settings, and pool parameters
   */
  config: PoolConfig;

  /**
   * Initialize PostgreSQL-compatible client
   * @param credentials - Credentials for PostgreSQL or Redshift
   * @param config - pg.PoolConfig with connection string and SSL configuration
   */
  constructor(credentials: T, config: PoolConfig);

  /**
   * Stream query results using server-side cursors
   * Executes query with DECLARE CURSOR and fetches in batches
   * @param sql - SQL query string with optional $1, $2 parameter placeholders
   * @param streamCallback - Callback invoked for each batch of results
   * @param options - Query options with parameter values, tags, and timezone
   * @returns Promise resolving when all results are streamed
   */
  streamQuery(
    sql: string,
    streamCallback: (data: WarehouseResults) => void,
    options: {
      values?: AnyType[];
      tags?: Record<string, string>;
      timezone?: string;
    }
  ): Promise<void>;

  /**
   * Get table metadata for specific tables
   * Queries information_schema for columns, data types, and constraints
   * @param requests - Array of table identifiers (database, schema, table)
   * @returns Promise resolving to nested catalog structure
   */
  getCatalog(
    requests: Array<{ database: string; schema: string; table: string }>
  ): Promise<WarehouseCatalog>;

  /**
   * Get all BASE TABLEs in the database
   * Queries information_schema.tables for BASE TABLE type only
   * Excludes system tables from information_schema and pg_catalog
   * Note: This method does NOT accept parameters despite base interface definition
   * Note: Materialized views are only included in getCatalog(), not getAllTables()
   * @returns Promise resolving to array of table metadata
   */
  getAllTables(): Promise<
    Array<{
      database: string;
      schema: string;
      table: string;
    }>
  >;

  /**
   * Get field metadata for specific table
   * @param tableName - Name of the table
   * @param schema - Optional schema name (uses credentials schema if omitted)
   * @param database - Optional database name (PostgreSQL catalog)
   * @param tags - Optional tags for query tracking
   * @returns Promise resolving to catalog with field information
   */
  getFields(
    tableName: string,
    schema?: string,
    database?: string,
    tags?: Record<string, string>
  ): Promise<WarehouseCatalog>;

  /**
   * Parse PostgreSQL database errors into WarehouseQueryError
   * Converts CTE references and position markers to line numbers
   * @param error - PostgreSQL error from pg driver
   * @param query - Optional SQL query for enhanced error messages (defaults to empty string if not provided)
   * @returns Enhanced error with line numbers and context
   */
  parseError(error: pg.DatabaseError, query?: string): WarehouseQueryError;

  /**
   * Parse raw query results into warehouse catalog structure
   * Inherited from WarehouseBaseClient, transforms rows into nested catalog format
   * @param rows - Raw query result rows containing database/schema/table/column metadata
   * @param mapFieldType - Function to map warehouse-specific types to DimensionType
   * @returns Nested catalog structure organized by database/schema/table/column
   */
  parseWarehouseCatalog(
    rows: Record<string, AnyType>[],
    mapFieldType: (type: string) => DimensionType
  ): WarehouseCatalog;

  /**
   * Convert pg query result field types to DimensionType
   * Maps PostgreSQL OID types (numeric data type identifiers) to Lightdash dimension types
   *
   * Note: This is primarily an internal utility method used by the client implementation.
   * Users typically only need this when working with raw pg driver results outside of
   * the standard streamQuery()/runQuery() methods, which handle type conversion automatically.
   *
   * @param fields - Field metadata from pg QueryResult (array of { name, dataTypeID } objects)
   * @returns Record mapping field names to dimension types
   * @example
   * ```typescript
   * import { PostgresWarehouseClient } from '@lightdash/warehouses';
   * import type { QueryResult } from 'pg';
   *
   * // After executing a raw pg query
   * const pgResult: QueryResult = await pgPool.query('SELECT * FROM users');
   *
   * // Convert field types to Lightdash dimension types
   * const fieldTypes = PostgresWarehouseClient.convertQueryResultFields(pgResult.fields);
   * console.log(fieldTypes);
   * // {
   * //   user_id: { type: DimensionType.NUMBER },
   * //   email: { type: DimensionType.STRING },
   * //   created_at: { type: DimensionType.TIMESTAMP }
   * // }
   * ```
   */
  static convertQueryResultFields(
    fields: QueryResult<AnyType>["fields"]
  ): Record<string, { type: DimensionType }>;
}

Implementation Details:

  • Uses pg library connection pooling for efficient resource management
  • Implements server-side cursors for memory-efficient result streaming
  • Automatically handles type conversions (configured globally via pg types.setTypeParser):
    • NUMERIC (OID 1700) → JavaScript number (via parseFloat)
    • INT8 / BIGINT (OID 20) → JavaScript BigInt (native BigInt type)
    • All other types use default pg driver parsing (dates, timestamps, booleans, etc.)
    • These conversions apply to ALL PostgreSQL-based clients (PostgreSQL, Redshift)
  • Supports parameterized queries with $1, $2, ... placeholders
  • SSH tunnel support via SshTunnel class (automatic when useSshTunnel: true)
  • Connection timeout: 5 seconds. Query timeout: 5 minutes (configurable via timeoutSeconds in credentials)
  • Materialized view detection: PostgreSQL 12+ versions only

Usage Examples:

// Note: PostgresClient IS exported, but most users should use concrete implementations
// Use PostgresWarehouseClient or RedshiftWarehouseClient for typical use cases
import { PostgresClient, PostgresWarehouseClient } from "@lightdash/warehouses";
import type { CreatePostgresCredentials } from "@lightdash/common";

// Create PostgresWarehouseClient (which extends PostgresClient)
const client = new PostgresWarehouseClient({
  type: "postgres",
  host: "localhost",
  port: 5432,
  user: "dbuser",
  password: "dbpass",
  dbname: "analytics",
  schema: "public",
  sslmode: "disable",
  startOfWeek: null,
});

// Use static method to convert field types (inherited from PostgresClient)
const fieldTypes = PostgresWarehouseClient.convertQueryResultFields(pgResult.fields);
console.log(fieldTypes);
// { user_id: { type: 'number' }, created_at: { type: 'timestamp' } }

PostgresWarehouseClient Class

PostgreSQL warehouse client implementation with SSL modes and connection pooling.

/**
 * PostgreSQL warehouse client with SSH tunnel support
 * Supports multiple SSL modes including AWS RDS certificate verification
 * Extends PostgresClient base class with all standard WarehouseClient methods
 */
class PostgresWarehouseClient extends PostgresClient<CreatePostgresCredentials> {
  /**
   * Initialize PostgreSQL client with credentials
   * Configures SSL based on sslmode, includes AWS RDS CA certificates
   * @param credentials - PostgreSQL credentials with host, port, database, and SSL config
   */
  constructor(credentials: CreatePostgresCredentials);
}

Public Methods:

PostgresWarehouseClient implements all standard WarehouseClient methods:

  • test() - Test database connection
  • streamQuery() - Stream query results with cursors
  • getCatalog() - Get table metadata for specific tables
  • getAllTables() - Get all tables in the database
  • getFields() - Get column information for a table
  • parseError() - Parse PostgreSQL errors with enhanced messages
  • parseWarehouseCatalog() - Parse raw rows into warehouse catalog structure
  • runQuery() - NOT RECOMMENDED Execute query and return all results at once (use streamQuery() instead to avoid loading all results into memory)
  • executeAsyncQuery() - Execute async query (fallback to streaming)
  • getAsyncQueryResults() - NOT SUPPORTED Throws NotImplementedError (inherited from base class - native pagination not supported)
  • Plus all WarehouseSqlBuilder methods (getAdapterType, getFieldQuoteChar, escapeString, etc.)

Note: The runQuery() method is not recommended for new code. Use streamQuery() for all implementations to avoid memory issues with large result sets.

See Core Types for complete WarehouseClient interface documentation.

Usage Examples:

import { PostgresWarehouseClient } from "@lightdash/warehouses";

// Basic connection without SSL
const client = new PostgresWarehouseClient({
  type: "postgres",
  host: "localhost",
  port: 5432,
  user: "myuser",
  password: "mypassword",
  dbname: "analytics",
  schema: "public",
  sslmode: "disable",
  startOfWeek: null,
});

// AWS RDS with SSL verification
const rdsClient = new PostgresWarehouseClient({
  type: "postgres",
  host: "my-db.abc123.us-east-1.rds.amazonaws.com",
  port: 5432,
  user: "admin",
  password: "secure-password",
  dbname: "production",
  schema: "analytics",
  sslmode: "verify-full",
  startOfWeek: null,
});

// With SSH tunnel
const tunnelClient = new PostgresWarehouseClient({
  type: "postgres",
  host: "internal-db.local",
  port: 5432,
  user: "dbuser",
  password: "dbpass",
  dbname: "warehouse",
  schema: "reporting",
  sslmode: "disable",
  startOfWeek: null,
  useSshTunnel: true,
  sshTunnelHost: "bastion.example.com",
  sshTunnelPort: 22,
  sshTunnelUser: "sshuser",
  sshTunnelPrivateKey: "-----BEGIN RSA PRIVATE KEY-----\n...",
});

// Test connection
await client.test();

// Stream query results with cursor
await client.streamQuery(
  "SELECT * FROM users WHERE created_at >= $1 ORDER BY created_at",
  (data) => {
    console.log("Received batch:", data.rows.length, "rows");
    data.rows.forEach((row) => {
      console.log(`User ${row.id}: ${row.email}`);
    });
  },
  {
    values: ["2023-01-01"],
    tags: { source: "lightdash" },
  }
);

// Parameterized query with positional parameters ($1, $2, etc.)
const results = await client.runQuery(
  "SELECT COUNT(*) as total FROM orders WHERE status = $1 AND created_at >= $2",
  { source: "analytics" },
  "UTC",
  ["completed", "2023-01-01"]
);
console.log("Total orders:", results.rows[0].total);

// Get all tables excluding system schemas
const tables = await client.getAllTables();
console.log("Tables:", tables.map((t) => t.table).join(", "));

// Get table fields
const catalog = await client.getFields("users", "public");
const columns = catalog["current_database"]["public"]["users"];
Object.entries(columns).forEach(([col, type]) => {
  console.log(`${col}: ${type}`);
});

PostgreSQL SQL Builder

SQL builder for PostgreSQL-specific syntax.

/**
 * PostgreSQL SQL builder with double-quote quoting and PERCENTILE_CONT
 */
class PostgresSqlBuilder extends WarehouseBaseSqlBuilder {
  /** Warehouse type identifier */
  type = WarehouseTypes.POSTGRES;

  /**
   * Get DBT adapter type for PostgreSQL
   * @returns SupportedDbtAdapter.POSTGRES
   */
  getAdapterType(): SupportedDbtAdapter;

  /**
   * Get escape string quote character for PostgreSQL
   * @returns Single quote character "'"
   */
  getEscapeStringQuoteChar(): string;

  /**
   * Generate SQL for metric aggregation
   * Uses PERCENTILE_CONT for percentile/median metrics
   * Note: AVERAGE metrics are cast to DOUBLE PRECISION in PostgreSQL
   * @param sql - Column SQL expression
   * @param metric - Metric definition
   * @returns SQL string for the metric aggregation
   */
  getMetricSql(sql: string, metric: Metric): string;

  /**
   * Generate SQL for string concatenation
   * Uses PostgreSQL || operator
   * @param args - String expressions to concatenate
   * @returns SQL concatenation expression
   */
  concatString(...args: string[]): string;

  /**
   * Escape string value for PostgreSQL
   * Doubles single quotes for escaping
   * Also normalizes unicode characters, escapes backslashes, strips SQL comments (-- and /* */), and removes null bytes
   * @param value - Raw string value
   * @returns Escaped string safe for PostgreSQL query
   */
  escapeString(value: string): string;
}

Usage Examples:

import { PostgresSqlBuilder } from "@lightdash/warehouses";

const builder = new PostgresSqlBuilder();

// Get quote characters
const fieldQuote = builder.getFieldQuoteChar(); // '"'
const fieldName = `${fieldQuote}user_id${fieldQuote}`; // "user_id"

// Escape string with single quotes
const escaped = builder.escapeString("O'Reilly's Book");
// Result: 'O''Reilly''s Book'

// Generate concatenation SQL
const concatSql = builder.concatString('"first_name"', "' '", '"last_name"');
// Result: "first_name" || ' ' || "last_name"

// Generate metric SQL
const medianSql = builder.getMetricSql("price", {
  type: MetricType.MEDIAN,
  sql: "price",
});
// Result: 'PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price)'

const percentileSql = builder.getMetricSql("response_time", {
  type: MetricType.PERCENTILE,
  sql: "response_time",
  percentile: 95,
});
// Result: 'PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time)'

PostgreSQL Types

PostgreSQL data type constants for field type checking.

// Import from @lightdash/warehouses
import { PostgresTypes } from "@lightdash/warehouses";

/**
 * PostgreSQL data type enum
 * Covers standard PostgreSQL types and aliases
 */

enum PostgresTypes {
  // Integer types
  INTEGER = "integer",
  INT = "int",
  INT2 = "int2",
  INT4 = "int4",
  INT8 = "int8",
  SMALLINT = "smallint",
  BIGINT = "bigint",
  SMALLSERIAL = "smallserial",
  SERIAL = "serial",
  SERIAL2 = "serial2",
  SERIAL4 = "serial4",
  SERIAL8 = "serial8",
  BIGSERIAL = "bigserial",

  // Numeric types
  MONEY = "money",
  NUMERIC = "numeric",
  DECIMAL = "decimal",
  REAL = "real",
  FLOAT = "float",
  FLOAT4 = "float4",
  FLOAT8 = "float8",
  DOUBLE_PRECISION = "double precision",

  // Boolean
  BOOLEAN = "boolean",
  BOOL = "bool",

  // Date/Time types
  DATE = "date",
  TIME = "time",
  TIME_TZ = "timetz",
  TIME_WITHOUT_TIME_ZONE = "time without time zone",
  TIMESTAMP = "timestamp",
  TIMESTAMP_TZ = "timestamptz",
  TIMESTAMP_WITHOUT_TIME_ZONE = "timestamp without time zone",

  // String types
  CHAR = "char",
  CHARACTER = "character",
  NCHAR = "nchar",
  BPCHAR = "bpchar",
  VARCHAR = "varchar",
  CHARACTER_VARYING = "character varying",
  NVARCHAR = "nvarchar",
  TEXT = "text",

  // JSON types
  JSON = "json",
  JSONB = "jsonb",
}

Inherited Methods

The following methods are inherited from WarehouseBaseClient and WarehouseSqlBuilder, and are available on all PostgreSQL client instances:

/**
 * Execute query and return all results in memory
 * @deprecated Use streamQuery() to avoid loading all results into memory
 */
runQuery(
  sql: string,
  tags?: Record<string, string>,
  timezone?: string,
  values?: AnyType[],
  queryParams?: Record<string, AnyType>
): Promise<WarehouseResults>;

/** Get the warehouse adapter type */
getAdapterType(): SupportedDbtAdapter;

/** Get string quote character (single quote) */
getStringQuoteChar(): string;

/** Get escape character for string quotes (backslash) */
getEscapeStringQuoteChar(): string;

/** Get field/identifier quote character (double quote for PostgreSQL) */
getFieldQuoteChar(): string;

/** Get floating-point type name (DOUBLE PRECISION for PostgreSQL) */
getFloatingType(): string;

/**
 * Generate SQL for metric aggregation
 * @param sql - Column SQL expression
 * @param metric - Metric definition with aggregation type
 * @returns SQL string for the metric aggregation
 */
getMetricSql(sql: string, metric: Metric): string;

/** Concatenate strings using warehouse-specific SQL */
concatString(...args: string[]): string;

/** Escape string for SQL safety */
escapeString(value: string): string;

/** Get configured week start day */
getStartOfWeek(): WeekDay | null | undefined;

/** Parse catalog rows into nested structure */
parseWarehouseCatalog(
  rows: Record<string, AnyType>[],
  mapFieldType: (type: string) => DimensionType
): WarehouseCatalog;

Types

/**
 * PostgreSQL warehouse credentials
 */
interface CreatePostgresCredentials extends CreatePostgresLikeCredentials {
  /** Warehouse type discriminator */
  type: 'postgres';
  /** PostgreSQL server hostname or IP */
  host: string;
  /** PostgreSQL server port (default: 5432) */
  port: number;
  /** Database username */
  user: string;
  /** Database password */
  password: string;
  /** Database name */
  dbname: string;
  /** Default schema for queries */
  schema: string;
  /** Number of concurrent query threads (optional) */
  threads?: number;
  /** TCP keepalive idle time in seconds (optional) */
  keepalivesIdle?: number;
  /** PostgreSQL search_path setting (optional) */
  searchPath?: string;
  /** PostgreSQL role to SET ROLE after connection (optional) */
  role?: string;
  /** Query timeout in seconds (optional) */
  timeoutSeconds?: number;
  /** SSL mode: disable, no-verify, allow, prefer, require, verify-ca, verify-full */
  sslmode: 'disable' | 'no-verify' | 'allow' | 'prefer' | 'require' | 'verify-ca' | 'verify-full';
  /** SSL client certificate filename (for reference) (optional) */
  sslcertFileName?: string;
  /** SSL client certificate file content (PEM format) (optional) */
  sslcert?: string | null;
  /** SSL client private key filename (for reference) (optional) */
  sslkeyFileName?: string;
  /** SSL client private key file content (PEM format) (optional) */
  sslkey?: string | null;
  /** SSL root certificate filename (for reference) (optional) */
  sslrootcertFileName?: string;
  /** SSL root certificate file content (PEM format) (optional) */
  sslrootcert?: string | null;
  /** Enable SSH tunnel (optional) */
  useSshTunnel?: boolean;
  /** SSH tunnel hostname (optional) */
  sshTunnelHost?: string;
  /** SSH tunnel port (default: 22) (optional) */
  sshTunnelPort?: number;
  /** SSH tunnel username (optional) */
  sshTunnelUser?: string;
  /** SSH tunnel public key (if using key-based auth) (optional) */
  sshTunnelPublicKey?: string;
  /** SSH tunnel private key (PEM format, required if useSshTunnel is true) (optional) */
  sshTunnelPrivateKey?: string;
  /** Week start day configuration (optional) */
  startOfWeek?: WeekDay | null;
  /** Require user-provided credentials for queries (optional) */
  requireUserCredentials?: boolean;
}

/**
 * Base interface for PostgreSQL-like credentials
 * Shared by PostgreSQL and Redshift
 */
interface CreatePostgresLikeCredentials {
  host: string;
  port: number;
  user: string;
  password: string;
  dbname: string;
  schema: string;
  sslmode: string;
  useSshTunnel?: boolean;
  sshTunnelHost?: string;
  sshTunnelPort?: number;
  sshTunnelUser?: string;
  sshTunnelPublicKey?: string;
  sshTunnelPrivateKey?: string;
  startOfWeek?: WeekDay | null;
}

/**
 * PostgreSQL connection pool configuration
 */
interface pg.PoolConfig {
  host: string;
  port: number;
  user: string;
  password: string;
  database: string;
  ssl?: boolean | object;
  [key: string]: any;
}

PostgreSQL-Specific Features

  • SSH Tunnel Support: Secure connections through SSH bastion hosts
  • SSL/TLS Modes: Multiple SSL modes including certificate verification
  • AWS RDS Certificates: Built-in AWS RDS CA certificate bundles for SSL verification
  • Streaming Cursors: Efficient streaming of large result sets using pg-cursor
  • Parameterized Queries: Support for both positional ($1, $2) and named (:param) parameters
  • Connection Pooling: Built-in connection pooling for performance
  • Materialized Views: Support for PostgreSQL 12+ materialized views in catalog
  • Error Line Numbers: Extract line and character position from PostgreSQL errors
  • System Schema Filtering: Automatically filters pg_% and information_schema from table listings