Common interface for 7 warehouse types including BigQuery, Snowflake, Redshift, Databricks, PostgreSQL, Trino, and ClickHouse.
Lightdash provides a unified warehouse abstraction layer that works consistently across multiple data warehouses. This abstraction enables:
The warehouse integration layer serves as the foundation for Lightdash's ability to run analytics queries across diverse data platforms while providing a consistent developer experience.
import {
// Warehouse Types
WarehouseTypes,
type CreateWarehouseCredentials,
type WarehouseCredentials,
type WarehouseClient,
type WarehouseSqlBuilder,
// Query Execution
type RunQueryTags,
type WarehouseResults,
type WarehouseExecuteAsyncQueryArgs,
type WarehouseExecuteAsyncQuery,
type WarehouseGetAsyncQueryResultsArgs,
type WarehouseGetAsyncQueryResults,
// Catalog & Schema
type WarehouseCatalog,
type WarehouseTableSchema,
type WarehouseTables,
type WarehouseTablesCatalog,
// Pagination
type WarehousePaginationArgs,
// Partitioning
PartitionType,
type PartitionColumn,
// Authentication Types
BigqueryAuthenticationType,
SnowflakeAuthenticationType,
DatabricksAuthenticationType,
// Specific Credentials
type CreateBigqueryCredentials,
type CreateSnowflakeCredentials,
type CreatePostgresCredentials,
type CreateDatabricksCredentials,
type CreateRedshiftCredentials,
type CreateTrinoCredentials,
type CreateClickhouseCredentials,
// Connection Configurations
type SshTunnelConfiguration,
type SslConfiguration,
// Utility Functions
getFieldQuoteChar,
getAggregatedField,
maybeOverrideWarehouseConnection,
mergeWarehouseCredentials,
// API Response Types
type ApiWarehouseCatalog,
type ApiWarehouseTablesCatalog,
type ApiWarehouseTableFields,
// Context
QueryExecutionContext,
// DBT Adapters
SupportedDbtAdapter,
} from '@lightdash/common';Lightdash supports 7 production data warehouses:
enum WarehouseTypes {
BIGQUERY = 'bigquery',
POSTGRES = 'postgres',
REDSHIFT = 'redshift',
SNOWFLAKE = 'snowflake',
DATABRICKS = 'databricks',
TRINO = 'trino',
CLICKHOUSE = 'clickhouse',
}Each warehouse type corresponds to a dbt adapter:
enum SupportedDbtAdapter {
BIGQUERY = 'bigquery',
DATABRICKS = 'databricks',
SNOWFLAKE = 'snowflake',
REDSHIFT = 'redshift',
POSTGRES = 'postgres',
TRINO = 'trino',
CLICKHOUSE = 'clickhouse',
}Base interface for warehouse-specific SQL generation.
interface WarehouseSqlBuilder {
/**
* Get the start of week day for this warehouse
* @returns WeekDay enum value, null, or undefined
*/
getStartOfWeek(): WeekDay | null | undefined;
/**
* Get the warehouse adapter type
* @returns SupportedDbtAdapter enum value
*/
getAdapterType(): SupportedDbtAdapter;
/**
* Get the character used to quote string literals in SQL
* @returns Quote character (typically single quote)
*/
getStringQuoteChar(): string;
/**
* Get the character used to escape string quotes
* @returns Escape character for strings
*/
getEscapeStringQuoteChar(): string;
/**
* Get the character used to quote field/column names
* @returns Quote character (backtick for BigQuery/Databricks, double quote for others)
*/
getFieldQuoteChar(): string;
/**
* Get the SQL type name for floating point numbers
* @returns Type name like 'FLOAT' or 'DOUBLE'
*/
getFloatingType(): string;
/**
* Generate warehouse-specific SQL for a metric
* @param sql - Base SQL expression
* @param metric - Metric definition with type and formatting
* @returns Warehouse-specific SQL string
*/
getMetricSql(sql: string, metric: Metric): string;
/**
* Concatenate strings using warehouse-specific syntax
* @param args - String expressions to concatenate
* @returns SQL concatenation expression
*/
concatString(...args: string[]): string;
/**
* Escape a string value for safe SQL inclusion
* @param value - String to escape
* @returns Escaped string safe for SQL
*/
escapeString(value: string): string;
}Main interface for warehouse operations, extends WarehouseSqlBuilder.
interface WarehouseClient extends WarehouseSqlBuilder {
/**
* Warehouse credentials for connection
*/
credentials: CreateWarehouseCredentials;
/**
* Test the warehouse connection
* @throws Error if connection fails
* @returns Promise that resolves when connection succeeds
*/
test(): Promise<void>;
/**
* Get catalog information for specified database objects
* @param config - Array of database/schema/table specifications
* @returns Promise resolving to warehouse catalog with schema information
*/
getCatalog(
config: Array<{
database: string;
schema: string;
table: string;
}>
): Promise<WarehouseCatalog>;
/**
* Get async query results with pagination support
* @param args - Query arguments including query ID and pagination settings
* @param rowFormatter - Optional function to format each row
* @returns Promise resolving to paginated query results
*/
getAsyncQueryResults<TFormattedRow extends Record<string, unknown>>(
args: WarehouseGetAsyncQueryResultsArgs,
rowFormatter?: (row: Record<string, unknown>) => TFormattedRow
): Promise<WarehouseGetAsyncQueryResults<TFormattedRow>>;
/**
* Stream query results to avoid loading all data into memory
* @param query - SQL query to execute
* @param streamCallback - Callback invoked for each batch of results
* @param options - Query options including values, tags (required), and timezone
* @returns Promise that resolves when streaming completes
*/
streamQuery(
query: string,
streamCallback: (data: WarehouseResults) => void,
options: {
values?: AnyType[];
tags: Record<string, string>;
timezone?: string;
}
): Promise<void>;
/**
* Execute async query with optional results streaming
* @param args - Async query execution arguments with SQL, tags, timezone, and parameters
* @param resultsStreamCallback - Optional callback for streaming results as they arrive
* @returns Promise resolving to query ID, metadata, total rows, and duration
*/
executeAsyncQuery(
args: WarehouseExecuteAsyncQueryArgs,
resultsStreamCallback?: (
rows: WarehouseResults['rows'],
fields: WarehouseResults['fields']
) => void
): Promise<WarehouseExecuteAsyncQuery>;
/**
* Run a query and return all results
* @deprecated Use streamQuery() instead to avoid loading all results into memory
* @param sql - SQL query to execute
* @param tags - Query tracking tags for logging (required)
* @param timezone - Optional timezone for date/time values
* @param values - Optional parameter values for parameterized queries
* @returns Promise resolving to query results with fields and rows
*/
runQuery(
sql: string,
tags: Record<string, string>,
timezone?: string,
values?: AnyType[]
): Promise<WarehouseResults>;
/**
* Get all tables in the warehouse
* @param schema - Optional schema filter
* @param tags - Optional query tags for tracking
* @returns Promise resolving to list of tables with database, schema, table names, and partition info
*/
getAllTables(
schema?: string,
tags?: Record<string, string>
): Promise<WarehouseTables>;
/**
* Get field information for a specific table
* @param tableName - Name of the table
* @param schema - Optional schema name
* @param database - Optional database name
* @param tags - Optional query tags for tracking
* @returns Promise resolving to table schema with column types
*/
getFields(
tableName: string,
schema?: string,
database?: string,
tags?: Record<string, string>
): Promise<WarehouseCatalog>;
/**
* Parse warehouse catalog rows into standardized format
* @param rows - Raw catalog rows from warehouse
* @param mapFieldType - Function to map warehouse types to Lightdash dimension types
* @returns Structured warehouse catalog
*/
parseWarehouseCatalog(
rows: Record<string, AnyType>[],
mapFieldType: (type: string) => DimensionType
): WarehouseCatalog;
/**
* Parse and standardize warehouse-specific errors
* @param error - Original error from warehouse
* @returns Standardized error object
*/
parseError(error: Error): Error;
/**
* Escape a string value for safe SQL inclusion
* @param value - String to escape
* @returns Escaped string safe for SQL
*/
escapeString(value: string): string;
}Types for query results and catalog information.
type WarehouseResults = {
fields: Record<string, { type: DimensionType }>;
rows: Record<string, AnyType>[];
};
type WarehouseTableSchema = {
[column: string]: DimensionType;
};
type WarehouseCatalog = {
[database: string]: {
[schema: string]: {
[table: string]: WarehouseTableSchema;
};
};
};
type WarehouseTables = Array<{
database: string;
schema: string;
table: string;
partitionColumn?: PartitionColumn;
}>;
type WarehouseTablesCatalog = {
[database: string]: {
[schema: string]: {
[table: string]: { partitionColumn?: PartitionColumn };
};
};
};Tags for query execution tracking and logging.
type RunQueryTags = {
project_uuid?: string;
user_uuid?: string;
organization_uuid?: string;
chart_uuid?: string;
dashboard_uuid?: string;
explore_name?: string;
query_context: QueryExecutionContext;
};The query_context field uses the QueryExecutionContext enum:
enum QueryExecutionContext {
DASHBOARD = 'dashboardView',
AUTOREFRESHED_DASHBOARD = 'autorefreshedDashboard',
EXPLORE = 'exploreView',
FILTER_AUTOCOMPLETE = 'filterAutocomplete',
CHART = 'chartView',
CHART_HISTORY = 'chartHistory',
SQL_CHART = 'sqlChartView',
SQL_RUNNER = 'sqlRunner',
VIEW_UNDERLYING_DATA = 'viewUnderlyingData',
ALERT = 'alert',
SCHEDULED_DELIVERY = 'scheduledDelivery',
CSV = 'csvDownload',
GSHEETS = 'gsheets',
SCHEDULED_GSHEETS_CHART = 'scheduledGsheetsChart',
// ... additional contexts
}type WarehousePaginationArgs = {
page: number;
pageSize: number;
};type WarehouseExecuteAsyncQueryArgs = {
tags: Record<string, string>;
timezone?: string;
values?: AnyType[]; // same as queryParams but in array form
queryParams?: Record<string, AnyType>; // same as values but in object form
sql: string;
};
type WarehouseExecuteAsyncQuery = {
queryId: string | null;
queryMetadata: WarehouseQueryMetadata | null;
totalRows: number;
durationMs: number;
};
type WarehouseGetAsyncQueryResultsArgs = WarehousePaginationArgs & {
sql: string;
queryId: string | null;
queryMetadata: WarehouseQueryMetadata | null;
};
type WarehouseGetAsyncQueryResults<TFormattedRow extends Record<string, unknown>> = {
queryId: string | null;
fields: Record<string, { type: DimensionType }>;
pageCount: number;
totalRows: number;
rows: TFormattedRow[];
};Support for partitioned tables.
enum PartitionType {
DATE = 'DATE',
RANGE = 'RANGE',
}
type PartitionColumn = {
partitionType: PartitionType;
field: string;
};Types for warehouse-related API endpoints.
type ApiWarehouseCatalog = {
status: 'ok';
results: WarehouseCatalog;
};
type ApiWarehouseTablesCatalog = {
status: 'ok';
results: WarehouseTablesCatalog;
};
type ApiWarehouseTableFields = {
status: 'ok';
results: WarehouseTableSchema;
};Each warehouse type has specific credential configurations with multiple authentication methods.
enum BigqueryAuthenticationType {
SSO = 'sso',
PRIVATE_KEY = 'private_key',
ADC = 'adc', // Application Default Credentials
}
type CreateBigqueryCredentials = {
type: WarehouseTypes.BIGQUERY;
project: string;
dataset: string;
threads?: number;
timeoutSeconds: number | undefined;
priority: 'interactive' | 'batch' | undefined;
authenticationType?: BigqueryAuthenticationType;
keyfileContents: Record<string, string>; // used for both sso and private key
requireUserCredentials?: boolean;
retries: number | undefined;
location: string | undefined;
maximumBytesBilled: number | undefined;
startOfWeek?: WeekDay | null;
executionProject?: string;
};
type BigqueryCredentials = Omit<CreateBigqueryCredentials, SensitiveCredentialsFieldNames>;Authentication Methods:
Example Configuration:
const bigqueryConfig: CreateBigqueryCredentials = {
type: WarehouseTypes.BIGQUERY,
project: 'my-gcp-project',
dataset: 'analytics',
authenticationType: BigqueryAuthenticationType.PRIVATE_KEY,
keyfileContents: {
type: 'service_account',
project_id: 'my-gcp-project',
private_key_id: 'key-id',
private_key: '-----BEGIN PRIVATE KEY-----\n...',
client_email: 'service-account@project.iam.gserviceaccount.com',
// ... other service account fields
},
timeoutSeconds: 300,
priority: 'interactive',
location: 'US',
startOfWeek: WeekDay.MONDAY,
};enum SnowflakeAuthenticationType {
PASSWORD = 'password',
PRIVATE_KEY = 'private_key',
SSO = 'sso',
EXTERNAL_BROWSER = 'external_browser',
}
type CreateSnowflakeCredentials = {
type: WarehouseTypes.SNOWFLAKE;
account: string;
user: string;
password?: string;
requireUserCredentials?: boolean;
privateKey?: string;
privateKeyPass?: string;
authenticationType?: SnowflakeAuthenticationType;
refreshToken?: string; // Refresh token for sso, used to generate new access token
token?: string; // Access token for sso, has low expiry time
role?: string;
database: string;
warehouse: string;
schema: string;
threads?: number;
clientSessionKeepAlive?: boolean;
queryTag?: string;
accessUrl?: string;
startOfWeek?: WeekDay | null;
quotedIdentifiersIgnoreCase?: boolean;
disableTimestampConversion?: boolean; // Disable timestamp conversion to UTC - only disable if all timestamp values are already in UTC
override?: boolean;
organizationWarehouseCredentialsUuid?: string;
};
type SnowflakeCredentials = Omit<CreateSnowflakeCredentials, SensitiveCredentialsFieldNames>;Authentication Methods:
Example Configuration:
const snowflakeConfig: CreateSnowflakeCredentials = {
type: WarehouseTypes.SNOWFLAKE,
account: 'xy12345.us-east-1',
user: 'lightdash_service',
authenticationType: SnowflakeAuthenticationType.PASSWORD,
password: 'secure-password',
database: 'ANALYTICS_DB',
warehouse: 'COMPUTE_WH',
schema: 'PUBLIC',
role: 'LIGHTDASH_ROLE',
clientSessionKeepAlive: true,
queryTag: 'lightdash',
startOfWeek: WeekDay.SUNDAY,
quotedIdentifiersIgnoreCase: true,
};type SslConfiguration = {
sslmode?: string;
sslcertFileName?: string;
sslcert?: string | null; // file content
sslkeyFileName?: string;
sslkey?: string | null; // file content
sslrootcertFileName?: string;
sslrootcert?: string | null; // file content
};
type CreatePostgresCredentials = SshTunnelConfiguration & SslConfiguration & {
type: WarehouseTypes.POSTGRES;
host: string;
user: string;
password: string;
requireUserCredentials?: boolean;
port: number;
dbname: string;
schema: string;
threads?: number;
keepalivesIdle?: number;
searchPath?: string;
role?: string;
startOfWeek?: WeekDay | null;
timeoutSeconds?: number;
};
type PostgresCredentials = Omit<CreatePostgresCredentials, SensitiveCredentialsFieldNames>;Example Configuration:
const postgresConfig: CreatePostgresCredentials = {
type: WarehouseTypes.POSTGRES,
host: 'postgres.example.com',
port: 5432,
user: 'analytics_user',
password: 'secure-password',
dbname: 'analytics',
schema: 'public',
sslmode: 'require',
keepalivesIdle: 30,
timeoutSeconds: 300,
startOfWeek: WeekDay.MONDAY,
};enum DatabricksAuthenticationType {
PERSONAL_ACCESS_TOKEN = 'personal_access_token',
OAUTH_M2M = 'oauth_m2m',
OAUTH_U2M = 'oauth_u2m',
}
type CreateDatabricksCredentials = {
type: WarehouseTypes.DATABRICKS;
catalog?: string;
database: string; // actually schema, but kept for backwards compatibility
serverHostName: string;
httpPath: string;
authenticationType?: DatabricksAuthenticationType;
personalAccessToken?: string; // optional when using OAuth
refreshToken?: string; // refresh token for OAuth, used to generate new access token
token?: string; // access token for OAuth, has low expiry time (1 hour)
oauthClientId?: string; // OAuth M2M client ID (Service Principal)
oauthClientSecret?: string; // OAuth M2M client secret (Service Principal)
requireUserCredentials?: boolean;
startOfWeek?: WeekDay | null;
compute?: Array<{
name: string;
httpPath: string;
}>;
};
type DatabricksCredentials = Omit<CreateDatabricksCredentials, SensitiveCredentialsFieldNames>;Authentication Methods:
Example Configuration:
const databricksConfig: CreateDatabricksCredentials = {
type: WarehouseTypes.DATABRICKS,
serverHostName: 'dbc-12345678-abcd.cloud.databricks.com',
httpPath: '/sql/1.0/warehouses/abc123def456',
authenticationType: DatabricksAuthenticationType.PERSONAL_ACCESS_TOKEN,
personalAccessToken: 'dapi1234567890abcdef',
catalog: 'main',
database: 'default', // this is actually the schema
startOfWeek: WeekDay.MONDAY,
compute: [
{
name: 'SQL Warehouse',
httpPath: '/sql/1.0/warehouses/abc123def456',
},
],
};type CreateRedshiftCredentials = SshTunnelConfiguration & {
type: WarehouseTypes.REDSHIFT;
host: string;
user: string;
password: string;
requireUserCredentials?: boolean;
port: number;
dbname: string;
schema: string;
threads?: number;
keepalivesIdle?: number;
sslmode?: string;
ra3Node?: boolean;
startOfWeek?: WeekDay | null;
timeoutSeconds?: number;
};
type RedshiftCredentials = Omit<CreateRedshiftCredentials, SensitiveCredentialsFieldNames>;Example Configuration:
const redshiftConfig: CreateRedshiftCredentials = {
type: WarehouseTypes.REDSHIFT,
host: 'redshift-cluster.abc123.us-east-1.redshift.amazonaws.com',
port: 5439,
user: 'analytics_user',
password: 'secure-password',
dbname: 'analytics',
schema: 'public',
sslmode: 'require',
ra3Node: true,
keepalivesIdle: 30,
timeoutSeconds: 300,
startOfWeek: WeekDay.MONDAY,
};type CreateTrinoCredentials = {
type: WarehouseTypes.TRINO;
host: string;
user: string;
password: string;
requireUserCredentials?: boolean;
port: number;
dbname: string;
schema: string;
http_scheme: string;
source?: string;
startOfWeek?: WeekDay | null;
};
type TrinoCredentials = Omit<CreateTrinoCredentials, SensitiveCredentialsFieldNames>;Example Configuration:
const trinoConfig: CreateTrinoCredentials = {
type: WarehouseTypes.TRINO,
host: 'trino.example.com',
port: 8080,
user: 'analytics_user',
password: 'secure-password',
dbname: 'hive',
schema: 'default',
http_scheme: 'https',
source: 'lightdash',
startOfWeek: WeekDay.MONDAY,
};type CreateClickhouseCredentials = {
type: WarehouseTypes.CLICKHOUSE;
host: string;
user: string;
password: string;
requireUserCredentials?: boolean;
port: number;
schema: string;
secure?: boolean;
startOfWeek?: WeekDay | null;
timeoutSeconds?: number;
};
type ClickhouseCredentials = Omit<CreateClickhouseCredentials, SensitiveCredentialsFieldNames>;Example Configuration:
const clickhouseConfig: CreateClickhouseCredentials = {
type: WarehouseTypes.CLICKHOUSE,
host: 'clickhouse.example.com',
port: 8123,
user: 'analytics_user',
password: 'secure-password',
schema: 'default',
secure: true,
timeoutSeconds: 300,
startOfWeek: WeekDay.MONDAY,
};Union types for warehouse credentials support all 7 warehouse types.
type CreateWarehouseCredentials =
| CreateRedshiftCredentials
| CreateBigqueryCredentials
| CreatePostgresCredentials
| CreateSnowflakeCredentials
| CreateDatabricksCredentials
| CreateTrinoCredentials
| CreateClickhouseCredentials;
type WarehouseCredentials =
| SnowflakeCredentials
| RedshiftCredentials
| PostgresCredentials
| BigqueryCredentials
| DatabricksCredentials
| TrinoCredentials
| ClickhouseCredentials;CreateWarehouseCredentials is used when creating or updating warehouse connections and includes all sensitive credential fields.
WarehouseCredentials is used in API responses and has sensitive fields omitted (see Sensitive Credentials section below).
SSH tunneling support for secure connections through bastion hosts.
type SshTunnelConfiguration = {
useSshTunnel?: boolean;
sshTunnelHost?: string;
sshTunnelPort?: number;
sshTunnelUser?: string;
sshTunnelPublicKey?: string;
sshTunnelPrivateKey?: string;
};Example with SSH Tunnel:
const postgresWithSsh: CreatePostgresCredentials = {
type: WarehouseTypes.POSTGRES,
host: 'internal-postgres.local',
port: 5432,
user: 'analytics_user',
password: 'secure-password',
dbname: 'analytics',
schema: 'public',
// SSH tunnel configuration
useSshTunnel: true,
sshTunnelHost: 'bastion.example.com',
sshTunnelPort: 22,
sshTunnelUser: 'ssh-user',
sshTunnelPrivateKey: '-----BEGIN RSA PRIVATE KEY-----\n...',
};Lightdash protects sensitive credential fields by automatically omitting them from API responses.
const sensitiveCredentialsFieldNames = [
'user',
'password',
'keyfileContents',
'personalAccessToken',
'privateKey',
'privateKeyPass',
'sshTunnelPrivateKey',
'sslcert',
'sslkey',
'sslrootcert',
'token',
'refreshToken',
'oauthClientId',
'oauthClientSecret',
] as const;
type SensitiveCredentialsFieldNames = typeof sensitiveCredentialsFieldNames[number];Field Categories:
Authentication Secrets (never returned in API responses):
password: Database passwordskeyfileContents: BigQuery service account JSONpersonalAccessToken: Databricks personal access tokensprivateKey, privateKeyPass: Snowflake private key authenticationtoken, refreshToken: OAuth access and refresh tokensoauthClientId, oauthClientSecret: OAuth client credentialsConnection Security (protected):
sshTunnelPrivateKey: SSH tunnel private keysslcert, sslkey, sslrootcert: SSL certificate filesUser Identifiers (protected):
user: Database username (considered PII)Type System:
The sensitiveCredentialsFieldNames array controls the TypeScript type system:
Create types (CreateXxxCredentials): Include all fields including sensitive ones
Response types (XxxCredentials): Use Omit<CreateXxxCredentials, SensitiveCredentialsFieldNames>
ProjectModel.get() filters credentials before returningExample:
// When creating a connection (includes sensitive fields)
const createCreds: CreateBigqueryCredentials = {
type: WarehouseTypes.BIGQUERY,
project: 'my-project',
dataset: 'my-dataset',
keyfileContents: { /* service account JSON */ },
timeoutSeconds: 300,
priority: 'interactive',
location: 'US',
retries: 3,
maximumBytesBilled: 1000000000,
};
// When retrieving a connection (sensitive fields omitted)
const responseCreds: BigqueryCredentials = {
type: WarehouseTypes.BIGQUERY,
project: 'my-project',
dataset: 'my-dataset',
// keyfileContents is NOT included in response
timeoutSeconds: 300,
priority: 'interactive',
location: 'US',
retries: 3,
maximumBytesBilled: 1000000000,
};Security Best Practices:
When adding new warehouse authentication methods, always:
sensitiveCredentialsFieldNames array in /packages/common/src/types/projects.tsProjectModel.getWithSensitiveFields() only for internal operations that need credentialsGet the character used to quote field names for a warehouse type.
/**
* Get field quote character for a warehouse type
* @deprecated Use WarehouseSqlBuilder.getFieldQuoteChar() instead
* @param warehouseType - Warehouse type enum value
* @returns Quote character (backtick for BigQuery/Databricks, double quote for others)
*/
function getFieldQuoteChar(warehouseType: WarehouseTypes | undefined): string;Returns:
` (backtick) for BigQuery and Databricks" (double quote) for Snowflake, Redshift, PostgreSQL, Trino, and ClickHouse" (double quote) if warehouse type is undefinedExample:
import { getFieldQuoteChar, WarehouseTypes } from '@lightdash/common';
const bigqueryQuote = getFieldQuoteChar(WarehouseTypes.BIGQUERY); // "`"
const snowflakeQuote = getFieldQuoteChar(WarehouseTypes.SNOWFLAKE); // "\""
const postgresQuote = getFieldQuoteChar(WarehouseTypes.POSTGRES); // "\""Generate warehouse-specific SQL for aggregation functions.
/**
* Generate aggregated field SQL with warehouse-specific syntax
* @param warehouseSqlBuilder - Warehouse SQL builder interface
* @param aggregation - Aggregation option (SUM, AVG, COUNT, MIN, MAX, ANY, etc.)
* @param reference - Field reference/name to aggregate
* @returns SQL expression for the aggregation
*/
function getAggregatedField(
warehouseSqlBuilder: WarehouseSqlBuilder,
aggregation: VizAggregationOptions,
reference: string
): string;Special Handling:
(ARRAY_AGG(field))[1] for ANY aggregation (v16+ has ANY_VALUE)any(field) for ANY aggregationANY_VALUE(field) for ANY aggregationExample:
import { getAggregatedField, VizAggregationOptions } from '@lightdash/common';
// For BigQuery/Snowflake/Redshift/Trino/Databricks
const sumExpr = getAggregatedField(
warehouseClient,
VizAggregationOptions.SUM,
'revenue'
); // 'SUM("revenue")'
const anyExpr = getAggregatedField(
warehouseClient,
VizAggregationOptions.ANY,
'status'
); // 'ANY_VALUE("status")'
// For PostgreSQL
const postgresAnyExpr = getAggregatedField(
postgresClient,
VizAggregationOptions.ANY,
'status'
); // '(ARRAY_AGG("status"))[1]'
// For ClickHouse
const clickhouseAnyExpr = getAggregatedField(
clickhouseClient,
VizAggregationOptions.ANY,
'status'
); // 'any("status")'Override warehouse connection schema if provided.
/**
* Maybe override warehouse connection schema
* @param connection - Warehouse credentials
* @param overrides - Schema override (uses 'dataset' field for BigQuery)
* @returns Updated warehouse credentials
*/
function maybeOverrideWarehouseConnection<T extends WarehouseCredentials>(
connection: T,
overrides: { schema?: string }
): T;Behavior:
schema to dataset fieldschema field directlyschema is providedExample:
import { maybeOverrideWarehouseConnection, WarehouseTypes } from '@lightdash/common';
const bigqueryConn: BigqueryCredentials = {
type: WarehouseTypes.BIGQUERY,
project: 'my-project',
dataset: 'production',
// ...
};
const overridden = maybeOverrideWarehouseConnection(bigqueryConn, {
schema: 'staging',
});
// overridden.dataset === 'staging'
const postgresConn: PostgresCredentials = {
type: WarehouseTypes.POSTGRES,
host: 'localhost',
schema: 'public',
// ...
};
const overriddenPg = maybeOverrideWarehouseConnection(postgresConn, {
schema: 'analytics',
});
// overriddenPg.schema === 'analytics'Merge new warehouse credentials with base credentials, preserving advanced settings.
/**
* Merge warehouse credentials while preserving advanced settings
* @param baseCredentials - Base credentials with advanced settings
* @param newCredentials - New credentials to merge
* @returns Merged warehouse credentials
*/
function mergeWarehouseCredentials<T extends CreateWarehouseCredentials>(
baseCredentials: T,
newCredentials: T
): T;Behavior:
newCredentials as-is if warehouse types don't matchnewCredentials if warehouse names differ (different SSO/role configs)baseCredentialsrequireUserCredentials from baseCredentials (security setting)authenticationType from base credentials to avoid conflictsUse Case: This is useful when creating preview projects where you want to use new connection details (like from dbt profiles) but preserve advanced configuration from the parent project.
Example:
import { mergeWarehouseCredentials, WarehouseTypes } from '@lightdash/common';
const baseCredentials: CreateSnowflakeCredentials = {
type: WarehouseTypes.SNOWFLAKE,
account: 'xy12345',
user: 'service_account',
password: 'old-password',
database: 'ANALYTICS',
warehouse: 'COMPUTE_WH',
schema: 'PUBLIC',
requireUserCredentials: true,
clientSessionKeepAlive: true,
quotedIdentifiersIgnoreCase: true,
startOfWeek: WeekDay.MONDAY,
};
const newCredentials: CreateSnowflakeCredentials = {
type: WarehouseTypes.SNOWFLAKE,
account: 'xy12345',
user: 'new_service_account',
password: 'new-password',
database: 'ANALYTICS',
warehouse: 'COMPUTE_WH',
schema: 'STAGING',
};
const merged = mergeWarehouseCredentials(baseCredentials, newCredentials);
// Result:
// {
// type: WarehouseTypes.SNOWFLAKE,
// account: 'xy12345',
// user: 'new_service_account',
// password: 'new-password',
// database: 'ANALYTICS',
// warehouse: 'COMPUTE_WH',
// schema: 'STAGING',
// requireUserCredentials: true, // preserved from base
// clientSessionKeepAlive: true, // preserved from base
// quotedIdentifiersIgnoreCase: true, // preserved from base
// startOfWeek: WeekDay.MONDAY, // preserved from base
// }import { type WarehouseClient } from '@lightdash/common';
async function testWarehouseConnection(client: WarehouseClient) {
try {
await client.test();
console.log('Connection successful');
console.log(`Connected to ${client.getAdapterType()} warehouse`);
} catch (error) {
console.error('Connection failed:', error.message);
const parsedError = client.parseError(error);
throw parsedError;
}
}import {
type WarehouseClient,
type RunQueryTags,
QueryExecutionContext,
} from '@lightdash/common';
async function runQuery(client: WarehouseClient, sql: string) {
const tags: RunQueryTags = {
project_uuid: 'project-123',
user_uuid: 'user-456',
organization_uuid: 'org-789',
query_context: QueryExecutionContext.EXPLORE,
};
const results = await client.runQuery(sql, tags, 'America/New_York');
console.log(`Returned ${results.rows.length} rows`);
console.log('Fields:', Object.keys(results.fields));
// Display field types
Object.entries(results.fields).forEach(([name, info]) => {
console.log(` ${name}: ${info.type}`);
});
return results;
}import { type WarehouseClient, type WarehouseResults } from '@lightdash/common';
async function streamLargeQuery(client: WarehouseClient, sql: string) {
const allRows: Record<string, unknown>[] = [];
let fields: WarehouseResults['fields'] | undefined;
await client.streamQuery(
sql,
(chunk) => {
console.log(`Received ${chunk.rows.length} rows`);
if (!fields) {
fields = chunk.fields;
}
allRows.push(...chunk.rows);
},
{
tags: {
project_uuid: 'project-123',
query_context: QueryExecutionContext.CSV,
},
timezone: 'UTC',
}
);
console.log(`Total rows: ${allRows.length}`);
console.log(`Fields: ${Object.keys(fields || {}).join(', ')}`);
return { rows: allRows, fields: fields! };
}import { type WarehouseClient } from '@lightdash/common';
async function runParameterizedQuery(client: WarehouseClient) {
const sql = `
SELECT customer_id, name, total_orders
FROM customers
WHERE created_at >= ?
AND country = ?
ORDER BY total_orders DESC
LIMIT ?
`;
const results = await client.runQuery(
sql,
{ query_context: QueryExecutionContext.EXPLORE },
'America/New_York',
['2024-01-01', 'USA', 100] // parameter values
);
return results;
}import { type WarehouseClient } from '@lightdash/common';
async function exploreCatalog(client: WarehouseClient) {
const catalog = await client.getCatalog([
{ database: 'analytics', schema: 'public', table: 'customers' },
{ database: 'analytics', schema: 'public', table: 'orders' },
]);
Object.entries(catalog).forEach(([dbName, schemas]) => {
Object.entries(schemas).forEach(([schemaName, tables]) => {
console.log(`${dbName}.${schemaName}:`);
Object.entries(tables).forEach(([tableName, columns]) => {
console.log(` ${tableName} (${Object.keys(columns).length} columns)`);
Object.entries(columns).forEach(([columnName, columnType]) => {
console.log(` ${columnName}: ${columnType}`);
});
});
});
});
}import { type WarehouseClient } from '@lightdash/common';
async function listTables(client: WarehouseClient, schema?: string) {
const tables = await client.getAllTables(schema, {
project_uuid: 'project-123',
query_context: QueryExecutionContext.EXPLORE,
});
console.log(`Found ${tables.length} tables`);
tables.forEach((table) => {
console.log(`${table.database}.${table.schema}.${table.table}`);
if (table.partitionColumn) {
console.log(` Partitioned by: ${table.partitionColumn.field} (${table.partitionColumn.partitionType})`);
}
});
return tables;
}import { type WarehouseClient } from '@lightdash/common';
async function getTableSchema(
client: WarehouseClient,
tableName: string,
schema?: string,
database?: string
) {
const catalog = await client.getFields(tableName, schema, database, {
project_uuid: 'project-123',
query_context: QueryExecutionContext.EXPLORE,
});
// Navigate to the specific table
const db = Object.keys(catalog)[0];
const sch = Object.keys(catalog[db])[0];
const tableSchema = catalog[db][sch][tableName];
console.log(`Schema for ${tableName}:`);
Object.entries(tableSchema).forEach(([columnName, columnType]) => {
console.log(` ${columnName}: ${columnType}`);
});
return tableSchema;
}import { type WarehouseClient } from '@lightdash/common';
async function runAsyncQuery(client: WarehouseClient, sql: string) {
// Start async query
const { queryId, queryMetadata, totalRows, durationMs } = await client.executeAsyncQuery({
sql,
tags: { query_type: 'long_running' },
timezone: 'UTC',
});
console.log(`Query started: ${queryId}`);
console.log(`Duration: ${durationMs}ms`);
console.log(`Total rows: ${totalRows}`);
// Get first page of results
const results = await client.getAsyncQueryResults({
sql,
queryId,
queryMetadata,
page: 1,
pageSize: 100,
});
console.log(`Page 1 of ${results.pageCount}: ${results.rows.length} rows`);
return results;
}import { type WarehouseClient } from '@lightdash/common';
async function runAsyncQueryWithStreaming(client: WarehouseClient, sql: string) {
const allRows: Record<string, unknown>[] = [];
const result = await client.executeAsyncQuery(
{
sql,
tags: { query_type: 'export' },
timezone: 'UTC',
},
// Stream callback receives results as they arrive
(rows, fields) => {
console.log(`Received ${rows.length} rows`);
console.log(`Fields: ${Object.keys(fields).join(', ')}`);
allRows.push(...rows);
}
);
console.log(`Query complete: ${result.totalRows} total rows`);
console.log(`Duration: ${result.durationMs}ms`);
console.log(`Streamed ${allRows.length} rows`);
return { result, rows: allRows };
}import { type WarehouseClient } from '@lightdash/common';
async function getAllPaginatedResults(
client: WarehouseClient,
sql: string,
pageSize: number = 1000
) {
// Start query
const { queryId, queryMetadata } = await client.executeAsyncQuery({
sql,
tags: { query_type: 'export' },
});
// Get first page to determine total pages
const firstPage = await client.getAsyncQueryResults({
sql,
queryId,
queryMetadata,
page: 1,
pageSize,
});
const allRows = [...firstPage.rows];
console.log(`Page 1/${firstPage.pageCount}: ${firstPage.rows.length} rows`);
// Get remaining pages
for (let page = 2; page <= firstPage.pageCount; page++) {
const pageResults = await client.getAsyncQueryResults({
sql,
queryId,
queryMetadata,
page,
pageSize,
});
allRows.push(...pageResults.rows);
console.log(`Page ${page}/${firstPage.pageCount}: ${pageResults.rows.length} rows`);
}
console.log(`Total rows: ${allRows.length}`);
return {
fields: firstPage.fields,
rows: allRows,
};
}import { type WarehouseSqlBuilder } from '@lightdash/common';
function buildWarehouseSpecificSQL(builder: WarehouseSqlBuilder) {
const fieldQuote = builder.getFieldQuoteChar();
const stringQuote = builder.getStringQuoteChar();
// Build a query with proper quoting
const sql = `
SELECT
${fieldQuote}customer_id${fieldQuote},
${fieldQuote}name${fieldQuote},
${builder.concatString(
`${fieldQuote}first_name${fieldQuote}`,
`${stringQuote} ${stringQuote}`,
`${fieldQuote}last_name${fieldQuote}`
)} AS full_name
FROM ${fieldQuote}customers${fieldQuote}
WHERE ${fieldQuote}status${fieldQuote} = ${stringQuote}${builder.escapeString('active')}${stringQuote}
`;
return sql;
}import { type WarehouseClient } from '@lightdash/common';
async function safeQueryExecution(client: WarehouseClient, sql: string) {
try {
const results = await client.runQuery(
sql,
{ query_context: QueryExecutionContext.EXPLORE }
);
return { success: true, results };
} catch (error) {
// Parse warehouse-specific error to standardized format
const parsedError = client.parseError(error);
console.error('Query failed:', parsedError.message);
console.error('Warehouse type:', client.getAdapterType());
return {
success: false,
error: parsedError.message,
warehouseType: client.getAdapterType(),
};
}
}