or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connections.mdindex.mdjob-management.mdmonitoring.mdrepositories.mdsql-operations.mdworkflows.md

job-management.mddocs/

0

# Job Management

1

2

The Databricks provider offers powerful job management capabilities for executing various types of tasks on Databricks clusters. This includes one-time runs, triggering existing jobs, and specialized notebook execution with comprehensive parameter support.

3

4

## Core Operators

5

6

### DatabricksSubmitRunOperator

7

8

Submit one-time runs to Databricks with flexible task configurations and cluster management.

9

10

```python { .api }

11

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

12

13

class DatabricksSubmitRunOperator(BaseOperator):

14

def __init__(

15

self,

16

*,

17

tasks: list[dict[str, Any]] | None = None,

18

spark_jar_task: dict[str, Any] | None = None,

19

notebook_task: dict[str, Any] | None = None,

20

spark_python_task: dict[str, Any] | None = None,

21

spark_submit_task: dict[str, Any] | None = None,

22

pipeline_task: dict[str, Any] | None = None,

23

python_wheel_task: dict[str, Any] | None = None,

24

dbt_task: dict[str, Any] | None = None,

25

sql_task: dict[str, Any] | None = None,

26

new_cluster: dict[str, Any] | None = None,

27

existing_cluster_id: str | None = None,

28

job_clusters: list[dict[str, Any]] | None = None,

29

libraries: list[dict[str, Any]] | None = None,

30

run_name: str | None = None,

31

timeout_seconds: int | None = None,

32

databricks_conn_id: str = "databricks_default",

33

polling_period_seconds: int = 30,

34

databricks_retry_limit: int = 3,

35

databricks_retry_delay: int = 1,

36

databricks_retry_args: dict[str, Any] | None = None,

37

do_xcom_push: bool = True,

38

idempotency_token: str | None = None,

39

access_control_list: list[dict[str, Any]] | None = None,

40

wait_for_termination: bool = True,

41

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),

42

git_source: dict[str, Any] | None = None,

43

**kwargs

44

) -> None:

45

"""

46

Submit a one-time run to Databricks.

47

48

Args:

49

tasks: List of tasks to execute in the run

50

spark_jar_task: Configuration for Spark JAR task

51

notebook_task: Configuration for notebook task

52

spark_python_task: Configuration for Spark Python task

53

spark_submit_task: Configuration for Spark submit task

54

pipeline_task: Configuration for Delta Live Tables pipeline task

55

python_wheel_task: Configuration for Python wheel task

56

dbt_task: Configuration for dbt task

57

sql_task: Configuration for SQL task

58

new_cluster: New cluster configuration for the run

59

existing_cluster_id: ID of existing cluster to use

60

job_clusters: Job cluster configurations

61

libraries: Libraries to install on the cluster

62

run_name: Name for the run (defaults to Airflow task name)

63

timeout_seconds: Maximum time to wait for job completion

64

databricks_conn_id: Airflow connection ID for Databricks

65

polling_period_seconds: Seconds between status polls

66

databricks_retry_limit: Number of retries for API calls

67

databricks_retry_delay: Seconds between retries

68

databricks_retry_args: Additional retry configuration

69

do_xcom_push: Whether to push run metadata to XCom

70

idempotency_token: Token to ensure idempotent execution

71

access_control_list: Access control permissions for the run

72

wait_for_termination: Whether to wait for run completion

73

deferrable: Whether to use deferrable execution

74

git_source: Git source configuration for code

75

"""

76

```

77

78

### DatabricksRunNowOperator

79

80

Trigger existing Databricks jobs with parameter overrides and monitoring.

81

82

```python { .api }

83

from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator

84

85

class DatabricksRunNowOperator(BaseOperator):

86

def __init__(

87

self,

88

*,

89

job_id: int | None = None,

90

job_name: str | None = None,

91

notebook_params: dict[str, str] | None = None,

92

python_params: list[str] | None = None,

93

spark_submit_params: list[str] | None = None,

94

jar_params: list[str] | None = None,

95

sql_params: dict[str, str] | None = None,

96

dbt_commands: list[str] | None = None,

97

python_named_params: dict[str, str] | None = None,

98

pipeline_params: dict[str, str] | None = None,

99

wait_for_termination: bool = True,

100

timeout_seconds: int | None = None,

101

databricks_conn_id: str = "databricks_default",

102

polling_period_seconds: int = 30,

103

databricks_retry_limit: int = 3,

104

databricks_retry_delay: int = 1,

105

do_xcom_push: bool = True,

106

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),

107

**kwargs

108

) -> None:

109

"""

110

Trigger an existing Databricks job.

111

112

Args:

113

job_id: Databricks job ID to trigger

114

job_name: Databricks job name to trigger (alternative to job_id)

115

notebook_params: Parameters for notebook tasks

116

python_params: Parameters for Python tasks

117

spark_submit_params: Parameters for Spark submit tasks

118

jar_params: Parameters for JAR tasks

119

sql_params: Parameters for SQL tasks

120

dbt_commands: Commands for dbt tasks

121

python_named_params: Named parameters for Python tasks

122

pipeline_params: Parameters for pipeline tasks

123

wait_for_termination: Whether to wait for job completion

124

timeout_seconds: Maximum time to wait for job completion

125

databricks_conn_id: Airflow connection ID for Databricks

126

polling_period_seconds: Seconds between status polls

127

databricks_retry_limit: Number of retries for API calls

128

databricks_retry_delay: Seconds between retries

129

do_xcom_push: Whether to push run metadata to XCom

130

deferrable: Whether to use deferrable execution

131

"""

132

```

133

134

### DatabricksNotebookOperator

135

136

Execute Databricks notebooks with parameter support and source management.

137

138

```python { .api }

139

from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator

140

141

class DatabricksNotebookOperator(BaseOperator):

142

def __init__(

143

self,

144

*,

145

notebook_path: str,

146

source: str = "WORKSPACE",

147

base_parameters: dict[str, str] | None = None,

148

new_cluster: dict[str, Any] | None = None,

149

existing_cluster_id: str | None = None,

150

job_cluster_key: str | None = None,

151

libraries: list[dict[str, Any]] | None = None,

152

run_name: str | None = None,

153

timeout_seconds: int | None = None,

154

databricks_conn_id: str = "databricks_default",

155

polling_period_seconds: int = 30,

156

databricks_retry_limit: int = 3,

157

databricks_retry_delay: int = 1,

158

do_xcom_push: bool = True,

159

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),

160

wait_for_termination: bool = True,

161

**kwargs

162

) -> None:

163

"""

164

Execute a Databricks notebook.

165

166

Args:

167

notebook_path: Path to the notebook in Databricks workspace or repo

168

source: Source type - "WORKSPACE" or "GIT"

169

base_parameters: Parameters to pass to the notebook

170

new_cluster: New cluster configuration for notebook execution

171

existing_cluster_id: ID of existing cluster to use

172

job_cluster_key: Key of job cluster to use (for workflow contexts)

173

libraries: Libraries to install on the cluster

174

run_name: Name for the notebook run

175

timeout_seconds: Maximum time to wait for notebook completion

176

databricks_conn_id: Airflow connection ID for Databricks

177

polling_period_seconds: Seconds between status polls

178

databricks_retry_limit: Number of retries for API calls

179

databricks_retry_delay: Seconds between retries

180

do_xcom_push: Whether to push run metadata to XCom

181

deferrable: Whether to use deferrable execution

182

wait_for_termination: Whether to wait for notebook completion

183

"""

184

```

185

186

## Usage Examples

187

188

### Basic Spark Job Submission

189

190

Execute a Python script on a new cluster:

191

192

```python

193

from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

194

195

spark_job = DatabricksSubmitRunOperator(

196

task_id='run_data_processing',

197

spark_python_task={

198

'python_file': 'dbfs:/mnt/scripts/process_data.py',

199

'parameters': [

200

'--input-path', '/data/raw/{{ ds }}',

201

'--output-path', '/data/processed/{{ ds }}',

202

'--partition-count', '10'

203

]

204

},

205

new_cluster={

206

'spark_version': '12.2.x-scala2.12',

207

'node_type_id': 'i3.xlarge',

208

'num_workers': 5,

209

'spark_conf': {

210

'spark.sql.adaptive.enabled': 'true',

211

'spark.sql.adaptive.coalescePartitions.enabled': 'true'

212

}

213

},

214

libraries=[

215

{'pypi': {'package': 'pandas==1.5.0'}},

216

{'pypi': {'package': 'numpy>=1.21.0'}}

217

],

218

timeout_seconds=7200,

219

databricks_conn_id='databricks_production'

220

)

221

```

222

223

### JAR Task with Custom Configuration

224

225

Run a Scala JAR with specific parameters:

226

227

```python

228

jar_job = DatabricksSubmitRunOperator(

229

task_id='run_scala_etl',

230

spark_jar_task={

231

'main_class_name': 'com.company.etl.DataProcessor',

232

'parameters': [

233

'--config', 'production.conf',

234

'--date', '{{ ds }}',

235

'--batch-size', '1000'

236

]

237

},

238

libraries=[

239

{'jar': 'dbfs:/mnt/jars/data-processor-1.2.3.jar'},

240

{'maven': {'coordinates': 'org.apache.kafka:kafka-clients:3.0.0'}}

241

],

242

existing_cluster_id='0123-456789-etl001',

243

run_name='ETL Process {{ ds }}',

244

idempotency_token='etl_{{ ds }}_{{ dag_run.run_id }}'

245

)

246

```

247

248

### Notebook Execution with Parameters

249

250

Execute a notebook with dynamic parameters:

251

252

```python

253

notebook_task = DatabricksNotebookOperator(

254

task_id='run_analysis_notebook',

255

notebook_path='/Shared/Analytics/Daily Report',

256

base_parameters={

257

'report_date': '{{ ds }}',

258

'customer_segment': 'premium',

259

'output_format': 'parquet',

260

'include_charts': 'true'

261

},

262

existing_cluster_id='0123-456789-analytics',

263

libraries=[

264

{'pypi': {'package': 'matplotlib'}},

265

{'pypi': {'package': 'seaborn'}}

266

],

267

timeout_seconds=1800

268

)

269

```

270

271

### Triggering Existing Jobs

272

273

Trigger a pre-configured Databricks job:

274

275

```python

276

trigger_job = DatabricksRunNowOperator(

277

task_id='trigger_daily_pipeline',

278

job_id=123456,

279

notebook_params={

280

'input_date': '{{ ds }}',

281

'refresh_mode': 'incremental'

282

},

283

python_params=['--verbose', '--config=prod'],

284

wait_for_termination=True,

285

timeout_seconds=3600

286

)

287

```

288

289

### SQL Task Execution

290

291

Submit a SQL task as part of a job run:

292

293

```python

294

sql_job = DatabricksSubmitRunOperator(

295

task_id='run_sql_aggregation',

296

sql_task={

297

'query': {

298

'query_id': 'abc123-def456-789' # Reference to saved query

299

},

300

'warehouse_id': 'warehouse-xyz789',

301

'parameters': {

302

'start_date': '{{ ds }}',

303

'end_date': '{{ next_ds }}'

304

}

305

},

306

run_name='SQL Aggregation {{ ds }}',

307

timeout_seconds=1200

308

)

309

```

310

311

### Deferrable Execution

312

313

Use deferrable mode for long-running jobs:

314

315

```python

316

long_running_job = DatabricksSubmitRunOperator(

317

task_id='long_ml_training',

318

spark_python_task={

319

'python_file': 'dbfs:/mnt/ml/train_model.py',

320

'parameters': ['--epochs', '100', '--data-path', '/data/training']

321

},

322

new_cluster={

323

'spark_version': '12.2.x-cpu-ml-scala2.12',

324

'node_type_id': 'i3.2xlarge',

325

'num_workers': 8

326

},

327

timeout_seconds=14400, # 4 hours

328

deferrable=True, # Use async execution

329

polling_period_seconds=60 # Check every minute

330

)

331

```

332

333

## Advanced Features

334

335

### Job Clusters

336

337

Define reusable cluster configurations within job submissions:

338

339

```python

340

job_with_clusters = DatabricksSubmitRunOperator(

341

task_id='multi_cluster_job',

342

tasks=[

343

{

344

'task_key': 'extract',

345

'job_cluster_key': 'extract_cluster',

346

'spark_python_task': {

347

'python_file': 'dbfs:/scripts/extract.py'

348

}

349

},

350

{

351

'task_key': 'transform',

352

'job_cluster_key': 'transform_cluster',

353

'depends_on': [{'task_key': 'extract'}],

354

'spark_python_task': {

355

'python_file': 'dbfs:/scripts/transform.py'

356

}

357

}

358

],

359

job_clusters=[

360

{

361

'job_cluster_key': 'extract_cluster',

362

'new_cluster': {

363

'spark_version': '12.2.x-scala2.12',

364

'node_type_id': 'i3.large',

365

'num_workers': 2

366

}

367

},

368

{

369

'job_cluster_key': 'transform_cluster',

370

'new_cluster': {

371

'spark_version': '12.2.x-scala2.12',

372

'node_type_id': 'i3.xlarge',

373

'num_workers': 8

374

}

375

}

376

]

377

)

378

```

379

380

### Access Control

381

382

Set permissions for job runs:

383

384

```python

385

secured_job = DatabricksSubmitRunOperator(

386

task_id='secured_data_job',

387

notebook_task={

388

'notebook_path': '/Secure/Financial Analysis'

389

},

390

existing_cluster_id='secure-cluster-001',

391

access_control_list=[

392

{

393

'user_name': 'analyst@company.com',

394

'permission_level': 'CAN_VIEW'

395

},

396

{

397

'group_name': 'data-engineers',

398

'permission_level': 'CAN_MANAGE_RUN'

399

}

400

]

401

)

402

```

403

404

### Git Source Integration

405

406

Execute code directly from Git repositories:

407

408

```python

409

git_job = DatabricksSubmitRunOperator(

410

task_id='run_from_git',

411

notebook_task={

412

'notebook_path': 'notebooks/data_pipeline.py',

413

'source': 'GIT'

414

},

415

git_source={

416

'git_url': 'https://github.com/company/data-pipelines.git',

417

'git_branch': 'main',

418

'git_provider': 'gitHub'

419

},

420

existing_cluster_id='dev-cluster-001'

421

)

422

```

423

424

## Error Handling and Monitoring

425

426

### XCom Integration

427

428

Automatically push job metadata to XCom for downstream tasks:

429

430

```python

431

# Job run pushes run_id, job_id, and run_page_url to XCom

432

job_run = DatabricksSubmitRunOperator(

433

task_id='data_processing',

434

spark_python_task={'python_file': 'dbfs:/scripts/process.py'},

435

existing_cluster_id='cluster-001',

436

do_xcom_push=True # Default is True

437

)

438

439

# Downstream task can access run information

440

def get_job_results(**context):

441

run_id = context['ti'].xcom_pull(task_ids='data_processing', key='run_id')

442

run_url = context['ti'].xcom_pull(task_ids='data_processing', key='run_page_url')

443

print(f"Job {run_id} completed. View at: {run_url}")

444

```

445

446

### Timeout and Retry Configuration

447

448

Configure robust error handling:

449

450

```python

451

resilient_job = DatabricksSubmitRunOperator(

452

task_id='resilient_processing',

453

spark_python_task={'python_file': 'dbfs:/scripts/process.py'},

454

existing_cluster_id='cluster-001',

455

timeout_seconds=3600, # 1 hour job timeout

456

databricks_retry_limit=5, # Retry API calls 5 times

457

databricks_retry_delay=30, # Wait 30 seconds between retries

458

databricks_retry_args={

459

'stop_max_attempt_number': 3,

460

'wait_exponential_multiplier': 1000

461

}

462

)

463

```

464

465

The job management operators provide comprehensive control over Databricks job execution with support for all major task types, cluster configurations, and monitoring capabilities. They integrate seamlessly with Airflow's templating, XCom, and error handling systems.