Google Cloud Platform integration components for the Dagster data orchestration framework.
npx @tessl/cli install tessl/pypi-dagster-gcp@0.27.0Google Cloud Platform integration components for the Dagster data orchestration framework. This package provides resources, I/O managers, operations, and utilities for building data pipelines that leverage GCP services including BigQuery, Google Cloud Storage, Dataproc, and Pipes for external process communication.
pip install dagster-gcpimport dagster_gcpCommon specific imports:
from dagster_gcp import (
BigQueryResource, BigQueryIOManager,
GCSResource, GCSPickleIOManager,
DataprocResource, configurable_dataproc_op
)Pipes components (imported from submodule):
from dagster_gcp.pipes import (
PipesDataprocJobClient,
PipesGCSContextInjector,
PipesGCSMessageReader,
PipesGCSLogReader
)from dagster import asset, Definitions
from dagster_gcp import BigQueryResource, GCSResource, GCSPickleIOManager
@asset
def my_data_asset(bigquery: BigQueryResource):
with bigquery.get_client() as client:
query = "SELECT * FROM `project.dataset.table` LIMIT 100"
df = client.query(query).to_dataframe()
return df
defs = Definitions(
assets=[my_data_asset],
resources={
"bigquery": BigQueryResource(project="my-gcp-project"),
"gcs": GCSResource(project="my-gcp-project"),
"io_manager": GCSPickleIOManager(
gcs_bucket="my-data-bucket",
gcs=GCSResource(project="my-gcp-project")
)
}
)The dagster-gcp package follows Dagster's resource and I/O manager patterns while providing GCP-specific implementations:
Each integration supports both modern ConfigurableResource patterns and legacy resource factory functions for backward compatibility.
Data warehousing operations including I/O managers for BigQuery tables, operations for data loading and querying, and resources for BigQuery client management with authentication support.
class BigQueryResource(ConfigurableResource):
project: Optional[str]
location: Optional[str]
gcp_credentials: Optional[str]
def get_client(self) -> Iterator[bigquery.Client]: ...
class BigQueryIOManager(ConfigurableIOManagerFactory):
project: str
dataset: Optional[str]
location: Optional[str]
gcp_credentials: Optional[str]
temporary_gcs_bucket: Optional[str]
timeout: Optional[float]File storage and management including I/O managers for pickled objects, file managers for direct GCS operations, compute log management, and sensor utilities for GCS-based data processing.
class GCSResource(ConfigurableResource):
project: Optional[str]
def get_client(self) -> storage.Client: ...
class GCSPickleIOManager(ConfigurableIOManager):
gcs: ResourceDependency[GCSResource]
gcs_bucket: str
gcs_prefix: str = "dagster"
def load_input(self, context) -> Any: ...
def handle_output(self, context, obj) -> None: ...Apache Spark cluster management and job execution including resources for Dataproc cluster lifecycle management, operations for submitting and monitoring Spark jobs, and comprehensive configuration support for cluster and job parameters.
class DataprocResource(ConfigurableResource):
project_id: str
region: str
cluster_name: str
labels: Optional[dict[str, str]]
cluster_config_yaml_path: Optional[str]
cluster_config_json_path: Optional[str]
cluster_config_dict: Optional[dict]
def get_client(self) -> DataprocClient: ...
@op
def configurable_dataproc_op(
dataproc: DataprocResource,
config: DataprocOpConfig
) -> Any: ...External process communication through GCP services including clients for running workloads on Dataproc, context injectors for passing data via GCS, and message readers for collecting results and logs from external processes.
class PipesDataprocJobClient(PipesClient):
client: JobControllerClient
context_injector: PipesContextInjector
message_reader: PipesMessageReader
forward_termination: bool = True
poll_interval: float = 5.0
def run(self, context, submit_job_params, extras) -> PipesClientCompletedInvocation: ...
class PipesGCSContextInjector(PipesContextInjector):
bucket: str
client: GCSClient
key_prefix: Optional[str]# File handle for GCS objects
class GCSFileHandle(FileHandle):
@property
def gcs_bucket(self) -> str: ...
@property
def gcs_key(self) -> str: ...
@property
def gcs_path(self) -> str: ...
@property
def path_desc(self) -> str: ...
# BigQuery error handling
class BigQueryError(Exception): ...
# Dataproc error handling
class DataprocError(Exception): ...