0
# Job Management
1
2
High-level job submission, monitoring, and log retrieval functionality through the DatabricksJobRunner. This component handles the complete lifecycle of Databricks jobs including configuration, library installation, cluster management, and execution monitoring.
3
4
## Capabilities
5
6
### DatabricksJobRunner
7
8
High-level interface for submitting and managing Databricks jobs with comprehensive configuration options and automatic monitoring.
9
10
```python { .api }
11
class DatabricksJobRunner:
12
"""Submits jobs created using Dagster config to Databricks, and monitors their progress."""
13
14
def __init__(
15
self,
16
host: Optional[str] = None,
17
token: Optional[str] = None,
18
oauth_client_id: Optional[str] = None,
19
oauth_client_secret: Optional[str] = None,
20
azure_client_id: Optional[str] = None,
21
azure_client_secret: Optional[str] = None,
22
azure_tenant_id: Optional[str] = None,
23
poll_interval_sec: float = 5,
24
max_wait_time_sec: float = 86400,
25
):
26
"""
27
Initialize the Databricks job runner.
28
29
Parameters:
30
- host: Databricks workspace URL
31
- token: Personal access token for authentication
32
- oauth_client_id: OAuth client ID for service principal authentication
33
- oauth_client_secret: OAuth client secret for service principal authentication
34
- azure_client_id: Azure service principal client ID
35
- azure_client_secret: Azure service principal client secret
36
- azure_tenant_id: Azure tenant ID
37
- poll_interval_sec: How often to poll Databricks for run status
38
- max_wait_time_sec: How long to wait for a run to complete before failing (default 24 hours)
39
"""
40
```
41
42
### Client Access
43
44
Access to the underlying DatabricksClient for advanced operations.
45
46
```python { .api }
47
@property
48
def client(self) -> DatabricksClient:
49
"""Return the underlying DatabricksClient object."""
50
```
51
52
### Job Submission
53
54
Submit new Databricks runs with comprehensive configuration options including cluster management, library installation, and task specification.
55
56
```python { .api }
57
def submit_run(self, run_config: Mapping[str, Any], task: Mapping[str, Any]) -> int:
58
"""
59
Submit a new run using the 'Runs submit' API.
60
61
Parameters:
62
- run_config: Configuration for the run including cluster, libraries, and settings
63
- task: Task specification (notebook_task, spark_python_task, etc.)
64
65
Returns:
66
int: The run ID of the submitted job
67
68
Raises:
69
DatabricksError: If job submission fails
70
"""
71
```
72
73
### Log Retrieval
74
75
Retrieve execution logs from completed Databricks runs for debugging and monitoring.
76
77
```python { .api }
78
def retrieve_logs_for_run_id(
79
self,
80
log: logging.Logger,
81
databricks_run_id: int
82
) -> Optional[tuple[Optional[str], Optional[str]]]:
83
"""
84
Retrieve the stdout and stderr logs for a run.
85
86
Parameters:
87
- log: Logger for status messages
88
- databricks_run_id: ID of the completed run
89
90
Returns:
91
Optional[tuple[Optional[str], Optional[str]]]: (stdout, stderr) logs or None if not available
92
"""
93
94
def wait_for_dbfs_logs(
95
self,
96
log: logging.Logger,
97
prefix: str,
98
cluster_id: str,
99
filename: str,
100
waiter_delay: int = 10,
101
waiter_max_attempts: int = 10,
102
) -> Optional[str]:
103
"""
104
Attempt to get logs from DBFS with retry logic.
105
106
Parameters:
107
- log: Logger for status messages
108
- prefix: DBFS prefix path for logs
109
- cluster_id: Databricks cluster ID
110
- filename: Log filename (stdout/stderr)
111
- waiter_delay: Delay between retry attempts in seconds
112
- waiter_max_attempts: Maximum number of retry attempts
113
114
Returns:
115
Optional[str]: Log content or None if retrieval fails
116
"""
117
```
118
119
## Run Configuration Structure
120
121
The `run_config` parameter for `submit_run` supports comprehensive job configuration:
122
123
### Cluster Configuration
124
125
```python
126
# Using existing cluster
127
run_config = {
128
"cluster": {
129
"existing": "cluster-id-here"
130
}
131
}
132
133
# Using new cluster
134
run_config = {
135
"cluster": {
136
"new": {
137
"nodes": {
138
"node_types": {
139
"node_type_id": "i3.xlarge",
140
"driver_node_type_id": "i3.xlarge" # optional
141
}
142
},
143
"size": {
144
"num_workers": 2
145
# OR autoscaling:
146
# "autoscale": {"min_workers": 1, "max_workers": 5}
147
},
148
"spark_version": "11.3.x-scala2.12",
149
"custom_tags": {"project": "my-project"}
150
}
151
}
152
}
153
```
154
155
### Library Configuration
156
157
```python
158
run_config = {
159
"libraries": [
160
{"pypi": {"package": "pandas==1.5.0"}},
161
{"pypi": {"package": "numpy>=1.20.0"}},
162
{"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}},
163
{"jar": "s3://my-bucket/my-jar.jar"}
164
],
165
"install_default_libraries": True # Automatically install dagster dependencies
166
}
167
```
168
169
### Task Configuration
170
171
```python
172
# Notebook task
173
task = {
174
"notebook_task": {
175
"notebook_path": "/Users/user@example.com/MyNotebook",
176
"base_parameters": {"param1": "value1", "param2": "value2"}
177
}
178
}
179
180
# Python task
181
task = {
182
"spark_python_task": {
183
"python_file": "s3://my-bucket/my-script.py",
184
"parameters": ["--input", "table1", "--output", "table2"]
185
}
186
}
187
188
# JAR task
189
task = {
190
"spark_jar_task": {
191
"main_class_name": "com.example.MyMainClass",
192
"parameters": ["arg1", "arg2"]
193
}
194
}
195
```
196
197
## Usage Examples
198
199
### Basic Job Submission
200
201
```python
202
from dagster_databricks import DatabricksJobRunner
203
204
runner = DatabricksJobRunner(
205
host="https://your-workspace.cloud.databricks.com",
206
token="your-access-token",
207
poll_interval_sec=10,
208
max_wait_time_sec=3600
209
)
210
211
run_config = {
212
"run_name": "My Dagster Job",
213
"cluster": {"existing": "existing-cluster-id"},
214
"libraries": [
215
{"pypi": {"package": "pandas==1.5.0"}}
216
]
217
}
218
219
task = {
220
"notebook_task": {
221
"notebook_path": "/Users/user@example.com/DataProcessing",
222
"base_parameters": {
223
"input_table": "raw_data",
224
"output_table": "processed_data"
225
}
226
}
227
}
228
229
# Submit and get run ID
230
run_id = runner.submit_run(run_config, task)
231
print(f"Submitted job with run ID: {run_id}")
232
```
233
234
### Advanced Configuration with New Cluster
235
236
```python
237
run_config = {
238
"run_name": "Advanced Processing Job",
239
"cluster": {
240
"new": {
241
"nodes": {
242
"node_types": {
243
"node_type_id": "i3.xlarge",
244
"driver_node_type_id": "i3.2xlarge"
245
}
246
},
247
"size": {
248
"autoscale": {"min_workers": 1, "max_workers": 10}
249
},
250
"spark_version": "11.3.x-scala2.12",
251
"custom_tags": {
252
"project": "data-pipeline",
253
"environment": "production"
254
}
255
}
256
},
257
"libraries": [
258
{"pypi": {"package": "scikit-learn==1.1.0"}},
259
{"pypi": {"package": "boto3==1.24.0"}}
260
],
261
"timeout_seconds": 7200, # 2 hour timeout
262
"email_notifications": {
263
"on_start": ["admin@company.com"],
264
"on_success": ["admin@company.com"],
265
"on_failure": ["admin@company.com", "oncall@company.com"]
266
}
267
}
268
269
task = {
270
"spark_python_task": {
271
"python_file": "s3://my-bucket/ml-pipeline.py",
272
"parameters": [
273
"--model", "random-forest",
274
"--data-path", "s3://data-bucket/training-data/",
275
"--output-path", "s3://model-bucket/models/"
276
]
277
}
278
}
279
280
run_id = runner.submit_run(run_config, task)
281
```
282
283
### Log Retrieval
284
285
```python
286
import logging
287
288
logger = logging.getLogger(__name__)
289
290
# Wait for job completion (automatic in submit_run)
291
# Then retrieve logs
292
logs = runner.retrieve_logs_for_run_id(logger, run_id)
293
if logs:
294
stdout, stderr = logs
295
if stdout:
296
print("STDOUT:", stdout)
297
if stderr:
298
print("STDERR:", stderr)
299
else:
300
print("Logs not available")
301
```