or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bigquery.mddataproc.mdgcs.mdindex.mdpipes.md

pipes.mddocs/

0

# Pipes Integration

1

2

External process communication through GCP services enabling Dagster to orchestrate and monitor workloads running outside the Dagster process. Includes clients for running workloads on Dataproc, context injectors for passing data via GCS, and message readers for collecting results and logs from external processes.

3

4

## Capabilities

5

6

### Dataproc Job Client

7

8

Pipes client for executing workloads on Google Cloud Dataproc in Job mode with full pipes protocol support.

9

10

```python { .api }

11

class PipesDataprocJobClient(PipesClient):

12

"""Pipes client for running workloads on Dataproc in Job mode."""

13

14

def __init__(

15

self,

16

message_reader: PipesMessageReader,

17

client: Optional[JobControllerClient] = None,

18

context_injector: Optional[PipesContextInjector] = None,

19

forward_termination: bool = True,

20

poll_interval: float = 5.0

21

): ...

22

23

def run(

24

self,

25

context,

26

submit_job_params: SubmitJobParams,

27

extras: Optional[dict] = None

28

) -> PipesClientCompletedInvocation:

29

"""

30

Execute Dataproc job with pipes protocol.

31

32

Parameters:

33

- context: Dagster execution context

34

- submit_job_params: Job submission parameters

35

- extras: Additional parameters

36

37

Returns:

38

Completed invocation with results and metadata

39

"""

40

41

class SubmitJobParams(TypedDict):

42

"""Type definition for Dataproc job submission parameters."""

43

project_id: str

44

region: str

45

job: dict # Dataproc job configuration

46

job_id: Optional[str]

47

request_id: Optional[str]

48

```

49

50

### Context Injectors

51

52

Context injectors enable passing execution context and parameters to external processes via GCP services.

53

54

```python { .api }

55

class PipesGCSContextInjector(PipesContextInjector):

56

"""Injects pipes context via temporary GCS objects."""

57

bucket: str # GCS bucket name

58

client: GCSClient # GCS client

59

key_prefix: Optional[str] # Optional key prefix

60

61

def inject_context(self, context) -> Iterator[PipesParams]:

62

"""

63

Context manager for context injection.

64

65

Yields:

66

PipesParams containing context information for external process

67

"""

68

69

def no_messages_debug_text(self) -> str:

70

"""Debug text for troubleshooting when no messages received."""

71

```

72

73

### Message Readers

74

75

Message readers collect results, logs, and metadata from external processes via GCS.

76

77

```python { .api }

78

class PipesGCSMessageReader(PipesBlobStoreMessageReader):

79

"""Reads pipes messages from GCS bucket."""

80

interval: float = 10 # Polling interval in seconds

81

bucket: str # GCS bucket name

82

client: GCSClient # GCS client

83

log_readers: Optional[Sequence[PipesLogReader]] # Associated log readers

84

include_stdio_in_messages: bool = False # Whether to include stdout/stderr

85

86

def get_params(self) -> dict:

87

"""Get parameters for message reading."""

88

89

def messages_are_readable(self, params: dict) -> bool:

90

"""Check if messages are available for reading."""

91

92

def download_messages_chunk(self, index: int, params: dict) -> dict:

93

"""Download message chunk from GCS."""

94

95

def no_messages_debug_text(self) -> str:

96

"""Debug text for troubleshooting when no messages received."""

97

98

class PipesGCSLogReader(PipesChunkedLogReader):

99

"""Reads log files from GCS with chunked streaming."""

100

bucket: str # GCS bucket name

101

key: str # Object key for logs

102

client: GCSClient # GCS client

103

interval: float = 10 # Polling interval in seconds

104

target_stream: IO[str] # Target stream for output

105

decode_fn: Callable[[bytes], str] # Decoding function for logs

106

debug_info: Optional[str] # Debug information

107

108

def target_is_readable(self, params: dict) -> bool:

109

"""Check if target is readable."""

110

111

def download_log_chunk(self, params: dict) -> bytes:

112

"""Download log chunk from GCS."""

113

```

114

115

### Utility Functions

116

117

Helper functions for log processing and decoding.

118

119

```python { .api }

120

def default_log_decode_fn(contents: bytes) -> str:

121

"""Default UTF-8 decoding for log contents."""

122

123

def gzip_log_decode_fn(contents: bytes) -> str:

124

"""Gzip decompression and UTF-8 decoding for compressed logs."""

125

```

126

127

## Usage Examples

128

129

### Basic Dataproc Pipes Job

130

131

```python

132

from dagster import asset, Definitions

133

from dagster_gcp.pipes import (

134

PipesDataprocJobClient,

135

PipesGCSMessageReader,

136

PipesGCSContextInjector

137

)

138

from google.cloud.dataproc_v1 import JobControllerClient

139

from google.cloud import storage

140

141

@asset

142

def spark_data_processing(

143

pipes_dataproc_client: PipesDataprocJobClient

144

) -> dict:

145

"""Execute Spark job via Pipes and return results."""

146

147

submit_job_params = {

148

"project_id": "my-gcp-project",

149

"region": "us-central1",

150

"job": {

151

"placement": {"cluster_name": "my-cluster"},

152

"pyspark_job": {

153

"main_python_file_uri": "gs://my-bucket/scripts/pipes_job.py",

154

"args": ["--input", "gs://my-bucket/data/input.csv"]

155

}

156

}

157

}

158

159

return pipes_dataproc_client.run(

160

context=context,

161

submit_job_params=submit_job_params

162

).get_results()

163

164

defs = Definitions(

165

assets=[spark_data_processing],

166

resources={

167

"pipes_dataproc_client": PipesDataprocJobClient(

168

client=JobControllerClient(),

169

context_injector=PipesGCSContextInjector(

170

bucket="my-pipes-bucket",

171

client=storage.Client()

172

),

173

message_reader=PipesGCSMessageReader(

174

bucket="my-pipes-bucket",

175

client=storage.Client()

176

)

177

)

178

}

179

)

180

```

181

182

### Advanced Pipes Configuration with Logging

183

184

```python

185

from dagster import asset, get_dagster_logger

186

from dagster_gcp.pipes import (

187

PipesDataprocJobClient,

188

PipesGCSMessageReader,

189

PipesGCSContextInjector,

190

PipesGCSLogReader

191

)

192

from google.cloud.dataproc_v1 import JobControllerClient

193

from google.cloud import storage

194

import sys

195

196

@asset

197

def ml_training_pipeline(

198

pipes_dataproc_client: PipesDataprocJobClient

199

) -> dict:

200

"""Execute ML training pipeline with comprehensive logging."""

201

202

# Configure job with custom cluster

203

submit_job_params = {

204

"project_id": "my-ml-project",

205

"region": "us-central1",

206

"job": {

207

"placement": {"cluster_name": "ml-training-cluster"},

208

"pyspark_job": {

209

"main_python_file_uri": "gs://my-ml-bucket/training/train_model.py",

210

"python_file_uris": [

211

"gs://my-ml-bucket/training/data_utils.py",

212

"gs://my-ml-bucket/training/model_utils.py"

213

],

214

"args": [

215

"--data-path", "gs://my-ml-bucket/datasets/",

216

"--model-output", "gs://my-ml-bucket/models/",

217

"--epochs", "100"

218

]

219

}

220

}

221

}

222

223

result = pipes_dataproc_client.run(

224

context=context,

225

submit_job_params=submit_job_params,

226

extras={"model_version": "v2.1"}

227

)

228

229

return {

230

"model_metrics": result.get_results(),

231

"training_logs": result.get_metadata(),

232

"job_duration": result.duration

233

}

234

235

# Configure with log readers

236

log_reader = PipesGCSLogReader(

237

bucket="my-ml-bucket",

238

key="logs/training.log",

239

client=storage.Client(),

240

target_stream=sys.stdout,

241

interval=5.0

242

)

243

244

defs = Definitions(

245

assets=[ml_training_pipeline],

246

resources={

247

"pipes_dataproc_client": PipesDataprocJobClient(

248

client=JobControllerClient(),

249

context_injector=PipesGCSContextInjector(

250

bucket="my-ml-bucket",

251

key_prefix="pipes/context",

252

client=storage.Client()

253

),

254

message_reader=PipesGCSMessageReader(

255

bucket="my-ml-bucket",

256

client=storage.Client(),

257

log_readers=[log_reader],

258

include_stdio_in_messages=True,

259

interval=5.0

260

),

261

forward_termination=True,

262

poll_interval=10.0

263

)

264

}

265

)

266

```

267

268

### Multiple External Processes

269

270

```python

271

from dagster import asset, multi_asset, AssetOut

272

from dagster_gcp.pipes import PipesDataprocJobClient

273

274

@multi_asset(

275

outs={

276

"processed_data": AssetOut(),

277

"model_predictions": AssetOut(),

278

"quality_metrics": AssetOut()

279

}

280

)

281

def batch_ml_pipeline(

282

pipes_dataproc_client: PipesDataprocJobClient

283

) -> tuple:

284

"""Execute multiple ML tasks in parallel via Pipes."""

285

286

# Data processing job

287

processing_params = {

288

"project_id": "my-project",

289

"region": "us-central1",

290

"job": {

291

"placement": {"cluster_name": "processing-cluster"},

292

"pyspark_job": {

293

"main_python_file_uri": "gs://my-bucket/jobs/data_processing.py"

294

}

295

}

296

}

297

298

# Model inference job

299

inference_params = {

300

"project_id": "my-project",

301

"region": "us-central1",

302

"job": {

303

"placement": {"cluster_name": "inference-cluster"},

304

"pyspark_job": {

305

"main_python_file_uri": "gs://my-bucket/jobs/model_inference.py"

306

}

307

}

308

}

309

310

# Quality assessment job

311

quality_params = {

312

"project_id": "my-project",

313

"region": "us-central1",

314

"job": {

315

"placement": {"cluster_name": "quality-cluster"},

316

"pyspark_job": {

317

"main_python_file_uri": "gs://my-bucket/jobs/quality_assessment.py"

318

}

319

}

320

}

321

322

# Execute jobs and collect results

323

processing_result = pipes_dataproc_client.run(context, processing_params)

324

inference_result = pipes_dataproc_client.run(context, inference_params)

325

quality_result = pipes_dataproc_client.run(context, quality_params)

326

327

return (

328

processing_result.get_results(),

329

inference_result.get_results(),

330

quality_result.get_results()

331

)

332

```

333

334

### Custom Context Injection

335

336

```python

337

from dagster import asset, Config

338

from dagster_gcp.pipes import PipesDataprocJobClient, PipesGCSContextInjector

339

340

class DataProcessingConfig(Config):

341

batch_date: str

342

processing_mode: str

343

output_format: str

344

345

@asset

346

def daily_data_processing(

347

pipes_dataproc_client: PipesDataprocJobClient,

348

config: DataProcessingConfig

349

) -> dict:

350

"""Process daily data with custom configuration."""

351

352

# The context injector will automatically pass config to external process

353

submit_job_params = {

354

"project_id": "my-project",

355

"region": "us-central1",

356

"job": {

357

"placement": {"cluster_name": "daily-processing"},

358

"pyspark_job": {

359

"main_python_file_uri": "gs://my-bucket/jobs/daily_processor.py",

360

"args": [

361

"--batch-date", config.batch_date,

362

"--mode", config.processing_mode,

363

"--format", config.output_format

364

]

365

}

366

}

367

}

368

369

return pipes_dataproc_client.run(

370

context=context,

371

submit_job_params=submit_job_params

372

).get_results()

373

```

374

375

### Error Handling and Debugging

376

377

```python

378

from dagster import asset, get_dagster_logger

379

from dagster_gcp.pipes import PipesDataprocJobClient

380

from dagster import Failure

381

382

@asset

383

def robust_data_pipeline(

384

pipes_dataproc_client: PipesDataprocJobClient

385

) -> dict:

386

"""Data pipeline with comprehensive error handling."""

387

388

logger = get_dagster_logger()

389

390

try:

391

submit_job_params = {

392

"project_id": "my-project",

393

"region": "us-central1",

394

"job": {

395

"placement": {"cluster_name": "robust-cluster"},

396

"pyspark_job": {

397

"main_python_file_uri": "gs://my-bucket/jobs/robust_pipeline.py"

398

}

399

}

400

}

401

402

result = pipes_dataproc_client.run(

403

context=context,

404

submit_job_params=submit_job_params

405

)

406

407

if not result.success:

408

logger.error(f"Pipeline failed: {result.get_metadata()}")

409

raise Failure("Data pipeline execution failed")

410

411

logger.info(f"Pipeline completed successfully: {result.get_results()}")

412

return result.get_results()

413

414

except Exception as e:

415

logger.error(f"Unexpected error in pipeline: {str(e)}")

416

417

# Get debug information

418

debug_text = pipes_dataproc_client.context_injector.no_messages_debug_text()

419

logger.error(f"Debug info: {debug_text}")

420

421

raise Failure(f"Pipeline failed with error: {str(e)}")

422

```