or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mdindex.mdjob-management.mdop-factories.mdpipes-integration.mdpyspark-step-launcher.mdresource-management.md

job-management.mddocs/

0

# Job Management

1

2

High-level job submission, monitoring, and log retrieval functionality through the DatabricksJobRunner. This component handles the complete lifecycle of Databricks jobs including configuration, library installation, cluster management, and execution monitoring.

3

4

## Capabilities

5

6

### DatabricksJobRunner

7

8

High-level interface for submitting and managing Databricks jobs with comprehensive configuration options and automatic monitoring.

9

10

```python { .api }

11

class DatabricksJobRunner:

12

"""Submits jobs created using Dagster config to Databricks, and monitors their progress."""

13

14

def __init__(

15

self,

16

host: Optional[str] = None,

17

token: Optional[str] = None,

18

oauth_client_id: Optional[str] = None,

19

oauth_client_secret: Optional[str] = None,

20

azure_client_id: Optional[str] = None,

21

azure_client_secret: Optional[str] = None,

22

azure_tenant_id: Optional[str] = None,

23

poll_interval_sec: float = 5,

24

max_wait_time_sec: float = 86400,

25

):

26

"""

27

Initialize the Databricks job runner.

28

29

Parameters:

30

- host: Databricks workspace URL

31

- token: Personal access token for authentication

32

- oauth_client_id: OAuth client ID for service principal authentication

33

- oauth_client_secret: OAuth client secret for service principal authentication

34

- azure_client_id: Azure service principal client ID

35

- azure_client_secret: Azure service principal client secret

36

- azure_tenant_id: Azure tenant ID

37

- poll_interval_sec: How often to poll Databricks for run status

38

- max_wait_time_sec: How long to wait for a run to complete before failing (default 24 hours)

39

"""

40

```

41

42

### Client Access

43

44

Access to the underlying DatabricksClient for advanced operations.

45

46

```python { .api }

47

@property

48

def client(self) -> DatabricksClient:

49

"""Return the underlying DatabricksClient object."""

50

```

51

52

### Job Submission

53

54

Submit new Databricks runs with comprehensive configuration options including cluster management, library installation, and task specification.

55

56

```python { .api }

57

def submit_run(self, run_config: Mapping[str, Any], task: Mapping[str, Any]) -> int:

58

"""

59

Submit a new run using the 'Runs submit' API.

60

61

Parameters:

62

- run_config: Configuration for the run including cluster, libraries, and settings

63

- task: Task specification (notebook_task, spark_python_task, etc.)

64

65

Returns:

66

int: The run ID of the submitted job

67

68

Raises:

69

DatabricksError: If job submission fails

70

"""

71

```

72

73

### Log Retrieval

74

75

Retrieve execution logs from completed Databricks runs for debugging and monitoring.

76

77

```python { .api }

78

def retrieve_logs_for_run_id(

79

self,

80

log: logging.Logger,

81

databricks_run_id: int

82

) -> Optional[tuple[Optional[str], Optional[str]]]:

83

"""

84

Retrieve the stdout and stderr logs for a run.

85

86

Parameters:

87

- log: Logger for status messages

88

- databricks_run_id: ID of the completed run

89

90

Returns:

91

Optional[tuple[Optional[str], Optional[str]]]: (stdout, stderr) logs or None if not available

92

"""

93

94

def wait_for_dbfs_logs(

95

self,

96

log: logging.Logger,

97

prefix: str,

98

cluster_id: str,

99

filename: str,

100

waiter_delay: int = 10,

101

waiter_max_attempts: int = 10,

102

) -> Optional[str]:

103

"""

104

Attempt to get logs from DBFS with retry logic.

105

106

Parameters:

107

- log: Logger for status messages

108

- prefix: DBFS prefix path for logs

109

- cluster_id: Databricks cluster ID

110

- filename: Log filename (stdout/stderr)

111

- waiter_delay: Delay between retry attempts in seconds

112

- waiter_max_attempts: Maximum number of retry attempts

113

114

Returns:

115

Optional[str]: Log content or None if retrieval fails

116

"""

117

```

118

119

## Run Configuration Structure

120

121

The `run_config` parameter for `submit_run` supports comprehensive job configuration:

122

123

### Cluster Configuration

124

125

```python

126

# Using existing cluster

127

run_config = {

128

"cluster": {

129

"existing": "cluster-id-here"

130

}

131

}

132

133

# Using new cluster

134

run_config = {

135

"cluster": {

136

"new": {

137

"nodes": {

138

"node_types": {

139

"node_type_id": "i3.xlarge",

140

"driver_node_type_id": "i3.xlarge" # optional

141

}

142

},

143

"size": {

144

"num_workers": 2

145

# OR autoscaling:

146

# "autoscale": {"min_workers": 1, "max_workers": 5}

147

},

148

"spark_version": "11.3.x-scala2.12",

149

"custom_tags": {"project": "my-project"}

150

}

151

}

152

}

153

```

154

155

### Library Configuration

156

157

```python

158

run_config = {

159

"libraries": [

160

{"pypi": {"package": "pandas==1.5.0"}},

161

{"pypi": {"package": "numpy>=1.20.0"}},

162

{"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}},

163

{"jar": "s3://my-bucket/my-jar.jar"}

164

],

165

"install_default_libraries": True # Automatically install dagster dependencies

166

}

167

```

168

169

### Task Configuration

170

171

```python

172

# Notebook task

173

task = {

174

"notebook_task": {

175

"notebook_path": "/Users/user@example.com/MyNotebook",

176

"base_parameters": {"param1": "value1", "param2": "value2"}

177

}

178

}

179

180

# Python task

181

task = {

182

"spark_python_task": {

183

"python_file": "s3://my-bucket/my-script.py",

184

"parameters": ["--input", "table1", "--output", "table2"]

185

}

186

}

187

188

# JAR task

189

task = {

190

"spark_jar_task": {

191

"main_class_name": "com.example.MyMainClass",

192

"parameters": ["arg1", "arg2"]

193

}

194

}

195

```

196

197

## Usage Examples

198

199

### Basic Job Submission

200

201

```python

202

from dagster_databricks import DatabricksJobRunner

203

204

runner = DatabricksJobRunner(

205

host="https://your-workspace.cloud.databricks.com",

206

token="your-access-token",

207

poll_interval_sec=10,

208

max_wait_time_sec=3600

209

)

210

211

run_config = {

212

"run_name": "My Dagster Job",

213

"cluster": {"existing": "existing-cluster-id"},

214

"libraries": [

215

{"pypi": {"package": "pandas==1.5.0"}}

216

]

217

}

218

219

task = {

220

"notebook_task": {

221

"notebook_path": "/Users/user@example.com/DataProcessing",

222

"base_parameters": {

223

"input_table": "raw_data",

224

"output_table": "processed_data"

225

}

226

}

227

}

228

229

# Submit and get run ID

230

run_id = runner.submit_run(run_config, task)

231

print(f"Submitted job with run ID: {run_id}")

232

```

233

234

### Advanced Configuration with New Cluster

235

236

```python

237

run_config = {

238

"run_name": "Advanced Processing Job",

239

"cluster": {

240

"new": {

241

"nodes": {

242

"node_types": {

243

"node_type_id": "i3.xlarge",

244

"driver_node_type_id": "i3.2xlarge"

245

}

246

},

247

"size": {

248

"autoscale": {"min_workers": 1, "max_workers": 10}

249

},

250

"spark_version": "11.3.x-scala2.12",

251

"custom_tags": {

252

"project": "data-pipeline",

253

"environment": "production"

254

}

255

}

256

},

257

"libraries": [

258

{"pypi": {"package": "scikit-learn==1.1.0"}},

259

{"pypi": {"package": "boto3==1.24.0"}}

260

],

261

"timeout_seconds": 7200, # 2 hour timeout

262

"email_notifications": {

263

"on_start": ["admin@company.com"],

264

"on_success": ["admin@company.com"],

265

"on_failure": ["admin@company.com", "oncall@company.com"]

266

}

267

}

268

269

task = {

270

"spark_python_task": {

271

"python_file": "s3://my-bucket/ml-pipeline.py",

272

"parameters": [

273

"--model", "random-forest",

274

"--data-path", "s3://data-bucket/training-data/",

275

"--output-path", "s3://model-bucket/models/"

276

]

277

}

278

}

279

280

run_id = runner.submit_run(run_config, task)

281

```

282

283

### Log Retrieval

284

285

```python

286

import logging

287

288

logger = logging.getLogger(__name__)

289

290

# Wait for job completion (automatic in submit_run)

291

# Then retrieve logs

292

logs = runner.retrieve_logs_for_run_id(logger, run_id)

293

if logs:

294

stdout, stderr = logs

295

if stdout:

296

print("STDOUT:", stdout)

297

if stderr:

298

print("STDERR:", stderr)

299

else:

300

print("Logs not available")

301

```