CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

emr-clusters.mddocs/

EMR Cluster Management

Amazon EMR (Elastic MapReduce) integration for big data processing and analytics workloads. Provides cluster lifecycle management, job execution, and monitoring capabilities for Hadoop, Spark, and other big data frameworks running on AWS.

Capabilities

EMR Hook

Core EMR client providing comprehensive cluster and job management functionality.

class EmrHook(AwsBaseHook):
    def __init__(self, aws_conn_id: str = 'aws_default', emr_conn_id: str = None, **kwargs):
        """
        Initialize EMR Hook.
        
        Parameters:
        - aws_conn_id: AWS connection ID
        - emr_conn_id: EMR-specific connection ID
        """

    def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list) -> str:
        """
        Get cluster ID by name and states.
        
        Parameters:
        - emr_cluster_name: Name of the EMR cluster
        - cluster_states: List of acceptable cluster states
        
        Returns:
        Cluster ID if found
        """

    def create_job_flow(self, job_flow_overrides: dict = None, **kwargs) -> str:
        """
        Create EMR cluster (job flow).
        
        Parameters:
        - job_flow_overrides: Configuration overrides for cluster creation
        
        Returns:
        Cluster ID
        """

    def add_job_flow_steps(self, job_flow_id: str, steps: list, **kwargs) -> list:
        """
        Add steps to running EMR cluster.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - steps: List of step configurations
        
        Returns:
        List of step IDs
        """

    def describe_cluster(self, job_flow_id: str) -> dict:
        """
        Get EMR cluster details.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        
        Returns:
        Cluster configuration and status
        """

    def describe_step(self, job_flow_id: str, step_id: str) -> dict:
        """
        Get EMR step details.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - step_id: Step ID
        
        Returns:
        Step configuration and status
        """

    def list_steps(self, job_flow_id: str, step_states: list = None, step_ids: list = None) -> list:
        """
        List steps in EMR cluster.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - step_states: Filter by step states
        - step_ids: Filter by specific step IDs
        
        Returns:
        List of step details
        """

    def terminate_job_flow(self, job_flow_id: str) -> None:
        """
        Terminate EMR cluster.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        """

    def modify_cluster(self, job_flow_id: str, step_concurrency_level: int) -> None:
        """
        Modify EMR cluster configuration.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - step_concurrency_level: Number of concurrent steps
        """

    def get_job_flow_state(self, job_flow_id: str) -> str:
        """
        Get EMR cluster state.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        
        Returns:
        Current cluster state
        """

    def check_query_output(self, qubole_conn_id: str, command_id: str) -> str:
        """
        Check query output status.
        
        Parameters:
        - qubole_conn_id: Qubole connection ID
        - command_id: Command ID
        
        Returns:
        Query output status
        """

EMR Operators

Task implementations for EMR cluster and job management operations.

class EmrCreateJobFlowOperator(BaseOperator):
    def __init__(self, job_flow_overrides: dict = None, aws_conn_id: str = 'aws_default', emr_conn_id: str = None, **kwargs):
        """
        Create EMR cluster.
        
        Parameters:
        - job_flow_overrides: Cluster configuration overrides
        - aws_conn_id: AWS connection ID
        - emr_conn_id: EMR-specific connection ID
        """

class EmrTerminateJobFlowOperator(BaseOperator):
    def __init__(self, job_flow_id: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Terminate EMR cluster.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - aws_conn_id: AWS connection ID
        """

class EmrAddStepsOperator(BaseOperator):
    def __init__(self, job_flow_id: str, steps: list = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Add steps to EMR cluster.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - steps: List of step configurations
        - aws_conn_id: AWS connection ID
        """

class EmrModifyClusterOperator(BaseOperator):
    def __init__(self, job_flow_id: str, step_concurrency_level: int, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Modify EMR cluster configuration.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - step_concurrency_level: Number of concurrent steps
        - aws_conn_id: AWS connection ID
        """

class EmrContainerOperator(BaseOperator):
    def __init__(self, name: str, virtual_cluster_id: str, execution_role_arn: str, release_label: str, job_driver: dict, configuration_overrides: dict = None, aws_conn_id: str = 'aws_default', poll_interval: int = 30, max_tries: int = None, **kwargs):
        """
        Run job on EMR on EKS.
        
        Parameters:
        - name: Job name
        - virtual_cluster_id: EMR on EKS virtual cluster ID
        - execution_role_arn: IAM role ARN for job execution
        - release_label: EMR release version
        - job_driver: Job driver configuration
        - configuration_overrides: Additional configuration overrides
        - aws_conn_id: AWS connection ID
        - poll_interval: Polling interval in seconds
        - max_tries: Maximum number of polling attempts
        """

class EmrServerlessCreateApplicationOperator(BaseOperator):
    def __init__(self, release_label: str, job_type: str, name: str = None, initial_capacity: dict = None, maximum_capacity: dict = None, tags: dict = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Create EMR Serverless application.
        
        Parameters:
        - release_label: EMR release version
        - job_type: Type of job ('SPARK' or 'HIVE')
        - name: Application name
        - initial_capacity: Initial capacity configuration
        - maximum_capacity: Maximum capacity configuration
        - tags: Resource tags
        - aws_conn_id: AWS connection ID
        """

class EmrServerlessStartJobOperator(BaseOperator):
    def __init__(self, application_id: str, execution_role_arn: str, job_driver: dict, configuration_overrides: dict = None, name: str = None, tags: dict = None, aws_conn_id: str = 'aws_default', wait_for_completion: bool = True, **kwargs):
        """
        Start EMR Serverless job.
        
        Parameters:
        - application_id: EMR Serverless application ID
        - execution_role_arn: IAM role ARN for job execution
        - job_driver: Job driver configuration
        - configuration_overrides: Additional configuration overrides
        - name: Job name
        - tags: Job tags
        - aws_conn_id: AWS connection ID
        - wait_for_completion: Wait for job completion
        """

class EmrServerlessDeleteApplicationOperator(BaseOperator):
    def __init__(self, application_id: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Delete EMR Serverless application.
        
        Parameters:
        - application_id: EMR Serverless application ID
        - aws_conn_id: AWS connection ID
        """

EMR Sensors

Monitoring tasks that wait for EMR cluster states and job completion.

class EmrJobFlowSensor(BaseSensorOperator):
    def __init__(self, job_flow_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for EMR cluster to reach target state.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - target_states: List of acceptable target states
        - failed_states: List of states considered as failures
        - aws_conn_id: AWS connection ID
        """

class EmrStepSensor(BaseSensorOperator):
    def __init__(self, job_flow_id: str, step_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for EMR step completion.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - step_id: Step ID to monitor
        - target_states: List of acceptable target states
        - failed_states: List of states considered as failures
        - aws_conn_id: AWS connection ID
        """

class EmrContainerSensor(BaseSensorOperator):
    def __init__(self, job_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
        """
        Wait for EMR on EKS job completion.
        
        Parameters:
        - job_id: EMR on EKS job ID
        - aws_conn_id: AWS connection ID
        - poll_interval: Polling interval in seconds
        """

class EmrServerlessJobSensor(BaseSensorOperator):
    def __init__(self, application_id: str, job_run_id: str, aws_conn_id: str = 'aws_default', **kwargs):
        """
        Wait for EMR Serverless job completion.
        
        Parameters:
        - application_id: EMR Serverless application ID
        - job_run_id: Job run ID
        - aws_conn_id: AWS connection ID
        """

EMR Triggers

Asynchronous triggers for efficient EMR monitoring without blocking workers.

class EmrJobFlowTrigger(BaseTrigger):
    def __init__(self, job_flow_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
        """
        Asynchronous trigger for EMR cluster state monitoring.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - target_states: List of acceptable target states
        - failed_states: List of states considered as failures
        - aws_conn_id: AWS connection ID
        - poll_interval: Polling interval in seconds
        """

class EmrStepTrigger(BaseTrigger):
    def __init__(self, job_flow_id: str, step_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', poll_interval: int = 30, **kwargs):
        """
        Asynchronous trigger for EMR step monitoring.
        
        Parameters:
        - job_flow_id: EMR cluster ID
        - step_id: Step ID to monitor
        - target_states: List of acceptable target states
        - failed_states: List of states considered as failures
        - aws_conn_id: AWS connection ID
        - poll_interval: Polling interval in seconds
        """

class EmrContainerTrigger(BaseTrigger):
    def __init__(self, job_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
        """
        Asynchronous trigger for EMR on EKS job monitoring.
        
        Parameters:
        - job_id: EMR on EKS job ID
        - aws_conn_id: AWS connection ID
        - poll_interval: Polling interval in seconds
        """

Usage Examples

Basic EMR Cluster Workflow

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrCreateJobFlowOperator,
    EmrAddStepsOperator,
    EmrTerminateJobFlowOperator
)
from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor, EmrStepSensor

dag = DAG('emr_spark_job', start_date=datetime(2023, 1, 1))

# Cluster configuration
JOB_FLOW_OVERRIDES = {
    'Name': 'data-processing-cluster',
    'ReleaseLabel': 'emr-6.10.0',
    'Applications': [{'Name': 'Spark'}, {'Name': 'Hadoop'}],
    'Instances': {
        'InstanceGroups': [
            {
                'Name': 'Master nodes',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': 'Core nodes',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 2,
            }
        ],
        'Ec2KeyName': 'my-key-pair',
        'KeepJobFlowAliveWhenNoSteps': False,
    },
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'LogUri': 's3://my-emr-logs/',
}

# Spark job steps
SPARK_STEPS = [
    {
        'Name': 'Data Processing Job',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                'spark-submit',
                '--class', 'com.example.DataProcessor',
                's3://my-spark-jobs/data-processor.jar',
                '--input', 's3://my-data/input/{{ ds }}/',
                '--output', 's3://my-data/output/{{ ds }}/',
                '--date', '{{ ds }}'
            ],
        },
    }
]

# Create cluster
create_cluster = EmrCreateJobFlowOperator(
    task_id='create_cluster',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    dag=dag
)

# Wait for cluster to be ready
wait_for_cluster = EmrJobFlowSensor(
    task_id='wait_for_cluster',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
    target_states=['WAITING'],
    dag=dag
)

# Add processing steps
add_steps = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
    steps=SPARK_STEPS,
    dag=dag
)

# Wait for steps to complete
wait_for_step = EmrStepSensor(
    task_id='wait_for_step',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
    target_states=['COMPLETED'],
    dag=dag
)

create_cluster >> wait_for_cluster >> add_steps >> wait_for_step

EMR Serverless Job

from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessCreateApplicationOperator,
    EmrServerlessStartJobOperator,
    EmrServerlessDeleteApplicationOperator
)

# Create serverless application
create_app = EmrServerlessCreateApplicationOperator(
    task_id='create_serverless_app',
    release_label='emr-6.10.0',
    job_type='SPARK',
    name='data-processing-app',
    initial_capacity={
        'DRIVER': {
            'workerCount': 1,
            'workerConfiguration': {
                'cpu': '2 vCPU',
                'memory': '4 GB'
            }
        },
        'EXECUTOR': {
            'workerCount': 4,
            'workerConfiguration': {
                'cpu': '4 vCPU',
                'memory': '8 GB'
            }
        }
    },
    maximum_capacity={
        'DRIVER': {
            'workerCount': 1,
            'workerConfiguration': {
                'cpu': '2 vCPU',
                'memory': '4 GB'
            }
        },
        'EXECUTOR': {
            'workerCount': 10,
            'workerConfiguration': {
                'cpu': '4 vCPU',
                'memory': '8 GB'
            }
        }
    },
    dag=dag
)

# Run Spark job
run_job = EmrServerlessStartJobOperator(
    task_id='run_spark_job',
    application_id="{{ task_instance.xcom_pull(task_ids='create_serverless_app', key='application_id') }}",
    execution_role_arn='arn:aws:iam::123456789012:role/EMRServerlessExecutionRole',
    job_driver={
        'sparkSubmit': {
            'entryPoint': 's3://my-spark-jobs/data-processor.py',
            'entryPointArguments': [
                '--input', 's3://my-data/input/{{ ds }}/',
                '--output', 's3://my-data/output/{{ ds }}/'
            ],
            'sparkSubmitParameters': '--conf spark.sql.adaptive.enabled=true'
        }
    },
    configuration_overrides={
        'monitoringConfiguration': {
            's3MonitoringConfiguration': {
                'logUri': 's3://my-emr-serverless-logs/'
            }
        }
    },
    wait_for_completion=True,
    dag=dag
)

# Clean up application
delete_app = EmrServerlessDeleteApplicationOperator(
    task_id='delete_serverless_app',
    application_id="{{ task_instance.xcom_pull(task_ids='create_serverless_app', key='application_id') }}",
    dag=dag
)

create_app >> run_job >> delete_app

EMR on EKS Job

from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator

# Run job on EMR on EKS
emr_eks_job = EmrContainerOperator(
    task_id='run_emr_eks_job',
    name='data-processing-job',
    virtual_cluster_id='abc123def456',
    execution_role_arn='arn:aws:iam::123456789012:role/EMRContainersExecutionRole',
    release_label='emr-6.10.0-latest',
    job_driver={
        'sparkSubmitJobDriver': {
            'entryPoint': 's3://my-spark-jobs/data-processor.py',
            'entryPointArguments': [
                '--input-path', 's3://my-data/input/{{ ds }}/',
                '--output-path', 's3://my-data/output/{{ ds }}/'
            ],
            'sparkSubmitParameters': '--conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true'
        }
    },
    configuration_overrides={
        'applicationConfiguration': [
            {
                'classification': 'spark-defaults',
                'properties': {
                    'spark.sql.adaptive.enabled': 'true',
                    'spark.sql.adaptive.coalescePartitions.enabled': 'true',
                    'spark.kubernetes.container.image': 'my-account.dkr.ecr.us-east-1.amazonaws.com/spark:latest'
                }
            }
        ],
        'monitoringConfiguration': {
            's3MonitoringConfiguration': {
                'logUri': 's3://my-emr-eks-logs/'
            }
        }
    },
    dag=dag
)

Types

# EMR cluster states
class EmrClusterState:
    STARTING = 'STARTING'
    BOOTSTRAPPING = 'BOOTSTRAPPING'
    RUNNING = 'RUNNING'
    WAITING = 'WAITING'
    TERMINATING = 'TERMINATING'
    TERMINATED = 'TERMINATED'
    TERMINATED_WITH_ERRORS = 'TERMINATED_WITH_ERRORS'

# EMR step states
class EmrStepState:
    PENDING = 'PENDING'
    CANCEL_PENDING = 'CANCEL_PENDING'
    RUNNING = 'RUNNING'
    COMPLETED = 'COMPLETED'
    CANCELLED = 'CANCELLED'
    FAILED = 'FAILED'
    INTERRUPTED = 'INTERRUPTED'

# EMR instance types
class EmrInstanceType:
    M5_LARGE = 'm5.large'
    M5_XLARGE = 'm5.xlarge'
    M5_2XLARGE = 'm5.2xlarge'
    M5_4XLARGE = 'm5.4xlarge'
    M5_8XLARGE = 'm5.8xlarge'
    C5_LARGE = 'c5.large'
    C5_XLARGE = 'c5.xlarge'
    R5_LARGE = 'r5.large' 
    R5_XLARGE = 'r5.xlarge'

# Job flow configuration
class JobFlowConfig:
    name: str
    release_label: str
    applications: list
    instances: dict
    steps: list = None
    bootstrap_actions: list = None
    configurations: list = None
    service_role: str
    job_flow_role: str
    security_configuration: str = None
    auto_scaling_role: str = None
    scale_down_behavior: str = None
    custom_ami_id: str = None
    ebs_root_volume_size: int = None
    repo_upgrade_on_boot: str = None
    kerberos_attributes: dict = None
    step_concurrency_level: int = 1
    managed_scaling_policy: dict = None
    placement_group_configs: list = None
    auto_termination_policy: dict = None
    os_release_label: str = None
    log_uri: str = None
    log_encryption_kms_key_id: str = None
    additional_info: str = None
    tags: list = None

# Instance group configuration
class InstanceGroupConfig:
    name: str
    instance_role: str  # 'MASTER', 'CORE', 'TASK'
    instance_type: str
    instance_count: int
    market: str = 'ON_DEMAND'  # 'ON_DEMAND', 'SPOT'
    bid_price: str = None
    ebs_configuration: dict = None
    auto_scaling_policy: dict = None
    configurations: list = None
    custom_ami_id: str = None

# EMR step configuration
class StepConfig:
    name: str
    action_on_failure: str  # 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'
    hadoop_jar_step: dict

# EMR Serverless configuration
class EmrServerlessConfig:
    release_label: str
    job_type: str  # 'SPARK', 'HIVE'
    name: str = None
    initial_capacity: dict = None
    maximum_capacity: dict = None
    auto_start_configuration: dict = None
    auto_stop_configuration: dict = None
    network_configuration: dict = None
    tags: dict = None

# Worker configuration for EMR Serverless
class WorkerConfiguration:
    cpu: str  # e.g., '2 vCPU', '4 vCPU'
    memory: str  # e.g., '4 GB', '8 GB'
    disk: str = None  # e.g., '20 GB'

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json