or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mdindex.mdjob-management.mdop-factories.mdpipes-integration.mdpyspark-step-launcher.mdresource-management.md

index.mddocs/

0

# Dagster Databricks

1

2

A comprehensive integration library for connecting Dagster data orchestration framework with Databricks analytics platform. Enables users to execute Dagster ops and assets on Databricks clusters through multiple execution patterns including PySpark step launcher, Databricks job runner, and Dagster Pipes integration.

3

4

## Package Information

5

6

- **Package Name**: dagster-databricks

7

- **Language**: Python

8

- **Installation**: `pip install dagster-databricks`

9

10

## Core Imports

11

12

```python

13

from dagster_databricks import (

14

DatabricksClient,

15

DatabricksError,

16

DatabricksJobRunner,

17

DatabricksPySparkStepLauncher,

18

databricks_pyspark_step_launcher,

19

PipesDatabricksClient,

20

PipesDbfsContextInjector,

21

PipesDbfsMessageReader,

22

PipesDbfsLogReader,

23

DatabricksClientResource,

24

databricks_client,

25

create_databricks_run_now_op,

26

create_databricks_submit_run_op,

27

)

28

```

29

30

## Basic Usage

31

32

```python

33

from dagster import job, op, Config

34

from dagster_databricks import (

35

DatabricksClientResource,

36

create_databricks_run_now_op,

37

PipesDatabricksClient,

38

PipesDbfsContextInjector,

39

PipesDbfsMessageReader,

40

)

41

42

# Define Databricks resource

43

databricks_resource = DatabricksClientResource(

44

host="https://your-workspace.cloud.databricks.com",

45

token="your-access-token"

46

)

47

48

# Create an op to run existing Databricks job

49

run_databricks_job = create_databricks_run_now_op(

50

databricks_job_id=123,

51

databricks_job_configuration={

52

"python_params": ["--input", "table1", "--output", "table2"]

53

}

54

)

55

56

# Use Pipes for bidirectional communication

57

@op

58

def process_with_pipes(context):

59

client = PipesDatabricksClient(

60

client=context.resources.databricks.workspace_client,

61

context_injector=PipesDbfsContextInjector(

62

client=context.resources.databricks.workspace_client

63

),

64

message_reader=PipesDbfsMessageReader(

65

client=context.resources.databricks.workspace_client

66

),

67

)

68

69

return client.run(

70

context=context,

71

task={

72

"notebook_task": {

73

"notebook_path": "/path/to/notebook",

74

"base_parameters": {"param1": "value1"}

75

}

76

},

77

cluster={"existing": "cluster-id"}

78

)

79

80

@job(resource_defs={"databricks": databricks_resource})

81

def my_databricks_job():

82

run_databricks_job()

83

process_with_pipes()

84

```

85

86

## Architecture

87

88

The dagster-databricks integration provides multiple execution patterns:

89

90

- **Direct Client API**: Low-level access to Databricks REST API through `DatabricksClient`

91

- **Job Runner**: High-level job submission and monitoring via `DatabricksJobRunner`

92

- **Step Launcher**: Execute individual Dagster ops on Databricks clusters using `DatabricksPySparkStepLauncher`

93

- **Pipes Integration**: Bidirectional communication with external Databricks processes through `PipesDatabricksClient`

94

- **Op Factories**: Pre-built ops for common Databricks workflows using `create_databricks_run_now_op` and `create_databricks_submit_run_op`

95

96

This multi-layered approach enables seamless workflow orchestration across Databricks environments while providing flexibility for different integration needs.

97

98

## Capabilities

99

100

### Core Client API

101

102

Low-level Databricks REST API client providing authentication, job management, file operations, and run monitoring capabilities. Supports multiple authentication methods including PAT, OAuth, and Azure service principal.

103

104

```python { .api }

105

class DatabricksClient:

106

def __init__(

107

self,

108

host: Optional[str] = None,

109

token: Optional[str] = None,

110

oauth_client_id: Optional[str] = None,

111

oauth_client_secret: Optional[str] = None,

112

azure_client_id: Optional[str] = None,

113

azure_client_secret: Optional[str] = None,

114

azure_tenant_id: Optional[str] = None,

115

workspace_id: Optional[str] = None,

116

): ...

117

118

@property

119

def workspace_client(self) -> WorkspaceClient: ...

120

121

def read_file(self, dbfs_path: str, block_size: int = 1024**2) -> bytes: ...

122

def put_file(self, file_obj: IO, dbfs_path: str, overwrite: bool = False, block_size: int = 1024**2) -> None: ...

123

def get_run_state(self, databricks_run_id: int) -> DatabricksRunState: ...

124

def wait_for_run_to_complete(

125

self,

126

logger: logging.Logger,

127

databricks_run_id: int,

128

poll_interval_sec: float,

129

max_wait_time_sec: float,

130

verbose_logs: bool = True,

131

) -> None: ...

132

```

133

134

[Core Client API](./core-client.md)

135

136

### Job Management

137

138

High-level job submission, monitoring, and log retrieval functionality through the DatabricksJobRunner. Handles job configuration, library installation, cluster management, and execution lifecycle.

139

140

```python { .api }

141

class DatabricksJobRunner:

142

def __init__(

143

self,

144

host: Optional[str] = None,

145

token: Optional[str] = None,

146

oauth_client_id: Optional[str] = None,

147

oauth_client_secret: Optional[str] = None,

148

azure_client_id: Optional[str] = None,

149

azure_client_secret: Optional[str] = None,

150

azure_tenant_id: Optional[str] = None,

151

poll_interval_sec: float = 5,

152

max_wait_time_sec: float = 86400,

153

): ...

154

155

@property

156

def client(self) -> DatabricksClient: ...

157

158

def submit_run(self, run_config: Mapping[str, Any], task: Mapping[str, Any]) -> int: ...

159

def retrieve_logs_for_run_id(self, log: logging.Logger, databricks_run_id: int) -> Optional[tuple[Optional[str], Optional[str]]]: ...

160

```

161

162

[Job Management](./job-management.md)

163

164

### PySpark Step Launcher

165

166

Step launcher that executes individual Dagster ops on Databricks clusters using PySpark. Provides cluster provisioning, code packaging, dependency management, and result collection.

167

168

```python { .api }

169

class DatabricksPySparkStepLauncher:

170

"""Step launcher for running PySpark steps on Databricks clusters."""

171

172

def databricks_pyspark_step_launcher(init_context: InitResourceContext) -> DatabricksPySparkStepLauncher: ...

173

174

class DatabricksConfig:

175

"""Configuration schema for Databricks step launcher."""

176

```

177

178

[PySpark Step Launcher](./pyspark-step-launcher.md)

179

180

### Pipes Integration

181

182

Bidirectional communication system for executing external code on Databricks with full context injection and result collection. Supports both standard and serverless Databricks environments.

183

184

```python { .api }

185

class PipesDatabricksClient(BasePipesDatabricksClient):

186

def __init__(

187

self,

188

client: WorkspaceClient,

189

context_injector: Optional[PipesContextInjector] = None,

190

message_reader: Optional[PipesMessageReader] = None,

191

poll_interval_seconds: float = 5,

192

forward_termination: bool = True,

193

): ...

194

195

def run(

196

self,

197

*,

198

context: Union[OpExecutionContext, AssetExecutionContext],

199

extras: Optional[PipesExtras] = None,

200

**kwargs,

201

): ...

202

203

class PipesDbfsContextInjector(PipesContextInjector):

204

def __init__(self, *, client: WorkspaceClient): ...

205

206

class PipesDbfsMessageReader(PipesBlobStoreMessageReader):

207

def __init__(

208

self,

209

*,

210

interval: float = 10,

211

client: WorkspaceClient,

212

include_stdio_in_messages: bool = False,

213

log_readers: Optional[Sequence[PipesLogReader]] = None,

214

): ...

215

216

class PipesDbfsLogReader(PipesChunkedLogReader):

217

def __init__(

218

self,

219

*,

220

interval: float = 10,

221

remote_log_name: Literal["stdout", "stderr"],

222

target_stream: TextIO,

223

client: WorkspaceClient,

224

debug_info: Optional[str] = None,

225

): ...

226

```

227

228

[Pipes Integration](./pipes-integration.md)

229

230

### Resource Management

231

232

Configurable resources for Databricks client management with support for multiple authentication methods and automatic credential handling.

233

234

```python { .api }

235

class DatabricksClientResource(ConfigurableResource):

236

host: Optional[str] = None

237

token: Optional[str] = None

238

oauth_credentials: Optional[OauthCredentials] = None

239

azure_credentials: Optional[AzureServicePrincipalCredentials] = None

240

workspace_id: Optional[str] = None

241

242

def get_client(self) -> DatabricksClient: ...

243

244

def databricks_client(init_context) -> DatabricksClient: ...

245

246

class OauthCredentials:

247

client_id: str

248

client_secret: str

249

250

class AzureServicePrincipalCredentials:

251

azure_client_id: str

252

azure_client_secret: str

253

azure_tenant_id: str

254

```

255

256

[Resource Management](./resource-management.md)

257

258

### Op Factories

259

260

Factory functions for creating pre-configured ops that handle common Databricks workflows including running existing jobs and submitting one-time tasks.

261

262

```python { .api }

263

def create_databricks_run_now_op(

264

databricks_job_id: int,

265

databricks_job_configuration: Optional[dict] = None,

266

poll_interval_seconds: float = 10,

267

max_wait_time_seconds: float = 86400,

268

name: Optional[str] = None,

269

databricks_resource_key: str = "databricks",

270

) -> OpDefinition: ...

271

272

def create_databricks_submit_run_op(

273

databricks_job_configuration: dict,

274

poll_interval_seconds: float = 10,

275

max_wait_time_seconds: float = 86400,

276

name: Optional[str] = None,

277

databricks_resource_key: str = "databricks",

278

) -> OpDefinition: ...

279

```

280

281

[Op Factories](./op-factories.md)

282

283

## Types

284

285

Core type definitions used throughout the Databricks integration:

286

287

```python { .api }

288

class DatabricksRunState(NamedTuple):

289

life_cycle_state: Optional[DatabricksRunLifeCycleState]

290

result_state: Optional[DatabricksRunResultState]

291

state_message: Optional[str]

292

293

def has_terminated(self) -> bool: ...

294

def is_skipped(self) -> bool: ...

295

def is_successful(self) -> bool: ...

296

@classmethod

297

def from_databricks(cls, run_state: jobs.RunState) -> DatabricksRunState: ...

298

299

class DatabricksRunResultState(str, Enum):

300

CANCELED = "CANCELED"

301

FAILED = "FAILED"

302

SUCCESS = "SUCCESS"

303

TIMEDOUT = "TIMEDOUT"

304

305

def is_successful(self) -> bool: ...

306

307

class DatabricksRunLifeCycleState(str, Enum):

308

BLOCKED = "BLOCKED"

309

INTERNAL_ERROR = "INTERNAL_ERROR"

310

QUEUED = "QUEUED"

311

PENDING = "PENDING"

312

RUNNING = "RUNNING"

313

SKIPPED = "SKIPPED"

314

TERMINATED = "TERMINATED"

315

TERMINATING = "TERMINATING"

316

WAITING_FOR_RETRY = "WAITING_FOR_RETRY"

317

318

def has_terminated(self) -> bool: ...

319

def is_skipped(self) -> bool: ...

320

321

class DatabricksError(Exception):

322

"""Custom exception for Databricks-related errors."""

323

```