CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

ecs-containers.mddocs/

ECS Container Orchestration

Amazon ECS (Elastic Container Service) integration for running containerized applications and tasks. Provides task execution, service management, and cluster operations for both EC2 and Fargate launch types.

Capabilities

ECS Hook

Core ECS client providing container orchestration and task management functionality.

class EcsHook(AwsBaseHook):
    def __init__(self, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
        """
        Initialize ECS Hook.
        
        Parameters:
        - aws_conn_id: AWS connection ID
        - region_name: AWS region name
        """

    def run_task(self, task_definition: str, cluster: str, overrides: dict = None, count: int = 1, started_by: str = None, group: str = None, placement_constraints: list = None, placement_strategy: list = None, platform_version: str = None, network_configuration: dict = None, tags: list = None, enable_execute_command: bool = False, propagate_tags: str = None, reference_id: str = None, **kwargs) -> str:
        """
        Run ECS task.
        
        Parameters:
        - task_definition: Task definition ARN or family:revision
        - cluster: ECS cluster name or ARN
        - overrides: Task definition overrides
        - count: Number of tasks to run
        - started_by: Optional started_by tag
        - group: Task group
        - placement_constraints: Task placement constraints
        - placement_strategy: Task placement strategy
        - platform_version: Fargate platform version
        - network_configuration: Network configuration for awsvpc mode
        - tags: Task tags
        - enable_execute_command: Enable ECS Exec
        - propagate_tags: Tag propagation ('TASK_DEFINITION', 'SERVICE', 'NONE')
        - reference_id: Reference ID for task
        
        Returns:
        Task ARN
        """

    def describe_tasks(self, cluster: str, tasks: list, include: list = None) -> dict:
        """
        Get ECS task details.
        
        Parameters:
        - cluster: ECS cluster name or ARN
        - tasks: List of task ARNs or IDs
        - include: Additional task information to include
        
        Returns:
        Task descriptions
        """

    def describe_task_definition(self, task_definition: str, include: list = None) -> dict:
        """
        Get task definition details.
        
        Parameters:
        - task_definition: Task definition ARN or family:revision
        - include: Additional information to include
        
        Returns:
        Task definition description
        """

    def list_tasks(self, cluster: str = None, container_instance: str = None, family: str = None, started_by: str = None, service_name: str = None, desired_status: str = None, launch_type: str = None) -> list:
        """
        List ECS tasks.
        
        Parameters:
        - cluster: ECS cluster name or ARN
        - container_instance: Container instance ARN or ID
        - family: Task definition family
        - started_by: Started by filter
        - service_name: Service name filter
        - desired_status: Task status filter
        - launch_type: Launch type filter ('EC2', 'FARGATE')
        
        Returns:
        List of task ARNs
        """

    def stop_task(self, cluster: str, task: str, reason: str = None) -> dict:
        """
        Stop running ECS task.
        
        Parameters:
        - cluster: ECS cluster name or ARN
        - task: Task ARN or ID
        - reason: Reason for stopping task
        
        Returns:
        Stop task response
        """

    def describe_clusters(self, clusters: list = None, include: list = None) -> dict:
        """
        Describe ECS clusters.
        
        Parameters:
        - clusters: List of cluster names or ARNs
        - include: Additional cluster information to include
        
        Returns:
        Cluster descriptions
        """

    def describe_services(self, cluster: str, services: list, include: list = None) -> dict:
        """
        Describe ECS services.
        
        Parameters:
        - cluster: ECS cluster name or ARN
        - services: List of service names or ARNs
        - include: Additional service information to include
        
        Returns:
        Service descriptions
        """

    def get_task_logs(self, task_arn: str, container_name: str = None, start_time: int = None, end_time: int = None, next_token: str = None) -> dict:
        """
        Get CloudWatch logs for ECS task.
        
        Parameters:
        - task_arn: Task ARN
        - container_name: Container name filter
        - start_time: Log start time (Unix timestamp)
        - end_time: Log end time (Unix timestamp)
        - next_token: Pagination token
        
        Returns:
        Task logs
        """

    def wait_until_task_stopped(self, cluster: str, tasks: list, max_attempts: int = 100, delay: int = 15) -> None:
        """
        Wait until ECS tasks are stopped.
        
        Parameters:
        - cluster: ECS cluster name or ARN
        - tasks: List of task ARNs
        - max_attempts: Maximum wait attempts
        - delay: Delay between attempts in seconds
        """

    def wait_until_task_running(self, cluster: str, tasks: list, max_attempts: int = 100, delay: int = 15) -> None:
        """
        Wait until ECS tasks are running.
        
        Parameters:
        - cluster: ECS cluster name or ARN
        - tasks: List of task ARNs
        - max_attempts: Maximum wait attempts
        - delay: Delay between attempts in seconds
        """

ECS Operators

Task implementations for ECS container operations.

class EcsRunTaskOperator(BaseOperator):
    def __init__(self, task_definition: str, cluster: str, overrides: dict = None, aws_conn_id: str = 'aws_default', region_name: str = None, launch_type: str = 'EC2', capacity_provider_strategy: list = None, platform_version: str = None, group: str = None, placement_constraints: list = None, placement_strategy: list = None, network_configuration: dict = None, tags: dict = None, awslogs_group: str = None, awslogs_region: str = None, awslogs_stream_prefix: str = None, reattach: bool = False, number_logs_exception: int = 10, **kwargs):
        """
        Run ECS task.
        
        Parameters:
        - task_definition: Task definition ARN or family:revision
        - cluster: ECS cluster name or ARN
        - overrides: Task definition overrides
        - aws_conn_id: AWS connection ID
        - region_name: AWS region name
        - launch_type: Launch type ('EC2', 'FARGATE')
        - capacity_provider_strategy: Capacity provider strategy
        - platform_version: Fargate platform version
        - group: Task group
        - placement_constraints: Task placement constraints
        - placement_strategy: Task placement strategy
        - network_configuration: Network configuration
        - tags: Task tags
        - awslogs_group: CloudWatch log group
        - awslogs_region: CloudWatch log region
        - awslogs_stream_prefix: Log stream prefix
        - reattach: Reattach to existing task
        - number_logs_exception: Number of log lines for exceptions
        """

class EcsOperator(BaseOperator):
    def __init__(self, task_definition: str, cluster: str, overrides: dict = None, aws_conn_id: str = 'aws_default', region_name: str = None, launch_type: str = 'EC2', **kwargs):
        """
        General ECS task operator.
        
        Parameters:
        - task_definition: Task definition ARN or family:revision
        - cluster: ECS cluster name or ARN
        - overrides: Task definition overrides
        - aws_conn_id: AWS connection ID
        - region_name: AWS region name
        - launch_type: Launch type ('EC2', 'FARGATE')
        """

ECS Sensors

Monitoring tasks for ECS task states and service health.

class EcsTaskSensor(BaseSensorOperator):
    def __init__(self, task_id: str, cluster_name: str, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
        """
        Wait for ECS task completion.
        
        Parameters:
        - task_id: ECS task ID or ARN
        - cluster_name: ECS cluster name
        - aws_conn_id: AWS connection ID
        - region_name: AWS region name
        """

ECS Triggers

Asynchronous triggers for ECS task monitoring.

class EcsTaskTrigger(BaseTrigger):
    def __init__(self, cluster: str, task_arn: str, target_state: str = 'STOPPED', aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
        """
        Asynchronous trigger for ECS task state monitoring.
        
        Parameters:
        - cluster: ECS cluster name or ARN
        - task_arn: Task ARN to monitor
        - target_state: Target task state
        - aws_conn_id: AWS connection ID
        - poll_interval: Polling interval in seconds
        """

Usage Examples

Container Task Execution

from airflow import DAG
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator

dag = DAG('ecs_batch_job', start_date=datetime(2023, 1, 1))

# Task definition overrides
task_overrides = {
    'containerOverrides': [
        {
            'name': 'data-processor',
            'environment': [
                {'name': 'INPUT_PATH', 'value': 's3://data-bucket/input/{{ ds }}/'},
                {'name': 'OUTPUT_PATH', 'value': 's3://data-bucket/output/{{ ds }}/'},
                {'name': 'PROCESSING_DATE', 'value': '{{ ds }}'}
            ],
            'cpu': 2048,
            'memory': 4096,
            'command': [
                'python', 'process_data.py',
                '--date', '{{ ds }}',
                '--input-path', 's3://data-bucket/input/{{ ds }}/',
                '--output-path', 's3://data-bucket/output/{{ ds }}/'
            ]
        }
    ]
}

# Run data processing task
run_processor = EcsRunTaskOperator(
    task_id='run_data_processor',
    task_definition='data-processing-task:1',
    cluster='data-processing-cluster',
    launch_type='FARGATE',
    overrides=task_overrides,
    network_configuration={
        'awsvpcConfiguration': {
            'subnets': ['subnet-12345678', 'subnet-87654321'],
            'securityGroups': ['sg-abcdef123'],
            'assignPublicIp': 'ENABLED'
        }
    },
    platform_version='1.4.0',
    awslogs_group='/ecs/data-processing',
    awslogs_region='us-east-1',
    awslogs_stream_prefix='data-processor',
    tags={
        'Environment': 'production',
        'Project': 'data-pipeline',
        'Date': '{{ ds }}'
    },
    dag=dag
)

Multi-Container Batch Processing

# Parallel processing with multiple containers
parallel_processors = []

for partition in range(4):
    task_overrides = {
        'containerOverrides': [
            {
                'name': 'batch-processor',
                'environment': [
                    {'name': 'PARTITION_ID', 'value': str(partition)},
                    {'name': 'TOTAL_PARTITIONS', 'value': '4'},
                    {'name': 'INPUT_PREFIX', 'value': f's3://data-bucket/partitioned/{partition}/'},
                    {'name': 'OUTPUT_PREFIX', 'value': f's3://results-bucket/partition-{partition}/'}
                ]
            }
        ]
    }
    
    processor = EcsRunTaskOperator(
        task_id=f'process_partition_{partition}',
        task_definition='batch-processing-task:2',
        cluster='batch-cluster',
        launch_type='FARGATE',
        overrides=task_overrides,
        network_configuration={
            'awsvpcConfiguration': {
                'subnets': ['subnet-12345678'],
                'securityGroups': ['sg-batch123'],
                'assignPublicIp': 'DISABLED'
            }
        },
        dag=dag
    )
    
    parallel_processors.append(processor)

# Aggregation task after parallel processing
aggregate_overrides = {
    'containerOverrides': [
        {
            'name': 'aggregator',
            'environment': [
                {'name': 'INPUT_PARTITIONS', 'value': '4'},
                {'name': 'INPUT_PREFIX', 'value': 's3://results-bucket/'},
                {'name': 'FINAL_OUTPUT', 'value': 's3://final-results/{{ ds }}/aggregated.json'}
            ]
        }
    ]
}

aggregate_results = EcsRunTaskOperator(
    task_id='aggregate_results',
    task_definition='result-aggregator:1',
    cluster='batch-cluster',
    launch_type='FARGATE',
    overrides=aggregate_overrides,
    network_configuration={
        'awsvpcConfiguration': {
            'subnets': ['subnet-12345678'],
            'securityGroups': ['sg-batch123'],
            'assignPublicIp': 'DISABLED'
        }
    },
    dag=dag
)

# Dependencies: all parallel processors must complete before aggregation
parallel_processors >> aggregate_results

ML Model Training with ECS

# ML training task with GPU support
training_overrides = {
    'containerOverrides': [
        {
            'name': 'ml-trainer',
            'environment': [
                {'name': 'TRAINING_DATA', 'value': 's3://ml-data/training/{{ ds }}/'},
                {'name': 'MODEL_OUTPUT', 'value': 's3://ml-models/trained/{{ ds }}/'},
                {'name': 'EPOCHS', 'value': '100'},
                {'name': 'BATCH_SIZE', 'value': '32'},
                {'name': 'LEARNING_RATE', 'value': '0.001'}
            ],
            'cpu': 4096,
            'memory': 16384,
            'gpu': 1
        }
    ]
}

train_model = EcsRunTaskOperator(
    task_id='train_ml_model',
    task_definition='ml-training-gpu:3',
    cluster='ml-training-cluster',
    launch_type='EC2',  # Use EC2 for GPU instances
    overrides=training_overrides,
    placement_constraints=[
        {
            'type': 'memberOf',
            'expression': 'attribute:ecs.instance-type =~ p3.*'  # GPU instances
        }
    ],
    awslogs_group='/ecs/ml-training',
    dag=dag
)

Service Health Monitoring

from airflow.providers.amazon.aws.sensors.ecs import EcsTaskSensor

# Wait for long-running task completion
wait_for_training = EcsTaskSensor(
    task_id='wait_for_model_training',
    task_id='{{ task_instance.xcom_pull(task_ids="train_ml_model") }}',
    cluster_name='ml-training-cluster',
    timeout=7200,  # 2 hours
    poke_interval=300,  # Check every 5 minutes
    dag=dag
)

train_model >> wait_for_training

Types

# ECS task states
class EcsTaskState:
    PROVISIONING = 'PROVISIONING'
    PENDING = 'PENDING'
    ACTIVATING = 'ACTIVATING'
    RUNNING = 'RUNNING'
    DEACTIVATING = 'DEACTIVATING'
    STOPPING = 'STOPPING'
    DEPROVISIONING = 'DEPROVISIONING'
    STOPPED = 'STOPPED'

# Launch types
class EcsLaunchType:
    EC2 = 'EC2'
    FARGATE = 'FARGATE'
    EXTERNAL = 'EXTERNAL'

# Network modes
class EcsNetworkMode:
    BRIDGE = 'bridge'
    HOST = 'host'
    AWSVPC = 'awsvpc'
    NONE = 'none'

# Task definition configuration
class TaskDefinitionConfig:
    family: str
    task_role_arn: str = None
    execution_role_arn: str = None
    network_mode: str = 'bridge'
    container_definitions: list
    volumes: list = None
    requires_compatibility: list = None
    cpu: str = None
    memory: str = None
    tags: list = None
    pid_mode: str = None
    ipc_mode: str = None
    proxy_configuration: dict = None
    inference_accelerators: list = None
    ephemeral_storage: dict = None
    runtime_platform: dict = None

# Container definition
class ContainerDefinition:
    name: str
    image: str
    cpu: int = 0
    memory: int = None
    memory_reservation: int = None
    links: list = None
    port_mappings: list = None
    essential: bool = True
    entry_point: list = None
    command: list = None
    environment: list = None
    environment_files: list = None
    mount_points: list = None
    volumes_from: list = None
    linux_parameters: dict = None
    secrets: list = None
    depends_on: list = None
    start_timeout: int = None
    stop_timeout: int = None
    hostname: str = None
    user: str = None
    working_directory: str = None
    disable_networking: bool = None
    privileged: bool = None
    readonly_root_filesystem: bool = None
    dns_servers: list = None
    dns_search_domains: list = None
    extra_hosts: list = None
    docker_security_options: list = None
    interactive: bool = None
    pseudo_terminal: bool = None
    docker_labels: dict = None
    ulimits: list = None
    log_configuration: dict = None
    health_check: dict = None
    system_controls: list = None
    resource_requirements: list = None
    firelens_configuration: dict = None

# Network configuration
class NetworkConfiguration:
    awsvpc_configuration: dict

class AwsVpcConfiguration:
    subnets: list
    security_groups: list = None
    assign_public_ip: str = 'DISABLED'  # 'ENABLED' or 'DISABLED'

# Placement constraint
class PlacementConstraint:
    type: str  # 'distinctInstance' or 'memberOf'
    expression: str = None

# Placement strategy
class PlacementStrategy:
    type: str  # 'random', 'spread', 'binpack'
    field: str = None

# Task overrides
class TaskOverride:
    container_overrides: list = None
    cpu: str = None
    inference_accelerator_overrides: list = None
    execution_role_arn: str = None
    memory: str = None
    task_role_arn: str = None
    ephemeral_storage: dict = None

# Container override
class ContainerOverride:
    name: str = None
    command: list = None
    environment: list = None
    environment_files: list = None
    cpu: int = None
    memory: int = None
    memory_reservation: int = None
    resource_requirements: list = None

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json