or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mdindex.mdjob-management.mdop-factories.mdpipes-integration.mdpyspark-step-launcher.mdresource-management.md

pipes-integration.mddocs/

0

# Pipes Integration

1

2

Bidirectional communication system for executing external code on Databricks with full context injection and result collection. Dagster Pipes enables seamless integration between Dagster orchestration and external Databricks workloads, supporting both standard and serverless environments.

3

4

## Capabilities

5

6

### PipesDatabricksClient

7

8

Main client for running external code on Databricks with bidirectional communication through the Dagster Pipes protocol.

9

10

```python { .api }

11

class PipesDatabricksClient(BasePipesDatabricksClient):

12

"""Pipes client for running external code on Databricks with bidirectional communication."""

13

14

def __init__(

15

self,

16

client: WorkspaceClient,

17

context_injector: Optional[PipesContextInjector] = None,

18

message_reader: Optional[PipesMessageReader] = None,

19

poll_interval_seconds: float = 5,

20

forward_termination: bool = True,

21

):

22

"""

23

Initialize the Pipes Databricks client.

24

25

Parameters:

26

- client: Databricks WorkspaceClient for API interactions

27

- context_injector: Component for injecting Dagster context into external process

28

- message_reader: Component for reading messages from external process

29

- poll_interval_seconds: How often to poll for job completion

30

- forward_termination: Whether to forward termination signals to external process

31

"""

32

33

def run(

34

self,

35

*,

36

context: Union[OpExecutionContext, AssetExecutionContext],

37

extras: Optional[PipesExtras] = None,

38

**kwargs,

39

) -> PipesClientCompletedInvocation:

40

"""

41

Execute external code on Databricks with bidirectional communication.

42

43

Parameters:

44

- context: Dagster execution context (op or asset)

45

- extras: Additional context data to pass to external process

46

- **kwargs: Additional arguments including task and cluster configuration

47

48

Returns:

49

PipesClientCompletedInvocation: Results from external process execution

50

"""

51

```

52

53

### Context Injection

54

55

Component for injecting Dagster context data into external Databricks processes via DBFS.

56

57

```python { .api }

58

class PipesDbfsContextInjector(PipesContextInjector):

59

"""A context injector that injects context into a Databricks job by writing a JSON file to DBFS."""

60

61

def __init__(self, *, client: WorkspaceClient):

62

"""

63

Initialize the DBFS context injector.

64

65

Parameters:

66

- client: Databricks WorkspaceClient for DBFS operations

67

"""

68

69

def inject_context(self, context: PipesContextData) -> Iterator[PipesParams]:

70

"""

71

Inject context to external environment by writing it to an automatically-generated

72

DBFS temporary file as JSON and exposing the path to the file.

73

74

Parameters:

75

- context: The context data to inject

76

77

Yields:

78

PipesParams: Parameters that can be used by the external process to locate and

79

load the injected context data

80

"""

81

```

82

83

### Message Reading

84

85

Component for reading messages and logs from external Databricks processes via DBFS.

86

87

```python { .api }

88

class PipesDbfsMessageReader(PipesBlobStoreMessageReader):

89

"""Message reader that reads messages by periodically reading message chunks from an

90

automatically-generated temporary directory on DBFS."""

91

92

def __init__(

93

self,

94

*,

95

interval: float = 10,

96

client: WorkspaceClient,

97

include_stdio_in_messages: bool = False,

98

log_readers: Optional[Sequence[PipesLogReader]] = None,

99

):

100

"""

101

Initialize the DBFS message reader.

102

103

Parameters:

104

- interval: Interval in seconds between attempts to download a chunk

105

- client: Databricks WorkspaceClient for DBFS operations

106

- include_stdio_in_messages: Whether to send stdout/stderr to Dagster via Pipes messages

107

- log_readers: Additional log readers for logs on DBFS

108

"""

109

110

def get_params(self) -> Iterator[PipesParams]:

111

"""

112

Get parameters for the external process to write messages.

113

114

Yields:

115

PipesParams: Parameters including DBFS path for message writing

116

"""

117

118

def messages_are_readable(self, params: PipesParams) -> bool:

119

"""

120

Check if messages are available to read from DBFS.

121

122

Parameters:

123

- params: Parameters from get_params()

124

125

Returns:

126

bool: True if messages can be read

127

"""

128

129

def download_messages_chunk(self, index: int, params: PipesParams) -> Optional[str]:

130

"""

131

Download a specific message chunk from DBFS.

132

133

Parameters:

134

- index: Index of the message chunk to download

135

- params: Parameters from get_params()

136

137

Returns:

138

Optional[str]: Message chunk content or None if not available

139

"""

140

```

141

142

### Log Reading

143

144

Component for reading execution logs from DBFS files, typically used for capturing stdout/stderr from Databricks clusters.

145

146

```python { .api }

147

class PipesDbfsLogReader(PipesChunkedLogReader):

148

"""Reader that reads a log file from DBFS."""

149

150

def __init__(

151

self,

152

*,

153

interval: float = 10,

154

remote_log_name: Literal["stdout", "stderr"],

155

target_stream: TextIO,

156

client: WorkspaceClient,

157

debug_info: Optional[str] = None,

158

):

159

"""

160

Initialize the DBFS log reader.

161

162

Parameters:

163

- interval: Interval in seconds between attempts to download a log chunk

164

- remote_log_name: The name of the log file to read ("stdout" or "stderr")

165

- target_stream: The stream to which to forward log chunks that have been read

166

- client: Databricks WorkspaceClient for DBFS operations

167

- debug_info: Optional message containing debug information about the log reader

168

"""

169

```

170

171

### Serverless Support

172

173

Client variant for Databricks serverless environments with specialized handling.

174

175

```python { .api }

176

class PipesDatabricksServerlessClient(BasePipesDatabricksClient):

177

"""Pipes client for Databricks serverless environments."""

178

```

179

180

## Usage Examples

181

182

### Basic Pipes Setup

183

184

```python

185

from dagster import asset, AssetExecutionContext

186

from dagster_databricks import (

187

PipesDatabricksClient,

188

PipesDbfsContextInjector,

189

PipesDbfsMessageReader,

190

DatabricksClientResource,

191

)

192

193

databricks_resource = DatabricksClientResource(

194

host="https://your-workspace.cloud.databricks.com",

195

token={"env": "DATABRICKS_TOKEN"}

196

)

197

198

@asset

199

def my_databricks_asset(context: AssetExecutionContext):

200

client = PipesDatabricksClient(

201

client=context.resources.databricks.workspace_client,

202

context_injector=PipesDbfsContextInjector(

203

client=context.resources.databricks.workspace_client

204

),

205

message_reader=PipesDbfsMessageReader(

206

client=context.resources.databricks.workspace_client

207

),

208

)

209

210

return client.run(

211

context=context,

212

task={

213

"notebook_task": {

214

"notebook_path": "/Workspace/Users/user@example.com/my_notebook",

215

"base_parameters": {

216

"input_table": "raw_data",

217

"output_table": "processed_data"

218

}

219

}

220

},

221

cluster={"existing": "your-cluster-id"}

222

).get_results()

223

```

224

225

### Advanced Pipes Configuration

226

227

```python

228

@asset

229

def advanced_databricks_processing(context: AssetExecutionContext):

230

# Configure message reader with logging

231

message_reader = PipesDbfsMessageReader(

232

client=context.resources.databricks.workspace_client,

233

interval=5, # Check for messages every 5 seconds

234

include_stdio_in_messages=True, # Include stdout/stderr

235

)

236

237

# Configure context injector

238

context_injector = PipesDbfsContextInjector(

239

client=context.resources.databricks.workspace_client

240

)

241

242

client = PipesDatabricksClient(

243

client=context.resources.databricks.workspace_client,

244

context_injector=context_injector,

245

message_reader=message_reader,

246

poll_interval_seconds=10,

247

forward_termination=True,

248

)

249

250

# Run with comprehensive configuration

251

result = client.run(

252

context=context,

253

extras={"custom_param": "custom_value"}, # Additional context

254

task={

255

"spark_python_task": {

256

"python_file": "dbfs:/FileStore/scripts/my_script.py",

257

"parameters": ["--mode", "production", "--debug", "false"]

258

}

259

},

260

cluster={

261

"new": {

262

"spark_version": "11.3.x-scala2.12",

263

"node_type_id": "i3.xlarge",

264

"num_workers": 2,

265

"custom_tags": {"project": "data-pipeline"}

266

}

267

},

268

libraries=[

269

{"pypi": {"package": "dagster-pipes"}},

270

{"pypi": {"package": "pandas>=1.5.0"}},

271

],

272

timeout_seconds=3600,

273

)

274

275

return result.get_results()

276

```

277

278

### External Script Integration

279

280

Example of external Python script that uses Pipes for communication:

281

282

```python

283

# my_script.py - runs on Databricks

284

from dagster_pipes import open_dagster_pipes, PipesDbfsContextLoader, PipesDbfsMessageWriter

285

286

def main():

287

# Initialize Pipes communication

288

with open_dagster_pipes(

289

context_loader=PipesDbfsContextLoader(),

290

message_writer=PipesDbfsMessageWriter(),

291

) as pipes:

292

# Access Dagster context

293

dagster_context = pipes.get_dagster_context()

294

295

# Log messages back to Dagster

296

pipes.log.info("Starting external processing")

297

298

# Perform work

299

import pandas as pd

300

df = pd.read_parquet("/dbfs/data/input.parquet")

301

302

# Report progress

303

pipes.log.info(f"Loaded {len(df)} rows")

304

305

# Process data

306

result_df = df.groupby("category").agg({"value": "sum"})

307

308

# Save results

309

result_df.to_parquet("/dbfs/data/output.parquet")

310

311

# Report asset materialization

312

pipes.report_asset_materialization(

313

asset_key="processed_data",

314

metadata={

315

"num_rows": len(result_df),

316

"num_categories": result_df["category"].nunique(),

317

}

318

)

319

320

# Return final results

321

return {"status": "success", "rows_processed": len(df)}

322

323

if __name__ == "__main__":

324

main()

325

```

326

327

### Notebook Integration

328

329

Example of using Pipes in a Databricks notebook:

330

331

```python

332

# Cell 1: Initialize Pipes

333

from dagster_pipes import open_dagster_pipes, PipesDbfsContextLoader, PipesDbfsMessageWriter

334

335

context_loader = PipesDbfsContextLoader()

336

message_writer = PipesDbfsMessageWriter()

337

338

pipes = open_dagster_pipes(

339

context_loader=context_loader,

340

message_writer=message_writer,

341

).__enter__()

342

343

# Cell 2: Access Dagster context

344

dagster_context = pipes.get_dagster_context()

345

pipes.log.info("Notebook execution started")

346

347

# Cell 3: Data processing

348

import pandas as pd

349

import numpy as np

350

351

# Load data

352

df = spark.read.table("my_catalog.my_schema.input_table").toPandas()

353

pipes.log.info(f"Loaded {len(df)} rows from input table")

354

355

# Process data

356

df["processed_value"] = df["raw_value"] * 2

357

result_df = df.groupby("category").agg({"processed_value": ["sum", "mean", "count"]})

358

359

# Cell 4: Save and report results

360

# Save to Delta table

361

spark.createDataFrame(result_df).write.mode("overwrite").saveAsTable("my_catalog.my_schema.output_table")

362

363

# Report to Dagster

364

pipes.report_asset_materialization(

365

asset_key="processed_output",

366

metadata={

367

"num_categories": len(result_df),

368

"total_value": float(result_df[("processed_value", "sum")].sum()),

369

}

370

)

371

372

pipes.log.info("Processing completed successfully")

373

374

# Cell 5: Cleanup

375

pipes.__exit__(None, None, None)

376

```

377

378

### Error Handling

379

380

```python

381

@asset

382

def robust_databricks_processing(context: AssetExecutionContext):

383

client = PipesDatabricksClient(

384

client=context.resources.databricks.workspace_client,

385

context_injector=PipesDbfsContextInjector(

386

client=context.resources.databricks.workspace_client

387

),

388

message_reader=PipesDbfsMessageReader(

389

client=context.resources.databricks.workspace_client,

390

include_stdio_in_messages=True, # Capture errors in logs

391

),

392

)

393

394

try:

395

result = client.run(

396

context=context,

397

task={

398

"notebook_task": {

399

"notebook_path": "/path/to/notebook",

400

}

401

},

402

cluster={"existing": "cluster-id"},

403

timeout_seconds=1800, # 30 minute timeout

404

)

405

406

# Check for errors in the result

407

if result.get_exit_code() != 0:

408

context.log.error(f"External process failed with exit code: {result.get_exit_code()}")

409

raise Exception("External Databricks process failed")

410

411

return result.get_results()

412

413

except Exception as e:

414

context.log.error(f"Databricks execution failed: {str(e)}")

415

# Optionally retrieve logs for debugging

416

raise

417

```