0
# PySpark Step Launcher
1
2
Step launcher that executes individual Dagster ops on Databricks clusters using PySpark. This component provides cluster provisioning, code packaging, dependency management, and result collection for seamless integration of Dagster ops with Databricks compute resources.
3
4
## Capabilities
5
6
### DatabricksPySparkStepLauncher
7
8
Step launcher resource that runs individual Dagster ops on Databricks clusters with automatic code packaging and dependency management.
9
10
```python { .api }
11
class DatabricksPySparkStepLauncher:
12
"""Step launcher for running PySpark steps on Databricks clusters."""
13
```
14
15
### Resource Factory
16
17
Resource function that creates and configures the Databricks PySpark step launcher.
18
19
```python { .api }
20
def databricks_pyspark_step_launcher(init_context: InitResourceContext) -> DatabricksPySparkStepLauncher:
21
"""
22
Create a DatabricksPySparkStepLauncher resource from configuration.
23
24
Parameters:
25
- init_context: Dagster resource initialization context
26
27
Returns:
28
DatabricksPySparkStepLauncher: Configured step launcher instance
29
"""
30
```
31
32
### Configuration Schema
33
34
Configuration class defining all available options for the step launcher.
35
36
```python { .api }
37
class DatabricksConfig:
38
"""Configuration schema for Databricks step launcher."""
39
```
40
41
## Configuration Options
42
43
The step launcher supports comprehensive configuration through the resource definition:
44
45
### Basic Configuration
46
47
```python
48
from dagster import job
49
from dagster_databricks import databricks_pyspark_step_launcher
50
51
@job(
52
resource_defs={
53
"step_launcher": databricks_pyspark_step_launcher.configured({
54
"run_config": {
55
"cluster": {
56
"existing": "your-cluster-id"
57
}
58
},
59
"databricks_host": "https://your-workspace.cloud.databricks.com",
60
"databricks_token": {"env": "DATABRICKS_TOKEN"}
61
})
62
}
63
)
64
def my_databricks_job():
65
my_op()
66
```
67
68
### Run Configuration Structure
69
70
The `run_config` section defines cluster and job execution parameters:
71
72
```python
73
{
74
"run_config": {
75
# Cluster configuration (required)
76
"cluster": {
77
"existing": "cluster-id" # Use existing cluster
78
# OR create new cluster:
79
# "new": {
80
# "nodes": {
81
# "node_types": {
82
# "node_type_id": "i3.xlarge"
83
# }
84
# },
85
# "size": {"num_workers": 2},
86
# "spark_version": "11.3.x-scala2.12"
87
# }
88
},
89
90
# Additional libraries to install
91
"libraries": [
92
{"pypi": {"package": "pandas==1.5.0"}},
93
{"pypi": {"package": "scikit-learn>=1.0.0"}}
94
],
95
96
# Whether to install default Dagster libraries
97
"install_default_libraries": True,
98
99
# Job timeout and naming
100
"timeout_seconds": 3600,
101
"run_name": "Dagster Step Execution",
102
103
# Notifications
104
"email_notifications": {
105
"on_failure": ["admin@company.com"]
106
}
107
}
108
}
109
```
110
111
### Authentication Configuration
112
113
Multiple authentication methods are supported:
114
115
```python
116
# Personal Access Token
117
{
118
"databricks_host": "https://your-workspace.cloud.databricks.com",
119
"databricks_token": {"env": "DATABRICKS_TOKEN"}
120
}
121
122
# OAuth Service Principal
123
{
124
"databricks_host": "https://your-workspace.cloud.databricks.com",
125
"oauth_credentials": {
126
"client_id": {"env": "DATABRICKS_CLIENT_ID"},
127
"client_secret": {"env": "DATABRICKS_CLIENT_SECRET"}
128
}
129
}
130
131
# Azure Service Principal
132
{
133
"databricks_host": "https://your-workspace.cloud.databricks.com",
134
"azure_credentials": {
135
"azure_client_id": {"env": "AZURE_CLIENT_ID"},
136
"azure_client_secret": {"env": "AZURE_CLIENT_SECRET"},
137
"azure_tenant_id": {"env": "AZURE_TENANT_ID"}
138
}
139
}
140
```
141
142
### Storage Configuration
143
144
Configure where code and data are stored during execution:
145
146
```python
147
{
148
"storage": {
149
"s3": {
150
"s3_bucket": "my-dagster-bucket",
151
"s3_prefix": "dagster-runs/"
152
}
153
# OR DBFS storage:
154
# "dbfs": {
155
# "dbfs_prefix": "/dagster-runs/"
156
# }
157
}
158
}
159
```
160
161
### Environment and Secrets
162
163
Configure environment variables and secret injection:
164
165
```python
166
{
167
"env_variables": {
168
"SPARK_CONF_DIR": "/opt/spark/conf",
169
"PYTHONPATH": "/custom/python/path"
170
},
171
"secrets_to_env_variables": {
172
"API_KEY": {
173
"scope": "my-secret-scope",
174
"key": "api-key"
175
}
176
}
177
}
178
```
179
180
## Usage Examples
181
182
### Basic Step Launcher Setup
183
184
```python
185
from dagster import job, op, Config
186
from dagster_databricks import databricks_pyspark_step_launcher
187
188
@op
189
def process_data():
190
import pandas as pd
191
192
# This code runs on Databricks cluster
193
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
194
result = df.sum().to_dict()
195
return result
196
197
@job(
198
resource_defs={
199
"step_launcher": databricks_pyspark_step_launcher.configured({
200
"run_config": {
201
"cluster": {"existing": "your-cluster-id"}
202
},
203
"databricks_host": "https://your-workspace.cloud.databricks.com",
204
"databricks_token": {"env": "DATABRICKS_TOKEN"}
205
})
206
}
207
)
208
def my_databricks_job():
209
process_data()
210
```
211
212
### Advanced Configuration with New Cluster
213
214
```python
215
step_launcher_config = {
216
"run_config": {
217
"cluster": {
218
"new": {
219
"nodes": {
220
"node_types": {
221
"node_type_id": "i3.xlarge",
222
"driver_node_type_id": "i3.2xlarge"
223
}
224
},
225
"size": {
226
"autoscale": {"min_workers": 1, "max_workers": 5}
227
},
228
"spark_version": "11.3.x-scala2.12",
229
"custom_tags": {
230
"project": "ml-pipeline",
231
"cost-center": "data-science"
232
}
233
}
234
},
235
"libraries": [
236
{"pypi": {"package": "scikit-learn==1.1.0"}},
237
{"pypi": {"package": "mlflow>=2.0.0"}},
238
{"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}}
239
],
240
"timeout_seconds": 7200,
241
"email_notifications": {
242
"on_failure": ["data-team@company.com"]
243
}
244
},
245
"databricks_host": "https://your-workspace.cloud.databricks.com",
246
"oauth_credentials": {
247
"client_id": {"env": "DATABRICKS_CLIENT_ID"},
248
"client_secret": {"env": "DATABRICKS_CLIENT_SECRET"}
249
},
250
"storage": {
251
"s3": {
252
"s3_bucket": "my-dagster-storage",
253
"s3_prefix": "step-launcher-runs/"
254
}
255
},
256
"env_variables": {
257
"MLFLOW_TRACKING_URI": "databricks"
258
}
259
}
260
261
@job(
262
resource_defs={
263
"step_launcher": databricks_pyspark_step_launcher.configured(step_launcher_config)
264
}
265
)
266
def advanced_ml_pipeline():
267
train_model()
268
evaluate_model()
269
deploy_model()
270
```
271
272
### PySpark Operations
273
274
```python
275
@op
276
def spark_data_processing():
277
from pyspark.sql import SparkSession
278
279
# SparkSession is automatically available on Databricks
280
spark = SparkSession.builder.getOrCreate()
281
282
# Read data from various sources
283
df = spark.read.format("delta").load("/path/to/delta/table")
284
285
# Perform transformations
286
result_df = df.groupBy("category").agg({"amount": "sum"})
287
288
# Write results
289
result_df.write.format("delta").mode("overwrite").save("/path/to/output")
290
291
return result_df.count()
292
293
@op
294
def ml_training():
295
import pandas as pd
296
from sklearn.ensemble import RandomForestClassifier
297
from sklearn.model_selection import train_test_split
298
import mlflow
299
300
# Load data
301
df = pd.read_parquet("/dbfs/data/training-data.parquet")
302
303
# Prepare features
304
X = df.drop("target", axis=1)
305
y = df["target"]
306
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
307
308
# Train model with MLflow tracking
309
with mlflow.start_run():
310
model = RandomForestClassifier(n_estimators=100)
311
model.fit(X_train, y_train)
312
313
accuracy = model.score(X_test, y_test)
314
mlflow.log_metric("accuracy", accuracy)
315
mlflow.sklearn.log_model(model, "model")
316
317
return accuracy
318
```
319
320
### Local Development Considerations
321
322
When developing ops that will run on Databricks, consider:
323
324
```python
325
@op
326
def data_processing_op():
327
try:
328
# Try to use Spark if available (on Databricks)
329
from pyspark.sql import SparkSession
330
spark = SparkSession.getOrCreate()
331
# Spark-based processing
332
return process_with_spark(spark)
333
except ImportError:
334
# Fallback for local development
335
import pandas as pd
336
return process_with_pandas()
337
```