CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-gcp

Google Cloud Platform integration components for the Dagster data orchestration framework.

Pending
Overview
Eval results
Files

bigquery.mddocs/

BigQuery Integration

Comprehensive BigQuery integration for Dagster providing data warehousing capabilities, I/O managers for reading and writing BigQuery tables, operations for data loading and querying, and resources for BigQuery client management with full authentication support.

Capabilities

BigQuery Resource

Configurable resource for BigQuery client management with project specification and authentication.

class BigQueryResource(ConfigurableResource):
    """Resource for BigQuery client management."""
    project: Optional[str]  # GCP project ID
    location: Optional[str]  # Default location for jobs/datasets/tables
    gcp_credentials: Optional[str]  # Base64-encoded service account credentials
    
    def get_client(self) -> Iterator[bigquery.Client]:
        """Context manager yielding authenticated BigQuery client."""

Legacy resource factory:

@resource(
    config_schema=BigQueryResource.to_config_schema(),
    description="Dagster resource for connecting to BigQuery"
)
def bigquery_resource(context) -> Iterator[bigquery.Client]:
    """Legacy BigQuery resource factory that yields a BigQuery client."""

I/O Manager

Configurable I/O manager factory for BigQuery table storage and retrieval.

class BigQueryIOManager(ConfigurableIOManagerFactory):
    """Base class for BigQuery I/O managers."""
    project: str  # GCP project ID
    dataset: Optional[str]  # Default BigQuery dataset
    location: Optional[str]  # GCP location
    gcp_credentials: Optional[str]  # Base64-encoded credentials
    temporary_gcs_bucket: Optional[str]  # Temporary GCS bucket for large operations
    timeout: Optional[float]  # Query timeout for Pandas operations
    
    def type_handlers(self) -> Sequence[DbTypeHandler]:
        """Abstract method to define type handlers."""
    
    def default_load_type(self) -> Optional[type]:
        """Default type for loading data."""
    
    def create_io_manager(self, context) -> Generator:
        """Creates the actual I/O manager instance."""

def build_bigquery_io_manager(
    type_handlers: Sequence[DbTypeHandler],
    default_load_type: Optional[type] = None
) -> IOManagerDefinition:
    """Factory function for creating BigQuery I/O manager definitions."""

Data Loading Operations

Operations for importing data from various sources into BigQuery.

@op(
    required_resource_keys={"bigquery"},
    config_schema=define_bigquery_load_config()
)
def import_df_to_bq(context, df: DataFrame) -> None:
    """Import Pandas DataFrame to BigQuery table."""

@op(
    required_resource_keys={"bigquery"}, 
    config_schema=define_bigquery_load_config()
)
def import_file_to_bq(context, path: str) -> None:
    """Import local file to BigQuery table."""

@op(
    required_resource_keys={"bigquery"},
    config_schema=define_bigquery_load_config()
)
def import_gcs_paths_to_bq(context, paths: List[str]) -> None:
    """Import GCS files to BigQuery table."""

Query Operations

Operations for executing SQL queries against BigQuery.

def bq_op_for_queries(sql_queries: List[str]) -> OpDefinition:
    """
    Creates an op that executes BigQuery SQL queries.
    
    Parameters:
    - sql_queries: List of SQL queries to execute
    
    Returns:
    Op function that returns List[DataFrame]
    """

Dataset Management Operations

Operations for BigQuery dataset lifecycle management.

@op(
    required_resource_keys={"bigquery"},
    config_schema=define_bigquery_create_dataset_config()
)
def bq_create_dataset(context) -> None:
    """
    Create BigQuery dataset.
    
    Config:
    - dataset: str - Dataset identifier  
    - exists_ok: bool - Whether to ignore "already exists" errors
    """

@op(
    required_resource_keys={"bigquery"},
    config_schema=define_bigquery_delete_dataset_config()
)
def bq_delete_dataset(context) -> None:
    """
    Delete BigQuery dataset.
    
    Config:
    - dataset: str - Dataset identifier
    - delete_contents: bool - Whether to delete tables in dataset
    - not_found_ok: bool - Whether to ignore "not found" errors
    """

Utility Functions

Helper functions for BigQuery operations.

def fetch_last_updated_timestamps(
    client: bigquery.Client,
    dataset_id: str, 
    table_ids: Sequence[str]
) -> Mapping[str, datetime]:
    """
    Get last updated timestamps for BigQuery tables.
    
    Parameters:
    - client: BigQuery client
    - dataset_id: Dataset ID
    - table_ids: List of table IDs
    
    Returns:
    Mapping of table ID to timestamp
    """

Types and Configuration

BigQuery-specific types, enums, and configuration schemas.

class BigQueryError(Exception):
    """Exception class for BigQuery-related errors."""

class BigQueryLoadSource(Enum):
    """Enum for BigQuery load sources."""
    DataFrame = "DataFrame"
    GCS = "GCS" 
    File = "File"

class BQCreateDisposition(Enum):
    """Table creation behavior."""
    CREATE_IF_NEEDED = "CREATE_IF_NEEDED"
    CREATE_NEVER = "CREATE_NEVER"

class BQWriteDisposition(Enum):
    """Write behavior for existing tables."""
    WRITE_TRUNCATE = "WRITE_TRUNCATE"
    WRITE_APPEND = "WRITE_APPEND"
    WRITE_EMPTY = "WRITE_EMPTY"

class BQSchemaUpdateOption(Enum):
    """Schema update options."""
    ALLOW_FIELD_ADDITION = "ALLOW_FIELD_ADDITION"
    ALLOW_FIELD_RELAXATION = "ALLOW_FIELD_RELAXATION"

class BQPriority(Enum):
    """Query priority levels."""
    BATCH = "BATCH"
    INTERACTIVE = "INTERACTIVE"

class BQEncoding(Enum):
    """File encoding options."""
    UTF_8 = "UTF-8"
    ISO_8859_1 = "ISO-8859-1"

class BQSourceFormat(Enum):
    """Source file format options."""
    CSV = "CSV"
    NEWLINE_DELIMITED_JSON = "NEWLINE_DELIMITED_JSON"
    AVRO = "AVRO"
    PARQUET = "PARQUET"
    DATASTORE_BACKUP = "DATASTORE_BACKUP"

Configuration Scalars

Validation scalars for BigQuery identifiers.

def Table(table_name: str) -> str:
    """Validates BigQuery table identifiers."""

def Dataset(dataset_name: str) -> str:
    """Validates BigQuery dataset identifiers."""

Configuration Functions

Functions that define configuration schemas for BigQuery operations.

def define_bigquery_query_config() -> ConfigSchema:
    """Configuration for query operations."""

def define_bigquery_load_config() -> ConfigSchema:
    """Configuration for load operations."""

def define_bigquery_create_dataset_config() -> ConfigSchema:
    """Configuration for dataset creation."""

def define_bigquery_delete_dataset_config() -> ConfigSchema:
    """Configuration for dataset deletion."""

Usage Examples

Basic Resource Usage

from dagster import asset, Definitions
from dagster_gcp import BigQueryResource

@asset
def customer_data(bigquery: BigQueryResource):
    with bigquery.get_client() as client:
        query = """
        SELECT customer_id, order_count, total_spent
        FROM `project.analytics.customer_summary` 
        WHERE last_order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
        """
        return client.query(query).to_dataframe()

defs = Definitions(
    assets=[customer_data],
    resources={
        "bigquery": BigQueryResource(
            project="my-gcp-project",
            location="US"
        )
    }
)

I/O Manager Usage

from dagster import asset, Definitions
from dagster_gcp import BigQueryIOManager, BigQueryResource

@asset
def processed_orders():
    # This will be stored in BigQuery
    return pd.DataFrame({
        'order_id': [1, 2, 3],
        'amount': [100.0, 250.0, 75.0],
        'processed_at': [datetime.now()] * 3
    })

defs = Definitions(
    assets=[processed_orders],
    resources={
        "io_manager": BigQueryIOManager(
            project="my-gcp-project",
            dataset="analytics"
        )
    }
)

Data Loading Operations

from dagster import job, op
from dagster_gcp import import_df_to_bq, BigQueryResource
import pandas as pd

@op
def create_sample_data():
    return pd.DataFrame({
        'id': [1, 2, 3],
        'value': ['a', 'b', 'c']
    })

@job(
    resource_defs={
        "bigquery": BigQueryResource(project="my-project")
    }
)
def load_data_job():
    df = create_sample_data()
    import_df_to_bq(df)

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-gcp

docs

bigquery.md

dataproc.md

gcs.md

index.md

pipes.md

tile.json