CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-gcp

Google Cloud Platform integration components for the Dagster data orchestration framework.

Pending
Overview
Eval results
Files

dataproc.mddocs/

Dataproc Integration

Comprehensive Apache Spark cluster management and job execution through Google Cloud Dataproc. Provides resources for Dataproc cluster lifecycle management, operations for submitting and monitoring Spark jobs, and comprehensive configuration support for cluster and job parameters.

Capabilities

Dataproc Resource

Configurable resource for Dataproc cluster management and client access.

class DataprocResource(ConfigurableResource):
    """Resource for Dataproc cluster management."""
    project_id: str  # GCP project ID
    region: str  # GCP region
    cluster_name: str  # Cluster name
    labels: Optional[dict[str, str]]  # Cluster labels
    cluster_config_yaml_path: Optional[str]  # Path to YAML config
    cluster_config_json_path: Optional[str]  # Path to JSON config  
    cluster_config_dict: Optional[dict]  # Inline cluster config
    
    def get_client(self) -> DataprocClient:
        """Create Dataproc client."""

@resource(
    config_schema=define_dataproc_create_cluster_config(),
    description="Manage a Dataproc cluster resource"
)
def dataproc_resource(context) -> DataprocClient:
    """Legacy Dataproc resource factory that returns a DataprocClient."""

Dataproc Client

Lower-level client for direct Dataproc API interactions.

class DataprocClient:
    """Lower-level client for Dataproc API interactions."""
    
    def create_cluster(self) -> None:
        """Create Dataproc cluster."""
    
    def delete_cluster(self) -> None:
        """Delete Dataproc cluster."""
    
    def submit_job(self, job_details: dict) -> str:
        """
        Submit job to cluster.
        
        Parameters:
        - job_details: Job configuration dictionary
        
        Returns:
        Job ID string
        """
    
    def get_job(self, job_id: str) -> dict:
        """Get job status and details."""
    
    def wait_for_job(self, job_id: str, wait_timeout: int) -> dict:
        """Wait for job completion with timeout."""
    
    def cluster_context_manager(self):
        """Context manager for temporary clusters."""

Operations

Operations for executing jobs on Dataproc clusters.

class DataprocOpConfig(Config):
    """Configuration class for Dataproc operations."""
    job_timeout_in_seconds: int = 1200  # Job timeout
    job_scoped_cluster: bool = True  # Whether to create temporary cluster
    project_id: str  # GCP project ID
    region: str  # GCP region
    job_config: dict[str, Any]  # Dataproc job configuration

@op(
    required_resource_keys={"dataproc"},
    config_schema=DATAPROC_CONFIG_SCHEMA
)
def dataproc_op(context) -> Any:
    """Legacy op for executing Dataproc jobs."""

@op
def configurable_dataproc_op(
    dataproc: DataprocResource,
    config: DataprocOpConfig
) -> Any:
    """
    Modern configurable op for executing Dataproc jobs.
    
    Parameters:
    - dataproc: Dataproc resource
    - config: Operation configuration
    """

Configuration Functions

Functions for defining Dataproc configuration schemas.

def define_dataproc_create_cluster_config() -> ConfigSchema:
    """Configuration schema for cluster creation."""

def define_dataproc_submit_job_config() -> ConfigSchema:
    """Configuration schema for job submission."""

Types and Exceptions

Dataproc-specific types and error handling.

class DataprocError(Exception):
    """Exception class for Dataproc-related errors."""

Usage Examples

Basic Spark Job Execution

from dagster import op, job, Definitions
from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig

@configurable_dataproc_op
def run_spark_analysis(dataproc: DataprocResource, config: DataprocOpConfig):
    """Execute Spark job for data analysis."""
    pass  # Job execution handled by the op decorator

@job
def spark_analysis_job():
    run_spark_analysis()

defs = Definitions(
    jobs=[spark_analysis_job],
    resources={
        "dataproc": DataprocResource(
            project_id="my-gcp-project",
            region="us-central1",
            cluster_name="analysis-cluster"
        )
    },
    ops=[
        run_spark_analysis.configured(
            DataprocOpConfig(
                project_id="my-gcp-project",
                region="us-central1", 
                job_config={
                    "pyspark_job": {
                        "main_python_file_uri": "gs://my-bucket/scripts/analysis.py",
                        "args": ["--input", "gs://my-bucket/data/", "--output", "gs://my-bucket/results/"]
                    }
                }
            ),
            name="spark_analysis"
        )
    ]
)

Cluster with Custom Configuration

from dagster import Definitions
from dagster_gcp import DataprocResource

# Define cluster configuration
cluster_config = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 100
        }
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard", 
            "boot_disk_size_gb": 100
        }
    },
    "software_config": {
        "image_version": "2.0-debian10",
        "properties": {
            "spark:spark.sql.adaptive.enabled": "true",
            "spark:spark.sql.adaptive.coalescePartitions.enabled": "true"
        }
    }
}

defs = Definitions(
    resources={
        "dataproc": DataprocResource(
            project_id="my-gcp-project",
            region="us-central1",
            cluster_name="custom-cluster",
            cluster_config_dict=cluster_config,
            labels={"environment": "production", "team": "data-eng"}
        )
    }
)

PySpark Job with Dependencies

from dagster import op, job, Config
from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig

class SparkJobConfig(Config):
    input_path: str
    output_path: str
    num_partitions: int = 10

@configurable_dataproc_op
def process_large_dataset(
    dataproc: DataprocResource,
    config: DataprocOpConfig,
    job_config: SparkJobConfig
):
    """Process large dataset with PySpark."""
    pass

@job
def etl_pipeline():
    process_large_dataset()

defs = Definitions(
    jobs=[etl_pipeline],
    resources={
        "dataproc": DataprocResource(
            project_id="my-gcp-project",
            region="us-central1", 
            cluster_name="etl-cluster"
        )
    },
    ops=[
        process_large_dataset.configured(
            {
                "dataproc_config": DataprocOpConfig(
                    project_id="my-gcp-project",
                    region="us-central1",
                    job_config={
                        "pyspark_job": {
                            "main_python_file_uri": "gs://my-bucket/scripts/etl.py",
                            "python_file_uris": [
                                "gs://my-bucket/scripts/utils.py",
                                "gs://my-bucket/scripts/transforms.py"
                            ],
                            "jar_file_uris": [
                                "gs://my-bucket/jars/spark-bigquery-connector.jar"
                            ],
                            "args": [
                                "--input", "gs://my-bucket/raw-data/",
                                "--output", "gs://my-bucket/processed-data/",
                                "--partitions", "20"
                            ]
                        }
                    },
                    job_timeout_in_seconds=3600
                ),
                "job_config": SparkJobConfig(
                    input_path="gs://my-bucket/raw-data/",
                    output_path="gs://my-bucket/processed-data/",
                    num_partitions=20
                )
            },
            name="large_dataset_processor"
        )
    ]
)

Temporary Cluster for Job

from dagster import op, job
from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig

@configurable_dataproc_op
def batch_processing_job(dataproc: DataprocResource, config: DataprocOpConfig):
    """Run batch processing on temporary cluster."""
    pass

@job
def nightly_batch_job():
    batch_processing_job()

# Configuration with temporary cluster
batch_config = DataprocOpConfig(
    project_id="my-gcp-project",
    region="us-central1",
    job_scoped_cluster=True,  # Creates temporary cluster
    job_timeout_in_seconds=7200,  # 2 hours
    job_config={
        "pyspark_job": {
            "main_python_file_uri": "gs://my-bucket/batch/nightly_process.py"
        }
    }
)

Direct Client Usage

from dagster import op, In
from dagster_gcp import DataprocResource

@op
def submit_custom_job(dataproc: DataprocResource, job_details: dict):
    client = dataproc.get_client()
    
    # Submit job and get job ID
    job_id = client.submit_job(job_details)
    
    # Wait for completion  
    result = client.wait_for_job(job_id, wait_timeout=1800)
    
    return {
        "job_id": job_id,
        "status": result.get("status"),
        "output_uri": result.get("driver_output_resource_uri")
    }

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-gcp

docs

bigquery.md

dataproc.md

gcs.md

index.md

pipes.md

tile.json