Snowflake warehouse client with multiple authentication methods (password, key pair, OAuth, external browser), native async query support with result pagination, session configuration for timezone and week start, and comprehensive error handling.
Snowflake warehouse client implementation with native async queries and pagination.
/**
* Snowflake warehouse client with native async query and pagination support
* Supports multiple authentication methods: password, key pair, OAuth, external browser
*/
class SnowflakeWarehouseClient implements WarehouseClient {
/**
* Initialize Snowflake client with credentials
* Supports multiple authentication methods via credentials configuration
* @param credentials - Snowflake credentials with account, database, warehouse, and auth
*/
constructor(credentials: CreateSnowflakeCredentials);
/** Connection options for Snowflake SDK (from snowflake-sdk package) */
connectionOptions: ConnectionOptions;
/**
* Flag indicating whether to ignore case for quoted identifiers
* When true, quoted identifiers are treated case-insensitively
* Optional - may be undefined
*/
quotedIdentifiersIgnoreCase?: boolean;
/**
* Test the database connection
* Executes a simple query (SELECT 1) to verify connectivity and credentials
* @returns Promise that resolves if connection is successful
* @throws {WarehouseConnectionError} If connection fails
* @throws {WarehouseQueryError} If test query fails
*/
test(): Promise<void>;
/**
* Stream query results with timezone and week start session configuration
* Sets TIMEZONE and WEEK_START session parameters before query execution
* @param sql - SQL query string to execute
* @param streamCallback - Callback invoked for each batch of results
* @param options - Query execution options (tags, timezone, values, etc.)
* @returns Promise that resolves when streaming completes
*/
streamQuery(
sql: string,
streamCallback: (data: WarehouseResults) => void,
options: {
values?: AnyType[];
tags?: Record<string, string>;
timezone?: string;
}
): Promise<void>;
/**
* Execute async query with native Snowflake support
* Returns query ID for later result retrieval
* @param args - Async query arguments (sql, tags, timezone, etc.)
* @param resultsStreamCallback - Callback for streaming results immediately
* Results are streamed in batches during execution. The callback receives row data and field metadata.
* For pagination-based retrieval after execution, use getAsyncQueryResults() with the returned queryId.
* @returns Promise resolving to query metadata with query ID
*/
executeAsyncQuery(
args: WarehouseExecuteAsyncQueryArgs,
resultsStreamCallback: (rows: WarehouseResults['rows'], fields: WarehouseResults['fields']) => void
): Promise<WarehouseExecuteAsyncQuery>;
/**
* Retrieve results from previously executed async query with pagination
* Supports page-based pagination for large result sets
* @param args - Query ID and optional pagination parameters (page, pageSize)
* @param rowFormatter - Optional function to transform each row
* @returns Promise resolving to paginated query results
*/
getAsyncQueryResults<T = Record<string, unknown>>(
args: WarehouseGetAsyncQueryResultsArgs,
rowFormatter?: (row: Record<string, unknown>) => T
): Promise<WarehouseGetAsyncQueryResults<T>>;
/**
* Get catalog metadata for specific tables
* Uses SHOW COLUMNS command for efficient metadata retrieval
* @param requests - Array of table identifiers (database, schema, table)
* @returns Promise resolving to nested catalog structure
*/
getCatalog(
requests: Array<{ database: string; schema: string; table: string }>
): Promise<WarehouseCatalog>;
/**
* Get all tables in the warehouse
* Queries information_schema.tables
* Note: Snowflake implementation does not accept schema/tags parameters
* (though the base WarehouseClient interface defines them as optional)
* @returns Promise resolving to array of table metadata
*/
getAllTables(): Promise<WarehouseTables>;
/**
* Get field metadata for specific table
* Uses SHOW COLUMNS command
* @param tableName - Name of the table
* @param schema - Optional schema name (uses credentials schema if omitted)
* @param database - Optional database name (uses credentials database if omitted)
* @param tags - Optional tags for query tracking
* @returns Promise resolving to catalog with table's column definitions
*/
getFields(
tableName: string,
schema?: string,
database?: string,
tags?: Record<string, string>
): Promise<WarehouseCatalog>;
/**
* Parse Snowflake errors with enhanced messages
* Handles specific error codes: authentication (002003, 390318, 390144), timeout, cancellation
* Extracts line and character position information from COMPILATION errors
* Formats "does not exist or not authorized" errors with custom message template
* @param error - Snowflake SnowflakeError object
* @param query - Query string for context (default: empty string)
* @returns Enhanced Error with specific message and line number information when available
*/
parseError(error: SnowflakeError, query: string = ''): Error;
}Usage Examples:
import { SnowflakeWarehouseClient } from '@lightdash/warehouses';
import { WeekDay } from '@lightdash/common';
// Using password authentication
const clientWithPassword = new SnowflakeWarehouseClient({
type: 'snowflake',
account: 'my-account', // Account identifier (may include region, e.g., 'my-account.us-east-1')
user: 'myuser',
password: 'mypassword',
database: 'ANALYTICS',
warehouse: 'COMPUTE_WH',
schema: 'PUBLIC',
role: 'ANALYST',
clientSessionKeepAlive: true,
startOfWeek: WeekDay.MONDAY,
});
// Using key pair authentication
const clientWithKeyPair = new SnowflakeWarehouseClient({
type: 'snowflake',
account: 'my-account',
user: 'myuser',
database: 'ANALYTICS',
warehouse: 'COMPUTE_WH',
schema: 'PUBLIC',
privateKey: '-----BEGIN PRIVATE KEY-----\n...',
authenticationType: 'private_key',
startOfWeek: null,
});
// Using OAuth SSO (with refresh token and access token)
const clientWithOAuth = new SnowflakeWarehouseClient({
type: 'snowflake',
account: 'my-account',
user: 'myuser',
database: 'ANALYTICS',
warehouse: 'COMPUTE_WH',
schema: 'PUBLIC',
token: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...', // Short-lived access token
refreshToken: 'refresh_token_here', // For renewing access token
authenticationType: 'sso',
startOfWeek: null,
});
// Using external browser authentication (interactive)
const clientWithBrowser = new SnowflakeWarehouseClient({
type: 'snowflake',
account: 'my-account',
user: 'myuser',
database: 'ANALYTICS',
warehouse: 'COMPUTE_WH',
schema: 'PUBLIC',
authenticationType: 'external_browser',
startOfWeek: null,
});
// Test connection
await clientWithPassword.test();
// Stream query with timezone and week start
await clientWithPassword.streamQuery(
`
SELECT
DATE_TRUNC('week', order_date) as week,
SUM(amount) as weekly_total
FROM orders
WHERE order_date >= ?
GROUP BY week
ORDER BY week
`,
(data) => {
console.log('Received batch:', data.rows.length, 'rows');
data.rows.forEach(row => {
console.log(`Week ${row.week}: $${row.weekly_total}`);
});
},
{
timezone: 'America/Los_Angeles',
values: ['2023-01-01'],
tags: { source: 'lightdash', report: 'weekly_sales' },
}
);
// Execute async query
const result = await clientWithPassword.executeAsyncQuery({
sql: 'SELECT * FROM large_table WHERE status = ?',
tags: { source: 'analytics' },
timezone: 'UTC',
values: ['active'],
});
console.log('Query ID:', result.queryId);
console.log('Total rows:', result.totalRows);
// Retrieve paginated results
const page1 = await clientWithPassword.getAsyncQueryResults({
sql: 'SELECT * FROM large_table ORDER BY id',
queryId: result.queryId!,
queryMetadata: result.queryMetadata,
page: 1,
pageSize: 1000,
});
console.log('Returned rows:', page1.rows.length);
console.log('Total pages:', page1.pageCount);
console.log('Total rows:', page1.totalRows);
// Get next page
if (page1.pageCount > 1) {
const page2 = await clientWithPassword.getAsyncQueryResults({
sql: 'SELECT * FROM large_table ORDER BY id',
queryId: result.queryId!,
queryMetadata: result.queryMetadata,
page: 2,
pageSize: 1000,
});
}
// Get all tables
const tables = await clientWithPassword.getAllTables();
tables.forEach(table => {
console.log(`${table.database}.${table.schema}.${table.table}`);
});SQL builder for Snowflake-specific syntax.
/**
* Snowflake SQL builder with double-quote quoting and PERCENTILE_CONT
*/
class SnowflakeSqlBuilder implements WarehouseSqlBuilder {
/** Warehouse type identifier */
readonly type = WarehouseTypes.SNOWFLAKE;
/**
* Get DBT adapter type for Snowflake
* @returns SupportedDbtAdapter.SNOWFLAKE
*/
getAdapterType(): SupportedDbtAdapter;
/**
* Generate SQL for metric aggregation
* Uses PERCENTILE_CONT for percentile/median metrics
* @param sql - Column SQL expression
* @param metric - Metric definition
* @returns SQL string for the metric aggregation
*/
getMetricSql(sql: string, metric: Metric): string;
/**
* Escape string value for Snowflake
* Performs:
* - Unicode normalization
* - Single quote doubling ('→'')
* - Backslash escaping (\→\\)
* - SQL comment removal (-- and /* */)
* - Null byte removal
* @param value - Raw string value
* @returns Escaped string safe for Snowflake query
*/
escapeString(value: string): string;
}Usage Examples:
import { SnowflakeSqlBuilder } from '@lightdash/warehouses';
const builder = new SnowflakeSqlBuilder();
// Get quote characters
const fieldQuote = builder.getFieldQuoteChar(); // '"'
const fieldName = `${fieldQuote}ORDER_ID${fieldQuote}`; // "ORDER_ID"
// Escape string with single quotes
const escaped = builder.escapeString("Winter's Sale");
// Result: 'Winter''s Sale'
// Generate metric SQL
const medianSql = builder.getMetricSql('price', {
type: MetricType.MEDIAN,
sql: 'price',
});
// Result: 'PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price)'
const percentileSql = builder.getMetricSql('latency', {
type: MetricType.PERCENTILE,
sql: 'latency',
percentile: 99,
});
// Result: 'PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY latency)'Snowflake data type constants for field type checking.
/**
* Snowflake data type enum
* Covers all standard Snowflake types including semi-structured data
*/
enum SnowflakeTypes {
// Numeric types
NUMBER = 'NUMBER',
DECIMAL = 'DECIMAL',
NUMERIC = 'NUMERIC',
INTEGER = 'INTEGER',
INT = 'INT',
BIGINT = 'BIGINT',
SMALLINT = 'SMALLINT',
FLOAT = 'FLOAT',
FLOAT4 = 'FLOAT4',
FLOAT8 = 'FLOAT8',
DOUBLE = 'DOUBLE',
DOUBLE_PRECISION = 'DOUBLE PRECISION',
REAL = 'REAL',
FIXED = 'FIXED',
// String types
STRING = 'STRING',
TEXT = 'TEXT',
// Boolean
BOOLEAN = 'BOOLEAN',
// Date/Time types
DATE = 'DATE',
DATETIME = 'DATETIME',
TIME = 'TIME',
TIMESTAMP = 'TIMESTAMP',
TIMESTAMP_LTZ = 'TIMESTAMP_LTZ',
TIMESTAMP_NTZ = 'TIMESTAMP_NTZ',
TIMESTAMP_TZ = 'TIMESTAMP_TZ',
// Semi-structured types
VARIANT = 'VARIANT',
OBJECT = 'OBJECT',
ARRAY = 'ARRAY',
// Geospatial
GEOGRAPHY = 'GEOGRAPHY',
}Utility function for converting Snowflake type strings to Lightdash dimension types.
/**
* Convert Snowflake type string to DimensionType
* Exported from @lightdash/warehouses for mapping Snowflake types
*
* ⚠️ CRITICAL REQUIREMENT: Input type string MUST be UPPERCASE
*
* @param type - Snowflake type string (MUST be uppercase, e.g., 'NUMBER', 'VARCHAR(100)', 'TIMESTAMP_TZ')
* @returns Corresponding DimensionType (NUMBER, DATE, TIMESTAMP, BOOLEAN, or STRING)
*
* @example
* // ✅ CORRECT - uppercase type
* mapFieldType('NUMBER') // Returns DimensionType.NUMBER
* mapFieldType('VARCHAR(100)') // Returns DimensionType.STRING
* mapFieldType('TIMESTAMP_TZ') // Returns DimensionType.TIMESTAMP
*
* @example
* // ❌ WRONG - lowercase will fail to match and return STRING
* mapFieldType('number') // Returns DimensionType.STRING (incorrect!)
* mapFieldType('timestamp_tz') // Returns DimensionType.STRING (incorrect!)
*/
function mapFieldType(type: string): DimensionType;⚠️ CRITICAL REQUIREMENT - UPPERCASE INPUT ONLY:
The mapFieldType function requires uppercase type strings. It uses a regex /^[A-Z]+/ to extract the base type name.
'NUMBER', 'VARCHAR(100)', 'TIMESTAMP_TZ''number', 'varchar(100)', 'timestamp_tz'Error Behavior:
ParseError is thrown with message: "Cannot understand type from Snowflake: {type}"normaliseSnowflakeType() function which validates input before type mappingFor parameterized types like VARCHAR(100), only the uppercase prefix (VARCHAR) is extracted and matched.
Type Mapping Rules:
The following methods are inherited from WarehouseBaseClient and WarehouseSqlBuilder, and are available on all Snowflake client instances:
/**
* Execute query and return all results in memory
* @deprecated Use streamQuery() to avoid loading all results into memory
*/
runQuery(
sql: string,
tags?: Record<string, string>,
timezone?: string,
values?: AnyType[],
queryParams?: Record<string, AnyType>
): Promise<WarehouseResults>;
/** Get the warehouse adapter type */
getAdapterType(): SupportedDbtAdapter;
/** Get string quote character (single quote) */
getStringQuoteChar(): string;
/** Get escape character for string quotes (backslash) */
getEscapeStringQuoteChar(): string;
/** Get field/identifier quote character (double quote for Snowflake) */
getFieldQuoteChar(): string;
/** Get floating-point type name (FLOAT for Snowflake) */
getFloatingType(): string;
/**
* Generate SQL for metric aggregation
* @param sql - Column SQL expression
* @param metric - Metric definition with aggregation type
* @returns SQL string for the metric aggregation
*/
getMetricSql(sql: string, metric: Metric): string;
/** Concatenate strings using warehouse-specific SQL */
concatString(...args: string[]): string;
/** Escape string for SQL safety */
escapeString(value: string): string;
/** Get configured week start day */
getStartOfWeek(): WeekDay | null | undefined;
/** Parse catalog rows into nested structure */
parseWarehouseCatalog(
rows: Record<string, AnyType>[],
mapFieldType: (type: string) => DimensionType
): WarehouseCatalog;/**
* Snowflake warehouse credentials
* Uses flat structure with optional fields for different authentication methods
*/
interface CreateSnowflakeCredentials {
/** Warehouse type discriminator */
type: 'snowflake';
/** Snowflake account identifier */
account: string;
/** Username for authentication */
user: string;
/** Database name */
database: string;
/** Virtual warehouse name */
warehouse: string;
/** Schema name */
schema: string;
/** Password for password authentication (optional) */
password?: string;
/** Private key in PEM format for private key authentication (optional) */
privateKey?: string;
/** Private key passphrase (optional) */
privateKeyPass?: string;
/** Authentication type selector (optional) */
authenticationType?: SnowflakeAuthenticationType;
/** OAuth refresh token for SSO authentication (optional) */
refreshToken?: string;
/** OAuth access token for SSO authentication (short-lived) (optional) */
token?: string;
/** Optional role for session */
role?: string;
/** Number of concurrent threads (optional, default: 4) */
threads?: number;
/** Keep client session alive between queries (optional, default: false) */
clientSessionKeepAlive?: boolean;
/** Custom query tag for tracking (optional) */
queryTag?: string;
/** Custom Snowflake access URL (optional) */
accessUrl?: string;
/** Week start day configuration (optional) */
startOfWeek?: WeekDay | null;
/** Quote identifiers to preserve case (optional, default: false - Snowflake upcases by default) */
quotedIdentifiersIgnoreCase?: boolean;
/** Disable automatic timestamp conversion to UTC - only disable if all timestamp values are already in UTC (optional, default: false) */
disableTimestampConversion?: boolean;
/** Override warehouse connection settings (optional, default: false) */
override?: boolean;
/** UUID reference for organization-level warehouse credentials (optional) */
organizationWarehouseCredentialsUuid?: string;
/** Require user to provide their own credentials (optional, default: false) */
requireUserCredentials?: boolean;
}
/**
* Snowflake authentication types
*/
enum SnowflakeAuthenticationType {
PASSWORD = 'password',
PRIVATE_KEY = 'private_key',
SSO = 'sso',
EXTERNAL_BROWSER = 'external_browser',
}
/**
* Snowflake connection options
*/
interface ConnectionOptions {
account: string;
username?: string;
password?: string;
authenticator?: string;
token?: string;
privateKey?: string;
database: string;
warehouse: string;
schema: string;
role?: string;
clientSessionKeepAlive?: boolean;
[key: string]: any;
}
/**
* Snowflake error object (from snowflake-sdk)
*/
interface SnowflakeError extends Error {
code?: string;
sqlState?: string;
data?: {
type?: string;
[key: string]: any;
};
}Snowflake client provides enhanced error messages for common scenarios:
The client supports custom error messages for unauthorized access errors using an environment variable:
SNOWFLAKE_UNAUTHORIZED_ERROR_MESSAGE{snowflakeTable} - The table name from the error{snowflakeSchema} - The schema name from the errorExample:
export SNOWFLAKE_UNAUTHORIZED_ERROR_MESSAGE="Access denied to {snowflakeSchema}.{snowflakeTable}. Please contact your administrator."The Snowflake client respects the following environment variables:
SNOWFLAKE_SDK_LOG_LEVEL: Control Snowflake SDK logging verbosity
'ERROR', 'WARN', 'INFO', 'DEBUG', 'TRACE''ERROR' (only log errors)export SNOWFLAKE_SDK_LOG_LEVEL='DEBUG'SNOWFLAKE_UNAUTHORIZED_ERROR_MESSAGE: Customize error messages for unauthorized table access
{snowflakeTable}, {snowflakeSchema}timezone parameter (default: 'UTC')startOfWeek configuration (1-7, Monday-Sunday) - only applied if configuredtags parameter (JSON stringified) - only applied if tags providedFALSE to avoid casing inconsistencies between Snowflake and Lightdash
authenticator: 'EXTERNALBROWSER' (SSO), the client caches the connection promiseexternalBrowserConnectionPromise private property