0
# Op Factories
1
2
Factory functions for creating pre-configured ops that handle common Databricks workflows including running existing jobs and submitting one-time tasks. These factories provide standardized patterns for integrating Databricks job execution into Dagster pipelines.
3
4
## Capabilities
5
6
### Run Existing Job Op Factory
7
8
Factory function that creates an op for running existing Databricks jobs by job ID.
9
10
```python { .api }
11
def create_databricks_run_now_op(
12
databricks_job_id: int,
13
databricks_job_configuration: Optional[dict] = None,
14
poll_interval_seconds: float = 10,
15
max_wait_time_seconds: float = 86400,
16
name: Optional[str] = None,
17
databricks_resource_key: str = "databricks",
18
) -> OpDefinition:
19
"""
20
Creates an op that launches an existing databricks job.
21
22
Parameters:
23
- databricks_job_id: The ID of the Databricks Job to be executed
24
- databricks_job_configuration: Configuration for triggering a new job run (job parameters, etc.)
25
- poll_interval_seconds: How often to poll the Databricks API to check job status
26
- max_wait_time_seconds: How long to wait for the job to finish before raising an error
27
- name: The name of the op (defaults to _databricks_run_now_op)
28
- databricks_resource_key: The name of the resource key used by this op
29
30
Returns:
31
OpDefinition: An op definition to run the Databricks Job
32
"""
33
```
34
35
### Submit New Job Op Factory
36
37
Factory function that creates an op for submitting one-time Databricks job runs with full task configuration.
38
39
```python { .api }
40
def create_databricks_submit_run_op(
41
databricks_job_configuration: dict,
42
poll_interval_seconds: float = 10,
43
max_wait_time_seconds: float = 86400,
44
name: Optional[str] = None,
45
databricks_resource_key: str = "databricks",
46
) -> OpDefinition:
47
"""
48
Creates an op that submits a one-time run of a set of tasks on Databricks.
49
50
Parameters:
51
- databricks_job_configuration: Configuration for submitting a one-time run (cluster, task, etc.)
52
- poll_interval_seconds: How often to poll the Databricks API to check job status
53
- max_wait_time_seconds: How long to wait for the job to finish before raising an error
54
- name: The name of the op (defaults to _databricks_submit_run_op)
55
- databricks_resource_key: The name of the resource key used by this op
56
57
Returns:
58
OpDefinition: An op definition to submit a one-time run on Databricks
59
"""
60
```
61
62
## Configuration Options
63
64
Both op factories support runtime configuration through their generated ops:
65
66
### Polling Configuration
67
68
```python { .api }
69
class DatabricksRunNowOpConfig:
70
"""Runtime configuration for run_now ops."""
71
poll_interval_seconds: float = 10
72
max_wait_time_seconds: float = 86400
73
74
class DatabricksSubmitRunOpConfig:
75
"""Runtime configuration for submit_run ops."""
76
poll_interval_seconds: float = 10
77
max_wait_time_seconds: float = 86400
78
```
79
80
## Usage Examples
81
82
### Running Existing Databricks Job
83
84
```python
85
from dagster import job
86
from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource
87
88
# Define the resource
89
databricks_resource = DatabricksClientResource(
90
host="https://your-workspace.cloud.databricks.com",
91
token={"env": "DATABRICKS_TOKEN"}
92
)
93
94
# Create op for existing job
95
run_etl_job = create_databricks_run_now_op(
96
databricks_job_id=12345,
97
databricks_job_configuration={
98
"python_params": [
99
"--input", "raw_data_table",
100
"--output", "processed_data_table",
101
"--date", "2024-01-15"
102
],
103
"jar_params": [],
104
"notebook_params": {
105
"environment": "production"
106
}
107
},
108
poll_interval_seconds=30,
109
max_wait_time_seconds=7200, # 2 hours
110
name="run_daily_etl"
111
)
112
113
@job(resource_defs={"databricks": databricks_resource})
114
def daily_etl_pipeline():
115
run_etl_job()
116
```
117
118
### Submitting One-Time Job
119
120
```python
121
from dagster import job
122
from dagster_databricks import create_databricks_submit_run_op, DatabricksClientResource
123
124
# Create op for one-time job submission
125
submit_ml_training = create_databricks_submit_run_op(
126
databricks_job_configuration={
127
"run_name": "ML Model Training",
128
"new_cluster": {
129
"spark_version": "11.3.x-cpu-ml-scala2.12",
130
"node_type_id": "m5d.xlarge",
131
"num_workers": 4,
132
"custom_tags": {
133
"project": "ml-pipeline",
134
"environment": "production"
135
}
136
},
137
"libraries": [
138
{"pypi": {"package": "scikit-learn==1.1.0"}},
139
{"pypi": {"package": "mlflow>=2.0.0"}},
140
{"pypi": {"package": "pandas>=1.5.0"}}
141
],
142
"spark_python_task": {
143
"python_file": "s3://ml-scripts/train_model.py",
144
"parameters": [
145
"--model-type", "random-forest",
146
"--data-path", "s3://data-bucket/training-data/",
147
"--output-path", "s3://model-bucket/models/",
148
"--max-depth", "10",
149
"--n-estimators", "100"
150
]
151
},
152
"timeout_seconds": 14400, # 4 hours
153
"email_notifications": {
154
"on_success": ["ml-team@company.com"],
155
"on_failure": ["ml-team@company.com", "oncall@company.com"]
156
}
157
},
158
poll_interval_seconds=60,
159
max_wait_time_seconds=18000, # 5 hours
160
name="train_ml_model"
161
)
162
163
@job(resource_defs={"databricks": databricks_resource})
164
def ml_training_pipeline():
165
submit_ml_training()
166
```
167
168
### Notebook-Based Workflow
169
170
```python
171
# Create op for notebook execution
172
run_analysis_notebook = create_databricks_submit_run_op(
173
databricks_job_configuration={
174
"run_name": "Daily Analysis Report",
175
"existing_cluster_id": "analysis-cluster-id",
176
"notebook_task": {
177
"notebook_path": "/Workspace/Users/analyst@company.com/DailyAnalysis",
178
"base_parameters": {
179
"report_date": "{{ ds }}", # Can use templating
180
"output_format": "html",
181
"include_charts": "true"
182
}
183
},
184
"libraries": [
185
{"pypi": {"package": "plotly>=5.0.0"}},
186
{"pypi": {"package": "seaborn>=0.11.0"}}
187
]
188
},
189
name="daily_analysis"
190
)
191
192
@job(resource_defs={"databricks": databricks_resource})
193
def reporting_pipeline():
194
run_analysis_notebook()
195
```
196
197
### JAR-Based Job
198
199
```python
200
# Create op for Scala/Java JAR execution
201
run_spark_jar = create_databricks_submit_run_op(
202
databricks_job_configuration={
203
"run_name": "Spark JAR Processing",
204
"new_cluster": {
205
"spark_version": "11.3.x-scala2.12",
206
"node_type_id": "i3.xlarge",
207
"num_workers": 8
208
},
209
"spark_jar_task": {
210
"main_class_name": "com.company.DataProcessor",
211
"parameters": [
212
"--input-path", "s3://input-bucket/data/",
213
"--output-path", "s3://output-bucket/processed/",
214
"--partition-date", "2024-01-15"
215
]
216
},
217
"libraries": [
218
{"jar": "s3://jars-bucket/data-processor-1.0.jar"},
219
{"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}}
220
]
221
},
222
name="spark_jar_processor"
223
)
224
```
225
226
### Multi-Op Pipeline
227
228
```python
229
from dagster import job, op
230
231
# Create multiple ops for different stages
232
extract_data_op = create_databricks_run_now_op(
233
databricks_job_id=100, # Existing extraction job
234
name="extract_data"
235
)
236
237
transform_data_op = create_databricks_submit_run_op(
238
databricks_job_configuration={
239
"existing_cluster_id": "transform-cluster",
240
"notebook_task": {
241
"notebook_path": "/ETL/Transform",
242
"base_parameters": {"stage": "transform"}
243
}
244
},
245
name="transform_data"
246
)
247
248
load_data_op = create_databricks_run_now_op(
249
databricks_job_id=101, # Existing loading job
250
name="load_data"
251
)
252
253
@job(resource_defs={"databricks": databricks_resource})
254
def etl_pipeline():
255
# Chain the operations
256
extract_result = extract_data_op()
257
transform_result = transform_data_op(extract_result)
258
load_data_op(transform_result)
259
```
260
261
### Runtime Configuration
262
263
```python
264
from dagster import job, RunConfig
265
266
# Op with runtime configuration
267
flexible_job_op = create_databricks_run_now_op(
268
databricks_job_id=200,
269
name="flexible_databricks_job"
270
)
271
272
# Job that accepts runtime config
273
@job(resource_defs={"databricks": databricks_resource})
274
def configurable_job():
275
flexible_job_op()
276
277
# Execute with custom polling settings
278
if __name__ == "__main__":
279
run_config = RunConfig(
280
ops={
281
"flexible_databricks_job": {
282
"config": {
283
"poll_interval_seconds": 5, # Poll every 5 seconds
284
"max_wait_time_seconds": 1800 # 30 minute timeout
285
}
286
}
287
}
288
)
289
290
result = configurable_job.execute_in_process(run_config=run_config)
291
```
292
293
### Error Handling and Monitoring
294
295
```python
296
from dagster import job, op, In, Out, OpExecutionContext
297
298
# Custom op that uses the factory with additional logic
299
def create_monitored_databricks_op(job_id: int, name: str):
300
base_op = create_databricks_run_now_op(
301
databricks_job_id=job_id,
302
name=name
303
)
304
305
@op(
306
name=f"monitored_{name}",
307
ins={"start_after": In(Nothing)},
308
out=Out(int),
309
)
310
def monitored_op(context: OpExecutionContext):
311
try:
312
# Log start
313
context.log.info(f"Starting Databricks job {job_id}")
314
315
# Execute the base op
316
result = base_op(context)
317
318
# Additional monitoring/logging
319
context.log.info(f"Databricks job {job_id} completed successfully")
320
321
return result
322
323
except Exception as e:
324
context.log.error(f"Databricks job {job_id} failed: {str(e)}")
325
# Could add alerting, retry logic, etc.
326
raise
327
328
return monitored_op
329
330
# Use the enhanced op
331
enhanced_op = create_monitored_databricks_op(300, "critical_job")
332
333
@job(resource_defs={"databricks": databricks_resource})
334
def monitored_pipeline():
335
enhanced_op()
336
```