Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon ECS (Elastic Container Service) integration for running containerized applications and tasks. Provides task execution, service management, and cluster operations for both EC2 and Fargate launch types.
Core ECS client providing container orchestration and task management functionality.
class EcsHook(AwsBaseHook):
def __init__(self, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
"""
Initialize ECS Hook.
Parameters:
- aws_conn_id: AWS connection ID
- region_name: AWS region name
"""
def run_task(self, task_definition: str, cluster: str, overrides: dict = None, count: int = 1, started_by: str = None, group: str = None, placement_constraints: list = None, placement_strategy: list = None, platform_version: str = None, network_configuration: dict = None, tags: list = None, enable_execute_command: bool = False, propagate_tags: str = None, reference_id: str = None, **kwargs) -> str:
"""
Run ECS task.
Parameters:
- task_definition: Task definition ARN or family:revision
- cluster: ECS cluster name or ARN
- overrides: Task definition overrides
- count: Number of tasks to run
- started_by: Optional started_by tag
- group: Task group
- placement_constraints: Task placement constraints
- placement_strategy: Task placement strategy
- platform_version: Fargate platform version
- network_configuration: Network configuration for awsvpc mode
- tags: Task tags
- enable_execute_command: Enable ECS Exec
- propagate_tags: Tag propagation ('TASK_DEFINITION', 'SERVICE', 'NONE')
- reference_id: Reference ID for task
Returns:
Task ARN
"""
def describe_tasks(self, cluster: str, tasks: list, include: list = None) -> dict:
"""
Get ECS task details.
Parameters:
- cluster: ECS cluster name or ARN
- tasks: List of task ARNs or IDs
- include: Additional task information to include
Returns:
Task descriptions
"""
def describe_task_definition(self, task_definition: str, include: list = None) -> dict:
"""
Get task definition details.
Parameters:
- task_definition: Task definition ARN or family:revision
- include: Additional information to include
Returns:
Task definition description
"""
def list_tasks(self, cluster: str = None, container_instance: str = None, family: str = None, started_by: str = None, service_name: str = None, desired_status: str = None, launch_type: str = None) -> list:
"""
List ECS tasks.
Parameters:
- cluster: ECS cluster name or ARN
- container_instance: Container instance ARN or ID
- family: Task definition family
- started_by: Started by filter
- service_name: Service name filter
- desired_status: Task status filter
- launch_type: Launch type filter ('EC2', 'FARGATE')
Returns:
List of task ARNs
"""
def stop_task(self, cluster: str, task: str, reason: str = None) -> dict:
"""
Stop running ECS task.
Parameters:
- cluster: ECS cluster name or ARN
- task: Task ARN or ID
- reason: Reason for stopping task
Returns:
Stop task response
"""
def describe_clusters(self, clusters: list = None, include: list = None) -> dict:
"""
Describe ECS clusters.
Parameters:
- clusters: List of cluster names or ARNs
- include: Additional cluster information to include
Returns:
Cluster descriptions
"""
def describe_services(self, cluster: str, services: list, include: list = None) -> dict:
"""
Describe ECS services.
Parameters:
- cluster: ECS cluster name or ARN
- services: List of service names or ARNs
- include: Additional service information to include
Returns:
Service descriptions
"""
def get_task_logs(self, task_arn: str, container_name: str = None, start_time: int = None, end_time: int = None, next_token: str = None) -> dict:
"""
Get CloudWatch logs for ECS task.
Parameters:
- task_arn: Task ARN
- container_name: Container name filter
- start_time: Log start time (Unix timestamp)
- end_time: Log end time (Unix timestamp)
- next_token: Pagination token
Returns:
Task logs
"""
def wait_until_task_stopped(self, cluster: str, tasks: list, max_attempts: int = 100, delay: int = 15) -> None:
"""
Wait until ECS tasks are stopped.
Parameters:
- cluster: ECS cluster name or ARN
- tasks: List of task ARNs
- max_attempts: Maximum wait attempts
- delay: Delay between attempts in seconds
"""
def wait_until_task_running(self, cluster: str, tasks: list, max_attempts: int = 100, delay: int = 15) -> None:
"""
Wait until ECS tasks are running.
Parameters:
- cluster: ECS cluster name or ARN
- tasks: List of task ARNs
- max_attempts: Maximum wait attempts
- delay: Delay between attempts in seconds
"""Task implementations for ECS container operations.
class EcsRunTaskOperator(BaseOperator):
def __init__(self, task_definition: str, cluster: str, overrides: dict = None, aws_conn_id: str = 'aws_default', region_name: str = None, launch_type: str = 'EC2', capacity_provider_strategy: list = None, platform_version: str = None, group: str = None, placement_constraints: list = None, placement_strategy: list = None, network_configuration: dict = None, tags: dict = None, awslogs_group: str = None, awslogs_region: str = None, awslogs_stream_prefix: str = None, reattach: bool = False, number_logs_exception: int = 10, **kwargs):
"""
Run ECS task.
Parameters:
- task_definition: Task definition ARN or family:revision
- cluster: ECS cluster name or ARN
- overrides: Task definition overrides
- aws_conn_id: AWS connection ID
- region_name: AWS region name
- launch_type: Launch type ('EC2', 'FARGATE')
- capacity_provider_strategy: Capacity provider strategy
- platform_version: Fargate platform version
- group: Task group
- placement_constraints: Task placement constraints
- placement_strategy: Task placement strategy
- network_configuration: Network configuration
- tags: Task tags
- awslogs_group: CloudWatch log group
- awslogs_region: CloudWatch log region
- awslogs_stream_prefix: Log stream prefix
- reattach: Reattach to existing task
- number_logs_exception: Number of log lines for exceptions
"""
class EcsOperator(BaseOperator):
def __init__(self, task_definition: str, cluster: str, overrides: dict = None, aws_conn_id: str = 'aws_default', region_name: str = None, launch_type: str = 'EC2', **kwargs):
"""
General ECS task operator.
Parameters:
- task_definition: Task definition ARN or family:revision
- cluster: ECS cluster name or ARN
- overrides: Task definition overrides
- aws_conn_id: AWS connection ID
- region_name: AWS region name
- launch_type: Launch type ('EC2', 'FARGATE')
"""Monitoring tasks for ECS task states and service health.
class EcsTaskSensor(BaseSensorOperator):
def __init__(self, task_id: str, cluster_name: str, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
"""
Wait for ECS task completion.
Parameters:
- task_id: ECS task ID or ARN
- cluster_name: ECS cluster name
- aws_conn_id: AWS connection ID
- region_name: AWS region name
"""Asynchronous triggers for ECS task monitoring.
class EcsTaskTrigger(BaseTrigger):
def __init__(self, cluster: str, task_arn: str, target_state: str = 'STOPPED', aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
"""
Asynchronous trigger for ECS task state monitoring.
Parameters:
- cluster: ECS cluster name or ARN
- task_arn: Task ARN to monitor
- target_state: Target task state
- aws_conn_id: AWS connection ID
- poll_interval: Polling interval in seconds
"""from airflow import DAG
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
dag = DAG('ecs_batch_job', start_date=datetime(2023, 1, 1))
# Task definition overrides
task_overrides = {
'containerOverrides': [
{
'name': 'data-processor',
'environment': [
{'name': 'INPUT_PATH', 'value': 's3://data-bucket/input/{{ ds }}/'},
{'name': 'OUTPUT_PATH', 'value': 's3://data-bucket/output/{{ ds }}/'},
{'name': 'PROCESSING_DATE', 'value': '{{ ds }}'}
],
'cpu': 2048,
'memory': 4096,
'command': [
'python', 'process_data.py',
'--date', '{{ ds }}',
'--input-path', 's3://data-bucket/input/{{ ds }}/',
'--output-path', 's3://data-bucket/output/{{ ds }}/'
]
}
]
}
# Run data processing task
run_processor = EcsRunTaskOperator(
task_id='run_data_processor',
task_definition='data-processing-task:1',
cluster='data-processing-cluster',
launch_type='FARGATE',
overrides=task_overrides,
network_configuration={
'awsvpcConfiguration': {
'subnets': ['subnet-12345678', 'subnet-87654321'],
'securityGroups': ['sg-abcdef123'],
'assignPublicIp': 'ENABLED'
}
},
platform_version='1.4.0',
awslogs_group='/ecs/data-processing',
awslogs_region='us-east-1',
awslogs_stream_prefix='data-processor',
tags={
'Environment': 'production',
'Project': 'data-pipeline',
'Date': '{{ ds }}'
},
dag=dag
)# Parallel processing with multiple containers
parallel_processors = []
for partition in range(4):
task_overrides = {
'containerOverrides': [
{
'name': 'batch-processor',
'environment': [
{'name': 'PARTITION_ID', 'value': str(partition)},
{'name': 'TOTAL_PARTITIONS', 'value': '4'},
{'name': 'INPUT_PREFIX', 'value': f's3://data-bucket/partitioned/{partition}/'},
{'name': 'OUTPUT_PREFIX', 'value': f's3://results-bucket/partition-{partition}/'}
]
}
]
}
processor = EcsRunTaskOperator(
task_id=f'process_partition_{partition}',
task_definition='batch-processing-task:2',
cluster='batch-cluster',
launch_type='FARGATE',
overrides=task_overrides,
network_configuration={
'awsvpcConfiguration': {
'subnets': ['subnet-12345678'],
'securityGroups': ['sg-batch123'],
'assignPublicIp': 'DISABLED'
}
},
dag=dag
)
parallel_processors.append(processor)
# Aggregation task after parallel processing
aggregate_overrides = {
'containerOverrides': [
{
'name': 'aggregator',
'environment': [
{'name': 'INPUT_PARTITIONS', 'value': '4'},
{'name': 'INPUT_PREFIX', 'value': 's3://results-bucket/'},
{'name': 'FINAL_OUTPUT', 'value': 's3://final-results/{{ ds }}/aggregated.json'}
]
}
]
}
aggregate_results = EcsRunTaskOperator(
task_id='aggregate_results',
task_definition='result-aggregator:1',
cluster='batch-cluster',
launch_type='FARGATE',
overrides=aggregate_overrides,
network_configuration={
'awsvpcConfiguration': {
'subnets': ['subnet-12345678'],
'securityGroups': ['sg-batch123'],
'assignPublicIp': 'DISABLED'
}
},
dag=dag
)
# Dependencies: all parallel processors must complete before aggregation
parallel_processors >> aggregate_results# ML training task with GPU support
training_overrides = {
'containerOverrides': [
{
'name': 'ml-trainer',
'environment': [
{'name': 'TRAINING_DATA', 'value': 's3://ml-data/training/{{ ds }}/'},
{'name': 'MODEL_OUTPUT', 'value': 's3://ml-models/trained/{{ ds }}/'},
{'name': 'EPOCHS', 'value': '100'},
{'name': 'BATCH_SIZE', 'value': '32'},
{'name': 'LEARNING_RATE', 'value': '0.001'}
],
'cpu': 4096,
'memory': 16384,
'gpu': 1
}
]
}
train_model = EcsRunTaskOperator(
task_id='train_ml_model',
task_definition='ml-training-gpu:3',
cluster='ml-training-cluster',
launch_type='EC2', # Use EC2 for GPU instances
overrides=training_overrides,
placement_constraints=[
{
'type': 'memberOf',
'expression': 'attribute:ecs.instance-type =~ p3.*' # GPU instances
}
],
awslogs_group='/ecs/ml-training',
dag=dag
)from airflow.providers.amazon.aws.sensors.ecs import EcsTaskSensor
# Wait for long-running task completion
wait_for_training = EcsTaskSensor(
task_id='wait_for_model_training',
task_id='{{ task_instance.xcom_pull(task_ids="train_ml_model") }}',
cluster_name='ml-training-cluster',
timeout=7200, # 2 hours
poke_interval=300, # Check every 5 minutes
dag=dag
)
train_model >> wait_for_training# ECS task states
class EcsTaskState:
PROVISIONING = 'PROVISIONING'
PENDING = 'PENDING'
ACTIVATING = 'ACTIVATING'
RUNNING = 'RUNNING'
DEACTIVATING = 'DEACTIVATING'
STOPPING = 'STOPPING'
DEPROVISIONING = 'DEPROVISIONING'
STOPPED = 'STOPPED'
# Launch types
class EcsLaunchType:
EC2 = 'EC2'
FARGATE = 'FARGATE'
EXTERNAL = 'EXTERNAL'
# Network modes
class EcsNetworkMode:
BRIDGE = 'bridge'
HOST = 'host'
AWSVPC = 'awsvpc'
NONE = 'none'
# Task definition configuration
class TaskDefinitionConfig:
family: str
task_role_arn: str = None
execution_role_arn: str = None
network_mode: str = 'bridge'
container_definitions: list
volumes: list = None
requires_compatibility: list = None
cpu: str = None
memory: str = None
tags: list = None
pid_mode: str = None
ipc_mode: str = None
proxy_configuration: dict = None
inference_accelerators: list = None
ephemeral_storage: dict = None
runtime_platform: dict = None
# Container definition
class ContainerDefinition:
name: str
image: str
cpu: int = 0
memory: int = None
memory_reservation: int = None
links: list = None
port_mappings: list = None
essential: bool = True
entry_point: list = None
command: list = None
environment: list = None
environment_files: list = None
mount_points: list = None
volumes_from: list = None
linux_parameters: dict = None
secrets: list = None
depends_on: list = None
start_timeout: int = None
stop_timeout: int = None
hostname: str = None
user: str = None
working_directory: str = None
disable_networking: bool = None
privileged: bool = None
readonly_root_filesystem: bool = None
dns_servers: list = None
dns_search_domains: list = None
extra_hosts: list = None
docker_security_options: list = None
interactive: bool = None
pseudo_terminal: bool = None
docker_labels: dict = None
ulimits: list = None
log_configuration: dict = None
health_check: dict = None
system_controls: list = None
resource_requirements: list = None
firelens_configuration: dict = None
# Network configuration
class NetworkConfiguration:
awsvpc_configuration: dict
class AwsVpcConfiguration:
subnets: list
security_groups: list = None
assign_public_ip: str = 'DISABLED' # 'ENABLED' or 'DISABLED'
# Placement constraint
class PlacementConstraint:
type: str # 'distinctInstance' or 'memberOf'
expression: str = None
# Placement strategy
class PlacementStrategy:
type: str # 'random', 'spread', 'binpack'
field: str = None
# Task overrides
class TaskOverride:
container_overrides: list = None
cpu: str = None
inference_accelerator_overrides: list = None
execution_role_arn: str = None
memory: str = None
task_role_arn: str = None
ephemeral_storage: dict = None
# Container override
class ContainerOverride:
name: str = None
command: list = None
environment: list = None
environment_files: list = None
cpu: int = None
memory: int = None
memory_reservation: int = None
resource_requirements: list = NoneInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon