CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-gcp

Google Cloud Platform integration components for the Dagster data orchestration framework.

Pending
Overview
Eval results
Files

pipes.mddocs/

Pipes Integration

External process communication through GCP services enabling Dagster to orchestrate and monitor workloads running outside the Dagster process. Includes clients for running workloads on Dataproc, context injectors for passing data via GCS, and message readers for collecting results and logs from external processes.

Capabilities

Dataproc Job Client

Pipes client for executing workloads on Google Cloud Dataproc in Job mode with full pipes protocol support.

class PipesDataprocJobClient(PipesClient):
    """Pipes client for running workloads on Dataproc in Job mode."""
    
    def __init__(
        self,
        message_reader: PipesMessageReader,
        client: Optional[JobControllerClient] = None,
        context_injector: Optional[PipesContextInjector] = None,
        forward_termination: bool = True,
        poll_interval: float = 5.0
    ): ...
    
    def run(
        self,
        context,
        submit_job_params: SubmitJobParams,
        extras: Optional[dict] = None
    ) -> PipesClientCompletedInvocation:
        """
        Execute Dataproc job with pipes protocol.
        
        Parameters:
        - context: Dagster execution context
        - submit_job_params: Job submission parameters
        - extras: Additional parameters
        
        Returns:
        Completed invocation with results and metadata
        """

class SubmitJobParams(TypedDict):
    """Type definition for Dataproc job submission parameters.""" 
    project_id: str
    region: str
    job: dict  # Dataproc job configuration
    job_id: Optional[str]
    request_id: Optional[str]

Context Injectors

Context injectors enable passing execution context and parameters to external processes via GCP services.

class PipesGCSContextInjector(PipesContextInjector):
    """Injects pipes context via temporary GCS objects."""
    bucket: str  # GCS bucket name
    client: GCSClient  # GCS client
    key_prefix: Optional[str]  # Optional key prefix
    
    def inject_context(self, context) -> Iterator[PipesParams]:
        """
        Context manager for context injection.
        
        Yields:
        PipesParams containing context information for external process
        """
    
    def no_messages_debug_text(self) -> str:
        """Debug text for troubleshooting when no messages received."""

Message Readers

Message readers collect results, logs, and metadata from external processes via GCS.

class PipesGCSMessageReader(PipesBlobStoreMessageReader):
    """Reads pipes messages from GCS bucket."""
    interval: float = 10  # Polling interval in seconds
    bucket: str  # GCS bucket name
    client: GCSClient  # GCS client
    log_readers: Optional[Sequence[PipesLogReader]]  # Associated log readers
    include_stdio_in_messages: bool = False  # Whether to include stdout/stderr
    
    def get_params(self) -> dict:
        """Get parameters for message reading."""
    
    def messages_are_readable(self, params: dict) -> bool:
        """Check if messages are available for reading."""
    
    def download_messages_chunk(self, index: int, params: dict) -> dict:
        """Download message chunk from GCS."""
    
    def no_messages_debug_text(self) -> str:
        """Debug text for troubleshooting when no messages received."""

class PipesGCSLogReader(PipesChunkedLogReader):
    """Reads log files from GCS with chunked streaming."""
    bucket: str  # GCS bucket name
    key: str  # Object key for logs
    client: GCSClient  # GCS client
    interval: float = 10  # Polling interval in seconds
    target_stream: IO[str]  # Target stream for output
    decode_fn: Callable[[bytes], str]  # Decoding function for logs
    debug_info: Optional[str]  # Debug information
    
    def target_is_readable(self, params: dict) -> bool:
        """Check if target is readable."""
    
    def download_log_chunk(self, params: dict) -> bytes:
        """Download log chunk from GCS."""

Utility Functions

Helper functions for log processing and decoding.

def default_log_decode_fn(contents: bytes) -> str:
    """Default UTF-8 decoding for log contents."""

def gzip_log_decode_fn(contents: bytes) -> str:
    """Gzip decompression and UTF-8 decoding for compressed logs."""

Usage Examples

Basic Dataproc Pipes Job

from dagster import asset, Definitions
from dagster_gcp.pipes import (
    PipesDataprocJobClient,
    PipesGCSMessageReader,
    PipesGCSContextInjector
)
from google.cloud.dataproc_v1 import JobControllerClient
from google.cloud import storage

@asset
def spark_data_processing(
    pipes_dataproc_client: PipesDataprocJobClient
) -> dict:
    """Execute Spark job via Pipes and return results."""
    
    submit_job_params = {
        "project_id": "my-gcp-project",
        "region": "us-central1",
        "job": {
            "placement": {"cluster_name": "my-cluster"},
            "pyspark_job": {
                "main_python_file_uri": "gs://my-bucket/scripts/pipes_job.py",
                "args": ["--input", "gs://my-bucket/data/input.csv"]
            }
        }
    }
    
    return pipes_dataproc_client.run(
        context=context,
        submit_job_params=submit_job_params
    ).get_results()

defs = Definitions(
    assets=[spark_data_processing],
    resources={
        "pipes_dataproc_client": PipesDataprocJobClient(
            client=JobControllerClient(),
            context_injector=PipesGCSContextInjector(
                bucket="my-pipes-bucket",
                client=storage.Client()
            ),
            message_reader=PipesGCSMessageReader(
                bucket="my-pipes-bucket", 
                client=storage.Client()
            )
        )
    }
)

Advanced Pipes Configuration with Logging

from dagster import asset, get_dagster_logger
from dagster_gcp.pipes import (
    PipesDataprocJobClient,
    PipesGCSMessageReader, 
    PipesGCSContextInjector,
    PipesGCSLogReader
)
from google.cloud.dataproc_v1 import JobControllerClient
from google.cloud import storage
import sys

@asset
def ml_training_pipeline(
    pipes_dataproc_client: PipesDataprocJobClient
) -> dict:
    """Execute ML training pipeline with comprehensive logging."""
    
    # Configure job with custom cluster
    submit_job_params = {
        "project_id": "my-ml-project",
        "region": "us-central1",
        "job": {
            "placement": {"cluster_name": "ml-training-cluster"},
            "pyspark_job": {
                "main_python_file_uri": "gs://my-ml-bucket/training/train_model.py",
                "python_file_uris": [
                    "gs://my-ml-bucket/training/data_utils.py",
                    "gs://my-ml-bucket/training/model_utils.py"
                ],
                "args": [
                    "--data-path", "gs://my-ml-bucket/datasets/",
                    "--model-output", "gs://my-ml-bucket/models/",
                    "--epochs", "100"
                ]
            }
        }
    }
    
    result = pipes_dataproc_client.run(
        context=context,
        submit_job_params=submit_job_params,
        extras={"model_version": "v2.1"}
    )
    
    return {
        "model_metrics": result.get_results(),
        "training_logs": result.get_metadata(),
        "job_duration": result.duration
    }

# Configure with log readers
log_reader = PipesGCSLogReader(
    bucket="my-ml-bucket",
    key="logs/training.log",
    client=storage.Client(),
    target_stream=sys.stdout,
    interval=5.0
)

defs = Definitions(
    assets=[ml_training_pipeline],
    resources={
        "pipes_dataproc_client": PipesDataprocJobClient(
            client=JobControllerClient(),
            context_injector=PipesGCSContextInjector(
                bucket="my-ml-bucket",
                key_prefix="pipes/context",
                client=storage.Client()
            ),
            message_reader=PipesGCSMessageReader(
                bucket="my-ml-bucket",
                client=storage.Client(),
                log_readers=[log_reader],
                include_stdio_in_messages=True,
                interval=5.0
            ),
            forward_termination=True,
            poll_interval=10.0
        )
    }
)

Multiple External Processes

from dagster import asset, multi_asset, AssetOut
from dagster_gcp.pipes import PipesDataprocJobClient

@multi_asset(
    outs={
        "processed_data": AssetOut(),
        "model_predictions": AssetOut(), 
        "quality_metrics": AssetOut()
    }
)
def batch_ml_pipeline(
    pipes_dataproc_client: PipesDataprocJobClient
) -> tuple:
    """Execute multiple ML tasks in parallel via Pipes."""
    
    # Data processing job
    processing_params = {
        "project_id": "my-project",
        "region": "us-central1", 
        "job": {
            "placement": {"cluster_name": "processing-cluster"},
            "pyspark_job": {
                "main_python_file_uri": "gs://my-bucket/jobs/data_processing.py"
            }
        }
    }
    
    # Model inference job  
    inference_params = {
        "project_id": "my-project",
        "region": "us-central1",
        "job": {
            "placement": {"cluster_name": "inference-cluster"},
            "pyspark_job": {
                "main_python_file_uri": "gs://my-bucket/jobs/model_inference.py"
            }
        }
    }
    
    # Quality assessment job
    quality_params = {
        "project_id": "my-project", 
        "region": "us-central1",
        "job": {
            "placement": {"cluster_name": "quality-cluster"},
            "pyspark_job": {
                "main_python_file_uri": "gs://my-bucket/jobs/quality_assessment.py"
            }
        }
    }
    
    # Execute jobs and collect results
    processing_result = pipes_dataproc_client.run(context, processing_params)
    inference_result = pipes_dataproc_client.run(context, inference_params)
    quality_result = pipes_dataproc_client.run(context, quality_params)
    
    return (
        processing_result.get_results(),
        inference_result.get_results(),
        quality_result.get_results()
    )

Custom Context Injection

from dagster import asset, Config
from dagster_gcp.pipes import PipesDataprocJobClient, PipesGCSContextInjector

class DataProcessingConfig(Config):
    batch_date: str
    processing_mode: str
    output_format: str

@asset
def daily_data_processing(
    pipes_dataproc_client: PipesDataprocJobClient,
    config: DataProcessingConfig
) -> dict:
    """Process daily data with custom configuration."""
    
    # The context injector will automatically pass config to external process
    submit_job_params = {
        "project_id": "my-project",
        "region": "us-central1",
        "job": {
            "placement": {"cluster_name": "daily-processing"},
            "pyspark_job": {
                "main_python_file_uri": "gs://my-bucket/jobs/daily_processor.py",
                "args": [
                    "--batch-date", config.batch_date,
                    "--mode", config.processing_mode,
                    "--format", config.output_format
                ]
            }
        }
    }
    
    return pipes_dataproc_client.run(
        context=context,
        submit_job_params=submit_job_params
    ).get_results()

Error Handling and Debugging

from dagster import asset, get_dagster_logger
from dagster_gcp.pipes import PipesDataprocJobClient
from dagster import Failure

@asset
def robust_data_pipeline(
    pipes_dataproc_client: PipesDataprocJobClient
) -> dict:
    """Data pipeline with comprehensive error handling."""
    
    logger = get_dagster_logger()
    
    try:
        submit_job_params = {
            "project_id": "my-project",
            "region": "us-central1",
            "job": {
                "placement": {"cluster_name": "robust-cluster"},
                "pyspark_job": {
                    "main_python_file_uri": "gs://my-bucket/jobs/robust_pipeline.py"
                }
            }
        }
        
        result = pipes_dataproc_client.run(
            context=context,
            submit_job_params=submit_job_params
        )
        
        if not result.success:
            logger.error(f"Pipeline failed: {result.get_metadata()}")
            raise Failure("Data pipeline execution failed")
        
        logger.info(f"Pipeline completed successfully: {result.get_results()}")
        return result.get_results()
        
    except Exception as e:
        logger.error(f"Unexpected error in pipeline: {str(e)}")
        
        # Get debug information
        debug_text = pipes_dataproc_client.context_injector.no_messages_debug_text()
        logger.error(f"Debug info: {debug_text}")
        
        raise Failure(f"Pipeline failed with error: {str(e)}")

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-gcp

docs

bigquery.md

dataproc.md

gcs.md

index.md

pipes.md

tile.json