0
# Dataproc Integration
1
2
Comprehensive Apache Spark cluster management and job execution through Google Cloud Dataproc. Provides resources for Dataproc cluster lifecycle management, operations for submitting and monitoring Spark jobs, and comprehensive configuration support for cluster and job parameters.
3
4
## Capabilities
5
6
### Dataproc Resource
7
8
Configurable resource for Dataproc cluster management and client access.
9
10
```python { .api }
11
class DataprocResource(ConfigurableResource):
12
"""Resource for Dataproc cluster management."""
13
project_id: str # GCP project ID
14
region: str # GCP region
15
cluster_name: str # Cluster name
16
labels: Optional[dict[str, str]] # Cluster labels
17
cluster_config_yaml_path: Optional[str] # Path to YAML config
18
cluster_config_json_path: Optional[str] # Path to JSON config
19
cluster_config_dict: Optional[dict] # Inline cluster config
20
21
def get_client(self) -> DataprocClient:
22
"""Create Dataproc client."""
23
24
@resource(
25
config_schema=define_dataproc_create_cluster_config(),
26
description="Manage a Dataproc cluster resource"
27
)
28
def dataproc_resource(context) -> DataprocClient:
29
"""Legacy Dataproc resource factory that returns a DataprocClient."""
30
```
31
32
### Dataproc Client
33
34
Lower-level client for direct Dataproc API interactions.
35
36
```python { .api }
37
class DataprocClient:
38
"""Lower-level client for Dataproc API interactions."""
39
40
def create_cluster(self) -> None:
41
"""Create Dataproc cluster."""
42
43
def delete_cluster(self) -> None:
44
"""Delete Dataproc cluster."""
45
46
def submit_job(self, job_details: dict) -> str:
47
"""
48
Submit job to cluster.
49
50
Parameters:
51
- job_details: Job configuration dictionary
52
53
Returns:
54
Job ID string
55
"""
56
57
def get_job(self, job_id: str) -> dict:
58
"""Get job status and details."""
59
60
def wait_for_job(self, job_id: str, wait_timeout: int) -> dict:
61
"""Wait for job completion with timeout."""
62
63
def cluster_context_manager(self):
64
"""Context manager for temporary clusters."""
65
```
66
67
### Operations
68
69
Operations for executing jobs on Dataproc clusters.
70
71
```python { .api }
72
class DataprocOpConfig(Config):
73
"""Configuration class for Dataproc operations."""
74
job_timeout_in_seconds: int = 1200 # Job timeout
75
job_scoped_cluster: bool = True # Whether to create temporary cluster
76
project_id: str # GCP project ID
77
region: str # GCP region
78
job_config: dict[str, Any] # Dataproc job configuration
79
80
@op(
81
required_resource_keys={"dataproc"},
82
config_schema=DATAPROC_CONFIG_SCHEMA
83
)
84
def dataproc_op(context) -> Any:
85
"""Legacy op for executing Dataproc jobs."""
86
87
@op
88
def configurable_dataproc_op(
89
dataproc: DataprocResource,
90
config: DataprocOpConfig
91
) -> Any:
92
"""
93
Modern configurable op for executing Dataproc jobs.
94
95
Parameters:
96
- dataproc: Dataproc resource
97
- config: Operation configuration
98
"""
99
```
100
101
### Configuration Functions
102
103
Functions for defining Dataproc configuration schemas.
104
105
```python { .api }
106
def define_dataproc_create_cluster_config() -> ConfigSchema:
107
"""Configuration schema for cluster creation."""
108
109
def define_dataproc_submit_job_config() -> ConfigSchema:
110
"""Configuration schema for job submission."""
111
```
112
113
### Types and Exceptions
114
115
Dataproc-specific types and error handling.
116
117
```python { .api }
118
class DataprocError(Exception):
119
"""Exception class for Dataproc-related errors."""
120
```
121
122
## Usage Examples
123
124
### Basic Spark Job Execution
125
126
```python
127
from dagster import op, job, Definitions
128
from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig
129
130
@configurable_dataproc_op
131
def run_spark_analysis(dataproc: DataprocResource, config: DataprocOpConfig):
132
"""Execute Spark job for data analysis."""
133
pass # Job execution handled by the op decorator
134
135
@job
136
def spark_analysis_job():
137
run_spark_analysis()
138
139
defs = Definitions(
140
jobs=[spark_analysis_job],
141
resources={
142
"dataproc": DataprocResource(
143
project_id="my-gcp-project",
144
region="us-central1",
145
cluster_name="analysis-cluster"
146
)
147
},
148
ops=[
149
run_spark_analysis.configured(
150
DataprocOpConfig(
151
project_id="my-gcp-project",
152
region="us-central1",
153
job_config={
154
"pyspark_job": {
155
"main_python_file_uri": "gs://my-bucket/scripts/analysis.py",
156
"args": ["--input", "gs://my-bucket/data/", "--output", "gs://my-bucket/results/"]
157
}
158
}
159
),
160
name="spark_analysis"
161
)
162
]
163
)
164
```
165
166
### Cluster with Custom Configuration
167
168
```python
169
from dagster import Definitions
170
from dagster_gcp import DataprocResource
171
172
# Define cluster configuration
173
cluster_config = {
174
"master_config": {
175
"num_instances": 1,
176
"machine_type_uri": "n1-standard-4",
177
"disk_config": {
178
"boot_disk_type": "pd-standard",
179
"boot_disk_size_gb": 100
180
}
181
},
182
"worker_config": {
183
"num_instances": 2,
184
"machine_type_uri": "n1-standard-4",
185
"disk_config": {
186
"boot_disk_type": "pd-standard",
187
"boot_disk_size_gb": 100
188
}
189
},
190
"software_config": {
191
"image_version": "2.0-debian10",
192
"properties": {
193
"spark:spark.sql.adaptive.enabled": "true",
194
"spark:spark.sql.adaptive.coalescePartitions.enabled": "true"
195
}
196
}
197
}
198
199
defs = Definitions(
200
resources={
201
"dataproc": DataprocResource(
202
project_id="my-gcp-project",
203
region="us-central1",
204
cluster_name="custom-cluster",
205
cluster_config_dict=cluster_config,
206
labels={"environment": "production", "team": "data-eng"}
207
)
208
}
209
)
210
```
211
212
### PySpark Job with Dependencies
213
214
```python
215
from dagster import op, job, Config
216
from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig
217
218
class SparkJobConfig(Config):
219
input_path: str
220
output_path: str
221
num_partitions: int = 10
222
223
@configurable_dataproc_op
224
def process_large_dataset(
225
dataproc: DataprocResource,
226
config: DataprocOpConfig,
227
job_config: SparkJobConfig
228
):
229
"""Process large dataset with PySpark."""
230
pass
231
232
@job
233
def etl_pipeline():
234
process_large_dataset()
235
236
defs = Definitions(
237
jobs=[etl_pipeline],
238
resources={
239
"dataproc": DataprocResource(
240
project_id="my-gcp-project",
241
region="us-central1",
242
cluster_name="etl-cluster"
243
)
244
},
245
ops=[
246
process_large_dataset.configured(
247
{
248
"dataproc_config": DataprocOpConfig(
249
project_id="my-gcp-project",
250
region="us-central1",
251
job_config={
252
"pyspark_job": {
253
"main_python_file_uri": "gs://my-bucket/scripts/etl.py",
254
"python_file_uris": [
255
"gs://my-bucket/scripts/utils.py",
256
"gs://my-bucket/scripts/transforms.py"
257
],
258
"jar_file_uris": [
259
"gs://my-bucket/jars/spark-bigquery-connector.jar"
260
],
261
"args": [
262
"--input", "gs://my-bucket/raw-data/",
263
"--output", "gs://my-bucket/processed-data/",
264
"--partitions", "20"
265
]
266
}
267
},
268
job_timeout_in_seconds=3600
269
),
270
"job_config": SparkJobConfig(
271
input_path="gs://my-bucket/raw-data/",
272
output_path="gs://my-bucket/processed-data/",
273
num_partitions=20
274
)
275
},
276
name="large_dataset_processor"
277
)
278
]
279
)
280
```
281
282
### Temporary Cluster for Job
283
284
```python
285
from dagster import op, job
286
from dagster_gcp import DataprocResource, configurable_dataproc_op, DataprocOpConfig
287
288
@configurable_dataproc_op
289
def batch_processing_job(dataproc: DataprocResource, config: DataprocOpConfig):
290
"""Run batch processing on temporary cluster."""
291
pass
292
293
@job
294
def nightly_batch_job():
295
batch_processing_job()
296
297
# Configuration with temporary cluster
298
batch_config = DataprocOpConfig(
299
project_id="my-gcp-project",
300
region="us-central1",
301
job_scoped_cluster=True, # Creates temporary cluster
302
job_timeout_in_seconds=7200, # 2 hours
303
job_config={
304
"pyspark_job": {
305
"main_python_file_uri": "gs://my-bucket/batch/nightly_process.py"
306
}
307
}
308
)
309
```
310
311
### Direct Client Usage
312
313
```python
314
from dagster import op, In
315
from dagster_gcp import DataprocResource
316
317
@op
318
def submit_custom_job(dataproc: DataprocResource, job_details: dict):
319
client = dataproc.get_client()
320
321
# Submit job and get job ID
322
job_id = client.submit_job(job_details)
323
324
# Wait for completion
325
result = client.wait_for_job(job_id, wait_timeout=1800)
326
327
return {
328
"job_id": job_id,
329
"status": result.get("status"),
330
"output_uri": result.get("driver_output_resource_uri")
331
}
332
```