or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bigquery.mddataproc.mdgcs.mdindex.mdpipes.md

dataproc.mddocs/

0

# Dataproc Integration

1

2

Comprehensive Apache Spark cluster management and job execution through Google Cloud Dataproc. Provides resources for Dataproc cluster lifecycle management, operations for submitting and monitoring Spark jobs, and comprehensive configuration support for cluster and job parameters.

3

4

## Capabilities

5

6

### Dataproc Resource

7

8

Configurable resource for Dataproc cluster management and client access.

9

10

```python { .api }

11

class DataprocResource(ConfigurableResource):

12

"""Resource for Dataproc cluster management."""

13

project_id: str # GCP project ID

14

region: str # GCP region

15

cluster_name: str # Cluster name

16

labels: Optional[dict[str, str]] # Cluster labels

17

cluster_config_yaml_path: Optional[str] # Path to YAML config

18

cluster_config_json_path: Optional[str] # Path to JSON config

19

cluster_config_dict: Optional[dict] # Inline cluster config

20

21

def get_client(self) -> DataprocClient:

22

"""Create Dataproc client."""

23

24

@resource(

25

config_schema=define_dataproc_create_cluster_config(),

26

description="Manage a Dataproc cluster resource"

27

)

28

def dataproc_resource(context) -> DataprocClient:

29

"""Legacy Dataproc resource factory that returns a DataprocClient."""

30

```

31

32

### Dataproc Client

33

34

Lower-level client for direct Dataproc API interactions.

35

36

```python { .api }

37

class DataprocClient:

38

"""Lower-level client for Dataproc API interactions."""

39

40

def create_cluster(self) -> None:

41

"""Create Dataproc cluster."""

42

43

def delete_cluster(self) -> None:

44

"""Delete Dataproc cluster."""

45

46

def submit_job(self, job_details: dict) -> str:

47

"""

48

Submit job to cluster.

49

50

Parameters:

51

- job_details: Job configuration dictionary

52

53

Returns:

54

Job ID string

55

"""

56

57

def get_job(self, job_id: str) -> dict:

58

"""Get job status and details."""

59

60

def wait_for_job(self, job_id: str, wait_timeout: int) -> dict:

61

"""Wait for job completion with timeout."""

62

63

def cluster_context_manager(self):

64

"""Context manager for temporary clusters."""

65

```

66

67

### Operations

68

69

Operations for executing jobs on Dataproc clusters.

70

71

```python { .api }

72

class DataprocOpConfig(Config):

73

"""Configuration class for Dataproc operations."""

74

job_timeout_in_seconds: int = 1200 # Job timeout

75

job_scoped_cluster: bool = True # Whether to create temporary cluster

76

project_id: str # GCP project ID

77

region: str # GCP region

78

job_config: dict[str, Any] # Dataproc job configuration

79

80

@op(

81

required_resource_keys={"dataproc"},

82

config_schema=DATAPROC_CONFIG_SCHEMA

83

)

84

def dataproc_op(context) -> Any:

85

"""Legacy op for executing Dataproc jobs."""

86

87

@op

88

def configurable_dataproc_op(

89

dataproc: DataprocResource,

90

config: DataprocOpConfig

91

) -> Any:

92

"""

93

Modern configurable op for executing Dataproc jobs.

94

95

Parameters:

96

- dataproc: Dataproc resource

97

- config: Operation configuration

98

"""

99

```

100

101

### Configuration Functions

102

103

Functions for defining Dataproc configuration schemas.

104

105

```python { .api }

106

def define_dataproc_create_cluster_config() -> ConfigSchema:

107

"""Configuration schema for cluster creation."""

108

109

def define_dataproc_submit_job_config() -> ConfigSchema:

110

"""Configuration schema for job submission."""

111

```

112

113

### Types and Exceptions

114

115

Dataproc-specific types and error handling.

116

117

```python { .api }

118

class DataprocError(Exception):

119

"""Exception class for Dataproc-related errors."""

120

```

121

122

## Usage Examples

123

124

### Basic Spark Job Execution

125

126

```python

127

from dagster import op, job, Definitions

128

from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig

129

130

@configurable_dataproc_op

131

def run_spark_analysis(dataproc: DataprocResource, config: DataprocOpConfig):

132

"""Execute Spark job for data analysis."""

133

pass # Job execution handled by the op decorator

134

135

@job

136

def spark_analysis_job():

137

run_spark_analysis()

138

139

defs = Definitions(

140

jobs=[spark_analysis_job],

141

resources={

142

"dataproc": DataprocResource(

143

project_id="my-gcp-project",

144

region="us-central1",

145

cluster_name="analysis-cluster"

146

)

147

},

148

ops=[

149

run_spark_analysis.configured(

150

DataprocOpConfig(

151

project_id="my-gcp-project",

152

region="us-central1",

153

job_config={

154

"pyspark_job": {

155

"main_python_file_uri": "gs://my-bucket/scripts/analysis.py",

156

"args": ["--input", "gs://my-bucket/data/", "--output", "gs://my-bucket/results/"]

157

}

158

}

159

),

160

name="spark_analysis"

161

)

162

]

163

)

164

```

165

166

### Cluster with Custom Configuration

167

168

```python

169

from dagster import Definitions

170

from dagster_gcp import DataprocResource

171

172

# Define cluster configuration

173

cluster_config = {

174

"master_config": {

175

"num_instances": 1,

176

"machine_type_uri": "n1-standard-4",

177

"disk_config": {

178

"boot_disk_type": "pd-standard",

179

"boot_disk_size_gb": 100

180

}

181

},

182

"worker_config": {

183

"num_instances": 2,

184

"machine_type_uri": "n1-standard-4",

185

"disk_config": {

186

"boot_disk_type": "pd-standard",

187

"boot_disk_size_gb": 100

188

}

189

},

190

"software_config": {

191

"image_version": "2.0-debian10",

192

"properties": {

193

"spark:spark.sql.adaptive.enabled": "true",

194

"spark:spark.sql.adaptive.coalescePartitions.enabled": "true"

195

}

196

}

197

}

198

199

defs = Definitions(

200

resources={

201

"dataproc": DataprocResource(

202

project_id="my-gcp-project",

203

region="us-central1",

204

cluster_name="custom-cluster",

205

cluster_config_dict=cluster_config,

206

labels={"environment": "production", "team": "data-eng"}

207

)

208

}

209

)

210

```

211

212

### PySpark Job with Dependencies

213

214

```python

215

from dagster import op, job, Config

216

from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig

217

218

class SparkJobConfig(Config):

219

input_path: str

220

output_path: str

221

num_partitions: int = 10

222

223

@configurable_dataproc_op

224

def process_large_dataset(

225

dataproc: DataprocResource,

226

config: DataprocOpConfig,

227

job_config: SparkJobConfig

228

):

229

"""Process large dataset with PySpark."""

230

pass

231

232

@job

233

def etl_pipeline():

234

process_large_dataset()

235

236

defs = Definitions(

237

jobs=[etl_pipeline],

238

resources={

239

"dataproc": DataprocResource(

240

project_id="my-gcp-project",

241

region="us-central1",

242

cluster_name="etl-cluster"

243

)

244

},

245

ops=[

246

process_large_dataset.configured(

247

{

248

"dataproc_config": DataprocOpConfig(

249

project_id="my-gcp-project",

250

region="us-central1",

251

job_config={

252

"pyspark_job": {

253

"main_python_file_uri": "gs://my-bucket/scripts/etl.py",

254

"python_file_uris": [

255

"gs://my-bucket/scripts/utils.py",

256

"gs://my-bucket/scripts/transforms.py"

257

],

258

"jar_file_uris": [

259

"gs://my-bucket/jars/spark-bigquery-connector.jar"

260

],

261

"args": [

262

"--input", "gs://my-bucket/raw-data/",

263

"--output", "gs://my-bucket/processed-data/",

264

"--partitions", "20"

265

]

266

}

267

},

268

job_timeout_in_seconds=3600

269

),

270

"job_config": SparkJobConfig(

271

input_path="gs://my-bucket/raw-data/",

272

output_path="gs://my-bucket/processed-data/",

273

num_partitions=20

274

)

275

},

276

name="large_dataset_processor"

277

)

278

]

279

)

280

```

281

282

### Temporary Cluster for Job

283

284

```python

285

from dagster import op, job

286

from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig

287

288

@configurable_dataproc_op

289

def batch_processing_job(dataproc: DataprocResource, config: DataprocOpConfig):

290

"""Run batch processing on temporary cluster."""

291

pass

292

293

@job

294

def nightly_batch_job():

295

batch_processing_job()

296

297

# Configuration with temporary cluster

298

batch_config = DataprocOpConfig(

299

project_id="my-gcp-project",

300

region="us-central1",

301

job_scoped_cluster=True, # Creates temporary cluster

302

job_timeout_in_seconds=7200, # 2 hours

303

job_config={

304

"pyspark_job": {

305

"main_python_file_uri": "gs://my-bucket/batch/nightly_process.py"

306

}

307

}

308

)

309

```

310

311

### Direct Client Usage

312

313

```python

314

from dagster import op, In

315

from dagster_gcp import DataprocResource

316

317

@op

318

def submit_custom_job(dataproc: DataprocResource, job_details: dict):

319

client = dataproc.get_client()

320

321

# Submit job and get job ID

322

job_id = client.submit_job(job_details)

323

324

# Wait for completion

325

result = client.wait_for_job(job_id, wait_timeout=1800)

326

327

return {

328

"job_id": job_id,

329

"status": result.get("status"),

330

"output_uri": result.get("driver_output_resource_uri")

331

}

332

```