0
# Pipes Integration
1
2
External process communication through GCP services enabling Dagster to orchestrate and monitor workloads running outside the Dagster process. Includes clients for running workloads on Dataproc, context injectors for passing data via GCS, and message readers for collecting results and logs from external processes.
3
4
## Capabilities
5
6
### Dataproc Job Client
7
8
Pipes client for executing workloads on Google Cloud Dataproc in Job mode with full pipes protocol support.
9
10
```python { .api }
11
class PipesDataprocJobClient(PipesClient):
12
"""Pipes client for running workloads on Dataproc in Job mode."""
13
14
def __init__(
15
self,
16
message_reader: PipesMessageReader,
17
client: Optional[JobControllerClient] = None,
18
context_injector: Optional[PipesContextInjector] = None,
19
forward_termination: bool = True,
20
poll_interval: float = 5.0
21
): ...
22
23
def run(
24
self,
25
context,
26
submit_job_params: SubmitJobParams,
27
extras: Optional[dict] = None
28
) -> PipesClientCompletedInvocation:
29
"""
30
Execute Dataproc job with pipes protocol.
31
32
Parameters:
33
- context: Dagster execution context
34
- submit_job_params: Job submission parameters
35
- extras: Additional parameters
36
37
Returns:
38
Completed invocation with results and metadata
39
"""
40
41
class SubmitJobParams(TypedDict):
42
"""Type definition for Dataproc job submission parameters."""
43
project_id: str
44
region: str
45
job: dict # Dataproc job configuration
46
job_id: Optional[str]
47
request_id: Optional[str]
48
```
49
50
### Context Injectors
51
52
Context injectors enable passing execution context and parameters to external processes via GCP services.
53
54
```python { .api }
55
class PipesGCSContextInjector(PipesContextInjector):
56
"""Injects pipes context via temporary GCS objects."""
57
bucket: str # GCS bucket name
58
client: GCSClient # GCS client
59
key_prefix: Optional[str] # Optional key prefix
60
61
def inject_context(self, context) -> Iterator[PipesParams]:
62
"""
63
Context manager for context injection.
64
65
Yields:
66
PipesParams containing context information for external process
67
"""
68
69
def no_messages_debug_text(self) -> str:
70
"""Debug text for troubleshooting when no messages received."""
71
```
72
73
### Message Readers
74
75
Message readers collect results, logs, and metadata from external processes via GCS.
76
77
```python { .api }
78
class PipesGCSMessageReader(PipesBlobStoreMessageReader):
79
"""Reads pipes messages from GCS bucket."""
80
interval: float = 10 # Polling interval in seconds
81
bucket: str # GCS bucket name
82
client: GCSClient # GCS client
83
log_readers: Optional[Sequence[PipesLogReader]] # Associated log readers
84
include_stdio_in_messages: bool = False # Whether to include stdout/stderr
85
86
def get_params(self) -> dict:
87
"""Get parameters for message reading."""
88
89
def messages_are_readable(self, params: dict) -> bool:
90
"""Check if messages are available for reading."""
91
92
def download_messages_chunk(self, index: int, params: dict) -> dict:
93
"""Download message chunk from GCS."""
94
95
def no_messages_debug_text(self) -> str:
96
"""Debug text for troubleshooting when no messages received."""
97
98
class PipesGCSLogReader(PipesChunkedLogReader):
99
"""Reads log files from GCS with chunked streaming."""
100
bucket: str # GCS bucket name
101
key: str # Object key for logs
102
client: GCSClient # GCS client
103
interval: float = 10 # Polling interval in seconds
104
target_stream: IO[str] # Target stream for output
105
decode_fn: Callable[[bytes], str] # Decoding function for logs
106
debug_info: Optional[str] # Debug information
107
108
def target_is_readable(self, params: dict) -> bool:
109
"""Check if target is readable."""
110
111
def download_log_chunk(self, params: dict) -> bytes:
112
"""Download log chunk from GCS."""
113
```
114
115
### Utility Functions
116
117
Helper functions for log processing and decoding.
118
119
```python { .api }
120
def default_log_decode_fn(contents: bytes) -> str:
121
"""Default UTF-8 decoding for log contents."""
122
123
def gzip_log_decode_fn(contents: bytes) -> str:
124
"""Gzip decompression and UTF-8 decoding for compressed logs."""
125
```
126
127
## Usage Examples
128
129
### Basic Dataproc Pipes Job
130
131
```python
132
from dagster import asset, Definitions
133
from dagster_gcp.pipes import (
134
PipesDataprocJobClient,
135
PipesGCSMessageReader,
136
PipesGCSContextInjector
137
)
138
from google.cloud.dataproc_v1 import JobControllerClient
139
from google.cloud import storage
140
141
@asset
142
def spark_data_processing(
143
pipes_dataproc_client: PipesDataprocJobClient
144
) -> dict:
145
"""Execute Spark job via Pipes and return results."""
146
147
submit_job_params = {
148
"project_id": "my-gcp-project",
149
"region": "us-central1",
150
"job": {
151
"placement": {"cluster_name": "my-cluster"},
152
"pyspark_job": {
153
"main_python_file_uri": "gs://my-bucket/scripts/pipes_job.py",
154
"args": ["--input", "gs://my-bucket/data/input.csv"]
155
}
156
}
157
}
158
159
return pipes_dataproc_client.run(
160
context=context,
161
submit_job_params=submit_job_params
162
).get_results()
163
164
defs = Definitions(
165
assets=[spark_data_processing],
166
resources={
167
"pipes_dataproc_client": PipesDataprocJobClient(
168
client=JobControllerClient(),
169
context_injector=PipesGCSContextInjector(
170
bucket="my-pipes-bucket",
171
client=storage.Client()
172
),
173
message_reader=PipesGCSMessageReader(
174
bucket="my-pipes-bucket",
175
client=storage.Client()
176
)
177
)
178
}
179
)
180
```
181
182
### Advanced Pipes Configuration with Logging
183
184
```python
185
from dagster import asset, get_dagster_logger
186
from dagster_gcp.pipes import (
187
PipesDataprocJobClient,
188
PipesGCSMessageReader,
189
PipesGCSContextInjector,
190
PipesGCSLogReader
191
)
192
from google.cloud.dataproc_v1 import JobControllerClient
193
from google.cloud import storage
194
import sys
195
196
@asset
197
def ml_training_pipeline(
198
pipes_dataproc_client: PipesDataprocJobClient
199
) -> dict:
200
"""Execute ML training pipeline with comprehensive logging."""
201
202
# Configure job with custom cluster
203
submit_job_params = {
204
"project_id": "my-ml-project",
205
"region": "us-central1",
206
"job": {
207
"placement": {"cluster_name": "ml-training-cluster"},
208
"pyspark_job": {
209
"main_python_file_uri": "gs://my-ml-bucket/training/train_model.py",
210
"python_file_uris": [
211
"gs://my-ml-bucket/training/data_utils.py",
212
"gs://my-ml-bucket/training/model_utils.py"
213
],
214
"args": [
215
"--data-path", "gs://my-ml-bucket/datasets/",
216
"--model-output", "gs://my-ml-bucket/models/",
217
"--epochs", "100"
218
]
219
}
220
}
221
}
222
223
result = pipes_dataproc_client.run(
224
context=context,
225
submit_job_params=submit_job_params,
226
extras={"model_version": "v2.1"}
227
)
228
229
return {
230
"model_metrics": result.get_results(),
231
"training_logs": result.get_metadata(),
232
"job_duration": result.duration
233
}
234
235
# Configure with log readers
236
log_reader = PipesGCSLogReader(
237
bucket="my-ml-bucket",
238
key="logs/training.log",
239
client=storage.Client(),
240
target_stream=sys.stdout,
241
interval=5.0
242
)
243
244
defs = Definitions(
245
assets=[ml_training_pipeline],
246
resources={
247
"pipes_dataproc_client": PipesDataprocJobClient(
248
client=JobControllerClient(),
249
context_injector=PipesGCSContextInjector(
250
bucket="my-ml-bucket",
251
key_prefix="pipes/context",
252
client=storage.Client()
253
),
254
message_reader=PipesGCSMessageReader(
255
bucket="my-ml-bucket",
256
client=storage.Client(),
257
log_readers=[log_reader],
258
include_stdio_in_messages=True,
259
interval=5.0
260
),
261
forward_termination=True,
262
poll_interval=10.0
263
)
264
}
265
)
266
```
267
268
### Multiple External Processes
269
270
```python
271
from dagster import asset, multi_asset, AssetOut
272
from dagster_gcp.pipes import PipesDataprocJobClient
273
274
@multi_asset(
275
outs={
276
"processed_data": AssetOut(),
277
"model_predictions": AssetOut(),
278
"quality_metrics": AssetOut()
279
}
280
)
281
def batch_ml_pipeline(
282
pipes_dataproc_client: PipesDataprocJobClient
283
) -> tuple:
284
"""Execute multiple ML tasks in parallel via Pipes."""
285
286
# Data processing job
287
processing_params = {
288
"project_id": "my-project",
289
"region": "us-central1",
290
"job": {
291
"placement": {"cluster_name": "processing-cluster"},
292
"pyspark_job": {
293
"main_python_file_uri": "gs://my-bucket/jobs/data_processing.py"
294
}
295
}
296
}
297
298
# Model inference job
299
inference_params = {
300
"project_id": "my-project",
301
"region": "us-central1",
302
"job": {
303
"placement": {"cluster_name": "inference-cluster"},
304
"pyspark_job": {
305
"main_python_file_uri": "gs://my-bucket/jobs/model_inference.py"
306
}
307
}
308
}
309
310
# Quality assessment job
311
quality_params = {
312
"project_id": "my-project",
313
"region": "us-central1",
314
"job": {
315
"placement": {"cluster_name": "quality-cluster"},
316
"pyspark_job": {
317
"main_python_file_uri": "gs://my-bucket/jobs/quality_assessment.py"
318
}
319
}
320
}
321
322
# Execute jobs and collect results
323
processing_result = pipes_dataproc_client.run(context, processing_params)
324
inference_result = pipes_dataproc_client.run(context, inference_params)
325
quality_result = pipes_dataproc_client.run(context, quality_params)
326
327
return (
328
processing_result.get_results(),
329
inference_result.get_results(),
330
quality_result.get_results()
331
)
332
```
333
334
### Custom Context Injection
335
336
```python
337
from dagster import asset, Config
338
from dagster_gcp.pipes import PipesDataprocJobClient, PipesGCSContextInjector
339
340
class DataProcessingConfig(Config):
341
batch_date: str
342
processing_mode: str
343
output_format: str
344
345
@asset
346
def daily_data_processing(
347
pipes_dataproc_client: PipesDataprocJobClient,
348
config: DataProcessingConfig
349
) -> dict:
350
"""Process daily data with custom configuration."""
351
352
# The context injector will automatically pass config to external process
353
submit_job_params = {
354
"project_id": "my-project",
355
"region": "us-central1",
356
"job": {
357
"placement": {"cluster_name": "daily-processing"},
358
"pyspark_job": {
359
"main_python_file_uri": "gs://my-bucket/jobs/daily_processor.py",
360
"args": [
361
"--batch-date", config.batch_date,
362
"--mode", config.processing_mode,
363
"--format", config.output_format
364
]
365
}
366
}
367
}
368
369
return pipes_dataproc_client.run(
370
context=context,
371
submit_job_params=submit_job_params
372
).get_results()
373
```
374
375
### Error Handling and Debugging
376
377
```python
378
from dagster import asset, get_dagster_logger
379
from dagster_gcp.pipes import PipesDataprocJobClient
380
from dagster import Failure
381
382
@asset
383
def robust_data_pipeline(
384
pipes_dataproc_client: PipesDataprocJobClient
385
) -> dict:
386
"""Data pipeline with comprehensive error handling."""
387
388
logger = get_dagster_logger()
389
390
try:
391
submit_job_params = {
392
"project_id": "my-project",
393
"region": "us-central1",
394
"job": {
395
"placement": {"cluster_name": "robust-cluster"},
396
"pyspark_job": {
397
"main_python_file_uri": "gs://my-bucket/jobs/robust_pipeline.py"
398
}
399
}
400
}
401
402
result = pipes_dataproc_client.run(
403
context=context,
404
submit_job_params=submit_job_params
405
)
406
407
if not result.success:
408
logger.error(f"Pipeline failed: {result.get_metadata()}")
409
raise Failure("Data pipeline execution failed")
410
411
logger.info(f"Pipeline completed successfully: {result.get_results()}")
412
return result.get_results()
413
414
except Exception as e:
415
logger.error(f"Unexpected error in pipeline: {str(e)}")
416
417
# Get debug information
418
debug_text = pipes_dataproc_client.context_injector.no_messages_debug_text()
419
logger.error(f"Debug info: {debug_text}")
420
421
raise Failure(f"Pipeline failed with error: {str(e)}")
422
```