CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-aws

Package for AWS-specific Dagster framework solid and resource components.

Pending
Overview
Eval results
Files

ecs-orchestration.mddocs/

ECS Container Orchestration

Execute Dagster jobs and ops on Amazon ECS (Elastic Container Service) clusters with full support for task configuration, networking, scaling, and container management. This integration allows Dagster to launch runs as ECS tasks and execute ops distributed across ECS containers.

Capabilities

ECS Run Launcher

Launch Dagster runs as ECS tasks, providing scalable and isolated execution environments for data pipelines.

class EcsRunLauncher(RunLauncher, ConfigurableClass):
    """
    Run launcher that executes Dagster runs as ECS tasks.
    
    Configuration options include ECS cluster settings, networking, 
    task definitions, resource allocation, and security configuration.
    """
    
    def __init__(self, inst_data=None):
        """
        Initialize ECS run launcher with configuration data.
        
        Configuration includes:
            cluster: ECS cluster name
            subnets: List of subnet IDs
            security_group_ids: List of security group IDs
            task_definition: ECS task definition ARN (optional)
            task_role_arn: IAM role for task execution
            execution_role_arn: IAM role for ECS task execution
            cpu: CPU units for task
            memory: Memory (MB) for task
            secrets: AWS Secrets Manager secrets
            secrets_tag: Tag for secret discovery
            env_vars: Environment variables
            include_sidecars: Include sidecar containers
            use_current_ecs_task_config: Inherit current task config
            run_ecs_tags: Tags for ECS tasks
            propagate_tags: Tag propagation configuration
            task_definition_prefix: Prefix for task definitions
            region_name: AWS region
        """
    
    def launch_run(self, context: LaunchRunContext) -> DagsterRun:
        """
        Launch a Dagster run as an ECS task.
        
        Parameters:
            context: Launch context containing run information
            
        Returns:
            DagsterRun: The launched run
        """
    
    def can_terminate(self, run_id: str) -> bool:
        """
        Check if run can be terminated.
        
        Parameters:
            run_id: ID of the run to check
            
        Returns:
            bool: Whether the run can be terminated
        """
    
    def terminate(self, run_id: str) -> bool:
        """
        Terminate a running ECS task.
        
        Parameters:
            run_id: ID of the run to terminate
            
        Returns:
            bool: Whether termination was successful
        """
    
    def check_run_worker_health(self, run: DagsterRun) -> CheckRunHealthResult:
        """
        Check health status of ECS task running the job.
        
        Parameters:
            run: Dagster run to check
            
        Returns:
            CheckRunHealthResult: Health check result
        """
    
    def get_image_for_run(self, run: DagsterRun) -> str:
        """
        Get container image for the run.
        
        Parameters:
            run: Dagster run
            
        Returns:
            str: Container image URI
        """

ECS Executor

Execute individual ops as separate ECS tasks, enabling distributed computation and parallel processing.

def ecs_executor(
    cluster: str,
    subnets: List[str], 
    security_group_ids: List[str],
    task_definition: Optional[str] = None,
    task_role_arn: Optional[str] = None,
    execution_role_arn: Optional[str] = None,
    assign_public_ip: bool = False,
    cpu: Optional[str] = None,
    memory: Optional[str] = None,
    ephemeral_storage: Optional[int] = None,
    region_name: Optional[str] = None,
    **kwargs
) -> ExecutorDefinition:
    """
    Executor that runs ops as individual ECS tasks.
    
    Parameters:
        cluster: ECS cluster name or ARN
        subnets: List of subnet IDs for task networking
        security_group_ids: List of security group IDs
        task_definition: ECS task definition ARN (optional)
        task_role_arn: IAM role ARN for task execution
        execution_role_arn: IAM role ARN for ECS task execution
        assign_public_ip: Whether to assign public IP to tasks
        cpu: CPU units for tasks (e.g., "256", "512", "1024")
        memory: Memory for tasks in MB (e.g., "512", "1024", "2048")
        ephemeral_storage: Ephemeral storage in GB
        region_name: AWS region name
        **kwargs: Additional configuration options
        
    Returns:
        ExecutorDefinition: Configured ECS executor
    """

ECS Exception Handling

Exception classes for ECS-specific error handling and timeout management.

class EcsEventualConsistencyTimeout(Exception):
    """
    Exception raised when ECS operations timeout due to eventual consistency delays.
    
    ECS has eventual consistency for some operations, and this exception is raised
    when operations don't complete within the expected timeframe.
    """

class EcsNoTasksFound(Exception):
    """
    Exception raised when no ECS tasks are found for a given run.
    """

class RetryableEcsException(Exception):
    """
    Base class for retryable ECS exceptions.
    """

Usage Examples

Basic ECS Run Launcher Configuration

from dagster import job, op, Definitions
from dagster_aws.ecs import EcsRunLauncher

@op
def hello_world():
    return "Hello from ECS!"

@job
def my_job():
    hello_world()

# Configure ECS run launcher
ecs_run_launcher = EcsRunLauncher(
    cluster="my-dagster-cluster",
    subnets=["subnet-12345", "subnet-67890"],
    security_group_ids=["sg-abcdef"],
    task_role_arn="arn:aws:iam::123456789012:role/DagsterEcsTaskRole",
    execution_role_arn="arn:aws:iam::123456789012:role/DagsterEcsExecutionRole",
    cpu="512",
    memory="1024",
    region_name="us-west-2"
)

defs = Definitions(
    jobs=[my_job],
    run_launcher=ecs_run_launcher
)

ECS Executor for Distributed Ops

from dagster import job, op, Definitions
from dagster_aws.ecs import ecs_executor

@op
def extract_data():
    # Extract data operation
    return "extracted_data"

@op
def transform_data(data):
    # Transform data operation  
    return f"transformed_{data}"

@op  
def load_data(data):
    # Load data operation
    print(f"Loading {data}")

@job(
    executor_def=ecs_executor.configured({
        "cluster": "my-dagster-cluster",
        "subnets": ["subnet-12345", "subnet-67890"], 
        "security_group_ids": ["sg-abcdef"],
        "task_definition": "arn:aws:ecs:us-west-2:123456789012:task-definition/dagster-task:1",
        "cpu": "1024",
        "memory": "2048",
        "assign_public_ip": True
    })
)
def etl_job():
    data = extract_data()
    transformed = transform_data(data)
    load_data(transformed)
    
defs = Definitions(jobs=[etl_job])

Custom Task Definition with ECS

from dagster import job, op, Definitions
from dagster_aws.ecs import ecs_executor

# Using custom task definition with specific container configuration
custom_ecs_executor = ecs_executor.configured({
    "cluster": "production-cluster",
    "subnets": ["subnet-prod-1", "subnet-prod-2"],
    "security_group_ids": ["sg-prod-dagster"],
    "task_definition": "arn:aws:ecs:us-east-1:123456789012:task-definition/dagster-prod:5",
    "task_role_arn": "arn:aws:iam::123456789012:role/DagsterTaskRole",
    "execution_role_arn": "arn:aws:iam::123456789012:role/DagsterExecutionRole",
    "assign_public_ip": False,
    "region_name": "us-east-1"
})

@op
def cpu_intensive_operation():
    # Perform CPU-intensive computation
    result = sum(i**2 for i in range(1000000))
    return result

@job(executor_def=custom_ecs_executor)
def compute_job():
    cpu_intensive_operation()

defs = Definitions(jobs=[compute_job])

Error Handling with ECS

from dagster import op, job, Definitions, RetryPolicy
from dagster_aws.ecs import ecs_executor, EcsEventualConsistencyTimeout

@op(retry_policy=RetryPolicy(max_retries=3))
def resilient_operation():
    try:
        # Operation that might fail due to ECS eventual consistency
        return "success"
    except EcsEventualConsistencyTimeout as e:
        # Handle ECS-specific timeout errors
        raise Exception(f"ECS operation timed out after {e.timeout_seconds} seconds")

@job(executor_def=ecs_executor.configured({
    "cluster": "my-cluster",
    "subnets": ["subnet-123"],
    "security_group_ids": ["sg-456"]
}))
def resilient_job():
    resilient_operation()

defs = Definitions(jobs=[resilient_job])

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-aws

docs

athena-queries.md

cloudwatch-logging.md

ecr-integration.md

ecs-orchestration.md

emr-processing.md

index.md

parameter-store.md

pipes-orchestration.md

rds-operations.md

redshift-integration.md

s3-storage.md

secrets-management.md

tile.json