0
# ECS Container Orchestration
1
2
Amazon ECS (Elastic Container Service) integration for running containerized applications and tasks. Provides task execution, service management, and cluster operations for both EC2 and Fargate launch types.
3
4
## Capabilities
5
6
### ECS Hook
7
8
Core ECS client providing container orchestration and task management functionality.
9
10
```python { .api }
11
class EcsHook(AwsBaseHook):
12
def __init__(self, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
13
"""
14
Initialize ECS Hook.
15
16
Parameters:
17
- aws_conn_id: AWS connection ID
18
- region_name: AWS region name
19
"""
20
21
def run_task(self, task_definition: str, cluster: str, overrides: dict = None, count: int = 1, started_by: str = None, group: str = None, placement_constraints: list = None, placement_strategy: list = None, platform_version: str = None, network_configuration: dict = None, tags: list = None, enable_execute_command: bool = False, propagate_tags: str = None, reference_id: str = None, **kwargs) -> str:
22
"""
23
Run ECS task.
24
25
Parameters:
26
- task_definition: Task definition ARN or family:revision
27
- cluster: ECS cluster name or ARN
28
- overrides: Task definition overrides
29
- count: Number of tasks to run
30
- started_by: Optional started_by tag
31
- group: Task group
32
- placement_constraints: Task placement constraints
33
- placement_strategy: Task placement strategy
34
- platform_version: Fargate platform version
35
- network_configuration: Network configuration for awsvpc mode
36
- tags: Task tags
37
- enable_execute_command: Enable ECS Exec
38
- propagate_tags: Tag propagation ('TASK_DEFINITION', 'SERVICE', 'NONE')
39
- reference_id: Reference ID for task
40
41
Returns:
42
Task ARN
43
"""
44
45
def describe_tasks(self, cluster: str, tasks: list, include: list = None) -> dict:
46
"""
47
Get ECS task details.
48
49
Parameters:
50
- cluster: ECS cluster name or ARN
51
- tasks: List of task ARNs or IDs
52
- include: Additional task information to include
53
54
Returns:
55
Task descriptions
56
"""
57
58
def describe_task_definition(self, task_definition: str, include: list = None) -> dict:
59
"""
60
Get task definition details.
61
62
Parameters:
63
- task_definition: Task definition ARN or family:revision
64
- include: Additional information to include
65
66
Returns:
67
Task definition description
68
"""
69
70
def list_tasks(self, cluster: str = None, container_instance: str = None, family: str = None, started_by: str = None, service_name: str = None, desired_status: str = None, launch_type: str = None) -> list:
71
"""
72
List ECS tasks.
73
74
Parameters:
75
- cluster: ECS cluster name or ARN
76
- container_instance: Container instance ARN or ID
77
- family: Task definition family
78
- started_by: Started by filter
79
- service_name: Service name filter
80
- desired_status: Task status filter
81
- launch_type: Launch type filter ('EC2', 'FARGATE')
82
83
Returns:
84
List of task ARNs
85
"""
86
87
def stop_task(self, cluster: str, task: str, reason: str = None) -> dict:
88
"""
89
Stop running ECS task.
90
91
Parameters:
92
- cluster: ECS cluster name or ARN
93
- task: Task ARN or ID
94
- reason: Reason for stopping task
95
96
Returns:
97
Stop task response
98
"""
99
100
def describe_clusters(self, clusters: list = None, include: list = None) -> dict:
101
"""
102
Describe ECS clusters.
103
104
Parameters:
105
- clusters: List of cluster names or ARNs
106
- include: Additional cluster information to include
107
108
Returns:
109
Cluster descriptions
110
"""
111
112
def describe_services(self, cluster: str, services: list, include: list = None) -> dict:
113
"""
114
Describe ECS services.
115
116
Parameters:
117
- cluster: ECS cluster name or ARN
118
- services: List of service names or ARNs
119
- include: Additional service information to include
120
121
Returns:
122
Service descriptions
123
"""
124
125
def get_task_logs(self, task_arn: str, container_name: str = None, start_time: int = None, end_time: int = None, next_token: str = None) -> dict:
126
"""
127
Get CloudWatch logs for ECS task.
128
129
Parameters:
130
- task_arn: Task ARN
131
- container_name: Container name filter
132
- start_time: Log start time (Unix timestamp)
133
- end_time: Log end time (Unix timestamp)
134
- next_token: Pagination token
135
136
Returns:
137
Task logs
138
"""
139
140
def wait_until_task_stopped(self, cluster: str, tasks: list, max_attempts: int = 100, delay: int = 15) -> None:
141
"""
142
Wait until ECS tasks are stopped.
143
144
Parameters:
145
- cluster: ECS cluster name or ARN
146
- tasks: List of task ARNs
147
- max_attempts: Maximum wait attempts
148
- delay: Delay between attempts in seconds
149
"""
150
151
def wait_until_task_running(self, cluster: str, tasks: list, max_attempts: int = 100, delay: int = 15) -> None:
152
"""
153
Wait until ECS tasks are running.
154
155
Parameters:
156
- cluster: ECS cluster name or ARN
157
- tasks: List of task ARNs
158
- max_attempts: Maximum wait attempts
159
- delay: Delay between attempts in seconds
160
"""
161
```
162
163
### ECS Operators
164
165
Task implementations for ECS container operations.
166
167
```python { .api }
168
class EcsRunTaskOperator(BaseOperator):
169
def __init__(self, task_definition: str, cluster: str, overrides: dict = None, aws_conn_id: str = 'aws_default', region_name: str = None, launch_type: str = 'EC2', capacity_provider_strategy: list = None, platform_version: str = None, group: str = None, placement_constraints: list = None, placement_strategy: list = None, network_configuration: dict = None, tags: dict = None, awslogs_group: str = None, awslogs_region: str = None, awslogs_stream_prefix: str = None, reattach: bool = False, number_logs_exception: int = 10, **kwargs):
170
"""
171
Run ECS task.
172
173
Parameters:
174
- task_definition: Task definition ARN or family:revision
175
- cluster: ECS cluster name or ARN
176
- overrides: Task definition overrides
177
- aws_conn_id: AWS connection ID
178
- region_name: AWS region name
179
- launch_type: Launch type ('EC2', 'FARGATE')
180
- capacity_provider_strategy: Capacity provider strategy
181
- platform_version: Fargate platform version
182
- group: Task group
183
- placement_constraints: Task placement constraints
184
- placement_strategy: Task placement strategy
185
- network_configuration: Network configuration
186
- tags: Task tags
187
- awslogs_group: CloudWatch log group
188
- awslogs_region: CloudWatch log region
189
- awslogs_stream_prefix: Log stream prefix
190
- reattach: Reattach to existing task
191
- number_logs_exception: Number of log lines for exceptions
192
"""
193
194
class EcsOperator(BaseOperator):
195
def __init__(self, task_definition: str, cluster: str, overrides: dict = None, aws_conn_id: str = 'aws_default', region_name: str = None, launch_type: str = 'EC2', **kwargs):
196
"""
197
General ECS task operator.
198
199
Parameters:
200
- task_definition: Task definition ARN or family:revision
201
- cluster: ECS cluster name or ARN
202
- overrides: Task definition overrides
203
- aws_conn_id: AWS connection ID
204
- region_name: AWS region name
205
- launch_type: Launch type ('EC2', 'FARGATE')
206
"""
207
```
208
209
### ECS Sensors
210
211
Monitoring tasks for ECS task states and service health.
212
213
```python { .api }
214
class EcsTaskSensor(BaseSensorOperator):
215
def __init__(self, task_id: str, cluster_name: str, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
216
"""
217
Wait for ECS task completion.
218
219
Parameters:
220
- task_id: ECS task ID or ARN
221
- cluster_name: ECS cluster name
222
- aws_conn_id: AWS connection ID
223
- region_name: AWS region name
224
"""
225
```
226
227
### ECS Triggers
228
229
Asynchronous triggers for ECS task monitoring.
230
231
```python { .api }
232
class EcsTaskTrigger(BaseTrigger):
233
def __init__(self, cluster: str, task_arn: str, target_state: str = 'STOPPED', aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):
234
"""
235
Asynchronous trigger for ECS task state monitoring.
236
237
Parameters:
238
- cluster: ECS cluster name or ARN
239
- task_arn: Task ARN to monitor
240
- target_state: Target task state
241
- aws_conn_id: AWS connection ID
242
- poll_interval: Polling interval in seconds
243
"""
244
```
245
246
## Usage Examples
247
248
### Container Task Execution
249
250
```python
251
from airflow import DAG
252
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
253
254
dag = DAG('ecs_batch_job', start_date=datetime(2023, 1, 1))
255
256
# Task definition overrides
257
task_overrides = {
258
'containerOverrides': [
259
{
260
'name': 'data-processor',
261
'environment': [
262
{'name': 'INPUT_PATH', 'value': 's3://data-bucket/input/{{ ds }}/'},
263
{'name': 'OUTPUT_PATH', 'value': 's3://data-bucket/output/{{ ds }}/'},
264
{'name': 'PROCESSING_DATE', 'value': '{{ ds }}'}
265
],
266
'cpu': 2048,
267
'memory': 4096,
268
'command': [
269
'python', 'process_data.py',
270
'--date', '{{ ds }}',
271
'--input-path', 's3://data-bucket/input/{{ ds }}/',
272
'--output-path', 's3://data-bucket/output/{{ ds }}/'
273
]
274
}
275
]
276
}
277
278
# Run data processing task
279
run_processor = EcsRunTaskOperator(
280
task_id='run_data_processor',
281
task_definition='data-processing-task:1',
282
cluster='data-processing-cluster',
283
launch_type='FARGATE',
284
overrides=task_overrides,
285
network_configuration={
286
'awsvpcConfiguration': {
287
'subnets': ['subnet-12345678', 'subnet-87654321'],
288
'securityGroups': ['sg-abcdef123'],
289
'assignPublicIp': 'ENABLED'
290
}
291
},
292
platform_version='1.4.0',
293
awslogs_group='/ecs/data-processing',
294
awslogs_region='us-east-1',
295
awslogs_stream_prefix='data-processor',
296
tags={
297
'Environment': 'production',
298
'Project': 'data-pipeline',
299
'Date': '{{ ds }}'
300
},
301
dag=dag
302
)
303
```
304
305
### Multi-Container Batch Processing
306
307
```python
308
# Parallel processing with multiple containers
309
parallel_processors = []
310
311
for partition in range(4):
312
task_overrides = {
313
'containerOverrides': [
314
{
315
'name': 'batch-processor',
316
'environment': [
317
{'name': 'PARTITION_ID', 'value': str(partition)},
318
{'name': 'TOTAL_PARTITIONS', 'value': '4'},
319
{'name': 'INPUT_PREFIX', 'value': f's3://data-bucket/partitioned/{partition}/'},
320
{'name': 'OUTPUT_PREFIX', 'value': f's3://results-bucket/partition-{partition}/'}
321
]
322
}
323
]
324
}
325
326
processor = EcsRunTaskOperator(
327
task_id=f'process_partition_{partition}',
328
task_definition='batch-processing-task:2',
329
cluster='batch-cluster',
330
launch_type='FARGATE',
331
overrides=task_overrides,
332
network_configuration={
333
'awsvpcConfiguration': {
334
'subnets': ['subnet-12345678'],
335
'securityGroups': ['sg-batch123'],
336
'assignPublicIp': 'DISABLED'
337
}
338
},
339
dag=dag
340
)
341
342
parallel_processors.append(processor)
343
344
# Aggregation task after parallel processing
345
aggregate_overrides = {
346
'containerOverrides': [
347
{
348
'name': 'aggregator',
349
'environment': [
350
{'name': 'INPUT_PARTITIONS', 'value': '4'},
351
{'name': 'INPUT_PREFIX', 'value': 's3://results-bucket/'},
352
{'name': 'FINAL_OUTPUT', 'value': 's3://final-results/{{ ds }}/aggregated.json'}
353
]
354
}
355
]
356
}
357
358
aggregate_results = EcsRunTaskOperator(
359
task_id='aggregate_results',
360
task_definition='result-aggregator:1',
361
cluster='batch-cluster',
362
launch_type='FARGATE',
363
overrides=aggregate_overrides,
364
network_configuration={
365
'awsvpcConfiguration': {
366
'subnets': ['subnet-12345678'],
367
'securityGroups': ['sg-batch123'],
368
'assignPublicIp': 'DISABLED'
369
}
370
},
371
dag=dag
372
)
373
374
# Dependencies: all parallel processors must complete before aggregation
375
parallel_processors >> aggregate_results
376
```
377
378
### ML Model Training with ECS
379
380
```python
381
# ML training task with GPU support
382
training_overrides = {
383
'containerOverrides': [
384
{
385
'name': 'ml-trainer',
386
'environment': [
387
{'name': 'TRAINING_DATA', 'value': 's3://ml-data/training/{{ ds }}/'},
388
{'name': 'MODEL_OUTPUT', 'value': 's3://ml-models/trained/{{ ds }}/'},
389
{'name': 'EPOCHS', 'value': '100'},
390
{'name': 'BATCH_SIZE', 'value': '32'},
391
{'name': 'LEARNING_RATE', 'value': '0.001'}
392
],
393
'cpu': 4096,
394
'memory': 16384,
395
'gpu': 1
396
}
397
]
398
}
399
400
train_model = EcsRunTaskOperator(
401
task_id='train_ml_model',
402
task_definition='ml-training-gpu:3',
403
cluster='ml-training-cluster',
404
launch_type='EC2', # Use EC2 for GPU instances
405
overrides=training_overrides,
406
placement_constraints=[
407
{
408
'type': 'memberOf',
409
'expression': 'attribute:ecs.instance-type =~ p3.*' # GPU instances
410
}
411
],
412
awslogs_group='/ecs/ml-training',
413
dag=dag
414
)
415
```
416
417
### Service Health Monitoring
418
419
```python
420
from airflow.providers.amazon.aws.sensors.ecs import EcsTaskSensor
421
422
# Wait for long-running task completion
423
wait_for_training = EcsTaskSensor(
424
task_id='wait_for_model_training',
425
task_id='{{ task_instance.xcom_pull(task_ids="train_ml_model") }}',
426
cluster_name='ml-training-cluster',
427
timeout=7200, # 2 hours
428
poke_interval=300, # Check every 5 minutes
429
dag=dag
430
)
431
432
train_model >> wait_for_training
433
```
434
435
## Types
436
437
```python { .api }
438
# ECS task states
439
class EcsTaskState:
440
PROVISIONING = 'PROVISIONING'
441
PENDING = 'PENDING'
442
ACTIVATING = 'ACTIVATING'
443
RUNNING = 'RUNNING'
444
DEACTIVATING = 'DEACTIVATING'
445
STOPPING = 'STOPPING'
446
DEPROVISIONING = 'DEPROVISIONING'
447
STOPPED = 'STOPPED'
448
449
# Launch types
450
class EcsLaunchType:
451
EC2 = 'EC2'
452
FARGATE = 'FARGATE'
453
EXTERNAL = 'EXTERNAL'
454
455
# Network modes
456
class EcsNetworkMode:
457
BRIDGE = 'bridge'
458
HOST = 'host'
459
AWSVPC = 'awsvpc'
460
NONE = 'none'
461
462
# Task definition configuration
463
class TaskDefinitionConfig:
464
family: str
465
task_role_arn: str = None
466
execution_role_arn: str = None
467
network_mode: str = 'bridge'
468
container_definitions: list
469
volumes: list = None
470
requires_compatibility: list = None
471
cpu: str = None
472
memory: str = None
473
tags: list = None
474
pid_mode: str = None
475
ipc_mode: str = None
476
proxy_configuration: dict = None
477
inference_accelerators: list = None
478
ephemeral_storage: dict = None
479
runtime_platform: dict = None
480
481
# Container definition
482
class ContainerDefinition:
483
name: str
484
image: str
485
cpu: int = 0
486
memory: int = None
487
memory_reservation: int = None
488
links: list = None
489
port_mappings: list = None
490
essential: bool = True
491
entry_point: list = None
492
command: list = None
493
environment: list = None
494
environment_files: list = None
495
mount_points: list = None
496
volumes_from: list = None
497
linux_parameters: dict = None
498
secrets: list = None
499
depends_on: list = None
500
start_timeout: int = None
501
stop_timeout: int = None
502
hostname: str = None
503
user: str = None
504
working_directory: str = None
505
disable_networking: bool = None
506
privileged: bool = None
507
readonly_root_filesystem: bool = None
508
dns_servers: list = None
509
dns_search_domains: list = None
510
extra_hosts: list = None
511
docker_security_options: list = None
512
interactive: bool = None
513
pseudo_terminal: bool = None
514
docker_labels: dict = None
515
ulimits: list = None
516
log_configuration: dict = None
517
health_check: dict = None
518
system_controls: list = None
519
resource_requirements: list = None
520
firelens_configuration: dict = None
521
522
# Network configuration
523
class NetworkConfiguration:
524
awsvpc_configuration: dict
525
526
class AwsVpcConfiguration:
527
subnets: list
528
security_groups: list = None
529
assign_public_ip: str = 'DISABLED' # 'ENABLED' or 'DISABLED'
530
531
# Placement constraint
532
class PlacementConstraint:
533
type: str # 'distinctInstance' or 'memberOf'
534
expression: str = None
535
536
# Placement strategy
537
class PlacementStrategy:
538
type: str # 'random', 'spread', 'binpack'
539
field: str = None
540
541
# Task overrides
542
class TaskOverride:
543
container_overrides: list = None
544
cpu: str = None
545
inference_accelerator_overrides: list = None
546
execution_role_arn: str = None
547
memory: str = None
548
task_role_arn: str = None
549
ephemeral_storage: dict = None
550
551
# Container override
552
class ContainerOverride:
553
name: str = None
554
command: list = None
555
environment: list = None
556
environment_files: list = None
557
cpu: int = None
558
memory: int = None
559
memory_reservation: int = None
560
resource_requirements: list = None
561
```