0
# Pipes Integration
1
2
Bidirectional communication system for executing external code on Databricks with full context injection and result collection. Dagster Pipes enables seamless integration between Dagster orchestration and external Databricks workloads, supporting both standard and serverless environments.
3
4
## Capabilities
5
6
### PipesDatabricksClient
7
8
Main client for running external code on Databricks with bidirectional communication through the Dagster Pipes protocol.
9
10
```python { .api }
11
class PipesDatabricksClient(BasePipesDatabricksClient):
12
"""Pipes client for running external code on Databricks with bidirectional communication."""
13
14
def __init__(
15
self,
16
client: WorkspaceClient,
17
context_injector: Optional[PipesContextInjector] = None,
18
message_reader: Optional[PipesMessageReader] = None,
19
poll_interval_seconds: float = 5,
20
forward_termination: bool = True,
21
):
22
"""
23
Initialize the Pipes Databricks client.
24
25
Parameters:
26
- client: Databricks WorkspaceClient for API interactions
27
- context_injector: Component for injecting Dagster context into external process
28
- message_reader: Component for reading messages from external process
29
- poll_interval_seconds: How often to poll for job completion
30
- forward_termination: Whether to forward termination signals to external process
31
"""
32
33
def run(
34
self,
35
*,
36
context: Union[OpExecutionContext, AssetExecutionContext],
37
extras: Optional[PipesExtras] = None,
38
**kwargs,
39
) -> PipesClientCompletedInvocation:
40
"""
41
Execute external code on Databricks with bidirectional communication.
42
43
Parameters:
44
- context: Dagster execution context (op or asset)
45
- extras: Additional context data to pass to external process
46
- **kwargs: Additional arguments including task and cluster configuration
47
48
Returns:
49
PipesClientCompletedInvocation: Results from external process execution
50
"""
51
```
52
53
### Context Injection
54
55
Component for injecting Dagster context data into external Databricks processes via DBFS.
56
57
```python { .api }
58
class PipesDbfsContextInjector(PipesContextInjector):
59
"""A context injector that injects context into a Databricks job by writing a JSON file to DBFS."""
60
61
def __init__(self, *, client: WorkspaceClient):
62
"""
63
Initialize the DBFS context injector.
64
65
Parameters:
66
- client: Databricks WorkspaceClient for DBFS operations
67
"""
68
69
def inject_context(self, context: PipesContextData) -> Iterator[PipesParams]:
70
"""
71
Inject context to external environment by writing it to an automatically-generated
72
DBFS temporary file as JSON and exposing the path to the file.
73
74
Parameters:
75
- context: The context data to inject
76
77
Yields:
78
PipesParams: Parameters that can be used by the external process to locate and
79
load the injected context data
80
"""
81
```
82
83
### Message Reading
84
85
Component for reading messages and logs from external Databricks processes via DBFS.
86
87
```python { .api }
88
class PipesDbfsMessageReader(PipesBlobStoreMessageReader):
89
"""Message reader that reads messages by periodically reading message chunks from an
90
automatically-generated temporary directory on DBFS."""
91
92
def __init__(
93
self,
94
*,
95
interval: float = 10,
96
client: WorkspaceClient,
97
include_stdio_in_messages: bool = False,
98
log_readers: Optional[Sequence[PipesLogReader]] = None,
99
):
100
"""
101
Initialize the DBFS message reader.
102
103
Parameters:
104
- interval: Interval in seconds between attempts to download a chunk
105
- client: Databricks WorkspaceClient for DBFS operations
106
- include_stdio_in_messages: Whether to send stdout/stderr to Dagster via Pipes messages
107
- log_readers: Additional log readers for logs on DBFS
108
"""
109
110
def get_params(self) -> Iterator[PipesParams]:
111
"""
112
Get parameters for the external process to write messages.
113
114
Yields:
115
PipesParams: Parameters including DBFS path for message writing
116
"""
117
118
def messages_are_readable(self, params: PipesParams) -> bool:
119
"""
120
Check if messages are available to read from DBFS.
121
122
Parameters:
123
- params: Parameters from get_params()
124
125
Returns:
126
bool: True if messages can be read
127
"""
128
129
def download_messages_chunk(self, index: int, params: PipesParams) -> Optional[str]:
130
"""
131
Download a specific message chunk from DBFS.
132
133
Parameters:
134
- index: Index of the message chunk to download
135
- params: Parameters from get_params()
136
137
Returns:
138
Optional[str]: Message chunk content or None if not available
139
"""
140
```
141
142
### Log Reading
143
144
Component for reading execution logs from DBFS files, typically used for capturing stdout/stderr from Databricks clusters.
145
146
```python { .api }
147
class PipesDbfsLogReader(PipesChunkedLogReader):
148
"""Reader that reads a log file from DBFS."""
149
150
def __init__(
151
self,
152
*,
153
interval: float = 10,
154
remote_log_name: Literal["stdout", "stderr"],
155
target_stream: TextIO,
156
client: WorkspaceClient,
157
debug_info: Optional[str] = None,
158
):
159
"""
160
Initialize the DBFS log reader.
161
162
Parameters:
163
- interval: Interval in seconds between attempts to download a log chunk
164
- remote_log_name: The name of the log file to read ("stdout" or "stderr")
165
- target_stream: The stream to which to forward log chunks that have been read
166
- client: Databricks WorkspaceClient for DBFS operations
167
- debug_info: Optional message containing debug information about the log reader
168
"""
169
```
170
171
### Serverless Support
172
173
Client variant for Databricks serverless environments with specialized handling.
174
175
```python { .api }
176
class PipesDatabricksServerlessClient(BasePipesDatabricksClient):
177
"""Pipes client for Databricks serverless environments."""
178
```
179
180
## Usage Examples
181
182
### Basic Pipes Setup
183
184
```python
185
from dagster import asset, AssetExecutionContext
186
from dagster_databricks import (
187
PipesDatabricksClient,
188
PipesDbfsContextInjector,
189
PipesDbfsMessageReader,
190
DatabricksClientResource,
191
)
192
193
databricks_resource = DatabricksClientResource(
194
host="https://your-workspace.cloud.databricks.com",
195
token={"env": "DATABRICKS_TOKEN"}
196
)
197
198
@asset
199
def my_databricks_asset(context: AssetExecutionContext):
200
client = PipesDatabricksClient(
201
client=context.resources.databricks.workspace_client,
202
context_injector=PipesDbfsContextInjector(
203
client=context.resources.databricks.workspace_client
204
),
205
message_reader=PipesDbfsMessageReader(
206
client=context.resources.databricks.workspace_client
207
),
208
)
209
210
return client.run(
211
context=context,
212
task={
213
"notebook_task": {
214
"notebook_path": "/Workspace/Users/user@example.com/my_notebook",
215
"base_parameters": {
216
"input_table": "raw_data",
217
"output_table": "processed_data"
218
}
219
}
220
},
221
cluster={"existing": "your-cluster-id"}
222
).get_results()
223
```
224
225
### Advanced Pipes Configuration
226
227
```python
228
@asset
229
def advanced_databricks_processing(context: AssetExecutionContext):
230
# Configure message reader with logging
231
message_reader = PipesDbfsMessageReader(
232
client=context.resources.databricks.workspace_client,
233
interval=5, # Check for messages every 5 seconds
234
include_stdio_in_messages=True, # Include stdout/stderr
235
)
236
237
# Configure context injector
238
context_injector = PipesDbfsContextInjector(
239
client=context.resources.databricks.workspace_client
240
)
241
242
client = PipesDatabricksClient(
243
client=context.resources.databricks.workspace_client,
244
context_injector=context_injector,
245
message_reader=message_reader,
246
poll_interval_seconds=10,
247
forward_termination=True,
248
)
249
250
# Run with comprehensive configuration
251
result = client.run(
252
context=context,
253
extras={"custom_param": "custom_value"}, # Additional context
254
task={
255
"spark_python_task": {
256
"python_file": "dbfs:/FileStore/scripts/my_script.py",
257
"parameters": ["--mode", "production", "--debug", "false"]
258
}
259
},
260
cluster={
261
"new": {
262
"spark_version": "11.3.x-scala2.12",
263
"node_type_id": "i3.xlarge",
264
"num_workers": 2,
265
"custom_tags": {"project": "data-pipeline"}
266
}
267
},
268
libraries=[
269
{"pypi": {"package": "dagster-pipes"}},
270
{"pypi": {"package": "pandas>=1.5.0"}},
271
],
272
timeout_seconds=3600,
273
)
274
275
return result.get_results()
276
```
277
278
### External Script Integration
279
280
Example of external Python script that uses Pipes for communication:
281
282
```python
283
# my_script.py - runs on Databricks
284
from dagster_pipes import open_dagster_pipes, PipesDbfsContextLoader, PipesDbfsMessageWriter
285
286
def main():
287
# Initialize Pipes communication
288
with open_dagster_pipes(
289
context_loader=PipesDbfsContextLoader(),
290
message_writer=PipesDbfsMessageWriter(),
291
) as pipes:
292
# Access Dagster context
293
dagster_context = pipes.get_dagster_context()
294
295
# Log messages back to Dagster
296
pipes.log.info("Starting external processing")
297
298
# Perform work
299
import pandas as pd
300
df = pd.read_parquet("/dbfs/data/input.parquet")
301
302
# Report progress
303
pipes.log.info(f"Loaded {len(df)} rows")
304
305
# Process data
306
result_df = df.groupby("category").agg({"value": "sum"})
307
308
# Save results
309
result_df.to_parquet("/dbfs/data/output.parquet")
310
311
# Report asset materialization
312
pipes.report_asset_materialization(
313
asset_key="processed_data",
314
metadata={
315
"num_rows": len(result_df),
316
"num_categories": result_df["category"].nunique(),
317
}
318
)
319
320
# Return final results
321
return {"status": "success", "rows_processed": len(df)}
322
323
if __name__ == "__main__":
324
main()
325
```
326
327
### Notebook Integration
328
329
Example of using Pipes in a Databricks notebook:
330
331
```python
332
# Cell 1: Initialize Pipes
333
from dagster_pipes import open_dagster_pipes, PipesDbfsContextLoader, PipesDbfsMessageWriter
334
335
context_loader = PipesDbfsContextLoader()
336
message_writer = PipesDbfsMessageWriter()
337
338
pipes = open_dagster_pipes(
339
context_loader=context_loader,
340
message_writer=message_writer,
341
).__enter__()
342
343
# Cell 2: Access Dagster context
344
dagster_context = pipes.get_dagster_context()
345
pipes.log.info("Notebook execution started")
346
347
# Cell 3: Data processing
348
import pandas as pd
349
import numpy as np
350
351
# Load data
352
df = spark.read.table("my_catalog.my_schema.input_table").toPandas()
353
pipes.log.info(f"Loaded {len(df)} rows from input table")
354
355
# Process data
356
df["processed_value"] = df["raw_value"] * 2
357
result_df = df.groupby("category").agg({"processed_value": ["sum", "mean", "count"]})
358
359
# Cell 4: Save and report results
360
# Save to Delta table
361
spark.createDataFrame(result_df).write.mode("overwrite").saveAsTable("my_catalog.my_schema.output_table")
362
363
# Report to Dagster
364
pipes.report_asset_materialization(
365
asset_key="processed_output",
366
metadata={
367
"num_categories": len(result_df),
368
"total_value": float(result_df[("processed_value", "sum")].sum()),
369
}
370
)
371
372
pipes.log.info("Processing completed successfully")
373
374
# Cell 5: Cleanup
375
pipes.__exit__(None, None, None)
376
```
377
378
### Error Handling
379
380
```python
381
@asset
382
def robust_databricks_processing(context: AssetExecutionContext):
383
client = PipesDatabricksClient(
384
client=context.resources.databricks.workspace_client,
385
context_injector=PipesDbfsContextInjector(
386
client=context.resources.databricks.workspace_client
387
),
388
message_reader=PipesDbfsMessageReader(
389
client=context.resources.databricks.workspace_client,
390
include_stdio_in_messages=True, # Capture errors in logs
391
),
392
)
393
394
try:
395
result = client.run(
396
context=context,
397
task={
398
"notebook_task": {
399
"notebook_path": "/path/to/notebook",
400
}
401
},
402
cluster={"existing": "cluster-id"},
403
timeout_seconds=1800, # 30 minute timeout
404
)
405
406
# Check for errors in the result
407
if result.get_exit_code() != 0:
408
context.log.error(f"External process failed with exit code: {result.get_exit_code()}")
409
raise Exception("External Databricks process failed")
410
411
return result.get_results()
412
413
except Exception as e:
414
context.log.error(f"Databricks execution failed: {str(e)}")
415
# Optionally retrieve logs for debugging
416
raise
417
```