Google Cloud Platform integration components for the Dagster data orchestration framework.
—
External process communication through GCP services enabling Dagster to orchestrate and monitor workloads running outside the Dagster process. Includes clients for running workloads on Dataproc, context injectors for passing data via GCS, and message readers for collecting results and logs from external processes.
Pipes client for executing workloads on Google Cloud Dataproc in Job mode with full pipes protocol support.
class PipesDataprocJobClient(PipesClient):
"""Pipes client for running workloads on Dataproc in Job mode."""
def __init__(
self,
message_reader: PipesMessageReader,
client: Optional[JobControllerClient] = None,
context_injector: Optional[PipesContextInjector] = None,
forward_termination: bool = True,
poll_interval: float = 5.0
): ...
def run(
self,
context,
submit_job_params: SubmitJobParams,
extras: Optional[dict] = None
) -> PipesClientCompletedInvocation:
"""
Execute Dataproc job with pipes protocol.
Parameters:
- context: Dagster execution context
- submit_job_params: Job submission parameters
- extras: Additional parameters
Returns:
Completed invocation with results and metadata
"""
class SubmitJobParams(TypedDict):
"""Type definition for Dataproc job submission parameters."""
project_id: str
region: str
job: dict # Dataproc job configuration
job_id: Optional[str]
request_id: Optional[str]Context injectors enable passing execution context and parameters to external processes via GCP services.
class PipesGCSContextInjector(PipesContextInjector):
"""Injects pipes context via temporary GCS objects."""
bucket: str # GCS bucket name
client: GCSClient # GCS client
key_prefix: Optional[str] # Optional key prefix
def inject_context(self, context) -> Iterator[PipesParams]:
"""
Context manager for context injection.
Yields:
PipesParams containing context information for external process
"""
def no_messages_debug_text(self) -> str:
"""Debug text for troubleshooting when no messages received."""Message readers collect results, logs, and metadata from external processes via GCS.
class PipesGCSMessageReader(PipesBlobStoreMessageReader):
"""Reads pipes messages from GCS bucket."""
interval: float = 10 # Polling interval in seconds
bucket: str # GCS bucket name
client: GCSClient # GCS client
log_readers: Optional[Sequence[PipesLogReader]] # Associated log readers
include_stdio_in_messages: bool = False # Whether to include stdout/stderr
def get_params(self) -> dict:
"""Get parameters for message reading."""
def messages_are_readable(self, params: dict) -> bool:
"""Check if messages are available for reading."""
def download_messages_chunk(self, index: int, params: dict) -> dict:
"""Download message chunk from GCS."""
def no_messages_debug_text(self) -> str:
"""Debug text for troubleshooting when no messages received."""
class PipesGCSLogReader(PipesChunkedLogReader):
"""Reads log files from GCS with chunked streaming."""
bucket: str # GCS bucket name
key: str # Object key for logs
client: GCSClient # GCS client
interval: float = 10 # Polling interval in seconds
target_stream: IO[str] # Target stream for output
decode_fn: Callable[[bytes], str] # Decoding function for logs
debug_info: Optional[str] # Debug information
def target_is_readable(self, params: dict) -> bool:
"""Check if target is readable."""
def download_log_chunk(self, params: dict) -> bytes:
"""Download log chunk from GCS."""Helper functions for log processing and decoding.
def default_log_decode_fn(contents: bytes) -> str:
"""Default UTF-8 decoding for log contents."""
def gzip_log_decode_fn(contents: bytes) -> str:
"""Gzip decompression and UTF-8 decoding for compressed logs."""from dagster import asset, Definitions
from dagster_gcp.pipes import (
PipesDataprocJobClient,
PipesGCSMessageReader,
PipesGCSContextInjector
)
from google.cloud.dataproc_v1 import JobControllerClient
from google.cloud import storage
@asset
def spark_data_processing(
pipes_dataproc_client: PipesDataprocJobClient
) -> dict:
"""Execute Spark job via Pipes and return results."""
submit_job_params = {
"project_id": "my-gcp-project",
"region": "us-central1",
"job": {
"placement": {"cluster_name": "my-cluster"},
"pyspark_job": {
"main_python_file_uri": "gs://my-bucket/scripts/pipes_job.py",
"args": ["--input", "gs://my-bucket/data/input.csv"]
}
}
}
return pipes_dataproc_client.run(
context=context,
submit_job_params=submit_job_params
).get_results()
defs = Definitions(
assets=[spark_data_processing],
resources={
"pipes_dataproc_client": PipesDataprocJobClient(
client=JobControllerClient(),
context_injector=PipesGCSContextInjector(
bucket="my-pipes-bucket",
client=storage.Client()
),
message_reader=PipesGCSMessageReader(
bucket="my-pipes-bucket",
client=storage.Client()
)
)
}
)from dagster import asset, get_dagster_logger
from dagster_gcp.pipes import (
PipesDataprocJobClient,
PipesGCSMessageReader,
PipesGCSContextInjector,
PipesGCSLogReader
)
from google.cloud.dataproc_v1 import JobControllerClient
from google.cloud import storage
import sys
@asset
def ml_training_pipeline(
pipes_dataproc_client: PipesDataprocJobClient
) -> dict:
"""Execute ML training pipeline with comprehensive logging."""
# Configure job with custom cluster
submit_job_params = {
"project_id": "my-ml-project",
"region": "us-central1",
"job": {
"placement": {"cluster_name": "ml-training-cluster"},
"pyspark_job": {
"main_python_file_uri": "gs://my-ml-bucket/training/train_model.py",
"python_file_uris": [
"gs://my-ml-bucket/training/data_utils.py",
"gs://my-ml-bucket/training/model_utils.py"
],
"args": [
"--data-path", "gs://my-ml-bucket/datasets/",
"--model-output", "gs://my-ml-bucket/models/",
"--epochs", "100"
]
}
}
}
result = pipes_dataproc_client.run(
context=context,
submit_job_params=submit_job_params,
extras={"model_version": "v2.1"}
)
return {
"model_metrics": result.get_results(),
"training_logs": result.get_metadata(),
"job_duration": result.duration
}
# Configure with log readers
log_reader = PipesGCSLogReader(
bucket="my-ml-bucket",
key="logs/training.log",
client=storage.Client(),
target_stream=sys.stdout,
interval=5.0
)
defs = Definitions(
assets=[ml_training_pipeline],
resources={
"pipes_dataproc_client": PipesDataprocJobClient(
client=JobControllerClient(),
context_injector=PipesGCSContextInjector(
bucket="my-ml-bucket",
key_prefix="pipes/context",
client=storage.Client()
),
message_reader=PipesGCSMessageReader(
bucket="my-ml-bucket",
client=storage.Client(),
log_readers=[log_reader],
include_stdio_in_messages=True,
interval=5.0
),
forward_termination=True,
poll_interval=10.0
)
}
)from dagster import asset, multi_asset, AssetOut
from dagster_gcp.pipes import PipesDataprocJobClient
@multi_asset(
outs={
"processed_data": AssetOut(),
"model_predictions": AssetOut(),
"quality_metrics": AssetOut()
}
)
def batch_ml_pipeline(
pipes_dataproc_client: PipesDataprocJobClient
) -> tuple:
"""Execute multiple ML tasks in parallel via Pipes."""
# Data processing job
processing_params = {
"project_id": "my-project",
"region": "us-central1",
"job": {
"placement": {"cluster_name": "processing-cluster"},
"pyspark_job": {
"main_python_file_uri": "gs://my-bucket/jobs/data_processing.py"
}
}
}
# Model inference job
inference_params = {
"project_id": "my-project",
"region": "us-central1",
"job": {
"placement": {"cluster_name": "inference-cluster"},
"pyspark_job": {
"main_python_file_uri": "gs://my-bucket/jobs/model_inference.py"
}
}
}
# Quality assessment job
quality_params = {
"project_id": "my-project",
"region": "us-central1",
"job": {
"placement": {"cluster_name": "quality-cluster"},
"pyspark_job": {
"main_python_file_uri": "gs://my-bucket/jobs/quality_assessment.py"
}
}
}
# Execute jobs and collect results
processing_result = pipes_dataproc_client.run(context, processing_params)
inference_result = pipes_dataproc_client.run(context, inference_params)
quality_result = pipes_dataproc_client.run(context, quality_params)
return (
processing_result.get_results(),
inference_result.get_results(),
quality_result.get_results()
)from dagster import asset, Config
from dagster_gcp.pipes import PipesDataprocJobClient, PipesGCSContextInjector
class DataProcessingConfig(Config):
batch_date: str
processing_mode: str
output_format: str
@asset
def daily_data_processing(
pipes_dataproc_client: PipesDataprocJobClient,
config: DataProcessingConfig
) -> dict:
"""Process daily data with custom configuration."""
# The context injector will automatically pass config to external process
submit_job_params = {
"project_id": "my-project",
"region": "us-central1",
"job": {
"placement": {"cluster_name": "daily-processing"},
"pyspark_job": {
"main_python_file_uri": "gs://my-bucket/jobs/daily_processor.py",
"args": [
"--batch-date", config.batch_date,
"--mode", config.processing_mode,
"--format", config.output_format
]
}
}
}
return pipes_dataproc_client.run(
context=context,
submit_job_params=submit_job_params
).get_results()from dagster import asset, get_dagster_logger
from dagster_gcp.pipes import PipesDataprocJobClient
from dagster import Failure
@asset
def robust_data_pipeline(
pipes_dataproc_client: PipesDataprocJobClient
) -> dict:
"""Data pipeline with comprehensive error handling."""
logger = get_dagster_logger()
try:
submit_job_params = {
"project_id": "my-project",
"region": "us-central1",
"job": {
"placement": {"cluster_name": "robust-cluster"},
"pyspark_job": {
"main_python_file_uri": "gs://my-bucket/jobs/robust_pipeline.py"
}
}
}
result = pipes_dataproc_client.run(
context=context,
submit_job_params=submit_job_params
)
if not result.success:
logger.error(f"Pipeline failed: {result.get_metadata()}")
raise Failure("Data pipeline execution failed")
logger.info(f"Pipeline completed successfully: {result.get_results()}")
return result.get_results()
except Exception as e:
logger.error(f"Unexpected error in pipeline: {str(e)}")
# Get debug information
debug_text = pipes_dataproc_client.context_injector.no_messages_debug_text()
logger.error(f"Debug info: {debug_text}")
raise Failure(f"Pipeline failed with error: {str(e)}")Install with Tessl CLI
npx tessl i tessl/pypi-dagster-gcp