Package for AWS-specific Dagster framework solid and resource components.
—
Execute Dagster jobs and ops on Amazon ECS (Elastic Container Service) clusters with full support for task configuration, networking, scaling, and container management. This integration allows Dagster to launch runs as ECS tasks and execute ops distributed across ECS containers.
Launch Dagster runs as ECS tasks, providing scalable and isolated execution environments for data pipelines.
class EcsRunLauncher(RunLauncher, ConfigurableClass):
"""
Run launcher that executes Dagster runs as ECS tasks.
Configuration options include ECS cluster settings, networking,
task definitions, resource allocation, and security configuration.
"""
def __init__(self, inst_data=None):
"""
Initialize ECS run launcher with configuration data.
Configuration includes:
cluster: ECS cluster name
subnets: List of subnet IDs
security_group_ids: List of security group IDs
task_definition: ECS task definition ARN (optional)
task_role_arn: IAM role for task execution
execution_role_arn: IAM role for ECS task execution
cpu: CPU units for task
memory: Memory (MB) for task
secrets: AWS Secrets Manager secrets
secrets_tag: Tag for secret discovery
env_vars: Environment variables
include_sidecars: Include sidecar containers
use_current_ecs_task_config: Inherit current task config
run_ecs_tags: Tags for ECS tasks
propagate_tags: Tag propagation configuration
task_definition_prefix: Prefix for task definitions
region_name: AWS region
"""
def launch_run(self, context: LaunchRunContext) -> DagsterRun:
"""
Launch a Dagster run as an ECS task.
Parameters:
context: Launch context containing run information
Returns:
DagsterRun: The launched run
"""
def can_terminate(self, run_id: str) -> bool:
"""
Check if run can be terminated.
Parameters:
run_id: ID of the run to check
Returns:
bool: Whether the run can be terminated
"""
def terminate(self, run_id: str) -> bool:
"""
Terminate a running ECS task.
Parameters:
run_id: ID of the run to terminate
Returns:
bool: Whether termination was successful
"""
def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult:
"""
Check health status of ECS task running the job.
Parameters:
run: Dagster run to check
Returns:
CheckRunHealthResult: Health check result
"""
def get_image_for_run(self, run: DagsterRun) -> str:
"""
Get container image for the run.
Parameters:
run: Dagster run
Returns:
str: Container image URI
"""Execute individual ops as separate ECS tasks, enabling distributed computation and parallel processing.
def ecs_executor(
cluster: str,
subnets: List[str],
security_group_ids: List[str],
task_definition: Optional[str] = None,
task_role_arn: Optional[str] = None,
execution_role_arn: Optional[str] = None,
assign_public_ip: bool = False,
cpu: Optional[str] = None,
memory: Optional[str] = None,
ephemeral_storage: Optional[int] = None,
region_name: Optional[str] = None,
**kwargs
) -> ExecutorDefinition:
"""
Executor that runs ops as individual ECS tasks.
Parameters:
cluster: ECS cluster name or ARN
subnets: List of subnet IDs for task networking
security_group_ids: List of security group IDs
task_definition: ECS task definition ARN (optional)
task_role_arn: IAM role ARN for task execution
execution_role_arn: IAM role ARN for ECS task execution
assign_public_ip: Whether to assign public IP to tasks
cpu: CPU units for tasks (e.g., "256", "512", "1024")
memory: Memory for tasks in MB (e.g., "512", "1024", "2048")
ephemeral_storage: Ephemeral storage in GB
region_name: AWS region name
**kwargs: Additional configuration options
Returns:
ExecutorDefinition: Configured ECS executor
"""Exception classes for ECS-specific error handling and timeout management.
class EcsEventualConsistencyTimeout(Exception):
"""
Exception raised when ECS operations timeout due to eventual consistency delays.
ECS has eventual consistency for some operations, and this exception is raised
when operations don't complete within the expected timeframe.
"""
class EcsNoTasksFound(Exception):
"""
Exception raised when no ECS tasks are found for a given run.
"""
class RetryableEcsException(Exception):
"""
Base class for retryable ECS exceptions.
"""from dagster import job, op, Definitions
from dagster_aws.ecs import EcsRunLauncher
@op
def hello_world():
return "Hello from ECS!"
@job
def my_job():
hello_world()
# Configure ECS run launcher
ecs_run_launcher = EcsRunLauncher(
cluster="my-dagster-cluster",
subnets=["subnet-12345", "subnet-67890"],
security_group_ids=["sg-abcdef"],
task_role_arn="arn:aws:iam::123456789012:role/DagsterEcsTaskRole",
execution_role_arn="arn:aws:iam::123456789012:role/DagsterEcsExecutionRole",
cpu="512",
memory="1024",
region_name="us-west-2"
)
defs = Definitions(
jobs=[my_job],
run_launcher=ecs_run_launcher
)from dagster import job, op, Definitions
from dagster_aws.ecs import ecs_executor
@op
def extract_data():
# Extract data operation
return "extracted_data"
@op
def transform_data(data):
# Transform data operation
return f"transformed_{data}"
@op
def load_data(data):
# Load data operation
print(f"Loading {data}")
@job(
executor_def=ecs_executor.configured({
"cluster": "my-dagster-cluster",
"subnets": ["subnet-12345", "subnet-67890"],
"security_group_ids": ["sg-abcdef"],
"task_definition": "arn:aws:ecs:us-west-2:123456789012:task-definition/dagster-task:1",
"cpu": "1024",
"memory": "2048",
"assign_public_ip": True
})
)
def etl_job():
data = extract_data()
transformed = transform_data(data)
load_data(transformed)
defs = Definitions(jobs=[etl_job])from dagster import job, op, Definitions
from dagster_aws.ecs import ecs_executor
# Using custom task definition with specific container configuration
custom_ecs_executor = ecs_executor.configured({
"cluster": "production-cluster",
"subnets": ["subnet-prod-1", "subnet-prod-2"],
"security_group_ids": ["sg-prod-dagster"],
"task_definition": "arn:aws:ecs:us-east-1:123456789012:task-definition/dagster-prod:5",
"task_role_arn": "arn:aws:iam::123456789012:role/DagsterTaskRole",
"execution_role_arn": "arn:aws:iam::123456789012:role/DagsterExecutionRole",
"assign_public_ip": False,
"region_name": "us-east-1"
})
@op
def cpu_intensive_operation():
# Perform CPU-intensive computation
result = sum(i**2 for i in range(1000000))
return result
@job(executor_def=custom_ecs_executor)
def compute_job():
cpu_intensive_operation()
defs = Definitions(jobs=[compute_job])from dagster import op, job, Definitions, RetryPolicy
from dagster_aws.ecs import ecs_executor, EcsEventualConsistencyTimeout
@op(retry_policy=RetryPolicy(max_retries=3))
def resilient_operation():
try:
# Operation that might fail due to ECS eventual consistency
return "success"
except EcsEventualConsistencyTimeout as e:
# Handle ECS-specific timeout errors
raise Exception(f"ECS operation timed out after {e.timeout_seconds} seconds")
@job(executor_def=ecs_executor.configured({
"cluster": "my-cluster",
"subnets": ["subnet-123"],
"security_group_ids": ["sg-456"]
}))
def resilient_job():
resilient_operation()
defs = Definitions(jobs=[resilient_job])Install with Tessl CLI
npx tessl i tessl/pypi-dagster-aws