or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bigquery.mddataproc.mdgcs.mdindex.mdpipes.md
tile.json

tessl/pypi-dagster-gcp

Google Cloud Platform integration components for the Dagster data orchestration framework.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-gcp@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-gcp@0.27.0

index.mddocs/

Dagster GCP

Google Cloud Platform integration components for the Dagster data orchestration framework. This package provides resources, I/O managers, operations, and utilities for building data pipelines that leverage GCP services including BigQuery, Google Cloud Storage, Dataproc, and Pipes for external process communication.

Package Information

  • Package Name: dagster-gcp
  • Language: Python
  • Installation: pip install dagster-gcp

Core Imports

import dagster_gcp

Common specific imports:

from dagster_gcp import (
    BigQueryResource, BigQueryIOManager,
    GCSResource, GCSPickleIOManager,
    DataprocResource, configurable_dataproc_op
)

Pipes components (imported from submodule):

from dagster_gcp.pipes import (
    PipesDataprocJobClient,
    PipesGCSContextInjector,
    PipesGCSMessageReader,
    PipesGCSLogReader
)

Basic Usage

from dagster import asset, Definitions
from dagster_gcp import BigQueryResource, GCSResource, GCSPickleIOManager

@asset
def my_data_asset(bigquery: BigQueryResource):
    with bigquery.get_client() as client:
        query = "SELECT * FROM `project.dataset.table` LIMIT 100"
        df = client.query(query).to_dataframe()
        return df

defs = Definitions(
    assets=[my_data_asset],
    resources={
        "bigquery": BigQueryResource(project="my-gcp-project"),
        "gcs": GCSResource(project="my-gcp-project"),
        "io_manager": GCSPickleIOManager(
            gcs_bucket="my-data-bucket",
            gcs=GCSResource(project="my-gcp-project")
        )
    }
)

Architecture

The dagster-gcp package follows Dagster's resource and I/O manager patterns while providing GCP-specific implementations:

  • Resources: Configurable connections to GCP services (BigQuery, GCS, Dataproc)
  • I/O Managers: Data storage and retrieval using GCP storage services
  • Operations: Pre-built ops for common GCP tasks (data loading, job execution)
  • Pipes Integration: External process communication through GCP services

Each integration supports both modern ConfigurableResource patterns and legacy resource factory functions for backward compatibility.

Capabilities

BigQuery Integration

Data warehousing operations including I/O managers for BigQuery tables, operations for data loading and querying, and resources for BigQuery client management with authentication support.

class BigQueryResource(ConfigurableResource):
    project: Optional[str]
    location: Optional[str] 
    gcp_credentials: Optional[str]
    
    def get_client(self) -> Iterator[bigquery.Client]: ...

class BigQueryIOManager(ConfigurableIOManagerFactory):
    project: str
    dataset: Optional[str]
    location: Optional[str]
    gcp_credentials: Optional[str]
    temporary_gcs_bucket: Optional[str]
    timeout: Optional[float]

BigQuery Integration

Google Cloud Storage (GCS)

File storage and management including I/O managers for pickled objects, file managers for direct GCS operations, compute log management, and sensor utilities for GCS-based data processing.

class GCSResource(ConfigurableResource):
    project: Optional[str]
    
    def get_client(self) -> storage.Client: ...

class GCSPickleIOManager(ConfigurableIOManager):
    gcs: ResourceDependency[GCSResource]
    gcs_bucket: str
    gcs_prefix: str = "dagster"
    
    def load_input(self, context) -> Any: ...
    def handle_output(self, context, obj) -> None: ...

Google Cloud Storage

Dataproc Integration

Apache Spark cluster management and job execution including resources for Dataproc cluster lifecycle management, operations for submitting and monitoring Spark jobs, and comprehensive configuration support for cluster and job parameters.

class DataprocResource(ConfigurableResource):
    project_id: str
    region: str
    cluster_name: str
    labels: Optional[dict[str, str]]
    cluster_config_yaml_path: Optional[str]
    cluster_config_json_path: Optional[str]
    cluster_config_dict: Optional[dict]
    
    def get_client(self) -> DataprocClient: ...

@op
def configurable_dataproc_op(
    dataproc: DataprocResource,
    config: DataprocOpConfig
) -> Any: ...

Dataproc Integration

Pipes Integration

External process communication through GCP services including clients for running workloads on Dataproc, context injectors for passing data via GCS, and message readers for collecting results and logs from external processes.

class PipesDataprocJobClient(PipesClient):
    client: JobControllerClient
    context_injector: PipesContextInjector
    message_reader: PipesMessageReader
    forward_termination: bool = True
    poll_interval: float = 5.0
    
    def run(self, context, submit_job_params, extras) -> PipesClientCompletedInvocation: ...

class PipesGCSContextInjector(PipesContextInjector):
    bucket: str
    client: GCSClient
    key_prefix: Optional[str]

Pipes Integration

Types

Core Types

# File handle for GCS objects
class GCSFileHandle(FileHandle):
    @property
    def gcs_bucket(self) -> str: ...
    
    @property
    def gcs_key(self) -> str: ...
    
    @property
    def gcs_path(self) -> str: ...
    
    @property
    def path_desc(self) -> str: ...

# BigQuery error handling
class BigQueryError(Exception): ...

# Dataproc error handling  
class DataprocError(Exception): ...