Google Cloud Platform integration components for the Dagster data orchestration framework.
—
Comprehensive BigQuery integration for Dagster providing data warehousing capabilities, I/O managers for reading and writing BigQuery tables, operations for data loading and querying, and resources for BigQuery client management with full authentication support.
Configurable resource for BigQuery client management with project specification and authentication.
class BigQueryResource(ConfigurableResource):
"""Resource for BigQuery client management."""
project: Optional[str] # GCP project ID
location: Optional[str] # Default location for jobs/datasets/tables
gcp_credentials: Optional[str] # Base64-encoded service account credentials
def get_client(self) -> Iterator[bigquery.Client]:
"""Context manager yielding authenticated BigQuery client."""Legacy resource factory:
@resource(
config_schema=BigQueryResource.to_config_schema(),
description="Dagster resource for connecting to BigQuery"
)
def bigquery_resource(context) -> Iterator[bigquery.Client]:
"""Legacy BigQuery resource factory that yields a BigQuery client."""Configurable I/O manager factory for BigQuery table storage and retrieval.
class BigQueryIOManager(ConfigurableIOManagerFactory):
"""Base class for BigQuery I/O managers."""
project: str # GCP project ID
dataset: Optional[str] # Default BigQuery dataset
location: Optional[str] # GCP location
gcp_credentials: Optional[str] # Base64-encoded credentials
temporary_gcs_bucket: Optional[str] # Temporary GCS bucket for large operations
timeout: Optional[float] # Query timeout for Pandas operations
def type_handlers(self) -> Sequence[DbTypeHandler]:
"""Abstract method to define type handlers."""
def default_load_type(self) -> Optional[type]:
"""Default type for loading data."""
def create_io_manager(self, context) -> Generator:
"""Creates the actual I/O manager instance."""
def build_bigquery_io_manager(
type_handlers: Sequence[DbTypeHandler],
default_load_type: Optional[type] = None
) -> IOManagerDefinition:
"""Factory function for creating BigQuery I/O manager definitions."""Operations for importing data from various sources into BigQuery.
@op(
required_resource_keys={"bigquery"},
config_schema=define_bigquery_load_config()
)
def import_df_to_bq(context, df: DataFrame) -> None:
"""Import Pandas DataFrame to BigQuery table."""
@op(
required_resource_keys={"bigquery"},
config_schema=define_bigquery_load_config()
)
def import_file_to_bq(context, path: str) -> None:
"""Import local file to BigQuery table."""
@op(
required_resource_keys={"bigquery"},
config_schema=define_bigquery_load_config()
)
def import_gcs_paths_to_bq(context, paths: List[str]) -> None:
"""Import GCS files to BigQuery table."""Operations for executing SQL queries against BigQuery.
def bq_op_for_queries(sql_queries: List[str]) -> OpDefinition:
"""
Creates an op that executes BigQuery SQL queries.
Parameters:
- sql_queries: List of SQL queries to execute
Returns:
Op function that returns List[DataFrame]
"""Operations for BigQuery dataset lifecycle management.
@op(
required_resource_keys={"bigquery"},
config_schema=define_bigquery_create_dataset_config()
)
def bq_create_dataset(context) -> None:
"""
Create BigQuery dataset.
Config:
- dataset: str - Dataset identifier
- exists_ok: bool - Whether to ignore "already exists" errors
"""
@op(
required_resource_keys={"bigquery"},
config_schema=define_bigquery_delete_dataset_config()
)
def bq_delete_dataset(context) -> None:
"""
Delete BigQuery dataset.
Config:
- dataset: str - Dataset identifier
- delete_contents: bool - Whether to delete tables in dataset
- not_found_ok: bool - Whether to ignore "not found" errors
"""Helper functions for BigQuery operations.
def fetch_last_updated_timestamps(
client: bigquery.Client,
dataset_id: str,
table_ids: Sequence[str]
) -> Mapping[str, datetime]:
"""
Get last updated timestamps for BigQuery tables.
Parameters:
- client: BigQuery client
- dataset_id: Dataset ID
- table_ids: List of table IDs
Returns:
Mapping of table ID to timestamp
"""BigQuery-specific types, enums, and configuration schemas.
class BigQueryError(Exception):
"""Exception class for BigQuery-related errors."""
class BigQueryLoadSource(Enum):
"""Enum for BigQuery load sources."""
DataFrame = "DataFrame"
GCS = "GCS"
File = "File"
class BQCreateDisposition(Enum):
"""Table creation behavior."""
CREATE_IF_NEEDED = "CREATE_IF_NEEDED"
CREATE_NEVER = "CREATE_NEVER"
class BQWriteDisposition(Enum):
"""Write behavior for existing tables."""
WRITE_TRUNCATE = "WRITE_TRUNCATE"
WRITE_APPEND = "WRITE_APPEND"
WRITE_EMPTY = "WRITE_EMPTY"
class BQSchemaUpdateOption(Enum):
"""Schema update options."""
ALLOW_FIELD_ADDITION = "ALLOW_FIELD_ADDITION"
ALLOW_FIELD_RELAXATION = "ALLOW_FIELD_RELAXATION"
class BQPriority(Enum):
"""Query priority levels."""
BATCH = "BATCH"
INTERACTIVE = "INTERACTIVE"
class BQEncoding(Enum):
"""File encoding options."""
UTF_8 = "UTF-8"
ISO_8859_1 = "ISO-8859-1"
class BQSourceFormat(Enum):
"""Source file format options."""
CSV = "CSV"
NEWLINE_DELIMITED_JSON = "NEWLINE_DELIMITED_JSON"
AVRO = "AVRO"
PARQUET = "PARQUET"
DATASTORE_BACKUP = "DATASTORE_BACKUP"Validation scalars for BigQuery identifiers.
def Table(table_name: str) -> str:
"""Validates BigQuery table identifiers."""
def Dataset(dataset_name: str) -> str:
"""Validates BigQuery dataset identifiers."""Functions that define configuration schemas for BigQuery operations.
def define_bigquery_query_config() -> ConfigSchema:
"""Configuration for query operations."""
def define_bigquery_load_config() -> ConfigSchema:
"""Configuration for load operations."""
def define_bigquery_create_dataset_config() -> ConfigSchema:
"""Configuration for dataset creation."""
def define_bigquery_delete_dataset_config() -> ConfigSchema:
"""Configuration for dataset deletion."""from dagster import asset, Definitions
from dagster_gcp import BigQueryResource
@asset
def customer_data(bigquery: BigQueryResource):
with bigquery.get_client() as client:
query = """
SELECT customer_id, order_count, total_spent
FROM `project.analytics.customer_summary`
WHERE last_order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
"""
return client.query(query).to_dataframe()
defs = Definitions(
assets=[customer_data],
resources={
"bigquery": BigQueryResource(
project="my-gcp-project",
location="US"
)
}
)from dagster import asset, Definitions
from dagster_gcp import BigQueryIOManager, BigQueryResource
@asset
def processed_orders():
# This will be stored in BigQuery
return pd.DataFrame({
'order_id': [1, 2, 3],
'amount': [100.0, 250.0, 75.0],
'processed_at': [datetime.now()] * 3
})
defs = Definitions(
assets=[processed_orders],
resources={
"io_manager": BigQueryIOManager(
project="my-gcp-project",
dataset="analytics"
)
}
)from dagster import job, op
from dagster_gcp import import_df_to_bq, BigQueryResource
import pandas as pd
@op
def create_sample_data():
return pd.DataFrame({
'id': [1, 2, 3],
'value': ['a', 'b', 'c']
})
@job(
resource_defs={
"bigquery": BigQueryResource(project="my-project")
}
)
def load_data_job():
df = create_sample_data()
import_df_to_bq(df)Install with Tessl CLI
npx tessl i tessl/pypi-dagster-gcp