or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mdindex.mdjob-management.mdop-factories.mdpipes-integration.mdpyspark-step-launcher.mdresource-management.md

op-factories.mddocs/

0

# Op Factories

1

2

Factory functions for creating pre-configured ops that handle common Databricks workflows including running existing jobs and submitting one-time tasks. These factories provide standardized patterns for integrating Databricks job execution into Dagster pipelines.

3

4

## Capabilities

5

6

### Run Existing Job Op Factory

7

8

Factory function that creates an op for running existing Databricks jobs by job ID.

9

10

```python { .api }

11

def create_databricks_run_now_op(

12

databricks_job_id: int,

13

databricks_job_configuration: Optional[dict] = None,

14

poll_interval_seconds: float = 10,

15

max_wait_time_seconds: float = 86400,

16

name: Optional[str] = None,

17

databricks_resource_key: str = "databricks",

18

) -> OpDefinition:

19

"""

20

Creates an op that launches an existing databricks job.

21

22

Parameters:

23

- databricks_job_id: The ID of the Databricks Job to be executed

24

- databricks_job_configuration: Configuration for triggering a new job run (job parameters, etc.)

25

- poll_interval_seconds: How often to poll the Databricks API to check job status

26

- max_wait_time_seconds: How long to wait for the job to finish before raising an error

27

- name: The name of the op (defaults to _databricks_run_now_op)

28

- databricks_resource_key: The name of the resource key used by this op

29

30

Returns:

31

OpDefinition: An op definition to run the Databricks Job

32

"""

33

```

34

35

### Submit New Job Op Factory

36

37

Factory function that creates an op for submitting one-time Databricks job runs with full task configuration.

38

39

```python { .api }

40

def create_databricks_submit_run_op(

41

databricks_job_configuration: dict,

42

poll_interval_seconds: float = 10,

43

max_wait_time_seconds: float = 86400,

44

name: Optional[str] = None,

45

databricks_resource_key: str = "databricks",

46

) -> OpDefinition:

47

"""

48

Creates an op that submits a one-time run of a set of tasks on Databricks.

49

50

Parameters:

51

- databricks_job_configuration: Configuration for submitting a one-time run (cluster, task, etc.)

52

- poll_interval_seconds: How often to poll the Databricks API to check job status

53

- max_wait_time_seconds: How long to wait for the job to finish before raising an error

54

- name: The name of the op (defaults to _databricks_submit_run_op)

55

- databricks_resource_key: The name of the resource key used by this op

56

57

Returns:

58

OpDefinition: An op definition to submit a one-time run on Databricks

59

"""

60

```

61

62

## Configuration Options

63

64

Both op factories support runtime configuration through their generated ops:

65

66

### Polling Configuration

67

68

```python { .api }

69

class DatabricksRunNowOpConfig:

70

"""Runtime configuration for run_now ops."""

71

poll_interval_seconds: float = 10

72

max_wait_time_seconds: float = 86400

73

74

class DatabricksSubmitRunOpConfig:

75

"""Runtime configuration for submit_run ops."""

76

poll_interval_seconds: float = 10

77

max_wait_time_seconds: float = 86400

78

```

79

80

## Usage Examples

81

82

### Running Existing Databricks Job

83

84

```python

85

from dagster import job

86

from dagster_databricks import create_databricks_run_now_op, DatabricksClientResource

87

88

# Define the resource

89

databricks_resource = DatabricksClientResource(

90

host="https://your-workspace.cloud.databricks.com",

91

token={"env": "DATABRICKS_TOKEN"}

92

)

93

94

# Create op for existing job

95

run_etl_job = create_databricks_run_now_op(

96

databricks_job_id=12345,

97

databricks_job_configuration={

98

"python_params": [

99

"--input", "raw_data_table",

100

"--output", "processed_data_table",

101

"--date", "2024-01-15"

102

],

103

"jar_params": [],

104

"notebook_params": {

105

"environment": "production"

106

}

107

},

108

poll_interval_seconds=30,

109

max_wait_time_seconds=7200, # 2 hours

110

name="run_daily_etl"

111

)

112

113

@job(resource_defs={"databricks": databricks_resource})

114

def daily_etl_pipeline():

115

run_etl_job()

116

```

117

118

### Submitting One-Time Job

119

120

```python

121

from dagster import job

122

from dagster_databricks import create_databricks_submit_run_op, DatabricksClientResource

123

124

# Create op for one-time job submission

125

submit_ml_training = create_databricks_submit_run_op(

126

databricks_job_configuration={

127

"run_name": "ML Model Training",

128

"new_cluster": {

129

"spark_version": "11.3.x-cpu-ml-scala2.12",

130

"node_type_id": "m5d.xlarge",

131

"num_workers": 4,

132

"custom_tags": {

133

"project": "ml-pipeline",

134

"environment": "production"

135

}

136

},

137

"libraries": [

138

{"pypi": {"package": "scikit-learn==1.1.0"}},

139

{"pypi": {"package": "mlflow>=2.0.0"}},

140

{"pypi": {"package": "pandas>=1.5.0"}}

141

],

142

"spark_python_task": {

143

"python_file": "s3://ml-scripts/train_model.py",

144

"parameters": [

145

"--model-type", "random-forest",

146

"--data-path", "s3://data-bucket/training-data/",

147

"--output-path", "s3://model-bucket/models/",

148

"--max-depth", "10",

149

"--n-estimators", "100"

150

]

151

},

152

"timeout_seconds": 14400, # 4 hours

153

"email_notifications": {

154

"on_success": ["ml-team@company.com"],

155

"on_failure": ["ml-team@company.com", "oncall@company.com"]

156

}

157

},

158

poll_interval_seconds=60,

159

max_wait_time_seconds=18000, # 5 hours

160

name="train_ml_model"

161

)

162

163

@job(resource_defs={"databricks": databricks_resource})

164

def ml_training_pipeline():

165

submit_ml_training()

166

```

167

168

### Notebook-Based Workflow

169

170

```python

171

# Create op for notebook execution

172

run_analysis_notebook = create_databricks_submit_run_op(

173

databricks_job_configuration={

174

"run_name": "Daily Analysis Report",

175

"existing_cluster_id": "analysis-cluster-id",

176

"notebook_task": {

177

"notebook_path": "/Workspace/Users/analyst@company.com/DailyAnalysis",

178

"base_parameters": {

179

"report_date": "{{ ds }}", # Can use templating

180

"output_format": "html",

181

"include_charts": "true"

182

}

183

},

184

"libraries": [

185

{"pypi": {"package": "plotly>=5.0.0"}},

186

{"pypi": {"package": "seaborn>=0.11.0"}}

187

]

188

},

189

name="daily_analysis"

190

)

191

192

@job(resource_defs={"databricks": databricks_resource})

193

def reporting_pipeline():

194

run_analysis_notebook()

195

```

196

197

### JAR-Based Job

198

199

```python

200

# Create op for Scala/Java JAR execution

201

run_spark_jar = create_databricks_submit_run_op(

202

databricks_job_configuration={

203

"run_name": "Spark JAR Processing",

204

"new_cluster": {

205

"spark_version": "11.3.x-scala2.12",

206

"node_type_id": "i3.xlarge",

207

"num_workers": 8

208

},

209

"spark_jar_task": {

210

"main_class_name": "com.company.DataProcessor",

211

"parameters": [

212

"--input-path", "s3://input-bucket/data/",

213

"--output-path", "s3://output-bucket/processed/",

214

"--partition-date", "2024-01-15"

215

]

216

},

217

"libraries": [

218

{"jar": "s3://jars-bucket/data-processor-1.0.jar"},

219

{"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}}

220

]

221

},

222

name="spark_jar_processor"

223

)

224

```

225

226

### Multi-Op Pipeline

227

228

```python

229

from dagster import job, op

230

231

# Create multiple ops for different stages

232

extract_data_op = create_databricks_run_now_op(

233

databricks_job_id=100, # Existing extraction job

234

name="extract_data"

235

)

236

237

transform_data_op = create_databricks_submit_run_op(

238

databricks_job_configuration={

239

"existing_cluster_id": "transform-cluster",

240

"notebook_task": {

241

"notebook_path": "/ETL/Transform",

242

"base_parameters": {"stage": "transform"}

243

}

244

},

245

name="transform_data"

246

)

247

248

load_data_op = create_databricks_run_now_op(

249

databricks_job_id=101, # Existing loading job

250

name="load_data"

251

)

252

253

@job(resource_defs={"databricks": databricks_resource})

254

def etl_pipeline():

255

# Chain the operations

256

extract_result = extract_data_op()

257

transform_result = transform_data_op(extract_result)

258

load_data_op(transform_result)

259

```

260

261

### Runtime Configuration

262

263

```python

264

from dagster import job, RunConfig

265

266

# Op with runtime configuration

267

flexible_job_op = create_databricks_run_now_op(

268

databricks_job_id=200,

269

name="flexible_databricks_job"

270

)

271

272

# Job that accepts runtime config

273

@job(resource_defs={"databricks": databricks_resource})

274

def configurable_job():

275

flexible_job_op()

276

277

# Execute with custom polling settings

278

if __name__ == "__main__":

279

run_config = RunConfig(

280

ops={

281

"flexible_databricks_job": {

282

"config": {

283

"poll_interval_seconds": 5, # Poll every 5 seconds

284

"max_wait_time_seconds": 1800 # 30 minute timeout

285

}

286

}

287

}

288

)

289

290

result = configurable_job.execute_in_process(run_config=run_config)

291

```

292

293

### Error Handling and Monitoring

294

295

```python

296

from dagster import job, op, In, Out, OpExecutionContext

297

298

# Custom op that uses the factory with additional logic

299

def create_monitored_databricks_op(job_id: int, name: str):

300

base_op = create_databricks_run_now_op(

301

databricks_job_id=job_id,

302

name=name

303

)

304

305

@op(

306

name=f"monitored_{name}",

307

ins={"start_after": In(Nothing)},

308

out=Out(int),

309

)

310

def monitored_op(context: OpExecutionContext):

311

try:

312

# Log start

313

context.log.info(f"Starting Databricks job {job_id}")

314

315

# Execute the base op

316

result = base_op(context)

317

318

# Additional monitoring/logging

319

context.log.info(f"Databricks job {job_id} completed successfully")

320

321

return result

322

323

except Exception as e:

324

context.log.error(f"Databricks job {job_id} failed: {str(e)}")

325

# Could add alerting, retry logic, etc.

326

raise

327

328

return monitored_op

329

330

# Use the enhanced op

331

enhanced_op = create_monitored_databricks_op(300, "critical_job")

332

333

@job(resource_defs={"databricks": databricks_resource})

334

def monitored_pipeline():

335

enhanced_op()

336

```