or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-analytics.mdauthentication.mdbatch-processing.mddata-transfers.mddms-migration.mddynamodb-nosql.mdecs-containers.mdeks-kubernetes.mdemr-clusters.mdglue-processing.mdindex.mdlambda-functions.mdmessaging-sns-sqs.mdrds-databases.mdredshift-warehouse.mds3-storage.mdsagemaker-ml.md

ecs-containers.mddocs/

0

# ECS Container Orchestration

1

2

Amazon ECS (Elastic Container Service) integration for running containerized applications and tasks. Provides task execution, service management, and cluster operations for both EC2 and Fargate launch types.

3

4

## Capabilities

5

6

### ECS Hook

7

8

Core ECS client providing container orchestration and task management functionality.

9

10

```python { .api }

11

class EcsHook(AwsBaseHook):

12

def __init__(self, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):

13

"""

14

Initialize ECS Hook.

15

16

Parameters:

17

- aws_conn_id: AWS connection ID

18

- region_name: AWS region name

19

"""

20

21

def run_task(self, task_definition: str, cluster: str, overrides: dict = None, count: int = 1, started_by: str = None, group: str = None, placement_constraints: list = None, placement_strategy: list = None, platform_version: str = None, network_configuration: dict = None, tags: list = None, enable_execute_command: bool = False, propagate_tags: str = None, reference_id: str = None, **kwargs) -> str:

22

"""

23

Run ECS task.

24

25

Parameters:

26

- task_definition: Task definition ARN or family:revision

27

- cluster: ECS cluster name or ARN

28

- overrides: Task definition overrides

29

- count: Number of tasks to run

30

- started_by: Optional started_by tag

31

- group: Task group

32

- placement_constraints: Task placement constraints

33

- placement_strategy: Task placement strategy

34

- platform_version: Fargate platform version

35

- network_configuration: Network configuration for awsvpc mode

36

- tags: Task tags

37

- enable_execute_command: Enable ECS Exec

38

- propagate_tags: Tag propagation ('TASK_DEFINITION', 'SERVICE', 'NONE')

39

- reference_id: Reference ID for task

40

41

Returns:

42

Task ARN

43

"""

44

45

def describe_tasks(self, cluster: str, tasks: list, include: list = None) -> dict:

46

"""

47

Get ECS task details.

48

49

Parameters:

50

- cluster: ECS cluster name or ARN

51

- tasks: List of task ARNs or IDs

52

- include: Additional task information to include

53

54

Returns:

55

Task descriptions

56

"""

57

58

def describe_task_definition(self, task_definition: str, include: list = None) -> dict:

59

"""

60

Get task definition details.

61

62

Parameters:

63

- task_definition: Task definition ARN or family:revision

64

- include: Additional information to include

65

66

Returns:

67

Task definition description

68

"""

69

70

def list_tasks(self, cluster: str = None, container_instance: str = None, family: str = None, started_by: str = None, service_name: str = None, desired_status: str = None, launch_type: str = None) -> list:

71

"""

72

List ECS tasks.

73

74

Parameters:

75

- cluster: ECS cluster name or ARN

76

- container_instance: Container instance ARN or ID

77

- family: Task definition family

78

- started_by: Started by filter

79

- service_name: Service name filter

80

- desired_status: Task status filter

81

- launch_type: Launch type filter ('EC2', 'FARGATE')

82

83

Returns:

84

List of task ARNs

85

"""

86

87

def stop_task(self, cluster: str, task: str, reason: str = None) -> dict:

88

"""

89

Stop running ECS task.

90

91

Parameters:

92

- cluster: ECS cluster name or ARN

93

- task: Task ARN or ID

94

- reason: Reason for stopping task

95

96

Returns:

97

Stop task response

98

"""

99

100

def describe_clusters(self, clusters: list = None, include: list = None) -> dict:

101

"""

102

Describe ECS clusters.

103

104

Parameters:

105

- clusters: List of cluster names or ARNs

106

- include: Additional cluster information to include

107

108

Returns:

109

Cluster descriptions

110

"""

111

112

def describe_services(self, cluster: str, services: list, include: list = None) -> dict:

113

"""

114

Describe ECS services.

115

116

Parameters:

117

- cluster: ECS cluster name or ARN

118

- services: List of service names or ARNs

119

- include: Additional service information to include

120

121

Returns:

122

Service descriptions

123

"""

124

125

def get_task_logs(self, task_arn: str, container_name: str = None, start_time: int = None, end_time: int = None, next_token: str = None) -> dict:

126

"""

127

Get CloudWatch logs for ECS task.

128

129

Parameters:

130

- task_arn: Task ARN

131

- container_name: Container name filter

132

- start_time: Log start time (Unix timestamp)

133

- end_time: Log end time (Unix timestamp)

134

- next_token: Pagination token

135

136

Returns:

137

Task logs

138

"""

139

140

def wait_until_task_stopped(self, cluster: str, tasks: list, max_attempts: int = 100, delay: int = 15) -> None:

141

"""

142

Wait until ECS tasks are stopped.

143

144

Parameters:

145

- cluster: ECS cluster name or ARN

146

- tasks: List of task ARNs

147

- max_attempts: Maximum wait attempts

148

- delay: Delay between attempts in seconds

149

"""

150

151

def wait_until_task_running(self, cluster: str, tasks: list, max_attempts: int = 100, delay: int = 15) -> None:

152

"""

153

Wait until ECS tasks are running.

154

155

Parameters:

156

- cluster: ECS cluster name or ARN

157

- tasks: List of task ARNs

158

- max_attempts: Maximum wait attempts

159

- delay: Delay between attempts in seconds

160

"""

161

```

162

163

### ECS Operators

164

165

Task implementations for ECS container operations.

166

167

```python { .api }

168

class EcsRunTaskOperator(BaseOperator):

169

def __init__(self, task_definition: str, cluster: str, overrides: dict = None, aws_conn_id: str = 'aws_default', region_name: str = None, launch_type: str = 'EC2', capacity_provider_strategy: list = None, platform_version: str = None, group: str = None, placement_constraints: list = None, placement_strategy: list = None, network_configuration: dict = None, tags: dict = None, awslogs_group: str = None, awslogs_region: str = None, awslogs_stream_prefix: str = None, reattach: bool = False, number_logs_exception: int = 10, **kwargs):

170

"""

171

Run ECS task.

172

173

Parameters:

174

- task_definition: Task definition ARN or family:revision

175

- cluster: ECS cluster name or ARN

176

- overrides: Task definition overrides

177

- aws_conn_id: AWS connection ID

178

- region_name: AWS region name

179

- launch_type: Launch type ('EC2', 'FARGATE')

180

- capacity_provider_strategy: Capacity provider strategy

181

- platform_version: Fargate platform version

182

- group: Task group

183

- placement_constraints: Task placement constraints

184

- placement_strategy: Task placement strategy

185

- network_configuration: Network configuration

186

- tags: Task tags

187

- awslogs_group: CloudWatch log group

188

- awslogs_region: CloudWatch log region

189

- awslogs_stream_prefix: Log stream prefix

190

- reattach: Reattach to existing task

191

- number_logs_exception: Number of log lines for exceptions

192

"""

193

194

class EcsOperator(BaseOperator):

195

def __init__(self, task_definition: str, cluster: str, overrides: dict = None, aws_conn_id: str = 'aws_default', region_name: str = None, launch_type: str = 'EC2', **kwargs):

196

"""

197

General ECS task operator.

198

199

Parameters:

200

- task_definition: Task definition ARN or family:revision

201

- cluster: ECS cluster name or ARN

202

- overrides: Task definition overrides

203

- aws_conn_id: AWS connection ID

204

- region_name: AWS region name

205

- launch_type: Launch type ('EC2', 'FARGATE')

206

"""

207

```

208

209

### ECS Sensors

210

211

Monitoring tasks for ECS task states and service health.

212

213

```python { .api }

214

class EcsTaskSensor(BaseSensorOperator):

215

def __init__(self, task_id: str, cluster_name: str, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):

216

"""

217

Wait for ECS task completion.

218

219

Parameters:

220

- task_id: ECS task ID or ARN

221

- cluster_name: ECS cluster name

222

- aws_conn_id: AWS connection ID

223

- region_name: AWS region name

224

"""

225

```

226

227

### ECS Triggers

228

229

Asynchronous triggers for ECS task monitoring.

230

231

```python { .api }

232

class EcsTaskTrigger(BaseTrigger):

233

def __init__(self, cluster: str, task_arn: str, target_state: str = 'STOPPED', aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):

234

"""

235

Asynchronous trigger for ECS task state monitoring.

236

237

Parameters:

238

- cluster: ECS cluster name or ARN

239

- task_arn: Task ARN to monitor

240

- target_state: Target task state

241

- aws_conn_id: AWS connection ID

242

- poll_interval: Polling interval in seconds

243

"""

244

```

245

246

## Usage Examples

247

248

### Container Task Execution

249

250

```python

251

from airflow import DAG

252

from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator

253

254

dag = DAG('ecs_batch_job', start_date=datetime(2023, 1, 1))

255

256

# Task definition overrides

257

task_overrides = {

258

'containerOverrides': [

259

{

260

'name': 'data-processor',

261

'environment': [

262

{'name': 'INPUT_PATH', 'value': 's3://data-bucket/input/{{ ds }}/'},

263

{'name': 'OUTPUT_PATH', 'value': 's3://data-bucket/output/{{ ds }}/'},

264

{'name': 'PROCESSING_DATE', 'value': '{{ ds }}'}

265

],

266

'cpu': 2048,

267

'memory': 4096,

268

'command': [

269

'python', 'process_data.py',

270

'--date', '{{ ds }}',

271

'--input-path', 's3://data-bucket/input/{{ ds }}/',

272

'--output-path', 's3://data-bucket/output/{{ ds }}/'

273

]

274

}

275

]

276

}

277

278

# Run data processing task

279

run_processor = EcsRunTaskOperator(

280

task_id='run_data_processor',

281

task_definition='data-processing-task:1',

282

cluster='data-processing-cluster',

283

launch_type='FARGATE',

284

overrides=task_overrides,

285

network_configuration={

286

'awsvpcConfiguration': {

287

'subnets': ['subnet-12345678', 'subnet-87654321'],

288

'securityGroups': ['sg-abcdef123'],

289

'assignPublicIp': 'ENABLED'

290

}

291

},

292

platform_version='1.4.0',

293

awslogs_group='/ecs/data-processing',

294

awslogs_region='us-east-1',

295

awslogs_stream_prefix='data-processor',

296

tags={

297

'Environment': 'production',

298

'Project': 'data-pipeline',

299

'Date': '{{ ds }}'

300

},

301

dag=dag

302

)

303

```

304

305

### Multi-Container Batch Processing

306

307

```python

308

# Parallel processing with multiple containers

309

parallel_processors = []

310

311

for partition in range(4):

312

task_overrides = {

313

'containerOverrides': [

314

{

315

'name': 'batch-processor',

316

'environment': [

317

{'name': 'PARTITION_ID', 'value': str(partition)},

318

{'name': 'TOTAL_PARTITIONS', 'value': '4'},

319

{'name': 'INPUT_PREFIX', 'value': f's3://data-bucket/partitioned/{partition}/'},

320

{'name': 'OUTPUT_PREFIX', 'value': f's3://results-bucket/partition-{partition}/'}

321

]

322

}

323

]

324

}

325

326

processor = EcsRunTaskOperator(

327

task_id=f'process_partition_{partition}',

328

task_definition='batch-processing-task:2',

329

cluster='batch-cluster',

330

launch_type='FARGATE',

331

overrides=task_overrides,

332

network_configuration={

333

'awsvpcConfiguration': {

334

'subnets': ['subnet-12345678'],

335

'securityGroups': ['sg-batch123'],

336

'assignPublicIp': 'DISABLED'

337

}

338

},

339

dag=dag

340

)

341

342

parallel_processors.append(processor)

343

344

# Aggregation task after parallel processing

345

aggregate_overrides = {

346

'containerOverrides': [

347

{

348

'name': 'aggregator',

349

'environment': [

350

{'name': 'INPUT_PARTITIONS', 'value': '4'},

351

{'name': 'INPUT_PREFIX', 'value': 's3://results-bucket/'},

352

{'name': 'FINAL_OUTPUT', 'value': 's3://final-results/{{ ds }}/aggregated.json'}

353

]

354

}

355

]

356

}

357

358

aggregate_results = EcsRunTaskOperator(

359

task_id='aggregate_results',

360

task_definition='result-aggregator:1',

361

cluster='batch-cluster',

362

launch_type='FARGATE',

363

overrides=aggregate_overrides,

364

network_configuration={

365

'awsvpcConfiguration': {

366

'subnets': ['subnet-12345678'],

367

'securityGroups': ['sg-batch123'],

368

'assignPublicIp': 'DISABLED'

369

}

370

},

371

dag=dag

372

)

373

374

# Dependencies: all parallel processors must complete before aggregation

375

parallel_processors >> aggregate_results

376

```

377

378

### ML Model Training with ECS

379

380

```python

381

# ML training task with GPU support

382

training_overrides = {

383

'containerOverrides': [

384

{

385

'name': 'ml-trainer',

386

'environment': [

387

{'name': 'TRAINING_DATA', 'value': 's3://ml-data/training/{{ ds }}/'},

388

{'name': 'MODEL_OUTPUT', 'value': 's3://ml-models/trained/{{ ds }}/'},

389

{'name': 'EPOCHS', 'value': '100'},

390

{'name': 'BATCH_SIZE', 'value': '32'},

391

{'name': 'LEARNING_RATE', 'value': '0.001'}

392

],

393

'cpu': 4096,

394

'memory': 16384,

395

'gpu': 1

396

}

397

]

398

}

399

400

train_model = EcsRunTaskOperator(

401

task_id='train_ml_model',

402

task_definition='ml-training-gpu:3',

403

cluster='ml-training-cluster',

404

launch_type='EC2', # Use EC2 for GPU instances

405

overrides=training_overrides,

406

placement_constraints=[

407

{

408

'type': 'memberOf',

409

'expression': 'attribute:ecs.instance-type =~ p3.*' # GPU instances

410

}

411

],

412

awslogs_group='/ecs/ml-training',

413

dag=dag

414

)

415

```

416

417

### Service Health Monitoring

418

419

```python

420

from airflow.providers.amazon.aws.sensors.ecs import EcsTaskSensor

421

422

# Wait for long-running task completion

423

wait_for_training = EcsTaskSensor(

424

task_id='wait_for_model_training',

425

task_id='{{ task_instance.xcom_pull(task_ids="train_ml_model") }}',

426

cluster_name='ml-training-cluster',

427

timeout=7200, # 2 hours

428

poke_interval=300, # Check every 5 minutes

429

dag=dag

430

)

431

432

train_model >> wait_for_training

433

```

434

435

## Types

436

437

```python { .api }

438

# ECS task states

439

class EcsTaskState:

440

PROVISIONING = 'PROVISIONING'

441

PENDING = 'PENDING'

442

ACTIVATING = 'ACTIVATING'

443

RUNNING = 'RUNNING'

444

DEACTIVATING = 'DEACTIVATING'

445

STOPPING = 'STOPPING'

446

DEPROVISIONING = 'DEPROVISIONING'

447

STOPPED = 'STOPPED'

448

449

# Launch types

450

class EcsLaunchType:

451

EC2 = 'EC2'

452

FARGATE = 'FARGATE'

453

EXTERNAL = 'EXTERNAL'

454

455

# Network modes

456

class EcsNetworkMode:

457

BRIDGE = 'bridge'

458

HOST = 'host'

459

AWSVPC = 'awsvpc'

460

NONE = 'none'

461

462

# Task definition configuration

463

class TaskDefinitionConfig:

464

family: str

465

task_role_arn: str = None

466

execution_role_arn: str = None

467

network_mode: str = 'bridge'

468

container_definitions: list

469

volumes: list = None

470

requires_compatibility: list = None

471

cpu: str = None

472

memory: str = None

473

tags: list = None

474

pid_mode: str = None

475

ipc_mode: str = None

476

proxy_configuration: dict = None

477

inference_accelerators: list = None

478

ephemeral_storage: dict = None

479

runtime_platform: dict = None

480

481

# Container definition

482

class ContainerDefinition:

483

name: str

484

image: str

485

cpu: int = 0

486

memory: int = None

487

memory_reservation: int = None

488

links: list = None

489

port_mappings: list = None

490

essential: bool = True

491

entry_point: list = None

492

command: list = None

493

environment: list = None

494

environment_files: list = None

495

mount_points: list = None

496

volumes_from: list = None

497

linux_parameters: dict = None

498

secrets: list = None

499

depends_on: list = None

500

start_timeout: int = None

501

stop_timeout: int = None

502

hostname: str = None

503

user: str = None

504

working_directory: str = None

505

disable_networking: bool = None

506

privileged: bool = None

507

readonly_root_filesystem: bool = None

508

dns_servers: list = None

509

dns_search_domains: list = None

510

extra_hosts: list = None

511

docker_security_options: list = None

512

interactive: bool = None

513

pseudo_terminal: bool = None

514

docker_labels: dict = None

515

ulimits: list = None

516

log_configuration: dict = None

517

health_check: dict = None

518

system_controls: list = None

519

resource_requirements: list = None

520

firelens_configuration: dict = None

521

522

# Network configuration

523

class NetworkConfiguration:

524

awsvpc_configuration: dict

525

526

class AwsVpcConfiguration:

527

subnets: list

528

security_groups: list = None

529

assign_public_ip: str = 'DISABLED' # 'ENABLED' or 'DISABLED'

530

531

# Placement constraint

532

class PlacementConstraint:

533

type: str # 'distinctInstance' or 'memberOf'

534

expression: str = None

535

536

# Placement strategy

537

class PlacementStrategy:

538

type: str # 'random', 'spread', 'binpack'

539

field: str = None

540

541

# Task overrides

542

class TaskOverride:

543

container_overrides: list = None

544

cpu: str = None

545

inference_accelerator_overrides: list = None

546

execution_role_arn: str = None

547

memory: str = None

548

task_role_arn: str = None

549

ephemeral_storage: dict = None

550

551

# Container override

552

class ContainerOverride:

553

name: str = None

554

command: list = None

555

environment: list = None

556

environment_files: list = None

557

cpu: int = None

558

memory: int = None

559

memory_reservation: int = None

560

resource_requirements: list = None

561

```