or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-queries.mdcloudwatch-logging.mdecr-integration.mdecs-orchestration.mdemr-processing.mdindex.mdparameter-store.mdpipes-orchestration.mdrds-operations.mdredshift-integration.mds3-storage.mdsecrets-management.md

ecs-orchestration.mddocs/

0

# ECS Container Orchestration

1

2

Execute Dagster jobs and ops on Amazon ECS (Elastic Container Service) clusters with full support for task configuration, networking, scaling, and container management. This integration allows Dagster to launch runs as ECS tasks and execute ops distributed across ECS containers.

3

4

## Capabilities

5

6

### ECS Run Launcher

7

8

Launch Dagster runs as ECS tasks, providing scalable and isolated execution environments for data pipelines.

9

10

```python { .api }

11

class EcsRunLauncher(RunLauncher, ConfigurableClass):

12

"""

13

Run launcher that executes Dagster runs as ECS tasks.

14

15

Configuration options include ECS cluster settings, networking,

16

task definitions, resource allocation, and security configuration.

17

"""

18

19

def __init__(self, inst_data=None):

20

"""

21

Initialize ECS run launcher with configuration data.

22

23

Configuration includes:

24

cluster: ECS cluster name

25

subnets: List of subnet IDs

26

security_group_ids: List of security group IDs

27

task_definition: ECS task definition ARN (optional)

28

task_role_arn: IAM role for task execution

29

execution_role_arn: IAM role for ECS task execution

30

cpu: CPU units for task

31

memory: Memory (MB) for task

32

secrets: AWS Secrets Manager secrets

33

secrets_tag: Tag for secret discovery

34

env_vars: Environment variables

35

include_sidecars: Include sidecar containers

36

use_current_ecs_task_config: Inherit current task config

37

run_ecs_tags: Tags for ECS tasks

38

propagate_tags: Tag propagation configuration

39

task_definition_prefix: Prefix for task definitions

40

region_name: AWS region

41

"""

42

43

def launch_run(self, context: LaunchRunContext) -> DagsterRun:

44

"""

45

Launch a Dagster run as an ECS task.

46

47

Parameters:

48

context: Launch context containing run information

49

50

Returns:

51

DagsterRun: The launched run

52

"""

53

54

def can_terminate(self, run_id: str) -> bool:

55

"""

56

Check if run can be terminated.

57

58

Parameters:

59

run_id: ID of the run to check

60

61

Returns:

62

bool: Whether the run can be terminated

63

"""

64

65

def terminate(self, run_id: str) -> bool:

66

"""

67

Terminate a running ECS task.

68

69

Parameters:

70

run_id: ID of the run to terminate

71

72

Returns:

73

bool: Whether termination was successful

74

"""

75

76

def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult:

77

"""

78

Check health status of ECS task running the job.

79

80

Parameters:

81

run: Dagster run to check

82

83

Returns:

84

CheckRunHealthResult: Health check result

85

"""

86

87

def get_image_for_run(self, run: DagsterRun) -> str:

88

"""

89

Get container image for the run.

90

91

Parameters:

92

run: Dagster run

93

94

Returns:

95

str: Container image URI

96

"""

97

```

98

99

### ECS Executor

100

101

Execute individual ops as separate ECS tasks, enabling distributed computation and parallel processing.

102

103

```python { .api }

104

def ecs_executor(

105

cluster: str,

106

subnets: List[str],

107

security_group_ids: List[str],

108

task_definition: Optional[str] = None,

109

task_role_arn: Optional[str] = None,

110

execution_role_arn: Optional[str] = None,

111

assign_public_ip: bool = False,

112

cpu: Optional[str] = None,

113

memory: Optional[str] = None,

114

ephemeral_storage: Optional[int] = None,

115

region_name: Optional[str] = None,

116

**kwargs

117

) -> ExecutorDefinition:

118

"""

119

Executor that runs ops as individual ECS tasks.

120

121

Parameters:

122

cluster: ECS cluster name or ARN

123

subnets: List of subnet IDs for task networking

124

security_group_ids: List of security group IDs

125

task_definition: ECS task definition ARN (optional)

126

task_role_arn: IAM role ARN for task execution

127

execution_role_arn: IAM role ARN for ECS task execution

128

assign_public_ip: Whether to assign public IP to tasks

129

cpu: CPU units for tasks (e.g., "256", "512", "1024")

130

memory: Memory for tasks in MB (e.g., "512", "1024", "2048")

131

ephemeral_storage: Ephemeral storage in GB

132

region_name: AWS region name

133

**kwargs: Additional configuration options

134

135

Returns:

136

ExecutorDefinition: Configured ECS executor

137

"""

138

```

139

140

### ECS Exception Handling

141

142

Exception classes for ECS-specific error handling and timeout management.

143

144

```python { .api }

145

class EcsEventualConsistencyTimeout(Exception):

146

"""

147

Exception raised when ECS operations timeout due to eventual consistency delays.

148

149

ECS has eventual consistency for some operations, and this exception is raised

150

when operations don't complete within the expected timeframe.

151

"""

152

153

class EcsNoTasksFound(Exception):

154

"""

155

Exception raised when no ECS tasks are found for a given run.

156

"""

157

158

class RetryableEcsException(Exception):

159

"""

160

Base class for retryable ECS exceptions.

161

"""

162

```

163

164

## Usage Examples

165

166

### Basic ECS Run Launcher Configuration

167

168

```python

169

from dagster import job, op, Definitions

170

from dagster_aws.ecs import EcsRunLauncher

171

172

@op

173

def hello_world():

174

return "Hello from ECS!"

175

176

@job

177

def my_job():

178

hello_world()

179

180

# Configure ECS run launcher

181

ecs_run_launcher = EcsRunLauncher(

182

cluster="my-dagster-cluster",

183

subnets=["subnet-12345", "subnet-67890"],

184

security_group_ids=["sg-abcdef"],

185

task_role_arn="arn:aws:iam::123456789012:role/DagsterEcsTaskRole",

186

execution_role_arn="arn:aws:iam::123456789012:role/DagsterEcsExecutionRole",

187

cpu="512",

188

memory="1024",

189

region_name="us-west-2"

190

)

191

192

defs = Definitions(

193

jobs=[my_job],

194

run_launcher=ecs_run_launcher

195

)

196

```

197

198

### ECS Executor for Distributed Ops

199

200

```python

201

from dagster import job, op, Definitions

202

from dagster_aws.ecs import ecs_executor

203

204

@op

205

def extract_data():

206

# Extract data operation

207

return "extracted_data"

208

209

@op

210

def transform_data(data):

211

# Transform data operation

212

return f"transformed_{data}"

213

214

@op

215

def load_data(data):

216

# Load data operation

217

print(f"Loading {data}")

218

219

@job(

220

executor_def=ecs_executor.configured({

221

"cluster": "my-dagster-cluster",

222

"subnets": ["subnet-12345", "subnet-67890"],

223

"security_group_ids": ["sg-abcdef"],

224

"task_definition": "arn:aws:ecs:us-west-2:123456789012:task-definition/dagster-task:1",

225

"cpu": "1024",

226

"memory": "2048",

227

"assign_public_ip": True

228

})

229

)

230

def etl_job():

231

data = extract_data()

232

transformed = transform_data(data)

233

load_data(transformed)

234

235

defs = Definitions(jobs=[etl_job])

236

```

237

238

### Custom Task Definition with ECS

239

240

```python

241

from dagster import job, op, Definitions

242

from dagster_aws.ecs import ecs_executor

243

244

# Using custom task definition with specific container configuration

245

custom_ecs_executor = ecs_executor.configured({

246

"cluster": "production-cluster",

247

"subnets": ["subnet-prod-1", "subnet-prod-2"],

248

"security_group_ids": ["sg-prod-dagster"],

249

"task_definition": "arn:aws:ecs:us-east-1:123456789012:task-definition/dagster-prod:5",

250

"task_role_arn": "arn:aws:iam::123456789012:role/DagsterTaskRole",

251

"execution_role_arn": "arn:aws:iam::123456789012:role/DagsterExecutionRole",

252

"assign_public_ip": False,

253

"region_name": "us-east-1"

254

})

255

256

@op

257

def cpu_intensive_operation():

258

# Perform CPU-intensive computation

259

result = sum(i**2 for i in range(1000000))

260

return result

261

262

@job(executor_def=custom_ecs_executor)

263

def compute_job():

264

cpu_intensive_operation()

265

266

defs = Definitions(jobs=[compute_job])

267

```

268

269

### Error Handling with ECS

270

271

```python

272

from dagster import op, job, Definitions, RetryPolicy

273

from dagster_aws.ecs import ecs_executor, EcsEventualConsistencyTimeout

274

275

@op(retry_policy=RetryPolicy(max_retries=3))

276

def resilient_operation():

277

try:

278

# Operation that might fail due to ECS eventual consistency

279

return "success"

280

except EcsEventualConsistencyTimeout as e:

281

# Handle ECS-specific timeout errors

282

raise Exception(f"ECS operation timed out after {e.timeout_seconds} seconds")

283

284

@job(executor_def=ecs_executor.configured({

285

"cluster": "my-cluster",

286

"subnets": ["subnet-123"],

287

"security_group_ids": ["sg-456"]

288

}))

289

def resilient_job():

290

resilient_operation()

291

292

defs = Definitions(jobs=[resilient_job])

293

```