Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon EMR (Elastic MapReduce) integration for big data processing and analytics workloads. Provides cluster lifecycle management, job execution, and monitoring capabilities for Hadoop, Spark, and other big data frameworks running on AWS.
Core EMR client providing comprehensive cluster and job management functionality.
class EmrHook(AwsBaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', emr_conn_id: str = None, **kwargs):
"""
Initialize EMR Hook.
Parameters:
- aws_conn_id: AWS connection ID
- emr_conn_id: EMR-specific connection ID
"""
def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list) -> str:
"""
Get cluster ID by name and states.
Parameters:
- emr_cluster_name: Name of the EMR cluster
- cluster_states: List of acceptable cluster states
Returns:
Cluster ID if found
"""
def create_job_flow(self, job_flow_overrides: dict = None, **kwargs) -> str:
"""
Create EMR cluster (job flow).
Parameters:
- job_flow_overrides: Configuration overrides for cluster creation
Returns:
Cluster ID
"""
def add_job_flow_steps(self, job_flow_id: str, steps: list, **kwargs) -> list:
"""
Add steps to running EMR cluster.
Parameters:
- job_flow_id: EMR cluster ID
- steps: List of step configurations
Returns:
List of step IDs
"""
def describe_cluster(self, job_flow_id: str) -> dict:
"""
Get EMR cluster details.
Parameters:
- job_flow_id: EMR cluster ID
Returns:
Cluster configuration and status
"""
def describe_step(self, job_flow_id: str, step_id: str) -> dict:
"""
Get EMR step details.
Parameters:
- job_flow_id: EMR cluster ID
- step_id: Step ID
Returns:
Step configuration and status
"""
def list_steps(self, job_flow_id: str, step_states: list = None, step_ids: list = None) -> list:
"""
List steps in EMR cluster.
Parameters:
- job_flow_id: EMR cluster ID
- step_states: Filter by step states
- step_ids: Filter by specific step IDs
Returns:
List of step details
"""
def terminate_job_flow(self, job_flow_id: str) -> None:
"""
Terminate EMR cluster.
Parameters:
- job_flow_id: EMR cluster ID
"""
def modify_cluster(self, job_flow_id: str, step_concurrency_level: int) -> None:
"""
Modify EMR cluster configuration.
Parameters:
- job_flow_id: EMR cluster ID
- step_concurrency_level: Number of concurrent steps
"""
def get_job_flow_state(self, job_flow_id: str) -> str:
"""
Get EMR cluster state.
Parameters:
- job_flow_id: EMR cluster ID
Returns:
Current cluster state
"""
def check_query_output(self, qubole_conn_id: str, command_id: str) -> str:
"""
Check query output status.
Parameters:
- qubole_conn_id: Qubole connection ID
- command_id: Command ID
Returns:
Query output status
"""Task implementations for EMR cluster and job management operations.
class EmrCreateJobFlowOperator(BaseOperator):
def __init__(self, job_flow_overrides: dict = None, aws_conn_id: str = 'aws_default', emr_conn_id: str = None, **kwargs):
"""
Create EMR cluster.
Parameters:
- job_flow_overrides: Cluster configuration overrides
- aws_conn_id: AWS connection ID
- emr_conn_id: EMR-specific connection ID
"""
class EmrTerminateJobFlowOperator(BaseOperator):
def __init__(self, job_flow_id: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Terminate EMR cluster.
Parameters:
- job_flow_id: EMR cluster ID
- aws_conn_id: AWS connection ID
"""
class EmrAddStepsOperator(BaseOperator):
def __init__(self, job_flow_id: str, steps: list = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Add steps to EMR cluster.
Parameters:
- job_flow_id: EMR cluster ID
- steps: List of step configurations
- aws_conn_id: AWS connection ID
"""
class EmrModifyClusterOperator(BaseOperator):
def __init__(self, job_flow_id: str, step_concurrency_level: int, aws_conn_id: str = 'aws_default', **kwargs):
"""
Modify EMR cluster configuration.
Parameters:
- job_flow_id: EMR cluster ID
- step_concurrency_level: Number of concurrent steps
- aws_conn_id: AWS connection ID
"""
class EmrContainerOperator(BaseOperator):
def __init__(self, name: str, virtual_cluster_id: str, execution_role_arn: str, release_label: str, job_driver: dict, configuration_overrides: dict = None, aws_conn_id: str = 'aws_default', poll_interval: int = 30, max_tries: int = None, **kwargs):
"""
Run job on EMR on EKS.
Parameters:
- name: Job name
- virtual_cluster_id: EMR on EKS virtual cluster ID
- execution_role_arn: IAM role ARN for job execution
- release_label: EMR release version
- job_driver: Job driver configuration
- configuration_overrides: Additional configuration overrides
- aws_conn_id: AWS connection ID
- poll_interval: Polling interval in seconds
- max_tries: Maximum number of polling attempts
"""
class EmrServerlessCreateApplicationOperator(BaseOperator):
def __init__(self, release_label: str, job_type: str, name: str = None, initial_capacity: dict = None, maximum_capacity: dict = None, tags: dict = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Create EMR Serverless application.
Parameters:
- release_label: EMR release version
- job_type: Type of job ('SPARK' or 'HIVE')
- name: Application name
- initial_capacity: Initial capacity configuration
- maximum_capacity: Maximum capacity configuration
- tags: Resource tags
- aws_conn_id: AWS connection ID
"""
class EmrServerlessStartJobOperator(BaseOperator):
def __init__(self, application_id: str, execution_role_arn: str, job_driver: dict, configuration_overrides: dict = None, name: str = None, tags: dict = None, aws_conn_id: str = 'aws_default', wait_for_completion: bool = True, **kwargs):
"""
Start EMR Serverless job.
Parameters:
- application_id: EMR Serverless application ID
- execution_role_arn: IAM role ARN for job execution
- job_driver: Job driver configuration
- configuration_overrides: Additional configuration overrides
- name: Job name
- tags: Job tags
- aws_conn_id: AWS connection ID
- wait_for_completion: Wait for job completion
"""
class EmrServerlessDeleteApplicationOperator(BaseOperator):
def __init__(self, application_id: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Delete EMR Serverless application.
Parameters:
- application_id: EMR Serverless application ID
- aws_conn_id: AWS connection ID
"""Monitoring tasks that wait for EMR cluster states and job completion.
class EmrJobFlowSensor(BaseSensorOperator):
def __init__(self, job_flow_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for EMR cluster to reach target state.
Parameters:
- job_flow_id: EMR cluster ID
- target_states: List of acceptable target states
- failed_states: List of states considered as failures
- aws_conn_id: AWS connection ID
"""
class EmrStepSensor(BaseSensorOperator):
def __init__(self, job_flow_id: str, step_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for EMR step completion.
Parameters:
- job_flow_id: EMR cluster ID
- step_id: Step ID to monitor
- target_states: List of acceptable target states
- failed_states: List of states considered as failures
- aws_conn_id: AWS connection ID
"""
class EmrContainerSensor(BaseSensorOperator):
def __init__(self, job_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
"""
Wait for EMR on EKS job completion.
Parameters:
- job_id: EMR on EKS job ID
- aws_conn_id: AWS connection ID
- poll_interval: Polling interval in seconds
"""
class EmrServerlessJobSensor(BaseSensorOperator):
def __init__(self, application_id: str, job_run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
"""
Wait for EMR Serverless job completion.
Parameters:
- application_id: EMR Serverless application ID
- job_run_id: Job run ID
- aws_conn_id: AWS connection ID
"""Asynchronous triggers for efficient EMR monitoring without blocking workers.
class EmrJobFlowTrigger(BaseTrigger):
def __init__(self, job_flow_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
"""
Asynchronous trigger for EMR cluster state monitoring.
Parameters:
- job_flow_id: EMR cluster ID
- target_states: List of acceptable target states
- failed_states: List of states considered as failures
- aws_conn_id: AWS connection ID
- poll_interval: Polling interval in seconds
"""
class EmrStepTrigger(BaseTrigger):
def __init__(self, job_flow_id: str, step_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', poll_interval: int = 30, **kwargs):
"""
Asynchronous trigger for EMR step monitoring.
Parameters:
- job_flow_id: EMR cluster ID
- step_id: Step ID to monitor
- target_states: List of acceptable target states
- failed_states: List of states considered as failures
- aws_conn_id: AWS connection ID
- poll_interval: Polling interval in seconds
"""
class EmrContainerTrigger(BaseTrigger):
def __init__(self, job_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
"""
Asynchronous trigger for EMR on EKS job monitoring.
Parameters:
- job_id: EMR on EKS job ID
- aws_conn_id: AWS connection ID
- poll_interval: Polling interval in seconds
"""from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
EmrCreateJobFlowOperator,
EmrAddStepsOperator,
EmrTerminateJobFlowOperator
)
from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor, EmrStepSensor
dag = DAG('emr_spark_job', start_date=datetime(2023, 1, 1))
# Cluster configuration
JOB_FLOW_OVERRIDES = {
'Name': 'data-processing-cluster',
'ReleaseLabel': 'emr-6.10.0',
'Applications': [{'Name': 'Spark'}, {'Name': 'Hadoop'}],
'Instances': {
'InstanceGroups': [
{
'Name': 'Master nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
},
{
'Name': 'Core nodes',
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2,
}
],
'Ec2KeyName': 'my-key-pair',
'KeepJobFlowAliveWhenNoSteps': False,
},
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
'LogUri': 's3://my-emr-logs/',
}
# Spark job steps
SPARK_STEPS = [
{
'Name': 'Data Processing Job',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--class', 'com.example.DataProcessor',
's3://my-spark-jobs/data-processor.jar',
'--input', 's3://my-data/input/{{ ds }}/',
'--output', 's3://my-data/output/{{ ds }}/',
'--date', '{{ ds }}'
],
},
}
]
# Create cluster
create_cluster = EmrCreateJobFlowOperator(
task_id='create_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES,
dag=dag
)
# Wait for cluster to be ready
wait_for_cluster = EmrJobFlowSensor(
task_id='wait_for_cluster',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
target_states=['WAITING'],
dag=dag
)
# Add processing steps
add_steps = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
steps=SPARK_STEPS,
dag=dag
)
# Wait for steps to complete
wait_for_step = EmrStepSensor(
task_id='wait_for_step',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
target_states=['COMPLETED'],
dag=dag
)
create_cluster >> wait_for_cluster >> add_steps >> wait_for_stepfrom airflow.providers.amazon.aws.operators.emr import (
EmrServerlessCreateApplicationOperator,
EmrServerlessStartJobOperator,
EmrServerlessDeleteApplicationOperator
)
# Create serverless application
create_app = EmrServerlessCreateApplicationOperator(
task_id='create_serverless_app',
release_label='emr-6.10.0',
job_type='SPARK',
name='data-processing-app',
initial_capacity={
'DRIVER': {
'workerCount': 1,
'workerConfiguration': {
'cpu': '2 vCPU',
'memory': '4 GB'
}
},
'EXECUTOR': {
'workerCount': 4,
'workerConfiguration': {
'cpu': '4 vCPU',
'memory': '8 GB'
}
}
},
maximum_capacity={
'DRIVER': {
'workerCount': 1,
'workerConfiguration': {
'cpu': '2 vCPU',
'memory': '4 GB'
}
},
'EXECUTOR': {
'workerCount': 10,
'workerConfiguration': {
'cpu': '4 vCPU',
'memory': '8 GB'
}
}
},
dag=dag
)
# Run Spark job
run_job = EmrServerlessStartJobOperator(
task_id='run_spark_job',
application_id="{{ task_instance.xcom_pull(task_ids='create_serverless_app', key='application_id') }}",
execution_role_arn='arn:aws:iam::123456789012:role/EMRServerlessExecutionRole',
job_driver={
'sparkSubmit': {
'entryPoint': 's3://my-spark-jobs/data-processor.py',
'entryPointArguments': [
'--input', 's3://my-data/input/{{ ds }}/',
'--output', 's3://my-data/output/{{ ds }}/'
],
'sparkSubmitParameters': '--conf spark.sql.adaptive.enabled=true'
}
},
configuration_overrides={
'monitoringConfiguration': {
's3MonitoringConfiguration': {
'logUri': 's3://my-emr-serverless-logs/'
}
}
},
wait_for_completion=True,
dag=dag
)
# Clean up application
delete_app = EmrServerlessDeleteApplicationOperator(
task_id='delete_serverless_app',
application_id="{{ task_instance.xcom_pull(task_ids='create_serverless_app', key='application_id') }}",
dag=dag
)
create_app >> run_job >> delete_appfrom airflow.providers.amazon.aws.operators.emr import EmrContainerOperator
# Run job on EMR on EKS
emr_eks_job = EmrContainerOperator(
task_id='run_emr_eks_job',
name='data-processing-job',
virtual_cluster_id='abc123def456',
execution_role_arn='arn:aws:iam::123456789012:role/EMRContainersExecutionRole',
release_label='emr-6.10.0-latest',
job_driver={
'sparkSubmitJobDriver': {
'entryPoint': 's3://my-spark-jobs/data-processor.py',
'entryPointArguments': [
'--input-path', 's3://my-data/input/{{ ds }}/',
'--output-path', 's3://my-data/output/{{ ds }}/'
],
'sparkSubmitParameters': '--conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true'
}
},
configuration_overrides={
'applicationConfiguration': [
{
'classification': 'spark-defaults',
'properties': {
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
'spark.kubernetes.container.image': 'my-account.dkr.ecr.us-east-1.amazonaws.com/spark:latest'
}
}
],
'monitoringConfiguration': {
's3MonitoringConfiguration': {
'logUri': 's3://my-emr-eks-logs/'
}
}
},
dag=dag
)# EMR cluster states
class EmrClusterState:
STARTING = 'STARTING'
BOOTSTRAPPING = 'BOOTSTRAPPING'
RUNNING = 'RUNNING'
WAITING = 'WAITING'
TERMINATING = 'TERMINATING'
TERMINATED = 'TERMINATED'
TERMINATED_WITH_ERRORS = 'TERMINATED_WITH_ERRORS'
# EMR step states
class EmrStepState:
PENDING = 'PENDING'
CANCEL_PENDING = 'CANCEL_PENDING'
RUNNING = 'RUNNING'
COMPLETED = 'COMPLETED'
CANCELLED = 'CANCELLED'
FAILED = 'FAILED'
INTERRUPTED = 'INTERRUPTED'
# EMR instance types
class EmrInstanceType:
M5_LARGE = 'm5.large'
M5_XLARGE = 'm5.xlarge'
M5_2XLARGE = 'm5.2xlarge'
M5_4XLARGE = 'm5.4xlarge'
M5_8XLARGE = 'm5.8xlarge'
C5_LARGE = 'c5.large'
C5_XLARGE = 'c5.xlarge'
R5_LARGE = 'r5.large'
R5_XLARGE = 'r5.xlarge'
# Job flow configuration
class JobFlowConfig:
name: str
release_label: str
applications: list
instances: dict
steps: list = None
bootstrap_actions: list = None
configurations: list = None
service_role: str
job_flow_role: str
security_configuration: str = None
auto_scaling_role: str = None
scale_down_behavior: str = None
custom_ami_id: str = None
ebs_root_volume_size: int = None
repo_upgrade_on_boot: str = None
kerberos_attributes: dict = None
step_concurrency_level: int = 1
managed_scaling_policy: dict = None
placement_group_configs: list = None
auto_termination_policy: dict = None
os_release_label: str = None
log_uri: str = None
log_encryption_kms_key_id: str = None
additional_info: str = None
tags: list = None
# Instance group configuration
class InstanceGroupConfig:
name: str
instance_role: str # 'MASTER', 'CORE', 'TASK'
instance_type: str
instance_count: int
market: str = 'ON_DEMAND' # 'ON_DEMAND', 'SPOT'
bid_price: str = None
ebs_configuration: dict = None
auto_scaling_policy: dict = None
configurations: list = None
custom_ami_id: str = None
# EMR step configuration
class StepConfig:
name: str
action_on_failure: str # 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
hadoop_jar_step: dict
# EMR Serverless configuration
class EmrServerlessConfig:
release_label: str
job_type: str # 'SPARK', 'HIVE'
name: str = None
initial_capacity: dict = None
maximum_capacity: dict = None
auto_start_configuration: dict = None
auto_stop_configuration: dict = None
network_configuration: dict = None
tags: dict = None
# Worker configuration for EMR Serverless
class WorkerConfiguration:
cpu: str # e.g., '2 vCPU', '4 vCPU'
memory: str # e.g., '4 GB', '8 GB'
disk: str = None # e.g., '20 GB'Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon