Google Cloud BigQuery warehouse client with native async query support, job-based execution, OAuth integration, and comprehensive error handling with line number parsing.
BigQuery warehouse client implementation extending the base client with native async query support and BigQuery-specific features.
/**
* BigQuery warehouse client with native async query support
* Supports both Application Default Credentials and service account authentication
*/
class BigqueryWarehouseClient implements WarehouseClient {
/**
* Initialize BigQuery client with credentials
* @param credentials - BigQuery credentials with project ID, location, and authentication
*/
constructor(credentials: CreateBigqueryCredentials);
/**
* The underlying BigQuery client instance
* Provides access to the native @google-cloud/bigquery client for advanced operations
*/
client: BigQuery;
/**
* The BigQuery credentials used to initialize this client
* Contains project, dataset, authentication, and configuration settings
*/
credentials: CreateBigqueryCredentials;
/**
* Stream query results using BigQuery jobs API
* Results are streamed in batches to handle large datasets efficiently
* Note: BigQuery does NOT support named query parameters (queryParams)
* Note: timezone parameter is currently not implemented and will be ignored
* @param query - SQL query string to execute
* @param streamCallback - Callback invoked for each batch of results
* @param options - Query execution options (values for positional params, tags, timezone)
* @returns Promise that resolves when streaming completes
*/
streamQuery(
query: string,
streamCallback: (data: WarehouseResults) => void,
options: {
values?: AnyType[];
tags?: Record<string, string>;
timezone?: string; // Currently not implemented - parameter is ignored
}
): Promise<void>;
/**
* Execute async query with native BigQuery job support
* Returns job ID for later result retrieval
*
* IMPORTANT: BigQuery requires the callback parameter to be provided.
* Unlike the base WarehouseClient interface (which makes the callback optional for
* warehouses with native async pagination), BigQuery streams all results immediately
* during query execution via the callback. The callback IS required and results are
* delivered in real-time as the job runs.
*
* NOTE: The TypeScript signature does not enforce this requirement with a type constraint,
* but it is a runtime requirement for BigQuery. If no callback is provided, results will
* be lost as BigQuery streams them during execution.
*
* Note: timezone parameter in args is currently not implemented and will be ignored
* Note: queryParams field exists in WarehouseExecuteAsyncQueryArgs type but BigQuery does NOT support named parameters
*
* @param args - Async query arguments (sql, tags, timezone, etc.)
* @param resultsStreamCallback - Callback for streaming results (REQUIRED for BigQuery)
* @returns Promise resolving to query metadata with job ID and location
*/
executeAsyncQuery(
args: WarehouseExecuteAsyncQueryArgs,
resultsStreamCallback: (
rows: WarehouseResults["rows"],
fields: WarehouseResults["fields"]
) => void
): Promise<WarehouseExecuteAsyncQuery>;
/**
* Get catalog metadata for specific tables
* Batch fetches table schemas using BigQuery API
* @param requests - Array of table identifiers (database=project, schema=dataset, table)
* @returns Promise resolving to nested catalog structure
*/
getCatalog(
requests: Array<{ database: string; schema: string; table: string }>
): Promise<WarehouseCatalog>;
/**
* Get all tables in the project with partition column information
* Note: BigQuery's getAllTables() does NOT accept schema or tags parameters
* @returns Promise resolving to array of table metadata with partition columns
*/
getAllTables(): Promise<WarehouseTables>;
/**
* Get field metadata for specific table
* @param tableName - Name of the table
* @param schema - Dataset name (required)
* @param database - Project ID (optional, uses credentials project if omitted)
* @returns Promise resolving to catalog with table's column definitions
*/
getFields(tableName: string, schema: string, database?: string): Promise<WarehouseCatalog>;
/**
* Parse BigQuery errors with enhanced messages and line numbers
* Handles specific error types: accessDenied, invalidQuery, stopped, quotaExceeded, etc.
* @param error - BigQuery IErrorProto error object
* @param query - Query string for line number context (defaults to empty string)
* @returns WarehouseQueryError with specific message and line number
*/
parseError(error: bigquery.IErrorProto, query: string = ""): WarehouseQueryError;
/**
* Test warehouse connection by executing a simple query
* Inherited from WarehouseBaseClient
* @returns Promise that resolves if connection is successful, rejects otherwise
*/
test(): Promise<void>;
/**
* Run query and return all results at once
* @deprecated Use streamQuery() instead to avoid loading all results into memory
* Inherited from WarehouseBaseClient
* @param sql - SQL query string
* @param tags - Tags for query tracking
* @param timezone - Optional timezone for query execution
* @param values - Optional positional parameter values
* @returns Promise resolving to complete query results
*/
runQuery(
sql: string,
tags: Record<string, string>,
timezone?: string,
values?: AnyType[]
): Promise<WarehouseResults>;
/**
* ❌ NOT IMPLEMENTED for BigQuery
*
* **Architectural Reason:** BigQuery streams all results immediately during executeAsyncQuery() via the callback.
* There is no separate result retrieval step - results are delivered in a single stream
* as they become available. This is due to how the BigQuery API works internally.
*
* This method exists to satisfy the WarehouseClient interface but will always throw an error for BigQuery.
* To work with BigQuery results, you MUST:
* 1. Provide a callback to executeAsyncQuery()
* 2. Process results as they stream in the callback
*
* @param args - Query ID and pagination parameters
* @param rowFormatter - Optional row transformation function
* @returns This method never returns successfully
* @throws {NotImplementedError} Always throws - BigQuery does not support pagination-based result retrieval
*/
getAsyncQueryResults<TFormattedRow extends Record<string, unknown> = Record<string, unknown>>(
args: WarehouseGetAsyncQueryResultsArgs,
rowFormatter?: (row: Record<string, unknown>) => TFormattedRow
): Promise<WarehouseGetAsyncQueryResults<TFormattedRow>>;
/**
* Parse warehouse catalog from raw query results
* Inherited from WarehouseBaseClient
* @param rows - Raw rows from catalog query
* @param mapFieldType - Function to map warehouse-specific types to DimensionType
* @returns Parsed warehouse catalog structure
*/
parseWarehouseCatalog(
rows: Record<string, AnyType>[],
mapFieldType: (type: string) => DimensionType
): WarehouseCatalog;
}Usage Examples:
import { BigqueryWarehouseClient } from "@lightdash/warehouses";
// Using service account credentials
const client = new BigqueryWarehouseClient({
type: "bigquery",
project: "my-project-id",
dataset: "my_dataset",
location: "US",
timeoutSeconds: 300,
priority: "interactive",
retries: 3,
maximumBytesBilled: 1000000000,
keyfileContents: {
type: "service_account",
project_id: "my-project-id",
private_key: "-----BEGIN PRIVATE KEY-----\n...",
client_email: "service-account@my-project.iam.gserviceaccount.com",
},
});
// Execute async query with job ID and stream results
const result = await client.executeAsyncQuery(
{
sql: "SELECT * FROM `my-dataset.my-table` WHERE date >= ?",
tags: { source: "lightdash", user_id: "123" },
// Note: timezone parameter is currently not implemented in BigQuery client
values: ["2023-01-01"], // Positional parameters
},
(rows, fields) => {
// Handle results as they stream in
console.log("Received batch:", rows.length, "rows");
// Process rows...
}
);
console.log("Job ID:", result.queryId);
console.log("Total rows:", result.totalRows);
console.log("Duration:", result.durationMs, "ms");
// Stream query results
await client.streamQuery(
"SELECT user_id, SUM(amount) as total FROM transactions GROUP BY user_id",
(data) => {
console.log("Received batch:", data.rows.length, "rows");
data.rows.forEach((row) => {
console.log(`User ${row.user_id}: $${row.total}`);
});
},
{ tags: { source: "lightdash" } }
);
// Get all tables with partition info
const tables = await client.getAllTables();
tables.forEach((table) => {
console.log(`${table.database}.${table.schema}.${table.table}`);
if (table.partitionColumn) {
console.log(` Partitioned by: ${table.partitionColumn}`);
}
});The following methods are inherited from WarehouseBaseClient and WarehouseSqlBuilder, and are available on all BigQuery client instances:
/**
* Execute query and return all results in memory
* @deprecated Use streamQuery() to avoid loading all results into memory
*/
runQuery(
sql: string,
tags?: Record<string, string>,
timezone?: string,
values?: AnyType[],
queryParams?: Record<string, AnyType>
): Promise<WarehouseResults>;
/** Get the warehouse adapter type */
getAdapterType(): SupportedDbtAdapter;
/** Get string quote character (single quote) */
getStringQuoteChar(): string;
/** Get escape character for string quotes (backslash) */
getEscapeStringQuoteChar(): string;
/** Get field/identifier quote character (backtick for BigQuery) */
getFieldQuoteChar(): string;
/** Get floating-point type name (FLOAT64 for BigQuery) */
getFloatingType(): string;
/**
* Generate SQL for metric aggregation
* @param sql - Column SQL expression
* @param metric - Metric definition with aggregation type
* @returns SQL string for the metric aggregation
*/
getMetricSql(sql: string, metric: Metric): string;
/** Concatenate strings using warehouse-specific SQL */
concatString(...args: string[]): string;
/** Escape string for SQL safety */
escapeString(value: string): string;
/** Get configured week start day */
getStartOfWeek(): WeekDay | null | undefined;
/** Parse catalog rows into nested structure */
parseWarehouseCatalog(
rows: Record<string, AnyType>[],
mapFieldType: (type: string) => DimensionType
): WarehouseCatalog;Static utility methods for BigQuery operations.
/**
* Sanitize labels for BigQuery requirements
* Labels must be lowercase, max 60 chars per key, max 60 chars per value, alphanumeric + underscore + hyphen
*
* **Return Value Behavior:**
* - If `labels` is `undefined` → returns `undefined` (not an empty object)
* - If `labels` is an empty object `{}` → returns empty object `{}`
* - If `labels` has entries → returns sanitized object
*
* @param labels - Raw label key-value pairs (optional)
* @returns Sanitized labels or undefined if input is undefined
*/
static sanitizeLabelsWithValues(
labels?: Record<string, string>
): Record<string, string> | undefined;
/**
* Extract field type information from BigQuery query response
* Maps BigQuery types to Lightdash DimensionType
* @param response - BigQuery QueryRowsResponse with schema metadata
* @returns Field metadata mapping column names to types
*/
static getFieldsFromResponse(
response: QueryRowsResponse[2] | undefined
): Record<string, { type: DimensionType }>;
/**
* Get available datasets using OAuth refresh token
* For UI dataset selection without service account
* @param projectId - BigQuery project ID
* @param refresh_token - OAuth refresh token
* @returns Promise resolving to array of dataset metadata
*/
static getDatabases(
projectId: string,
refresh_token: string
): Promise<BigqueryDataset[]>;
/**
* Get metadata for a specific table in a dataset
* Fetches table schema including fields and their types
* @param dataset - BigQuery Dataset instance
* @param table - Table name
* @returns Promise resolving to tuple of [projectId, datasetId, tableName, schema]
*/
static getTableMetadata(
dataset: Dataset,
table: string
): Promise<[string, string, string, TableSchema]>;
/**
* Type guard to check if error is a BigQuery error with error details
* @param error - Error object to check
* @returns True if error contains BigQuery error details
*/
static isBigqueryError(error: unknown): error is BigqueryError;Usage Examples:
import { BigqueryWarehouseClient } from "@lightdash/warehouses";
// Sanitize labels for BigQuery
const sanitized = BigqueryWarehouseClient.sanitizeLabelsWithValues({
"User-ID": "123",
"Very-Long-Label-Name-That-Exceeds-The-Maximum-Length-Allowed-By-BigQuery": "value",
UPPERCASE: "VALUE",
});
// Result: { 'user-id': '123', 'very-long-label-name-that-exceeds-the-maximum-length-allowed': 'value', 'uppercase': 'value' }
// Get datasets using OAuth (for UI)
const datasets = await BigqueryWarehouseClient.getDatabases("my-project-id", "oauth-refresh-token");
datasets.forEach((ds) => {
console.log(`${ds.datasetId} (${ds.location})`);
});SQL builder for BigQuery-specific syntax.
/**
* BigQuery SQL builder with backtick quoting and APPROX_QUANTILES for percentiles
*/
class BigquerySqlBuilder implements WarehouseSqlBuilder {
/** Warehouse type identifier */
readonly type = WarehouseTypes.BIGQUERY;
/**
* Get DBT adapter type for BigQuery
* @returns SupportedDbtAdapter.BIGQUERY
*/
getAdapterType(): SupportedDbtAdapter;
/**
* Get field quote character for BigQuery
* @returns Backtick character '`'
*/
getFieldQuoteChar(): string;
/**
* Get floating-point type name for BigQuery
* @returns 'FLOAT64'
*/
getFloatingType(): string;
/**
* Generate SQL for metric aggregation
* Uses APPROX_QUANTILES for percentile/median metrics
* @param sql - Column SQL expression
* @param metric - Metric definition
* @returns SQL string for the metric aggregation
*/
getMetricSql(sql: string, metric: Metric): string;
/**
* Escape string value for BigQuery
* Uses backslash escaping, removes SQL comments and null bytes
* @param value - Raw string value
* @returns Escaped string safe for BigQuery query
*/
escapeString(value: string): string;
}Usage Examples:
import { BigquerySqlBuilder } from "@lightdash/warehouses";
const builder = new BigquerySqlBuilder();
// Get quote characters
const fieldQuote = builder.getFieldQuoteChar(); // '`'
const fieldName = `${fieldQuote}user_id${fieldQuote}`; // `user_id`
// Escape string with special characters
const escaped = builder.escapeString("O'Reilly /* comment */ \x00");
// Result: 'O\'Reilly '
// Generate metric SQL
const medianSql = builder.getMetricSql("revenue", {
type: MetricType.MEDIAN,
sql: "revenue",
});
// Result: 'APPROX_QUANTILES(revenue, 100)[OFFSET(50)]'
const percentileSql = builder.getMetricSql("response_time", {
type: MetricType.PERCENTILE,
sql: "response_time",
percentile: 95,
});
// Result: 'APPROX_QUANTILES(response_time, 100)[OFFSET(95)]'BigQuery data type constants for field type checking.
/**
* BigQuery data type enum
* Covers all standard BigQuery types including nested structures
* Exported from @lightdash/warehouses for type checking and schema validation
*/
enum BigqueryFieldType {
STRING = "STRING",
INTEGER = "INTEGER",
BYTES = "BYTES",
INT64 = "INT64",
FLOAT = "FLOAT",
FLOAT64 = "FLOAT64",
BOOLEAN = "BOOLEAN",
BOOL = "BOOL",
TIMESTAMP = "TIMESTAMP",
DATE = "DATE",
TIME = "TIME",
DATETIME = "DATETIME",
GEOGRAPHY = "GEOGRAPHY",
NUMERIC = "NUMERIC",
BIGNUMERIC = "BIGNUMERIC",
RECORD = "RECORD",
STRUCT = "STRUCT",
ARRAY = "ARRAY",
}/**
* BigQuery warehouse credentials
*/
interface CreateBigqueryCredentials {
/** Warehouse type discriminator */
type: "bigquery";
/** GCP project ID */
project: string;
/** BigQuery dataset name */
dataset: string;
/** Optional number of concurrent threads */
threads?: number;
/** Query timeout in seconds (REQUIRED field, value can be undefined) */
timeoutSeconds: number | undefined;
/** Query priority: 'interactive' (default, fast) or 'batch' (slower, cheaper) (REQUIRED field, value can be undefined) */
priority: "interactive" | "batch" | undefined;
/** Authentication type: ADC, private key, or SSO */
authenticationType?: BigqueryAuthenticationType;
/** Service account key file contents (JSON object with service account credentials) */
keyfileContents: Record<string, string>;
/** Require user credentials flag */
requireUserCredentials?: boolean;
/** Number of retries for transient failures (REQUIRED field, value can be undefined) */
retries: number | undefined;
/** BigQuery location/region (e.g., 'US', 'EU', 'us-west1') (REQUIRED field, value can be undefined) */
location: string | undefined;
/** Maximum bytes billed limit (query fails if exceeded) (REQUIRED field, value can be undefined) */
maximumBytesBilled: number | undefined;
/** Week start day configuration */
startOfWeek?: WeekDay | null;
/** Optional execution project (defaults to main project if not specified) */
executionProject?: string;
}Job Configuration Options Behavior:
The following credential fields are applied to BigQuery job configuration when executing queries:
maximumBytesBilled: Enforces a hard limit on query cost
priority: Controls query execution priority
'interactive' (default): Fast execution, higher cost, limited concurrency'batch': Slower execution (queued), lower cost, unlimited concurrencytimeoutSeconds: Query timeout in seconds
timeoutSeconds * 1000) for BigQuery APIexecutionProject: Project used for billing and quota
project field for billing/**
* BigQuery dataset metadata from OAuth API
*/
interface BigqueryDataset {
/** Project ID containing the dataset */
projectId: string;
/** Dataset ID */
datasetId: string;
/** Dataset location/region (may be undefined) */
location: string | undefined;
}
/**
* Query execution options for streamQuery()
*/
interface QueryOptions {
/** Labels/tags for query tracking in BigQuery (optional) */
tags?: Record<string, string>;
/** Timezone for query session (not used by BigQuery) */
timezone?: string;
/** Positional query parameters */
values?: AnyType[];
}The following types are used internally by the BigQuery client and are NOT exported from the package:
/**
* BigQuery error object with error details
* INTERNAL TYPE - NOT EXPORTED from @lightdash/warehouses
* Used by the isBigqueryError() static method for type checking
*/
type BigqueryError = {
/** Array of error details from BigQuery API */
errors: bigquery.IErrorProto[];
};
/**
* Table schema structure from BigQuery metadata
* INTERNAL TYPE - NOT EXPORTED from @lightdash/warehouses
*/
type TableSchema = {
/** Array of field definitions with name and type */
fields: Array<{ name: string; type: string }>;
};
/**
* External types from @google-cloud/bigquery package:
* - Dataset: BigQuery dataset instance for table operations
* - QueryRowsResponse: Response from BigQuery query execution with schema metadata
* - bigquery.IErrorProto: BigQuery API error protocol buffer interface
*
* These types are part of the @google-cloud/bigquery dependency and are used in
* static method signatures. Import them from '@google-cloud/bigquery' when needed:
*
* import { Dataset, QueryRowsResponse } from '@google-cloud/bigquery';
* import bigquery from '@google-cloud/bigquery/build/src/types';
*/BigQuery client provides enhanced error messages for common error types:
Line number parsing extracts context from errors like: "1:34 Expected end of input but got keyword SELECT"