0
# Dagster Databricks
1
2
A comprehensive integration library for connecting Dagster data orchestration framework with Databricks analytics platform. Enables users to execute Dagster ops and assets on Databricks clusters through multiple execution patterns including PySpark step launcher, Databricks job runner, and Dagster Pipes integration.
3
4
## Package Information
5
6
- **Package Name**: dagster-databricks
7
- **Language**: Python
8
- **Installation**: `pip install dagster-databricks`
9
10
## Core Imports
11
12
```python
13
from dagster_databricks import (
14
DatabricksClient,
15
DatabricksError,
16
DatabricksJobRunner,
17
DatabricksPySparkStepLauncher,
18
databricks_pyspark_step_launcher,
19
PipesDatabricksClient,
20
PipesDbfsContextInjector,
21
PipesDbfsMessageReader,
22
PipesDbfsLogReader,
23
DatabricksClientResource,
24
databricks_client,
25
create_databricks_run_now_op,
26
create_databricks_submit_run_op,
27
)
28
```
29
30
## Basic Usage
31
32
```python
33
from dagster import job, op, Config
34
from dagster_databricks import (
35
DatabricksClientResource,
36
create_databricks_run_now_op,
37
PipesDatabricksClient,
38
PipesDbfsContextInjector,
39
PipesDbfsMessageReader,
40
)
41
42
# Define Databricks resource
43
databricks_resource = DatabricksClientResource(
44
host="https://your-workspace.cloud.databricks.com",
45
token="your-access-token"
46
)
47
48
# Create an op to run existing Databricks job
49
run_databricks_job = create_databricks_run_now_op(
50
databricks_job_id=123,
51
databricks_job_configuration={
52
"python_params": ["--input", "table1", "--output", "table2"]
53
}
54
)
55
56
# Use Pipes for bidirectional communication
57
@op
58
def process_with_pipes(context):
59
client = PipesDatabricksClient(
60
client=context.resources.databricks.workspace_client,
61
context_injector=PipesDbfsContextInjector(
62
client=context.resources.databricks.workspace_client
63
),
64
message_reader=PipesDbfsMessageReader(
65
client=context.resources.databricks.workspace_client
66
),
67
)
68
69
return client.run(
70
context=context,
71
task={
72
"notebook_task": {
73
"notebook_path": "/path/to/notebook",
74
"base_parameters": {"param1": "value1"}
75
}
76
},
77
cluster={"existing": "cluster-id"}
78
)
79
80
@job(resource_defs={"databricks": databricks_resource})
81
def my_databricks_job():
82
run_databricks_job()
83
process_with_pipes()
84
```
85
86
## Architecture
87
88
The dagster-databricks integration provides multiple execution patterns:
89
90
- **Direct Client API**: Low-level access to Databricks REST API through `DatabricksClient`
91
- **Job Runner**: High-level job submission and monitoring via `DatabricksJobRunner`
92
- **Step Launcher**: Execute individual Dagster ops on Databricks clusters using `DatabricksPySparkStepLauncher`
93
- **Pipes Integration**: Bidirectional communication with external Databricks processes through `PipesDatabricksClient`
94
- **Op Factories**: Pre-built ops for common Databricks workflows using `create_databricks_run_now_op` and `create_databricks_submit_run_op`
95
96
This multi-layered approach enables seamless workflow orchestration across Databricks environments while providing flexibility for different integration needs.
97
98
## Capabilities
99
100
### Core Client API
101
102
Low-level Databricks REST API client providing authentication, job management, file operations, and run monitoring capabilities. Supports multiple authentication methods including PAT, OAuth, and Azure service principal.
103
104
```python { .api }
105
class DatabricksClient:
106
def __init__(
107
self,
108
host: Optional[str] = None,
109
token: Optional[str] = None,
110
oauth_client_id: Optional[str] = None,
111
oauth_client_secret: Optional[str] = None,
112
azure_client_id: Optional[str] = None,
113
azure_client_secret: Optional[str] = None,
114
azure_tenant_id: Optional[str] = None,
115
workspace_id: Optional[str] = None,
116
): ...
117
118
@property
119
def workspace_client(self) -> WorkspaceClient: ...
120
121
def read_file(self, dbfs_path: str, block_size: int = 1024**2) -> bytes: ...
122
def put_file(self, file_obj: IO, dbfs_path: str, overwrite: bool = False, block_size: int = 1024**2) -> None: ...
123
def get_run_state(self, databricks_run_id: int) -> DatabricksRunState: ...
124
def wait_for_run_to_complete(
125
self,
126
logger: logging.Logger,
127
databricks_run_id: int,
128
poll_interval_sec: float,
129
max_wait_time_sec: float,
130
verbose_logs: bool = True,
131
) -> None: ...
132
```
133
134
[Core Client API](./core-client.md)
135
136
### Job Management
137
138
High-level job submission, monitoring, and log retrieval functionality through the DatabricksJobRunner. Handles job configuration, library installation, cluster management, and execution lifecycle.
139
140
```python { .api }
141
class DatabricksJobRunner:
142
def __init__(
143
self,
144
host: Optional[str] = None,
145
token: Optional[str] = None,
146
oauth_client_id: Optional[str] = None,
147
oauth_client_secret: Optional[str] = None,
148
azure_client_id: Optional[str] = None,
149
azure_client_secret: Optional[str] = None,
150
azure_tenant_id: Optional[str] = None,
151
poll_interval_sec: float = 5,
152
max_wait_time_sec: float = 86400,
153
): ...
154
155
@property
156
def client(self) -> DatabricksClient: ...
157
158
def submit_run(self, run_config: Mapping[str, Any], task: Mapping[str, Any]) -> int: ...
159
def retrieve_logs_for_run_id(self, log: logging.Logger, databricks_run_id: int) -> Optional[tuple[Optional[str], Optional[str]]]: ...
160
```
161
162
[Job Management](./job-management.md)
163
164
### PySpark Step Launcher
165
166
Step launcher that executes individual Dagster ops on Databricks clusters using PySpark. Provides cluster provisioning, code packaging, dependency management, and result collection.
167
168
```python { .api }
169
class DatabricksPySparkStepLauncher:
170
"""Step launcher for running PySpark steps on Databricks clusters."""
171
172
def databricks_pyspark_step_launcher(init_context: InitResourceContext) -> DatabricksPySparkStepLauncher: ...
173
174
class DatabricksConfig:
175
"""Configuration schema for Databricks step launcher."""
176
```
177
178
[PySpark Step Launcher](./pyspark-step-launcher.md)
179
180
### Pipes Integration
181
182
Bidirectional communication system for executing external code on Databricks with full context injection and result collection. Supports both standard and serverless Databricks environments.
183
184
```python { .api }
185
class PipesDatabricksClient(BasePipesDatabricksClient):
186
def __init__(
187
self,
188
client: WorkspaceClient,
189
context_injector: Optional[PipesContextInjector] = None,
190
message_reader: Optional[PipesMessageReader] = None,
191
poll_interval_seconds: float = 5,
192
forward_termination: bool = True,
193
): ...
194
195
def run(
196
self,
197
*,
198
context: Union[OpExecutionContext, AssetExecutionContext],
199
extras: Optional[PipesExtras] = None,
200
**kwargs,
201
): ...
202
203
class PipesDbfsContextInjector(PipesContextInjector):
204
def __init__(self, *, client: WorkspaceClient): ...
205
206
class PipesDbfsMessageReader(PipesBlobStoreMessageReader):
207
def __init__(
208
self,
209
*,
210
interval: float = 10,
211
client: WorkspaceClient,
212
include_stdio_in_messages: bool = False,
213
log_readers: Optional[Sequence[PipesLogReader]] = None,
214
): ...
215
216
class PipesDbfsLogReader(PipesChunkedLogReader):
217
def __init__(
218
self,
219
*,
220
interval: float = 10,
221
remote_log_name: Literal["stdout", "stderr"],
222
target_stream: TextIO,
223
client: WorkspaceClient,
224
debug_info: Optional[str] = None,
225
): ...
226
```
227
228
[Pipes Integration](./pipes-integration.md)
229
230
### Resource Management
231
232
Configurable resources for Databricks client management with support for multiple authentication methods and automatic credential handling.
233
234
```python { .api }
235
class DatabricksClientResource(ConfigurableResource):
236
host: Optional[str] = None
237
token: Optional[str] = None
238
oauth_credentials: Optional[OauthCredentials] = None
239
azure_credentials: Optional[AzureServicePrincipalCredentials] = None
240
workspace_id: Optional[str] = None
241
242
def get_client(self) -> DatabricksClient: ...
243
244
def databricks_client(init_context) -> DatabricksClient: ...
245
246
class OauthCredentials:
247
client_id: str
248
client_secret: str
249
250
class AzureServicePrincipalCredentials:
251
azure_client_id: str
252
azure_client_secret: str
253
azure_tenant_id: str
254
```
255
256
[Resource Management](./resource-management.md)
257
258
### Op Factories
259
260
Factory functions for creating pre-configured ops that handle common Databricks workflows including running existing jobs and submitting one-time tasks.
261
262
```python { .api }
263
def create_databricks_run_now_op(
264
databricks_job_id: int,
265
databricks_job_configuration: Optional[dict] = None,
266
poll_interval_seconds: float = 10,
267
max_wait_time_seconds: float = 86400,
268
name: Optional[str] = None,
269
databricks_resource_key: str = "databricks",
270
) -> OpDefinition: ...
271
272
def create_databricks_submit_run_op(
273
databricks_job_configuration: dict,
274
poll_interval_seconds: float = 10,
275
max_wait_time_seconds: float = 86400,
276
name: Optional[str] = None,
277
databricks_resource_key: str = "databricks",
278
) -> OpDefinition: ...
279
```
280
281
[Op Factories](./op-factories.md)
282
283
## Types
284
285
Core type definitions used throughout the Databricks integration:
286
287
```python { .api }
288
class DatabricksRunState(NamedTuple):
289
life_cycle_state: Optional[DatabricksRunLifeCycleState]
290
result_state: Optional[DatabricksRunResultState]
291
state_message: Optional[str]
292
293
def has_terminated(self) -> bool: ...
294
def is_skipped(self) -> bool: ...
295
def is_successful(self) -> bool: ...
296
@classmethod
297
def from_databricks(cls, run_state: jobs.RunState) -> DatabricksRunState: ...
298
299
class DatabricksRunResultState(str, Enum):
300
CANCELED = "CANCELED"
301
FAILED = "FAILED"
302
SUCCESS = "SUCCESS"
303
TIMEDOUT = "TIMEDOUT"
304
305
def is_successful(self) -> bool: ...
306
307
class DatabricksRunLifeCycleState(str, Enum):
308
BLOCKED = "BLOCKED"
309
INTERNAL_ERROR = "INTERNAL_ERROR"
310
QUEUED = "QUEUED"
311
PENDING = "PENDING"
312
RUNNING = "RUNNING"
313
SKIPPED = "SKIPPED"
314
TERMINATED = "TERMINATED"
315
TERMINATING = "TERMINATING"
316
WAITING_FOR_RETRY = "WAITING_FOR_RETRY"
317
318
def has_terminated(self) -> bool: ...
319
def is_skipped(self) -> bool: ...
320
321
class DatabricksError(Exception):
322
"""Custom exception for Databricks-related errors."""
323
```