0
# EMR Cluster Management
1
2
Amazon EMR (Elastic MapReduce) integration for big data processing and analytics workloads. Provides cluster lifecycle management, job execution, and monitoring capabilities for Hadoop, Spark, and other big data frameworks running on AWS.
3
4
## Capabilities
5
6
### EMR Hook
7
8
Core EMR client providing comprehensive cluster and job management functionality.
9
10
```python { .api }
11
class EmrHook(AwsBaseHook):
12
def __init__(self, aws_conn_id: str = 'aws_default', emr_conn_id: str = None, **kwargs):
13
"""
14
Initialize EMR Hook.
15
16
Parameters:
17
- aws_conn_id: AWS connection ID
18
- emr_conn_id: EMR-specific connection ID
19
"""
20
21
def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list) -> str:
22
"""
23
Get cluster ID by name and states.
24
25
Parameters:
26
- emr_cluster_name: Name of the EMR cluster
27
- cluster_states: List of acceptable cluster states
28
29
Returns:
30
Cluster ID if found
31
"""
32
33
def create_job_flow(self, job_flow_overrides: dict = None, **kwargs) -> str:
34
"""
35
Create EMR cluster (job flow).
36
37
Parameters:
38
- job_flow_overrides: Configuration overrides for cluster creation
39
40
Returns:
41
Cluster ID
42
"""
43
44
def add_job_flow_steps(self, job_flow_id: str, steps: list, **kwargs) -> list:
45
"""
46
Add steps to running EMR cluster.
47
48
Parameters:
49
- job_flow_id: EMR cluster ID
50
- steps: List of step configurations
51
52
Returns:
53
List of step IDs
54
"""
55
56
def describe_cluster(self, job_flow_id: str) -> dict:
57
"""
58
Get EMR cluster details.
59
60
Parameters:
61
- job_flow_id: EMR cluster ID
62
63
Returns:
64
Cluster configuration and status
65
"""
66
67
def describe_step(self, job_flow_id: str, step_id: str) -> dict:
68
"""
69
Get EMR step details.
70
71
Parameters:
72
- job_flow_id: EMR cluster ID
73
- step_id: Step ID
74
75
Returns:
76
Step configuration and status
77
"""
78
79
def list_steps(self, job_flow_id: str, step_states: list = None, step_ids: list = None) -> list:
80
"""
81
List steps in EMR cluster.
82
83
Parameters:
84
- job_flow_id: EMR cluster ID
85
- step_states: Filter by step states
86
- step_ids: Filter by specific step IDs
87
88
Returns:
89
List of step details
90
"""
91
92
def terminate_job_flow(self, job_flow_id: str) -> None:
93
"""
94
Terminate EMR cluster.
95
96
Parameters:
97
- job_flow_id: EMR cluster ID
98
"""
99
100
def modify_cluster(self, job_flow_id: str, step_concurrency_level: int) -> None:
101
"""
102
Modify EMR cluster configuration.
103
104
Parameters:
105
- job_flow_id: EMR cluster ID
106
- step_concurrency_level: Number of concurrent steps
107
"""
108
109
def get_job_flow_state(self, job_flow_id: str) -> str:
110
"""
111
Get EMR cluster state.
112
113
Parameters:
114
- job_flow_id: EMR cluster ID
115
116
Returns:
117
Current cluster state
118
"""
119
120
def check_query_output(self, qubole_conn_id: str, command_id: str) -> str:
121
"""
122
Check query output status.
123
124
Parameters:
125
- qubole_conn_id: Qubole connection ID
126
- command_id: Command ID
127
128
Returns:
129
Query output status
130
"""
131
```
132
133
### EMR Operators
134
135
Task implementations for EMR cluster and job management operations.
136
137
```python { .api }
138
class EmrCreateJobFlowOperator(BaseOperator):
139
def __init__(self, job_flow_overrides: dict = None, aws_conn_id: str = 'aws_default', emr_conn_id: str = None, **kwargs):
140
"""
141
Create EMR cluster.
142
143
Parameters:
144
- job_flow_overrides: Cluster configuration overrides
145
- aws_conn_id: AWS connection ID
146
- emr_conn_id: EMR-specific connection ID
147
"""
148
149
class EmrTerminateJobFlowOperator(BaseOperator):
150
def __init__(self, job_flow_id: str, aws_conn_id: str = 'aws_default', **kwargs):
151
"""
152
Terminate EMR cluster.
153
154
Parameters:
155
- job_flow_id: EMR cluster ID
156
- aws_conn_id: AWS connection ID
157
"""
158
159
class EmrAddStepsOperator(BaseOperator):
160
def __init__(self, job_flow_id: str, steps: list = None, aws_conn_id: str = 'aws_default', **kwargs):
161
"""
162
Add steps to EMR cluster.
163
164
Parameters:
165
- job_flow_id: EMR cluster ID
166
- steps: List of step configurations
167
- aws_conn_id: AWS connection ID
168
"""
169
170
class EmrModifyClusterOperator(BaseOperator):
171
def __init__(self, job_flow_id: str, step_concurrency_level: int, aws_conn_id: str = 'aws_default', **kwargs):
172
"""
173
Modify EMR cluster configuration.
174
175
Parameters:
176
- job_flow_id: EMR cluster ID
177
- step_concurrency_level: Number of concurrent steps
178
- aws_conn_id: AWS connection ID
179
"""
180
181
class EmrContainerOperator(BaseOperator):
182
def __init__(self, name: str, virtual_cluster_id: str, execution_role_arn: str, release_label: str, job_driver: dict, configuration_overrides: dict = None, aws_conn_id: str = 'aws_default', poll_interval: int = 30, max_tries: int = None, **kwargs):
183
"""
184
Run job on EMR on EKS.
185
186
Parameters:
187
- name: Job name
188
- virtual_cluster_id: EMR on EKS virtual cluster ID
189
- execution_role_arn: IAM role ARN for job execution
190
- release_label: EMR release version
191
- job_driver: Job driver configuration
192
- configuration_overrides: Additional configuration overrides
193
- aws_conn_id: AWS connection ID
194
- poll_interval: Polling interval in seconds
195
- max_tries: Maximum number of polling attempts
196
"""
197
198
class EmrServerlessCreateApplicationOperator(BaseOperator):
199
def __init__(self, release_label: str, job_type: str, name: str = None, initial_capacity: dict = None, maximum_capacity: dict = None, tags: dict = None, aws_conn_id: str = 'aws_default', **kwargs):
200
"""
201
Create EMR Serverless application.
202
203
Parameters:
204
- release_label: EMR release version
205
- job_type: Type of job ('SPARK' or 'HIVE')
206
- name: Application name
207
- initial_capacity: Initial capacity configuration
208
- maximum_capacity: Maximum capacity configuration
209
- tags: Resource tags
210
- aws_conn_id: AWS connection ID
211
"""
212
213
class EmrServerlessStartJobOperator(BaseOperator):
214
def __init__(self, application_id: str, execution_role_arn: str, job_driver: dict, configuration_overrides: dict = None, name: str = None, tags: dict = None, aws_conn_id: str = 'aws_default', wait_for_completion: bool = True, **kwargs):
215
"""
216
Start EMR Serverless job.
217
218
Parameters:
219
- application_id: EMR Serverless application ID
220
- execution_role_arn: IAM role ARN for job execution
221
- job_driver: Job driver configuration
222
- configuration_overrides: Additional configuration overrides
223
- name: Job name
224
- tags: Job tags
225
- aws_conn_id: AWS connection ID
226
- wait_for_completion: Wait for job completion
227
"""
228
229
class EmrServerlessDeleteApplicationOperator(BaseOperator):
230
def __init__(self, application_id: str, aws_conn_id: str = 'aws_default', **kwargs):
231
"""
232
Delete EMR Serverless application.
233
234
Parameters:
235
- application_id: EMR Serverless application ID
236
- aws_conn_id: AWS connection ID
237
"""
238
```
239
240
### EMR Sensors
241
242
Monitoring tasks that wait for EMR cluster states and job completion.
243
244
```python { .api }
245
class EmrJobFlowSensor(BaseSensorOperator):
246
def __init__(self, job_flow_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):
247
"""
248
Wait for EMR cluster to reach target state.
249
250
Parameters:
251
- job_flow_id: EMR cluster ID
252
- target_states: List of acceptable target states
253
- failed_states: List of states considered as failures
254
- aws_conn_id: AWS connection ID
255
"""
256
257
class EmrStepSensor(BaseSensorOperator):
258
def __init__(self, job_flow_id: str, step_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):
259
"""
260
Wait for EMR step completion.
261
262
Parameters:
263
- job_flow_id: EMR cluster ID
264
- step_id: Step ID to monitor
265
- target_states: List of acceptable target states
266
- failed_states: List of states considered as failures
267
- aws_conn_id: AWS connection ID
268
"""
269
270
class EmrContainerSensor(BaseSensorOperator):
271
def __init__(self, job_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
272
"""
273
Wait for EMR on EKS job completion.
274
275
Parameters:
276
- job_id: EMR on EKS job ID
277
- aws_conn_id: AWS connection ID
278
- poll_interval: Polling interval in seconds
279
"""
280
281
class EmrServerlessJobSensor(BaseSensorOperator):
282
def __init__(self, application_id: str, job_run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
283
"""
284
Wait for EMR Serverless job completion.
285
286
Parameters:
287
- application_id: EMR Serverless application ID
288
- job_run_id: Job run ID
289
- aws_conn_id: AWS connection ID
290
"""
291
```
292
293
### EMR Triggers
294
295
Asynchronous triggers for efficient EMR monitoring without blocking workers.
296
297
```python { .api }
298
class EmrJobFlowTrigger(BaseTrigger):
299
def __init__(self, job_flow_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
300
"""
301
Asynchronous trigger for EMR cluster state monitoring.
302
303
Parameters:
304
- job_flow_id: EMR cluster ID
305
- target_states: List of acceptable target states
306
- failed_states: List of states considered as failures
307
- aws_conn_id: AWS connection ID
308
- poll_interval: Polling interval in seconds
309
"""
310
311
class EmrStepTrigger(BaseTrigger):
312
def __init__(self, job_flow_id: str, step_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', poll_interval: int = 30, **kwargs):
313
"""
314
Asynchronous trigger for EMR step monitoring.
315
316
Parameters:
317
- job_flow_id: EMR cluster ID
318
- step_id: Step ID to monitor
319
- target_states: List of acceptable target states
320
- failed_states: List of states considered as failures
321
- aws_conn_id: AWS connection ID
322
- poll_interval: Polling interval in seconds
323
"""
324
325
class EmrContainerTrigger(BaseTrigger):
326
def __init__(self, job_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
327
"""
328
Asynchronous trigger for EMR on EKS job monitoring.
329
330
Parameters:
331
- job_id: EMR on EKS job ID
332
- aws_conn_id: AWS connection ID
333
- poll_interval: Polling interval in seconds
334
"""
335
```
336
337
## Usage Examples
338
339
### Basic EMR Cluster Workflow
340
341
```python
342
from airflow import DAG
343
from airflow.providers.amazon.aws.operators.emr import (
344
EmrCreateJobFlowOperator,
345
EmrAddStepsOperator,
346
EmrTerminateJobFlowOperator
347
)
348
from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor, EmrStepSensor
349
350
dag = DAG('emr_spark_job', start_date=datetime(2023, 1, 1))
351
352
# Cluster configuration
353
JOB_FLOW_OVERRIDES = {
354
'Name': 'data-processing-cluster',
355
'ReleaseLabel': 'emr-6.10.0',
356
'Applications': [{'Name': 'Spark'}, {'Name': 'Hadoop'}],
357
'Instances': {
358
'InstanceGroups': [
359
{
360
'Name': 'Master nodes',
361
'Market': 'ON_DEMAND',
362
'InstanceRole': 'MASTER',
363
'InstanceType': 'm5.xlarge',
364
'InstanceCount': 1,
365
},
366
{
367
'Name': 'Core nodes',
368
'Market': 'ON_DEMAND',
369
'InstanceRole': 'CORE',
370
'InstanceType': 'm5.xlarge',
371
'InstanceCount': 2,
372
}
373
],
374
'Ec2KeyName': 'my-key-pair',
375
'KeepJobFlowAliveWhenNoSteps': False,
376
},
377
'JobFlowRole': 'EMR_EC2_DefaultRole',
378
'ServiceRole': 'EMR_DefaultRole',
379
'LogUri': 's3://my-emr-logs/',
380
}
381
382
# Spark job steps
383
SPARK_STEPS = [
384
{
385
'Name': 'Data Processing Job',
386
'ActionOnFailure': 'TERMINATE_CLUSTER',
387
'HadoopJarStep': {
388
'Jar': 'command-runner.jar',
389
'Args': [
390
'spark-submit',
391
'--class', 'com.example.DataProcessor',
392
's3://my-spark-jobs/data-processor.jar',
393
'--input', 's3://my-data/input/{{ ds }}/',
394
'--output', 's3://my-data/output/{{ ds }}/',
395
'--date', '{{ ds }}'
396
],
397
},
398
}
399
]
400
401
# Create cluster
402
create_cluster = EmrCreateJobFlowOperator(
403
task_id='create_cluster',
404
job_flow_overrides=JOB_FLOW_OVERRIDES,
405
dag=dag
406
)
407
408
# Wait for cluster to be ready
409
wait_for_cluster = EmrJobFlowSensor(
410
task_id='wait_for_cluster',
411
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
412
target_states=['WAITING'],
413
dag=dag
414
)
415
416
# Add processing steps
417
add_steps = EmrAddStepsOperator(
418
task_id='add_steps',
419
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
420
steps=SPARK_STEPS,
421
dag=dag
422
)
423
424
# Wait for steps to complete
425
wait_for_step = EmrStepSensor(
426
task_id='wait_for_step',
427
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
428
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
429
target_states=['COMPLETED'],
430
dag=dag
431
)
432
433
create_cluster >> wait_for_cluster >> add_steps >> wait_for_step
434
```
435
436
### EMR Serverless Job
437
438
```python
439
from airflow.providers.amazon.aws.operators.emr import (
440
EmrServerlessCreateApplicationOperator,
441
EmrServerlessStartJobOperator,
442
EmrServerlessDeleteApplicationOperator
443
)
444
445
# Create serverless application
446
create_app = EmrServerlessCreateApplicationOperator(
447
task_id='create_serverless_app',
448
release_label='emr-6.10.0',
449
job_type='SPARK',
450
name='data-processing-app',
451
initial_capacity={
452
'DRIVER': {
453
'workerCount': 1,
454
'workerConfiguration': {
455
'cpu': '2 vCPU',
456
'memory': '4 GB'
457
}
458
},
459
'EXECUTOR': {
460
'workerCount': 4,
461
'workerConfiguration': {
462
'cpu': '4 vCPU',
463
'memory': '8 GB'
464
}
465
}
466
},
467
maximum_capacity={
468
'DRIVER': {
469
'workerCount': 1,
470
'workerConfiguration': {
471
'cpu': '2 vCPU',
472
'memory': '4 GB'
473
}
474
},
475
'EXECUTOR': {
476
'workerCount': 10,
477
'workerConfiguration': {
478
'cpu': '4 vCPU',
479
'memory': '8 GB'
480
}
481
}
482
},
483
dag=dag
484
)
485
486
# Run Spark job
487
run_job = EmrServerlessStartJobOperator(
488
task_id='run_spark_job',
489
application_id="{{ task_instance.xcom_pull(task_ids='create_serverless_app', key='application_id') }}",
490
execution_role_arn='arn:aws:iam::123456789012:role/EMRServerlessExecutionRole',
491
job_driver={
492
'sparkSubmit': {
493
'entryPoint': 's3://my-spark-jobs/data-processor.py',
494
'entryPointArguments': [
495
'--input', 's3://my-data/input/{{ ds }}/',
496
'--output', 's3://my-data/output/{{ ds }}/'
497
],
498
'sparkSubmitParameters': '--conf spark.sql.adaptive.enabled=true'
499
}
500
},
501
configuration_overrides={
502
'monitoringConfiguration': {
503
's3MonitoringConfiguration': {
504
'logUri': 's3://my-emr-serverless-logs/'
505
}
506
}
507
},
508
wait_for_completion=True,
509
dag=dag
510
)
511
512
# Clean up application
513
delete_app = EmrServerlessDeleteApplicationOperator(
514
task_id='delete_serverless_app',
515
application_id="{{ task_instance.xcom_pull(task_ids='create_serverless_app', key='application_id') }}",
516
dag=dag
517
)
518
519
create_app >> run_job >> delete_app
520
```
521
522
### EMR on EKS Job
523
524
```python
525
from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator
526
527
# Run job on EMR on EKS
528
emr_eks_job = EmrContainerOperator(
529
task_id='run_emr_eks_job',
530
name='data-processing-job',
531
virtual_cluster_id='abc123def456',
532
execution_role_arn='arn:aws:iam::123456789012:role/EMRContainersExecutionRole',
533
release_label='emr-6.10.0-latest',
534
job_driver={
535
'sparkSubmitJobDriver': {
536
'entryPoint': 's3://my-spark-jobs/data-processor.py',
537
'entryPointArguments': [
538
'--input-path', 's3://my-data/input/{{ ds }}/',
539
'--output-path', 's3://my-data/output/{{ ds }}/'
540
],
541
'sparkSubmitParameters': '--conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true'
542
}
543
},
544
configuration_overrides={
545
'applicationConfiguration': [
546
{
547
'classification': 'spark-defaults',
548
'properties': {
549
'spark.sql.adaptive.enabled': 'true',
550
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
551
'spark.kubernetes.container.image': 'my-account.dkr.ecr.us-east-1.amazonaws.com/spark:latest'
552
}
553
}
554
],
555
'monitoringConfiguration': {
556
's3MonitoringConfiguration': {
557
'logUri': 's3://my-emr-eks-logs/'
558
}
559
}
560
},
561
dag=dag
562
)
563
```
564
565
## Types
566
567
```python { .api }
568
# EMR cluster states
569
class EmrClusterState:
570
STARTING = 'STARTING'
571
BOOTSTRAPPING = 'BOOTSTRAPPING'
572
RUNNING = 'RUNNING'
573
WAITING = 'WAITING'
574
TERMINATING = 'TERMINATING'
575
TERMINATED = 'TERMINATED'
576
TERMINATED_WITH_ERRORS = 'TERMINATED_WITH_ERRORS'
577
578
# EMR step states
579
class EmrStepState:
580
PENDING = 'PENDING'
581
CANCEL_PENDING = 'CANCEL_PENDING'
582
RUNNING = 'RUNNING'
583
COMPLETED = 'COMPLETED'
584
CANCELLED = 'CANCELLED'
585
FAILED = 'FAILED'
586
INTERRUPTED = 'INTERRUPTED'
587
588
# EMR instance types
589
class EmrInstanceType:
590
M5_LARGE = 'm5.large'
591
M5_XLARGE = 'm5.xlarge'
592
M5_2XLARGE = 'm5.2xlarge'
593
M5_4XLARGE = 'm5.4xlarge'
594
M5_8XLARGE = 'm5.8xlarge'
595
C5_LARGE = 'c5.large'
596
C5_XLARGE = 'c5.xlarge'
597
R5_LARGE = 'r5.large'
598
R5_XLARGE = 'r5.xlarge'
599
600
# Job flow configuration
601
class JobFlowConfig:
602
name: str
603
release_label: str
604
applications: list
605
instances: dict
606
steps: list = None
607
bootstrap_actions: list = None
608
configurations: list = None
609
service_role: str
610
job_flow_role: str
611
security_configuration: str = None
612
auto_scaling_role: str = None
613
scale_down_behavior: str = None
614
custom_ami_id: str = None
615
ebs_root_volume_size: int = None
616
repo_upgrade_on_boot: str = None
617
kerberos_attributes: dict = None
618
step_concurrency_level: int = 1
619
managed_scaling_policy: dict = None
620
placement_group_configs: list = None
621
auto_termination_policy: dict = None
622
os_release_label: str = None
623
log_uri: str = None
624
log_encryption_kms_key_id: str = None
625
additional_info: str = None
626
tags: list = None
627
628
# Instance group configuration
629
class InstanceGroupConfig:
630
name: str
631
instance_role: str # 'MASTER', 'CORE', 'TASK'
632
instance_type: str
633
instance_count: int
634
market: str = 'ON_DEMAND' # 'ON_DEMAND', 'SPOT'
635
bid_price: str = None
636
ebs_configuration: dict = None
637
auto_scaling_policy: dict = None
638
configurations: list = None
639
custom_ami_id: str = None
640
641
# EMR step configuration
642
class StepConfig:
643
name: str
644
action_on_failure: str # 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
645
hadoop_jar_step: dict
646
647
# EMR Serverless configuration
648
class EmrServerlessConfig:
649
release_label: str
650
job_type: str # 'SPARK', 'HIVE'
651
name: str = None
652
initial_capacity: dict = None
653
maximum_capacity: dict = None
654
auto_start_configuration: dict = None
655
auto_stop_configuration: dict = None
656
network_configuration: dict = None
657
tags: dict = None
658
659
# Worker configuration for EMR Serverless
660
class WorkerConfiguration:
661
cpu: str # e.g., '2 vCPU', '4 vCPU'
662
memory: str # e.g., '4 GB', '8 GB'
663
disk: str = None # e.g., '20 GB'
664
```