0
# ECS Container Orchestration
1
2
Execute Dagster jobs and ops on Amazon ECS (Elastic Container Service) clusters with full support for task configuration, networking, scaling, and container management. This integration allows Dagster to launch runs as ECS tasks and execute ops distributed across ECS containers.
3
4
## Capabilities
5
6
### ECS Run Launcher
7
8
Launch Dagster runs as ECS tasks, providing scalable and isolated execution environments for data pipelines.
9
10
```python { .api }
11
class EcsRunLauncher(RunLauncher, ConfigurableClass):
12
"""
13
Run launcher that executes Dagster runs as ECS tasks.
14
15
Configuration options include ECS cluster settings, networking,
16
task definitions, resource allocation, and security configuration.
17
"""
18
19
def __init__(self, inst_data=None):
20
"""
21
Initialize ECS run launcher with configuration data.
22
23
Configuration includes:
24
cluster: ECS cluster name
25
subnets: List of subnet IDs
26
security_group_ids: List of security group IDs
27
task_definition: ECS task definition ARN (optional)
28
task_role_arn: IAM role for task execution
29
execution_role_arn: IAM role for ECS task execution
30
cpu: CPU units for task
31
memory: Memory (MB) for task
32
secrets: AWS Secrets Manager secrets
33
secrets_tag: Tag for secret discovery
34
env_vars: Environment variables
35
include_sidecars: Include sidecar containers
36
use_current_ecs_task_config: Inherit current task config
37
run_ecs_tags: Tags for ECS tasks
38
propagate_tags: Tag propagation configuration
39
task_definition_prefix: Prefix for task definitions
40
region_name: AWS region
41
"""
42
43
def launch_run(self, context: LaunchRunContext) -> DagsterRun:
44
"""
45
Launch a Dagster run as an ECS task.
46
47
Parameters:
48
context: Launch context containing run information
49
50
Returns:
51
DagsterRun: The launched run
52
"""
53
54
def can_terminate(self, run_id: str) -> bool:
55
"""
56
Check if run can be terminated.
57
58
Parameters:
59
run_id: ID of the run to check
60
61
Returns:
62
bool: Whether the run can be terminated
63
"""
64
65
def terminate(self, run_id: str) -> bool:
66
"""
67
Terminate a running ECS task.
68
69
Parameters:
70
run_id: ID of the run to terminate
71
72
Returns:
73
bool: Whether termination was successful
74
"""
75
76
def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult:
77
"""
78
Check health status of ECS task running the job.
79
80
Parameters:
81
run: Dagster run to check
82
83
Returns:
84
CheckRunHealthResult: Health check result
85
"""
86
87
def get_image_for_run(self, run: DagsterRun) -> str:
88
"""
89
Get container image for the run.
90
91
Parameters:
92
run: Dagster run
93
94
Returns:
95
str: Container image URI
96
"""
97
```
98
99
### ECS Executor
100
101
Execute individual ops as separate ECS tasks, enabling distributed computation and parallel processing.
102
103
```python { .api }
104
def ecs_executor(
105
cluster: str,
106
subnets: List[str],
107
security_group_ids: List[str],
108
task_definition: Optional[str] = None,
109
task_role_arn: Optional[str] = None,
110
execution_role_arn: Optional[str] = None,
111
assign_public_ip: bool = False,
112
cpu: Optional[str] = None,
113
memory: Optional[str] = None,
114
ephemeral_storage: Optional[int] = None,
115
region_name: Optional[str] = None,
116
**kwargs
117
) -> ExecutorDefinition:
118
"""
119
Executor that runs ops as individual ECS tasks.
120
121
Parameters:
122
cluster: ECS cluster name or ARN
123
subnets: List of subnet IDs for task networking
124
security_group_ids: List of security group IDs
125
task_definition: ECS task definition ARN (optional)
126
task_role_arn: IAM role ARN for task execution
127
execution_role_arn: IAM role ARN for ECS task execution
128
assign_public_ip: Whether to assign public IP to tasks
129
cpu: CPU units for tasks (e.g., "256", "512", "1024")
130
memory: Memory for tasks in MB (e.g., "512", "1024", "2048")
131
ephemeral_storage: Ephemeral storage in GB
132
region_name: AWS region name
133
**kwargs: Additional configuration options
134
135
Returns:
136
ExecutorDefinition: Configured ECS executor
137
"""
138
```
139
140
### ECS Exception Handling
141
142
Exception classes for ECS-specific error handling and timeout management.
143
144
```python { .api }
145
class EcsEventualConsistencyTimeout(Exception):
146
"""
147
Exception raised when ECS operations timeout due to eventual consistency delays.
148
149
ECS has eventual consistency for some operations, and this exception is raised
150
when operations don't complete within the expected timeframe.
151
"""
152
153
class EcsNoTasksFound(Exception):
154
"""
155
Exception raised when no ECS tasks are found for a given run.
156
"""
157
158
class RetryableEcsException(Exception):
159
"""
160
Base class for retryable ECS exceptions.
161
"""
162
```
163
164
## Usage Examples
165
166
### Basic ECS Run Launcher Configuration
167
168
```python
169
from dagster import job, op, Definitions
170
from dagster_aws.ecs import EcsRunLauncher
171
172
@op
173
def hello_world():
174
return "Hello from ECS!"
175
176
@job
177
def my_job():
178
hello_world()
179
180
# Configure ECS run launcher
181
ecs_run_launcher = EcsRunLauncher(
182
cluster="my-dagster-cluster",
183
subnets=["subnet-12345", "subnet-67890"],
184
security_group_ids=["sg-abcdef"],
185
task_role_arn="arn:aws:iam::123456789012:role/DagsterEcsTaskRole",
186
execution_role_arn="arn:aws:iam::123456789012:role/DagsterEcsExecutionRole",
187
cpu="512",
188
memory="1024",
189
region_name="us-west-2"
190
)
191
192
defs = Definitions(
193
jobs=[my_job],
194
run_launcher=ecs_run_launcher
195
)
196
```
197
198
### ECS Executor for Distributed Ops
199
200
```python
201
from dagster import job, op, Definitions
202
from dagster_aws.ecs import ecs_executor
203
204
@op
205
def extract_data():
206
# Extract data operation
207
return "extracted_data"
208
209
@op
210
def transform_data(data):
211
# Transform data operation
212
return f"transformed_{data}"
213
214
@op
215
def load_data(data):
216
# Load data operation
217
print(f"Loading {data}")
218
219
@job(
220
executor_def=ecs_executor.configured({
221
"cluster": "my-dagster-cluster",
222
"subnets": ["subnet-12345", "subnet-67890"],
223
"security_group_ids": ["sg-abcdef"],
224
"task_definition": "arn:aws:ecs:us-west-2:123456789012:task-definition/dagster-task:1",
225
"cpu": "1024",
226
"memory": "2048",
227
"assign_public_ip": True
228
})
229
)
230
def etl_job():
231
data = extract_data()
232
transformed = transform_data(data)
233
load_data(transformed)
234
235
defs = Definitions(jobs=[etl_job])
236
```
237
238
### Custom Task Definition with ECS
239
240
```python
241
from dagster import job, op, Definitions
242
from dagster_aws.ecs import ecs_executor
243
244
# Using custom task definition with specific container configuration
245
custom_ecs_executor = ecs_executor.configured({
246
"cluster": "production-cluster",
247
"subnets": ["subnet-prod-1", "subnet-prod-2"],
248
"security_group_ids": ["sg-prod-dagster"],
249
"task_definition": "arn:aws:ecs:us-east-1:123456789012:task-definition/dagster-prod:5",
250
"task_role_arn": "arn:aws:iam::123456789012:role/DagsterTaskRole",
251
"execution_role_arn": "arn:aws:iam::123456789012:role/DagsterExecutionRole",
252
"assign_public_ip": False,
253
"region_name": "us-east-1"
254
})
255
256
@op
257
def cpu_intensive_operation():
258
# Perform CPU-intensive computation
259
result = sum(i**2 for i in range(1000000))
260
return result
261
262
@job(executor_def=custom_ecs_executor)
263
def compute_job():
264
cpu_intensive_operation()
265
266
defs = Definitions(jobs=[compute_job])
267
```
268
269
### Error Handling with ECS
270
271
```python
272
from dagster import op, job, Definitions, RetryPolicy
273
from dagster_aws.ecs import ecs_executor, EcsEventualConsistencyTimeout
274
275
@op(retry_policy=RetryPolicy(max_retries=3))
276
def resilient_operation():
277
try:
278
# Operation that might fail due to ECS eventual consistency
279
return "success"
280
except EcsEventualConsistencyTimeout as e:
281
# Handle ECS-specific timeout errors
282
raise Exception(f"ECS operation timed out after {e.timeout_seconds} seconds")
283
284
@job(executor_def=ecs_executor.configured({
285
"cluster": "my-cluster",
286
"subnets": ["subnet-123"],
287
"security_group_ids": ["sg-456"]
288
}))
289
def resilient_job():
290
resilient_operation()
291
292
defs = Definitions(jobs=[resilient_job])
293
```