Google Cloud Platform integration components for the Dagster data orchestration framework.
—
Comprehensive Apache Spark cluster management and job execution through Google Cloud Dataproc. Provides resources for Dataproc cluster lifecycle management, operations for submitting and monitoring Spark jobs, and comprehensive configuration support for cluster and job parameters.
Configurable resource for Dataproc cluster management and client access.
class DataprocResource(ConfigurableResource):
"""Resource for Dataproc cluster management."""
project_id: str # GCP project ID
region: str # GCP region
cluster_name: str # Cluster name
labels: Optional[dict[str, str]] # Cluster labels
cluster_config_yaml_path: Optional[str] # Path to YAML config
cluster_config_json_path: Optional[str] # Path to JSON config
cluster_config_dict: Optional[dict] # Inline cluster config
def get_client(self) -> DataprocClient:
"""Create Dataproc client."""
@resource(
config_schema=define_dataproc_create_cluster_config(),
description="Manage a Dataproc cluster resource"
)
def dataproc_resource(context) -> DataprocClient:
"""Legacy Dataproc resource factory that returns a DataprocClient."""Lower-level client for direct Dataproc API interactions.
class DataprocClient:
"""Lower-level client for Dataproc API interactions."""
def create_cluster(self) -> None:
"""Create Dataproc cluster."""
def delete_cluster(self) -> None:
"""Delete Dataproc cluster."""
def submit_job(self, job_details: dict) -> str:
"""
Submit job to cluster.
Parameters:
- job_details: Job configuration dictionary
Returns:
Job ID string
"""
def get_job(self, job_id: str) -> dict:
"""Get job status and details."""
def wait_for_job(self, job_id: str, wait_timeout: int) -> dict:
"""Wait for job completion with timeout."""
def cluster_context_manager(self):
"""Context manager for temporary clusters."""Operations for executing jobs on Dataproc clusters.
class DataprocOpConfig(Config):
"""Configuration class for Dataproc operations."""
job_timeout_in_seconds: int = 1200 # Job timeout
job_scoped_cluster: bool = True # Whether to create temporary cluster
project_id: str # GCP project ID
region: str # GCP region
job_config: dict[str, Any] # Dataproc job configuration
@op(
required_resource_keys={"dataproc"},
config_schema=DATAPROC_CONFIG_SCHEMA
)
def dataproc_op(context) -> Any:
"""Legacy op for executing Dataproc jobs."""
@op
def configurable_dataproc_op(
dataproc: DataprocResource,
config: DataprocOpConfig
) -> Any:
"""
Modern configurable op for executing Dataproc jobs.
Parameters:
- dataproc: Dataproc resource
- config: Operation configuration
"""Functions for defining Dataproc configuration schemas.
def define_dataproc_create_cluster_config() -> ConfigSchema:
"""Configuration schema for cluster creation."""
def define_dataproc_submit_job_config() -> ConfigSchema:
"""Configuration schema for job submission."""Dataproc-specific types and error handling.
class DataprocError(Exception):
"""Exception class for Dataproc-related errors."""from dagster import op, job, Definitions
from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig
@configurable_dataproc_op
def run_spark_analysis(dataproc: DataprocResource, config: DataprocOpConfig):
"""Execute Spark job for data analysis."""
pass # Job execution handled by the op decorator
@job
def spark_analysis_job():
run_spark_analysis()
defs = Definitions(
jobs=[spark_analysis_job],
resources={
"dataproc": DataprocResource(
project_id="my-gcp-project",
region="us-central1",
cluster_name="analysis-cluster"
)
},
ops=[
run_spark_analysis.configured(
DataprocOpConfig(
project_id="my-gcp-project",
region="us-central1",
job_config={
"pyspark_job": {
"main_python_file_uri": "gs://my-bucket/scripts/analysis.py",
"args": ["--input", "gs://my-bucket/data/", "--output", "gs://my-bucket/results/"]
}
}
),
name="spark_analysis"
)
]
)from dagster import Definitions
from dagster_gcp import DataprocResource
# Define cluster configuration
cluster_config = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 100
}
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 100
}
},
"software_config": {
"image_version": "2.0-debian10",
"properties": {
"spark:spark.sql.adaptive.enabled": "true",
"spark:spark.sql.adaptive.coalescePartitions.enabled": "true"
}
}
}
defs = Definitions(
resources={
"dataproc": DataprocResource(
project_id="my-gcp-project",
region="us-central1",
cluster_name="custom-cluster",
cluster_config_dict=cluster_config,
labels={"environment": "production", "team": "data-eng"}
)
}
)from dagster import op, job, Config
from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig
class SparkJobConfig(Config):
input_path: str
output_path: str
num_partitions: int = 10
@configurable_dataproc_op
def process_large_dataset(
dataproc: DataprocResource,
config: DataprocOpConfig,
job_config: SparkJobConfig
):
"""Process large dataset with PySpark."""
pass
@job
def etl_pipeline():
process_large_dataset()
defs = Definitions(
jobs=[etl_pipeline],
resources={
"dataproc": DataprocResource(
project_id="my-gcp-project",
region="us-central1",
cluster_name="etl-cluster"
)
},
ops=[
process_large_dataset.configured(
{
"dataproc_config": DataprocOpConfig(
project_id="my-gcp-project",
region="us-central1",
job_config={
"pyspark_job": {
"main_python_file_uri": "gs://my-bucket/scripts/etl.py",
"python_file_uris": [
"gs://my-bucket/scripts/utils.py",
"gs://my-bucket/scripts/transforms.py"
],
"jar_file_uris": [
"gs://my-bucket/jars/spark-bigquery-connector.jar"
],
"args": [
"--input", "gs://my-bucket/raw-data/",
"--output", "gs://my-bucket/processed-data/",
"--partitions", "20"
]
}
},
job_timeout_in_seconds=3600
),
"job_config": SparkJobConfig(
input_path="gs://my-bucket/raw-data/",
output_path="gs://my-bucket/processed-data/",
num_partitions=20
)
},
name="large_dataset_processor"
)
]
)from dagster import op, job
from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig
@configurable_dataproc_op
def batch_processing_job(dataproc: DataprocResource, config: DataprocOpConfig):
"""Run batch processing on temporary cluster."""
pass
@job
def nightly_batch_job():
batch_processing_job()
# Configuration with temporary cluster
batch_config = DataprocOpConfig(
project_id="my-gcp-project",
region="us-central1",
job_scoped_cluster=True, # Creates temporary cluster
job_timeout_in_seconds=7200, # 2 hours
job_config={
"pyspark_job": {
"main_python_file_uri": "gs://my-bucket/batch/nightly_process.py"
}
}
)from dagster import op, In
from dagster_gcp import DataprocResource
@op
def submit_custom_job(dataproc: DataprocResource, job_details: dict):
client = dataproc.get_client()
# Submit job and get job ID
job_id = client.submit_job(job_details)
# Wait for completion
result = client.wait_for_job(job_id, wait_timeout=1800)
return {
"job_id": job_id,
"status": result.get("status"),
"output_uri": result.get("driver_output_resource_uri")
}Install with Tessl CLI
npx tessl i tessl/pypi-dagster-gcp