0
# Job Management
1
2
The Databricks provider offers powerful job management capabilities for executing various types of tasks on Databricks clusters. This includes one-time runs, triggering existing jobs, and specialized notebook execution with comprehensive parameter support.
3
4
## Core Operators
5
6
### DatabricksSubmitRunOperator
7
8
Submit one-time runs to Databricks with flexible task configurations and cluster management.
9
10
```python { .api }
11
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
12
13
class DatabricksSubmitRunOperator(BaseOperator):
14
def __init__(
15
self,
16
*,
17
tasks: list[dict[str, Any]] | None = None,
18
spark_jar_task: dict[str, Any] | None = None,
19
notebook_task: dict[str, Any] | None = None,
20
spark_python_task: dict[str, Any] | None = None,
21
spark_submit_task: dict[str, Any] | None = None,
22
pipeline_task: dict[str, Any] | None = None,
23
python_wheel_task: dict[str, Any] | None = None,
24
dbt_task: dict[str, Any] | None = None,
25
sql_task: dict[str, Any] | None = None,
26
new_cluster: dict[str, Any] | None = None,
27
existing_cluster_id: str | None = None,
28
job_clusters: list[dict[str, Any]] | None = None,
29
libraries: list[dict[str, Any]] | None = None,
30
run_name: str | None = None,
31
timeout_seconds: int | None = None,
32
databricks_conn_id: str = "databricks_default",
33
polling_period_seconds: int = 30,
34
databricks_retry_limit: int = 3,
35
databricks_retry_delay: int = 1,
36
databricks_retry_args: dict[str, Any] | None = None,
37
do_xcom_push: bool = True,
38
idempotency_token: str | None = None,
39
access_control_list: list[dict[str, Any]] | None = None,
40
wait_for_termination: bool = True,
41
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
42
git_source: dict[str, Any] | None = None,
43
**kwargs
44
) -> None:
45
"""
46
Submit a one-time run to Databricks.
47
48
Args:
49
tasks: List of tasks to execute in the run
50
spark_jar_task: Configuration for Spark JAR task
51
notebook_task: Configuration for notebook task
52
spark_python_task: Configuration for Spark Python task
53
spark_submit_task: Configuration for Spark submit task
54
pipeline_task: Configuration for Delta Live Tables pipeline task
55
python_wheel_task: Configuration for Python wheel task
56
dbt_task: Configuration for dbt task
57
sql_task: Configuration for SQL task
58
new_cluster: New cluster configuration for the run
59
existing_cluster_id: ID of existing cluster to use
60
job_clusters: Job cluster configurations
61
libraries: Libraries to install on the cluster
62
run_name: Name for the run (defaults to Airflow task name)
63
timeout_seconds: Maximum time to wait for job completion
64
databricks_conn_id: Airflow connection ID for Databricks
65
polling_period_seconds: Seconds between status polls
66
databricks_retry_limit: Number of retries for API calls
67
databricks_retry_delay: Seconds between retries
68
databricks_retry_args: Additional retry configuration
69
do_xcom_push: Whether to push run metadata to XCom
70
idempotency_token: Token to ensure idempotent execution
71
access_control_list: Access control permissions for the run
72
wait_for_termination: Whether to wait for run completion
73
deferrable: Whether to use deferrable execution
74
git_source: Git source configuration for code
75
"""
76
```
77
78
### DatabricksRunNowOperator
79
80
Trigger existing Databricks jobs with parameter overrides and monitoring.
81
82
```python { .api }
83
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
84
85
class DatabricksRunNowOperator(BaseOperator):
86
def __init__(
87
self,
88
*,
89
job_id: int | None = None,
90
job_name: str | None = None,
91
notebook_params: dict[str, str] | None = None,
92
python_params: list[str] | None = None,
93
spark_submit_params: list[str] | None = None,
94
jar_params: list[str] | None = None,
95
sql_params: dict[str, str] | None = None,
96
dbt_commands: list[str] | None = None,
97
python_named_params: dict[str, str] | None = None,
98
pipeline_params: dict[str, str] | None = None,
99
wait_for_termination: bool = True,
100
timeout_seconds: int | None = None,
101
databricks_conn_id: str = "databricks_default",
102
polling_period_seconds: int = 30,
103
databricks_retry_limit: int = 3,
104
databricks_retry_delay: int = 1,
105
do_xcom_push: bool = True,
106
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
107
**kwargs
108
) -> None:
109
"""
110
Trigger an existing Databricks job.
111
112
Args:
113
job_id: Databricks job ID to trigger
114
job_name: Databricks job name to trigger (alternative to job_id)
115
notebook_params: Parameters for notebook tasks
116
python_params: Parameters for Python tasks
117
spark_submit_params: Parameters for Spark submit tasks
118
jar_params: Parameters for JAR tasks
119
sql_params: Parameters for SQL tasks
120
dbt_commands: Commands for dbt tasks
121
python_named_params: Named parameters for Python tasks
122
pipeline_params: Parameters for pipeline tasks
123
wait_for_termination: Whether to wait for job completion
124
timeout_seconds: Maximum time to wait for job completion
125
databricks_conn_id: Airflow connection ID for Databricks
126
polling_period_seconds: Seconds between status polls
127
databricks_retry_limit: Number of retries for API calls
128
databricks_retry_delay: Seconds between retries
129
do_xcom_push: Whether to push run metadata to XCom
130
deferrable: Whether to use deferrable execution
131
"""
132
```
133
134
### DatabricksNotebookOperator
135
136
Execute Databricks notebooks with parameter support and source management.
137
138
```python { .api }
139
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
140
141
class DatabricksNotebookOperator(BaseOperator):
142
def __init__(
143
self,
144
*,
145
notebook_path: str,
146
source: str = "WORKSPACE",
147
base_parameters: dict[str, str] | None = None,
148
new_cluster: dict[str, Any] | None = None,
149
existing_cluster_id: str | None = None,
150
job_cluster_key: str | None = None,
151
libraries: list[dict[str, Any]] | None = None,
152
run_name: str | None = None,
153
timeout_seconds: int | None = None,
154
databricks_conn_id: str = "databricks_default",
155
polling_period_seconds: int = 30,
156
databricks_retry_limit: int = 3,
157
databricks_retry_delay: int = 1,
158
do_xcom_push: bool = True,
159
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
160
wait_for_termination: bool = True,
161
**kwargs
162
) -> None:
163
"""
164
Execute a Databricks notebook.
165
166
Args:
167
notebook_path: Path to the notebook in Databricks workspace or repo
168
source: Source type - "WORKSPACE" or "GIT"
169
base_parameters: Parameters to pass to the notebook
170
new_cluster: New cluster configuration for notebook execution
171
existing_cluster_id: ID of existing cluster to use
172
job_cluster_key: Key of job cluster to use (for workflow contexts)
173
libraries: Libraries to install on the cluster
174
run_name: Name for the notebook run
175
timeout_seconds: Maximum time to wait for notebook completion
176
databricks_conn_id: Airflow connection ID for Databricks
177
polling_period_seconds: Seconds between status polls
178
databricks_retry_limit: Number of retries for API calls
179
databricks_retry_delay: Seconds between retries
180
do_xcom_push: Whether to push run metadata to XCom
181
deferrable: Whether to use deferrable execution
182
wait_for_termination: Whether to wait for notebook completion
183
"""
184
```
185
186
## Usage Examples
187
188
### Basic Spark Job Submission
189
190
Execute a Python script on a new cluster:
191
192
```python
193
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
194
195
spark_job = DatabricksSubmitRunOperator(
196
task_id='run_data_processing',
197
spark_python_task={
198
'python_file': 'dbfs:/mnt/scripts/process_data.py',
199
'parameters': [
200
'--input-path', '/data/raw/{{ ds }}',
201
'--output-path', '/data/processed/{{ ds }}',
202
'--partition-count', '10'
203
]
204
},
205
new_cluster={
206
'spark_version': '12.2.x-scala2.12',
207
'node_type_id': 'i3.xlarge',
208
'num_workers': 5,
209
'spark_conf': {
210
'spark.sql.adaptive.enabled': 'true',
211
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
212
}
213
},
214
libraries=[
215
{'pypi': {'package': 'pandas==1.5.0'}},
216
{'pypi': {'package': 'numpy>=1.21.0'}}
217
],
218
timeout_seconds=7200,
219
databricks_conn_id='databricks_production'
220
)
221
```
222
223
### JAR Task with Custom Configuration
224
225
Run a Scala JAR with specific parameters:
226
227
```python
228
jar_job = DatabricksSubmitRunOperator(
229
task_id='run_scala_etl',
230
spark_jar_task={
231
'main_class_name': 'com.company.etl.DataProcessor',
232
'parameters': [
233
'--config', 'production.conf',
234
'--date', '{{ ds }}',
235
'--batch-size', '1000'
236
]
237
},
238
libraries=[
239
{'jar': 'dbfs:/mnt/jars/data-processor-1.2.3.jar'},
240
{'maven': {'coordinates': 'org.apache.kafka:kafka-clients:3.0.0'}}
241
],
242
existing_cluster_id='0123-456789-etl001',
243
run_name='ETL Process {{ ds }}',
244
idempotency_token='etl_{{ ds }}_{{ dag_run.run_id }}'
245
)
246
```
247
248
### Notebook Execution with Parameters
249
250
Execute a notebook with dynamic parameters:
251
252
```python
253
notebook_task = DatabricksNotebookOperator(
254
task_id='run_analysis_notebook',
255
notebook_path='/Shared/Analytics/Daily Report',
256
base_parameters={
257
'report_date': '{{ ds }}',
258
'customer_segment': 'premium',
259
'output_format': 'parquet',
260
'include_charts': 'true'
261
},
262
existing_cluster_id='0123-456789-analytics',
263
libraries=[
264
{'pypi': {'package': 'matplotlib'}},
265
{'pypi': {'package': 'seaborn'}}
266
],
267
timeout_seconds=1800
268
)
269
```
270
271
### Triggering Existing Jobs
272
273
Trigger a pre-configured Databricks job:
274
275
```python
276
trigger_job = DatabricksRunNowOperator(
277
task_id='trigger_daily_pipeline',
278
job_id=123456,
279
notebook_params={
280
'input_date': '{{ ds }}',
281
'refresh_mode': 'incremental'
282
},
283
python_params=['--verbose', '--config=prod'],
284
wait_for_termination=True,
285
timeout_seconds=3600
286
)
287
```
288
289
### SQL Task Execution
290
291
Submit a SQL task as part of a job run:
292
293
```python
294
sql_job = DatabricksSubmitRunOperator(
295
task_id='run_sql_aggregation',
296
sql_task={
297
'query': {
298
'query_id': 'abc123-def456-789' # Reference to saved query
299
},
300
'warehouse_id': 'warehouse-xyz789',
301
'parameters': {
302
'start_date': '{{ ds }}',
303
'end_date': '{{ next_ds }}'
304
}
305
},
306
run_name='SQL Aggregation {{ ds }}',
307
timeout_seconds=1200
308
)
309
```
310
311
### Deferrable Execution
312
313
Use deferrable mode for long-running jobs:
314
315
```python
316
long_running_job = DatabricksSubmitRunOperator(
317
task_id='long_ml_training',
318
spark_python_task={
319
'python_file': 'dbfs:/mnt/ml/train_model.py',
320
'parameters': ['--epochs', '100', '--data-path', '/data/training']
321
},
322
new_cluster={
323
'spark_version': '12.2.x-cpu-ml-scala2.12',
324
'node_type_id': 'i3.2xlarge',
325
'num_workers': 8
326
},
327
timeout_seconds=14400, # 4 hours
328
deferrable=True, # Use async execution
329
polling_period_seconds=60 # Check every minute
330
)
331
```
332
333
## Advanced Features
334
335
### Job Clusters
336
337
Define reusable cluster configurations within job submissions:
338
339
```python
340
job_with_clusters = DatabricksSubmitRunOperator(
341
task_id='multi_cluster_job',
342
tasks=[
343
{
344
'task_key': 'extract',
345
'job_cluster_key': 'extract_cluster',
346
'spark_python_task': {
347
'python_file': 'dbfs:/scripts/extract.py'
348
}
349
},
350
{
351
'task_key': 'transform',
352
'job_cluster_key': 'transform_cluster',
353
'depends_on': [{'task_key': 'extract'}],
354
'spark_python_task': {
355
'python_file': 'dbfs:/scripts/transform.py'
356
}
357
}
358
],
359
job_clusters=[
360
{
361
'job_cluster_key': 'extract_cluster',
362
'new_cluster': {
363
'spark_version': '12.2.x-scala2.12',
364
'node_type_id': 'i3.large',
365
'num_workers': 2
366
}
367
},
368
{
369
'job_cluster_key': 'transform_cluster',
370
'new_cluster': {
371
'spark_version': '12.2.x-scala2.12',
372
'node_type_id': 'i3.xlarge',
373
'num_workers': 8
374
}
375
}
376
]
377
)
378
```
379
380
### Access Control
381
382
Set permissions for job runs:
383
384
```python
385
secured_job = DatabricksSubmitRunOperator(
386
task_id='secured_data_job',
387
notebook_task={
388
'notebook_path': '/Secure/Financial Analysis'
389
},
390
existing_cluster_id='secure-cluster-001',
391
access_control_list=[
392
{
393
'user_name': 'analyst@company.com',
394
'permission_level': 'CAN_VIEW'
395
},
396
{
397
'group_name': 'data-engineers',
398
'permission_level': 'CAN_MANAGE_RUN'
399
}
400
]
401
)
402
```
403
404
### Git Source Integration
405
406
Execute code directly from Git repositories:
407
408
```python
409
git_job = DatabricksSubmitRunOperator(
410
task_id='run_from_git',
411
notebook_task={
412
'notebook_path': 'notebooks/data_pipeline.py',
413
'source': 'GIT'
414
},
415
git_source={
416
'git_url': 'https://github.com/company/data-pipelines.git',
417
'git_branch': 'main',
418
'git_provider': 'gitHub'
419
},
420
existing_cluster_id='dev-cluster-001'
421
)
422
```
423
424
## Error Handling and Monitoring
425
426
### XCom Integration
427
428
Automatically push job metadata to XCom for downstream tasks:
429
430
```python
431
# Job run pushes run_id, job_id, and run_page_url to XCom
432
job_run = DatabricksSubmitRunOperator(
433
task_id='data_processing',
434
spark_python_task={'python_file': 'dbfs:/scripts/process.py'},
435
existing_cluster_id='cluster-001',
436
do_xcom_push=True # Default is True
437
)
438
439
# Downstream task can access run information
440
def get_job_results(**context):
441
run_id = context['ti'].xcom_pull(task_ids='data_processing', key='run_id')
442
run_url = context['ti'].xcom_pull(task_ids='data_processing', key='run_page_url')
443
print(f"Job {run_id} completed. View at: {run_url}")
444
```
445
446
### Timeout and Retry Configuration
447
448
Configure robust error handling:
449
450
```python
451
resilient_job = DatabricksSubmitRunOperator(
452
task_id='resilient_processing',
453
spark_python_task={'python_file': 'dbfs:/scripts/process.py'},
454
existing_cluster_id='cluster-001',
455
timeout_seconds=3600, # 1 hour job timeout
456
databricks_retry_limit=5, # Retry API calls 5 times
457
databricks_retry_delay=30, # Wait 30 seconds between retries
458
databricks_retry_args={
459
'stop_max_attempt_number': 3,
460
'wait_exponential_multiplier': 1000
461
}
462
)
463
```
464
465
The job management operators provide comprehensive control over Databricks job execution with support for all major task types, cluster configurations, and monitoring capabilities. They integrate seamlessly with Airflow's templating, XCom, and error handling systems.