or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mdindex.mdjob-management.mdop-factories.mdpipes-integration.mdpyspark-step-launcher.mdresource-management.md

pyspark-step-launcher.mddocs/

0

# PySpark Step Launcher

1

2

Step launcher that executes individual Dagster ops on Databricks clusters using PySpark. This component provides cluster provisioning, code packaging, dependency management, and result collection for seamless integration of Dagster ops with Databricks compute resources.

3

4

## Capabilities

5

6

### DatabricksPySparkStepLauncher

7

8

Step launcher resource that runs individual Dagster ops on Databricks clusters with automatic code packaging and dependency management.

9

10

```python { .api }

11

class DatabricksPySparkStepLauncher:

12

"""Step launcher for running PySpark steps on Databricks clusters."""

13

```

14

15

### Resource Factory

16

17

Resource function that creates and configures the Databricks PySpark step launcher.

18

19

```python { .api }

20

def databricks_pyspark_step_launcher(init_context: InitResourceContext) -> DatabricksPySparkStepLauncher:

21

"""

22

Create a DatabricksPySparkStepLauncher resource from configuration.

23

24

Parameters:

25

- init_context: Dagster resource initialization context

26

27

Returns:

28

DatabricksPySparkStepLauncher: Configured step launcher instance

29

"""

30

```

31

32

### Configuration Schema

33

34

Configuration class defining all available options for the step launcher.

35

36

```python { .api }

37

class DatabricksConfig:

38

"""Configuration schema for Databricks step launcher."""

39

```

40

41

## Configuration Options

42

43

The step launcher supports comprehensive configuration through the resource definition:

44

45

### Basic Configuration

46

47

```python

48

from dagster import job

49

from dagster_databricks import databricks_pyspark_step_launcher

50

51

@job(

52

resource_defs={

53

"step_launcher": databricks_pyspark_step_launcher.configured({

54

"run_config": {

55

"cluster": {

56

"existing": "your-cluster-id"

57

}

58

},

59

"databricks_host": "https://your-workspace.cloud.databricks.com",

60

"databricks_token": {"env": "DATABRICKS_TOKEN"}

61

})

62

}

63

)

64

def my_databricks_job():

65

my_op()

66

```

67

68

### Run Configuration Structure

69

70

The `run_config` section defines cluster and job execution parameters:

71

72

```python

73

{

74

"run_config": {

75

# Cluster configuration (required)

76

"cluster": {

77

"existing": "cluster-id" # Use existing cluster

78

# OR create new cluster:

79

# "new": {

80

# "nodes": {

81

# "node_types": {

82

# "node_type_id": "i3.xlarge"

83

# }

84

# },

85

# "size": {"num_workers": 2},

86

# "spark_version": "11.3.x-scala2.12"

87

# }

88

},

89

90

# Additional libraries to install

91

"libraries": [

92

{"pypi": {"package": "pandas==1.5.0"}},

93

{"pypi": {"package": "scikit-learn>=1.0.0"}}

94

],

95

96

# Whether to install default Dagster libraries

97

"install_default_libraries": True,

98

99

# Job timeout and naming

100

"timeout_seconds": 3600,

101

"run_name": "Dagster Step Execution",

102

103

# Notifications

104

"email_notifications": {

105

"on_failure": ["admin@company.com"]

106

}

107

}

108

}

109

```

110

111

### Authentication Configuration

112

113

Multiple authentication methods are supported:

114

115

```python

116

# Personal Access Token

117

{

118

"databricks_host": "https://your-workspace.cloud.databricks.com",

119

"databricks_token": {"env": "DATABRICKS_TOKEN"}

120

}

121

122

# OAuth Service Principal

123

{

124

"databricks_host": "https://your-workspace.cloud.databricks.com",

125

"oauth_credentials": {

126

"client_id": {"env": "DATABRICKS_CLIENT_ID"},

127

"client_secret": {"env": "DATABRICKS_CLIENT_SECRET"}

128

}

129

}

130

131

# Azure Service Principal

132

{

133

"databricks_host": "https://your-workspace.cloud.databricks.com",

134

"azure_credentials": {

135

"azure_client_id": {"env": "AZURE_CLIENT_ID"},

136

"azure_client_secret": {"env": "AZURE_CLIENT_SECRET"},

137

"azure_tenant_id": {"env": "AZURE_TENANT_ID"}

138

}

139

}

140

```

141

142

### Storage Configuration

143

144

Configure where code and data are stored during execution:

145

146

```python

147

{

148

"storage": {

149

"s3": {

150

"s3_bucket": "my-dagster-bucket",

151

"s3_prefix": "dagster-runs/"

152

}

153

# OR DBFS storage:

154

# "dbfs": {

155

# "dbfs_prefix": "/dagster-runs/"

156

# }

157

}

158

}

159

```

160

161

### Environment and Secrets

162

163

Configure environment variables and secret injection:

164

165

```python

166

{

167

"env_variables": {

168

"SPARK_CONF_DIR": "/opt/spark/conf",

169

"PYTHONPATH": "/custom/python/path"

170

},

171

"secrets_to_env_variables": {

172

"API_KEY": {

173

"scope": "my-secret-scope",

174

"key": "api-key"

175

}

176

}

177

}

178

```

179

180

## Usage Examples

181

182

### Basic Step Launcher Setup

183

184

```python

185

from dagster import job, op, Config

186

from dagster_databricks import databricks_pyspark_step_launcher

187

188

@op

189

def process_data():

190

import pandas as pd

191

192

# This code runs on Databricks cluster

193

df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})

194

result = df.sum().to_dict()

195

return result

196

197

@job(

198

resource_defs={

199

"step_launcher": databricks_pyspark_step_launcher.configured({

200

"run_config": {

201

"cluster": {"existing": "your-cluster-id"}

202

},

203

"databricks_host": "https://your-workspace.cloud.databricks.com",

204

"databricks_token": {"env": "DATABRICKS_TOKEN"}

205

})

206

}

207

)

208

def my_databricks_job():

209

process_data()

210

```

211

212

### Advanced Configuration with New Cluster

213

214

```python

215

step_launcher_config = {

216

"run_config": {

217

"cluster": {

218

"new": {

219

"nodes": {

220

"node_types": {

221

"node_type_id": "i3.xlarge",

222

"driver_node_type_id": "i3.2xlarge"

223

}

224

},

225

"size": {

226

"autoscale": {"min_workers": 1, "max_workers": 5}

227

},

228

"spark_version": "11.3.x-scala2.12",

229

"custom_tags": {

230

"project": "ml-pipeline",

231

"cost-center": "data-science"

232

}

233

}

234

},

235

"libraries": [

236

{"pypi": {"package": "scikit-learn==1.1.0"}},

237

{"pypi": {"package": "mlflow>=2.0.0"}},

238

{"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}}

239

],

240

"timeout_seconds": 7200,

241

"email_notifications": {

242

"on_failure": ["data-team@company.com"]

243

}

244

},

245

"databricks_host": "https://your-workspace.cloud.databricks.com",

246

"oauth_credentials": {

247

"client_id": {"env": "DATABRICKS_CLIENT_ID"},

248

"client_secret": {"env": "DATABRICKS_CLIENT_SECRET"}

249

},

250

"storage": {

251

"s3": {

252

"s3_bucket": "my-dagster-storage",

253

"s3_prefix": "step-launcher-runs/"

254

}

255

},

256

"env_variables": {

257

"MLFLOW_TRACKING_URI": "databricks"

258

}

259

}

260

261

@job(

262

resource_defs={

263

"step_launcher": databricks_pyspark_step_launcher.configured(step_launcher_config)

264

}

265

)

266

def advanced_ml_pipeline():

267

train_model()

268

evaluate_model()

269

deploy_model()

270

```

271

272

### PySpark Operations

273

274

```python

275

@op

276

def spark_data_processing():

277

from pyspark.sql import SparkSession

278

279

# SparkSession is automatically available on Databricks

280

spark = SparkSession.builder.getOrCreate()

281

282

# Read data from various sources

283

df = spark.read.format("delta").load("/path/to/delta/table")

284

285

# Perform transformations

286

result_df = df.groupBy("category").agg({"amount": "sum"})

287

288

# Write results

289

result_df.write.format("delta").mode("overwrite").save("/path/to/output")

290

291

return result_df.count()

292

293

@op

294

def ml_training():

295

import pandas as pd

296

from sklearn.ensemble import RandomForestClassifier

297

from sklearn.model_selection import train_test_split

298

import mlflow

299

300

# Load data

301

df = pd.read_parquet("/dbfs/data/training-data.parquet")

302

303

# Prepare features

304

X = df.drop("target", axis=1)

305

y = df["target"]

306

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

307

308

# Train model with MLflow tracking

309

with mlflow.start_run():

310

model = RandomForestClassifier(n_estimators=100)

311

model.fit(X_train, y_train)

312

313

accuracy = model.score(X_test, y_test)

314

mlflow.log_metric("accuracy", accuracy)

315

mlflow.sklearn.log_model(model, "model")

316

317

return accuracy

318

```

319

320

### Local Development Considerations

321

322

When developing ops that will run on Databricks, consider:

323

324

```python

325

@op

326

def data_processing_op():

327

try:

328

# Try to use Spark if available (on Databricks)

329

from pyspark.sql import SparkSession

330

spark = SparkSession.getOrCreate()

331

# Spark-based processing

332

return process_with_spark(spark)

333

except ImportError:

334

# Fallback for local development

335

import pandas as pd

336

return process_with_pandas()

337

```