0
# EMR Big Data Processing
1
2
Integration with Amazon EMR (Elastic MapReduce) for big data processing workflows. This module provides cluster management, PySpark step execution, job orchestration, and comprehensive EMR state monitoring for large-scale data processing tasks.
3
4
## Capabilities
5
6
### EMR Job Runner
7
8
Manage EMR clusters and execute big data processing jobs with comprehensive cluster lifecycle management.
9
10
```python { .api }
11
class EmrJobRunner:
12
"""
13
Manages EMR job execution and cluster operations.
14
"""
15
16
def __init__(
17
self,
18
region: str,
19
cluster_id: Optional[str] = None,
20
**kwargs
21
): ...
22
23
def run_job_flow(
24
self,
25
job_flow_overrides: Dict[str, Any] = None,
26
**kwargs
27
) -> str:
28
"""
29
Start a new EMR cluster and run job flow.
30
31
Parameters:
32
job_flow_overrides: Custom EMR cluster configuration
33
**kwargs: Additional EMR RunJobFlow parameters
34
35
Returns:
36
str: EMR cluster ID
37
"""
38
39
def add_job_flow_steps(
40
self,
41
cluster_id: str,
42
steps: List[Dict[str, Any]]
43
) -> List[str]:
44
"""
45
Add steps to an existing EMR cluster.
46
47
Parameters:
48
cluster_id: EMR cluster identifier
49
steps: List of EMR step configurations
50
51
Returns:
52
List[str]: List of step IDs
53
"""
54
55
def wait_for_completion(
56
self,
57
cluster_id: str,
58
timeout_seconds: int = 3600
59
) -> bool:
60
"""
61
Wait for EMR cluster to complete all steps.
62
63
Parameters:
64
cluster_id: EMR cluster identifier
65
timeout_seconds: Maximum wait time in seconds
66
67
Returns:
68
bool: True if completed successfully, False if timeout
69
"""
70
71
def terminate_cluster(self, cluster_id: str) -> bool:
72
"""
73
Terminate an EMR cluster.
74
75
Parameters:
76
cluster_id: EMR cluster identifier
77
78
Returns:
79
bool: True if termination initiated successfully
80
"""
81
82
def get_cluster_status(self, cluster_id: str) -> EmrClusterState:
83
"""
84
Get current status of EMR cluster.
85
86
Parameters:
87
cluster_id: EMR cluster identifier
88
89
Returns:
90
EmrClusterState: Current cluster state
91
"""
92
```
93
94
### EMR PySpark Step Launcher
95
96
Execute PySpark applications as EMR steps with automatic step configuration and monitoring.
97
98
```python { .api }
99
def emr_pyspark_step_launcher(
100
cluster_id: str,
101
s3_bucket: str,
102
deploy_local_pyspark_deps: bool = True,
103
staging_bucket: Optional[str] = None,
104
wait_for_logs: bool = True,
105
local_job_package_path: Optional[str] = None,
106
action_on_failure: str = "TERMINATE_CLUSTER",
107
spark_config: Optional[Dict[str, str]] = None,
108
region_name: Optional[str] = None,
109
**kwargs
110
) -> StepLauncherDefinition:
111
"""
112
Step launcher for executing PySpark applications on EMR.
113
114
Parameters:
115
cluster_id: EMR cluster ID to run the step on
116
s3_bucket: S3 bucket for staging PySpark dependencies
117
deploy_local_pyspark_deps: Whether to deploy local dependencies
118
staging_bucket: S3 bucket for staging job artifacts
119
wait_for_logs: Whether to wait for CloudWatch logs
120
local_job_package_path: Path to local job package
121
action_on_failure: Action to take on step failure
122
spark_config: Spark configuration parameters
123
region_name: AWS region name
124
**kwargs: Additional step launcher configuration
125
126
Returns:
127
StepLauncherDefinition: Configured EMR PySpark step launcher
128
"""
129
```
130
131
### EMR State Management
132
133
Enumerations and constants for managing EMR cluster and step lifecycles.
134
135
```python { .api }
136
class EmrClusterState(Enum):
137
"""
138
Enumeration of possible EMR cluster states.
139
"""
140
STARTING = "STARTING"
141
BOOTSTRAPPING = "BOOTSTRAPPING"
142
RUNNING = "RUNNING"
143
WAITING = "WAITING"
144
TERMINATING = "TERMINATING"
145
TERMINATED = "TERMINATED"
146
TERMINATED_WITH_ERRORS = "TERMINATED_WITH_ERRORS"
147
148
class EmrStepState(Enum):
149
"""
150
Enumeration of possible EMR step states.
151
"""
152
PENDING = "PENDING"
153
CANCEL_PENDING = "CANCEL_PENDING"
154
RUNNING = "RUNNING"
155
COMPLETED = "COMPLETED"
156
CANCELLED = "CANCELLED"
157
FAILED = "FAILED"
158
INTERRUPTED = "INTERRUPTED"
159
160
# Cluster state constants
161
EMR_CLUSTER_DONE_STATES: Set[EmrClusterState] = {
162
EmrClusterState.TERMINATED,
163
EmrClusterState.TERMINATED_WITH_ERRORS
164
}
165
166
EMR_CLUSTER_TERMINATED_STATES: Set[EmrClusterState] = {
167
EmrClusterState.TERMINATING,
168
EmrClusterState.TERMINATED,
169
EmrClusterState.TERMINATED_WITH_ERRORS
170
}
171
```
172
173
### EMR Exception Handling
174
175
Exception classes for EMR-specific error handling and operation failures.
176
177
```python { .api }
178
class EmrError(Exception):
179
"""
180
Exception raised for EMR-related errors.
181
182
Covers cluster failures, step failures, timeout errors,
183
and other EMR-specific operational issues.
184
"""
185
186
def __init__(self, message: str, cluster_id: Optional[str] = None): ...
187
```
188
189
## Usage Examples
190
191
### Basic EMR Cluster Management
192
193
```python
194
from dagster import op, job, Definitions
195
from dagster_aws.emr import EmrJobRunner, EmrError
196
197
@op
198
def create_emr_cluster():
199
"""Create and configure EMR cluster for big data processing."""
200
runner = EmrJobRunner(region="us-west-2")
201
202
job_flow_config = {
203
"Name": "Dagster EMR Cluster",
204
"ReleaseLabel": "emr-6.9.0",
205
"Instances": {
206
"InstanceGroups": [
207
{
208
"Name": "Master nodes",
209
"Market": "ON_DEMAND",
210
"InstanceRole": "MASTER",
211
"InstanceType": "m5.xlarge",
212
"InstanceCount": 1,
213
},
214
{
215
"Name": "Worker nodes",
216
"Market": "ON_DEMAND",
217
"InstanceRole": "CORE",
218
"InstanceType": "m5.xlarge",
219
"InstanceCount": 2,
220
}
221
],
222
"Ec2KeyName": "my-key-pair",
223
"KeepJobFlowAliveWhenNoSteps": True,
224
},
225
"Applications": [{"Name": "Spark"}, {"Name": "Hadoop"}],
226
"ServiceRole": "EMR_DefaultRole",
227
"JobFlowRole": "EMR_EC2_DefaultRole",
228
}
229
230
try:
231
cluster_id = runner.run_job_flow(job_flow_overrides=job_flow_config)
232
return cluster_id
233
except EmrError as e:
234
raise Exception(f"Failed to create EMR cluster: {e}")
235
236
@job
237
def emr_cluster_job():
238
create_emr_cluster()
239
240
defs = Definitions(jobs=[emr_cluster_job])
241
```
242
243
### PySpark Step Execution
244
245
```python
246
from dagster import op, job, Definitions
247
from dagster_aws.emr import emr_pyspark_step_launcher
248
249
# Configure PySpark step launcher
250
pyspark_launcher = emr_pyspark_step_launcher.configured({
251
"cluster_id": "j-XXXXXXXXXX", # Existing EMR cluster
252
"s3_bucket": "my-emr-bucket",
253
"deploy_local_pyspark_deps": True,
254
"wait_for_logs": True,
255
"spark_config": {
256
"spark.executor.memory": "4g",
257
"spark.executor.cores": "2",
258
"spark.default.parallelism": "100"
259
}
260
})
261
262
@op
263
def data_processing_step():
264
"""PySpark data processing operation."""
265
# This will be executed as a PySpark application on EMR
266
from pyspark.sql import SparkSession
267
268
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
269
270
# Read data from S3
271
df = spark.read.parquet("s3://my-data-bucket/input/")
272
273
# Process data
274
processed_df = df.groupBy("category").sum("amount")
275
276
# Write results back to S3
277
processed_df.write.mode("overwrite").parquet("s3://my-data-bucket/output/")
278
279
spark.stop()
280
return "Processing complete"
281
282
@job(step_launcher_def=pyspark_launcher)
283
def pyspark_processing_job():
284
data_processing_step()
285
286
defs = Definitions(jobs=[pyspark_processing_job])
287
```
288
289
### Advanced EMR Workflow
290
291
```python
292
from dagster import op, job, Definitions, DependencyDefinition
293
from dagster_aws.emr import EmrJobRunner, EmrClusterState, EmrError
294
295
@op
296
def provision_emr_cluster():
297
"""Provision EMR cluster with custom configuration."""
298
runner = EmrJobRunner(region="us-east-1")
299
300
cluster_config = {
301
"Name": "Advanced Processing Cluster",
302
"ReleaseLabel": "emr-6.9.0",
303
"Instances": {
304
"InstanceFleets": [
305
{
306
"Name": "Master Fleet",
307
"InstanceFleetType": "MASTER",
308
"TargetOnDemandCapacity": 1,
309
"InstanceTypeConfigs": [
310
{
311
"InstanceType": "m5.2xlarge",
312
"EbsConfiguration": {
313
"EbsBlockDeviceConfigs": [
314
{
315
"VolumeSpecification": {
316
"VolumeType": "gp2",
317
"SizeInGB": 100
318
},
319
"VolumesPerInstance": 1
320
}
321
]
322
}
323
}
324
]
325
}
326
],
327
"Ec2SubnetId": "subnet-12345",
328
"EmrManagedMasterSecurityGroup": "sg-master",
329
"EmrManagedSlaveSecurityGroup": "sg-slave"
330
},
331
"Applications": [
332
{"Name": "Spark"},
333
{"Name": "Hadoop"},
334
{"Name": "Hive"}
335
],
336
"Configurations": [
337
{
338
"Classification": "spark-defaults",
339
"Properties": {
340
"spark.sql.adaptive.enabled": "true",
341
"spark.sql.adaptive.coalescePartitions.enabled": "true"
342
}
343
}
344
]
345
}
346
347
cluster_id = runner.run_job_flow(job_flow_overrides=cluster_config)
348
349
# Wait for cluster to be ready
350
if not runner.wait_for_completion(cluster_id, timeout_seconds=1800):
351
raise EmrError("Cluster provisioning timed out", cluster_id)
352
353
return cluster_id
354
355
@op
356
def run_data_processing(cluster_id: str):
357
"""Execute data processing steps on EMR cluster."""
358
runner = EmrJobRunner(region="us-east-1")
359
360
processing_steps = [
361
{
362
"Name": "Data Ingestion",
363
"ActionOnFailure": "CONTINUE",
364
"HadoopJarStep": {
365
"Jar": "command-runner.jar",
366
"Args": [
367
"spark-submit",
368
"--deploy-mode", "cluster",
369
"s3://my-scripts-bucket/ingest_data.py"
370
]
371
}
372
},
373
{
374
"Name": "Data Transformation",
375
"ActionOnFailure": "TERMINATE_CLUSTER",
376
"HadoopJarStep": {
377
"Jar": "command-runner.jar",
378
"Args": [
379
"spark-submit",
380
"--deploy-mode", "cluster",
381
"--conf", "spark.executor.instances=4",
382
"s3://my-scripts-bucket/transform_data.py"
383
]
384
}
385
}
386
]
387
388
step_ids = runner.add_job_flow_steps(cluster_id, processing_steps)
389
390
# Monitor step completion
391
if not runner.wait_for_completion(cluster_id, timeout_seconds=3600):
392
raise EmrError("Data processing steps timed out", cluster_id)
393
394
return step_ids
395
396
@op
397
def cleanup_cluster(cluster_id: str):
398
"""Terminate EMR cluster after processing."""
399
runner = EmrJobRunner(region="us-east-1")
400
401
if runner.terminate_cluster(cluster_id):
402
return f"Cluster {cluster_id} termination initiated"
403
else:
404
raise EmrError(f"Failed to terminate cluster {cluster_id}")
405
406
@job
407
def advanced_emr_workflow():
408
cluster_id = provision_emr_cluster()
409
step_ids = run_data_processing(cluster_id)
410
cleanup_cluster(cluster_id)
411
412
defs = Definitions(jobs=[advanced_emr_workflow])
413
```