Databricks warehouse client with Unity Catalog support, multiple OAuth authentication methods (M2M and U2M), and SQL warehouse integration for unified analytics platform access.
Databricks warehouse client implementation with Unity Catalog three-part naming and flexible authentication.
/**
* Databricks warehouse client with Unity Catalog support
* Supports personal access tokens, OAuth M2M, and OAuth U2M authentication
*/
class DatabricksWarehouseClient implements WarehouseClient {
/**
* Initialize Databricks client with credentials
* Supports multiple authentication methods via credentials type
* @param credentials - Databricks credentials with host, HTTP path, and authentication
*/
constructor(credentials: CreateDatabricksCredentials);
/** Current schema/database name from credentials */
schema: string;
/** Unity Catalog catalog name (optional, from credentials) */
catalog?: string;
/** Connection options for Databricks SQL connector (from @databricks/sql) */
connectionOptions: ConnectionOptions;
/**
* Test the database connection
* Executes a simple query (SELECT 1) to verify connectivity and credentials
* Inherited from WarehouseBaseClient
* @returns Promise that resolves if connection is successful
* @throws {WarehouseConnectionError} If connection fails
* @throws {WarehouseQueryError} If test query fails
*/
test(): Promise<void>;
/**
* Stream query results with timezone support
* Sets timezone via session configuration before query execution
* @param sql - SQL query string to execute
* @param streamCallback - Callback invoked for each batch of results
* @param options - Query execution options (tags, timezone, values, etc.)
* @returns Promise that resolves when streaming completes
*/
streamQuery(
sql: string,
streamCallback: (data: WarehouseResults) => void,
options: {
values?: AnyType[];
tags?: Record<string, string>;
timezone?: string;
}
): Promise<void>;
/**
* Run query and return all results at once
* @deprecated Use streamQuery() instead to avoid loading all results into memory
* Inherited from WarehouseBaseClient
* @param sql - SQL query string
* @param tags - Optional tags for query tracking
* @param timezone - Optional timezone for query execution
* @param values - Optional positional parameter values
* @param queryParams - Optional named query parameters
* @returns Promise resolving to complete query results
*/
runQuery(
sql: string,
tags?: Record<string, string>,
timezone?: string,
values?: AnyType[],
queryParams?: Record<string, AnyType>
): Promise<WarehouseResults>;
/**
* Execute async query using streaming fallback
* Databricks client uses the base implementation which streams all results
* immediately via the callback. There is no native async query support.
* @param args - Async query arguments (sql, tags, timezone, values)
* @param resultsStreamCallback - Callback for streaming results (REQUIRED)
* @returns Promise resolving to query metadata (queryId will be null)
*/
executeAsyncQuery(
args: WarehouseExecuteAsyncQueryArgs,
resultsStreamCallback: (rows: WarehouseResults['rows'], fields: WarehouseResults['fields']) => void
): Promise<WarehouseExecuteAsyncQuery>;
/**
* Get catalog metadata for specific tables
* Uses system.columns metadata table in Unity Catalog
* @param requests - Array of table identifiers (database=catalog, schema, table)
* @returns Promise resolving to nested catalog structure
*/
getCatalog(
requests: Array<{ database: string; schema: string; table: string }>
): Promise<WarehouseCatalog>;
/**
* Get all MANAGED tables across all schemas in the catalog
* Queries information_schema.tables for tables with type 'MANAGED'
* Uses catalog and database configured in credentials
* Note: This method does NOT accept schema or tags parameters despite base interface definition
* @returns Promise resolving to array of table metadata (database, schema, table)
*/
getAllTables(): Promise<Array<{
database: string;
schema: string;
table: string;
}>>;
/**
* Get BASE TABLE type tables in specific schema
* Queries information_schema.tables for tables with type 'BASE TABLE'
* @param schema - Optional schema name (uses credentials schema if omitted)
* @param tags - Optional tags for query tracking
* @returns Promise resolving to catalog with tables in schema
*/
getTables(
schema?: string,
tags?: Record<string, string>
): Promise<WarehouseCatalog>;
/**
* Get field metadata for specific table
* @param tableName - Name of the table
* @param schema - Optional schema name (uses credentials schema if omitted)
* @param database - Optional catalog name (uses credentials catalog if omitted)
* @param tags - Optional tags for query tracking
* @returns Promise resolving to catalog with table's column definitions
*/
getFields(
tableName: string,
schema?: string,
database?: string,
tags?: Record<string, string>
): Promise<WarehouseCatalog>;
/**
* Parse warehouse errors into WarehouseQueryError
* Uses inherited base implementation from WarehouseBaseClient
* @param error - Error object from Databricks
* @param query - Optional SQL query for context (defaults to empty string)
* @returns WarehouseQueryError with enhanced error message
*/
parseError(error: Error, query?: string): Error;
}Usage Examples:
import { DatabricksWarehouseClient } from '@lightdash/warehouses';
// Using personal access token
const clientWithToken = new DatabricksWarehouseClient({
type: 'databricks',
serverHostName: 'my-workspace.cloud.databricks.com',
httpPath: '/sql/1.0/warehouses/abc123',
database: 'analytics',
catalog: 'production',
personalAccessToken: 'dapi1234567890abcdef',
startOfWeek: null,
});
// Using OAuth M2M (machine-to-machine)
const clientWithM2M = new DatabricksWarehouseClient({
type: 'databricks',
serverHostName: 'my-workspace.cloud.databricks.com',
httpPath: '/sql/1.0/warehouses/abc123',
database: 'analytics',
catalog: 'production',
oauthClientId: 'client-id-123',
oauthClientSecret: 'client-secret-456',
startOfWeek: null,
});
// Using OAuth U2M (user-to-machine) with refresh token
const clientWithU2M = new DatabricksWarehouseClient({
type: 'databricks',
serverHostName: 'my-workspace.cloud.databricks.com',
httpPath: '/sql/1.0/warehouses/abc123',
database: 'analytics',
catalog: 'production',
oauthClientId: 'client-id-123',
refreshToken: 'refresh-token-789',
startOfWeek: null,
});
// Test connection
await clientWithToken.test();
// Stream query results with timezone
await clientWithToken.streamQuery(
'SELECT * FROM production.analytics.sales WHERE date >= "2023-01-01"',
(data) => {
console.log('Received batch:', data.rows.length, 'rows');
data.rows.forEach(row => {
console.log(`Sale: $${row.amount} on ${row.date}`);
});
},
{
timezone: 'America/Los_Angeles',
tags: { source: 'lightdash' },
}
);
// Get all tables with Unity Catalog naming
const tables = await clientWithToken.getAllTables();
tables.forEach(table => {
console.log(`${table.database}.${table.schema}.${table.table}`);
});
// Get fields for specific table
const catalog = await clientWithToken.getFields(
'sales',
'analytics',
'production'
);
const columns = catalog['production']['analytics']['sales'];
Object.entries(columns).forEach(([col, type]) => {
console.log(`${col}: ${type}`);
});Helper functions for OAuth authentication flows with Databricks.
/**
* Exchange OAuth M2M credentials for access token
* Uses client_credentials grant type for machine-to-machine authentication
* Note: Always uses scope 'sql' for Databricks SQL warehouse access
* @param host - Databricks workspace host (e.g., 'my-workspace.cloud.databricks.com')
* @param clientId - OAuth client ID
* @param clientSecret - OAuth client secret
* @returns Promise resolving to access token and optional refresh token
* Note: refreshToken is typically NOT returned for M2M (client_credentials) flow
* as M2M access tokens can be re-issued by calling this function again
* @throws {Error} If network request fails, OAuth server returns an error, or response parsing fails
*/
async function exchangeDatabricksOAuthCredentials(
host: string,
clientId: string,
clientSecret: string
): Promise<{
accessToken: string;
refreshToken?: string; // Usually not present for M2M flow
}>;
/**
* Refresh OAuth U2M access token using refresh token
* Uses refresh_token grant type for user-to-machine authentication
* @param host - Databricks workspace host
* @param clientId - OAuth client ID
* @param refreshToken - OAuth refresh token from initial authorization
* @returns Promise resolving to new access token, refresh token, and expiry
* - `accessToken`: New OAuth access token for API requests
* - `refreshToken`: New refresh token (may be different from input)
* - `expiresIn`: Token lifetime in SECONDS (not milliseconds) - typically 3600 (1 hour)
* @throws {Error} If network request fails, OAuth server returns an error, or response parsing fails
*/
async function refreshDatabricksOAuthToken(
host: string,
clientId: string,
refreshToken: string
): Promise<{
accessToken: string;
refreshToken: string;
expiresIn: number; // Token lifetime in SECONDS
}>;Usage Examples:
import {
exchangeDatabricksOAuthCredentials,
refreshDatabricksOAuthToken,
} from '@lightdash/warehouses';
// Exchange M2M credentials for access token
const m2mTokens = await exchangeDatabricksOAuthCredentials(
'my-workspace.cloud.databricks.com',
'client-id-123',
'client-secret-456'
);
console.log('Access token:', m2mTokens.accessToken);
// Refresh U2M access token
const refreshedTokens = await refreshDatabricksOAuthToken(
'my-workspace.cloud.databricks.com',
'client-id-123',
'existing-refresh-token'
);
console.log('New access token:', refreshedTokens.accessToken);
console.log('New refresh token:', refreshedTokens.refreshToken);
console.log('Expires in:', refreshedTokens.expiresIn, 'seconds');
// Use refreshed token in credentials
const client = new DatabricksWarehouseClient({
type: 'databricks',
serverHostName: 'my-workspace.cloud.databricks.com',
httpPath: '/sql/1.0/warehouses/abc123',
database: 'analytics',
oauthClientId: 'client-id-123',
refreshToken: refreshedTokens.refreshToken,
startOfWeek: null,
});SQL builder for Databricks-specific syntax.
/**
* Databricks SQL builder with backtick quoting and PERCENTILE function
*/
class DatabricksSqlBuilder implements WarehouseSqlBuilder {
/** Warehouse type identifier */
readonly type = WarehouseTypes.DATABRICKS;
/**
* Get DBT adapter type for Databricks
* @returns SupportedDbtAdapter.DATABRICKS
*/
getAdapterType(): SupportedDbtAdapter;
/**
* Get field quote character for Databricks
* @returns Backtick character '`'
*/
getFieldQuoteChar(): string;
/**
* Generate SQL for metric aggregation
* Uses PERCENTILE function for percentile/median metrics
* @param sql - Column SQL expression
* @param metric - Metric definition
* @returns SQL string for the metric aggregation
*/
getMetricSql(sql: string, metric: Metric): string;
/**
* Escape string value for Databricks
* Uses Spark/Databricks-specific escaping rules
* @param value - Raw string value
* @returns Escaped string safe for Databricks query
*/
escapeString(value: string): string;
}Usage Examples:
import { DatabricksSqlBuilder } from '@lightdash/warehouses';
const builder = new DatabricksSqlBuilder();
// Get quote characters
const fieldQuote = builder.getFieldQuoteChar(); // '`'
const fieldName = `${fieldQuote}transaction_id${fieldQuote}`; // `transaction_id`
// Escape string with special characters
const escaped = builder.escapeString("O'Reilly");
// Result: 'O\'Reilly'
// Generate metric SQL
const medianSql = builder.getMetricSql('price', {
type: MetricType.MEDIAN,
sql: 'price',
});
// Result: 'PERCENTILE(price, 0.5)'
const percentileSql = builder.getMetricSql('latency', {
type: MetricType.PERCENTILE,
sql: 'latency',
percentile: 99,
});
// Result: 'PERCENTILE(latency, 0.99)'The following methods are inherited from WarehouseBaseClient and WarehouseSqlBuilder, and are available on all Databricks client instances:
/**
* Execute query and return all results in memory
* @deprecated Use streamQuery() to avoid loading all results into memory
*/
runQuery(
sql: string,
tags?: Record<string, string>,
timezone?: string,
values?: AnyType[],
queryParams?: Record<string, AnyType>
): Promise<WarehouseResults>;
/** Get the warehouse adapter type */
getAdapterType(): SupportedDbtAdapter;
/** Get string quote character (single quote) */
getStringQuoteChar(): string;
/** Get escape character for string quotes (backslash) */
getEscapeStringQuoteChar(): string;
/** Get field/identifier quote character (backtick for Databricks) */
getFieldQuoteChar(): string;
/** Get floating-point type name (DOUBLE for Databricks) */
getFloatingType(): string;
/**
* Generate SQL for metric aggregation
* @param sql - Column SQL expression
* @param metric - Metric definition with aggregation type
* @returns SQL string for the metric aggregation
*/
getMetricSql(sql: string, metric: Metric): string;
/** Concatenate strings using warehouse-specific SQL */
concatString(...args: string[]): string;
/** Escape string for SQL safety */
escapeString(value: string): string;
/** Get configured week start day */
getStartOfWeek(): WeekDay | null | undefined;
/** Parse catalog rows into nested structure */
parseWarehouseCatalog(
rows: Record<string, AnyType>[],
mapFieldType: (type: string) => DimensionType
): WarehouseCatalog;/**
* Databricks authentication type
* Determines which authentication method to use for connecting to Databricks
*/
enum DatabricksAuthenticationType {
/** Personal Access Token authentication */
PERSONAL_ACCESS_TOKEN = 'personal_access_token',
/** OAuth Machine-to-Machine authentication */
OAUTH_M2M = 'oauth_m2m',
/** OAuth User-to-Machine authentication */
OAUTH_U2M = 'oauth_u2m',
}
/**
* Databricks warehouse credentials
* Supports three authentication methods: personal access token, OAuth M2M, and OAuth U2M
*/
interface CreateDatabricksCredentials {
/** Warehouse type discriminator */
type: 'databricks';
/** Unity Catalog catalog name (optional, for three-part naming) */
catalog?: string;
/** Database/schema name (note: called 'database' for historical reasons, represents schema) */
database: string;
/** Databricks workspace hostname (without https://) */
serverHostName: string;
/** SQL warehouse HTTP path (e.g., '/sql/1.0/warehouses/abc123') */
httpPath: string;
/** Authentication type (personal_access_token, oauth_m2m, or oauth_u2m) (optional) */
authenticationType?: DatabricksAuthenticationType;
/** Personal access token (optional, used when authenticationType is personal_access_token) */
personalAccessToken?: string;
/** OAuth refresh token (optional, used when authenticationType is oauth_u2m) */
refreshToken?: string;
/** OAuth access token (optional, short-lived token for OAuth) */
token?: string;
/** OAuth client ID (optional, used for OAuth M2M and U2M) */
oauthClientId?: string;
/** OAuth client secret (optional, used for OAuth M2M) */
oauthClientSecret?: string;
/** Require user credentials flag (optional) */
requireUserCredentials?: boolean;
/** Week start day configuration (optional) */
startOfWeek?: WeekDay | null;
/** Optional array of compute endpoints (warehouses) */
compute?: Array<{
name: string;
httpPath: string;
}>;
}
/**
* Databricks connection options for SQL connector
* Note: This interface is defined in @databricks/sql package
*/
interface ConnectionOptions {
host: string;
path: string;
token?: string;
authType?: string;
socketTimeout?: number;
[key: string]: any;
}The following enum is used internally by the Databricks client for type mapping but is NOT exported from the package. It is included here for reference when working with Databricks data types:
/**
* Internal Databricks data type enum (NOT EXPORTED)
* Used for mapping Databricks SQL types to Lightdash dimension types
* These types are handled automatically by the client - no need to reference directly
*/
enum DatabricksTypes {
BOOLEAN = 'BOOLEAN',
BYTE = 'BYTE',
TINYINT = 'TINYINT',
SHORT = 'SHORT',
SMALLINT = 'SMALLINT',
INT = 'INT',
INTEGER = 'INTEGER',
LONG = 'LONG',
BIGINT = 'BIGINT',
FLOAT = 'FLOAT',
REAL = 'REAL',
DOUBLE = 'DOUBLE',
DATE = 'DATE',
TIMESTAMP = 'TIMESTAMP',
STRING = 'STRING',
BINARY = 'BINARY',
DECIMAL = 'DECIMAL',
DEC = 'DEC',
NUMERIC = 'NUMERIC',
INTERVAL = 'INTERVAL',
ARRAY = 'ARRAY',
STRUCT = 'STRUCT',
MAP = 'MAP',
CHAR = 'CHAR',
VARCHAR = 'VARCHAR',
}The Databricks client respects the following environment variables:
DATABRICKS_ENABLE_TIMEOUTS: Set to the string 'true' (case-sensitive) to enable socket and query timeouts
'true' - other values (including '1', 'True', or boolean true) will not enable timeouts