CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-aws

Package for AWS-specific Dagster framework solid and resource components.

Pending
Overview
Eval results
Files

pipes-orchestration.mddocs/

Pipes External Process Orchestration

Orchestrate external processes running on various AWS services using Dagster's Pipes protocol for subprocess communication. This enables seamless integration with AWS compute services while maintaining observability and data flow.

Capabilities

Pipes Clients

Clients for orchestrating external processes on different AWS compute services.

class PipesECSClient(PipesClient):
    """
    Pipes client for running external processes on ECS.
    """
    
    def __init__(
        self,
        client=None,
        context_injector=None,
        message_reader=None,
        forward_termination=True
    ): ...
    
    def run(
        self,
        context: OpExecutionContext,
        extras: Optional[Dict] = None,
        **kwargs
    ) -> PipesExecutionResult:
        """
        Execute external process on ECS.
        
        Parameters:
            context: Dagster execution context
            extras: Additional context for external process
            **kwargs: ECS task configuration overrides
            
        Returns:
            PipesExecutionResult: Execution results
        """

class PipesLambdaClient(PipesClient):
    """
    Pipes client for running external processes on Lambda.
    """
    
    def run(
        self,
        function_name: str,
        event: Dict,
        context: OpExecutionContext,
        **kwargs
    ) -> PipesExecutionResult:
        """
        Execute external process on Lambda.
        
        Parameters:
            function_name: Lambda function name
            event: Event payload for Lambda function
            context: Dagster execution context
            **kwargs: Additional Lambda invocation parameters
            
        Returns:
            PipesExecutionResult: Execution results
        """

class PipesGlueClient(PipesClient):
    """
    Pipes client for running external processes on AWS Glue.
    """

class PipesEMRClient(PipesClient):
    """
    Pipes client for running external processes on EMR.
    """

class PipesEMRContainersClient(PipesClient):
    """
    Pipes client for running external processes on EMR on EKS.
    """

class PipesEMRServerlessClient(PipesClient):
    """
    Pipes client for running external processes on EMR Serverless.
    """

Context Injectors

Inject Dagster context into external processes running on AWS services.

class PipesS3ContextInjector(PipesContextInjector):
    """
    Inject context via S3 for external processes.
    """
    
    def __init__(
        self,
        *,
        bucket: str,
        client
    ): ...
    
    def inject_context(
        self,
        context: OpExecutionContext
    ) -> PipesContextData: ...

class PipesLambdaEventContextInjector(PipesContextInjector):
    """
    Inject context via Lambda event payload.
    """

Message Readers

Read messages and logs from external processes running on AWS services.

class PipesS3MessageReader(PipesMessageReader):
    """
    Read messages from S3 for external processes.
    """
    
    def __init__(
        self,
        bucket: str,
        key_prefix: str = "dagster-pipes",
        **kwargs
    ): ...

class PipesS3LogReader(PipesLogReader):
    """
    Read logs from S3 for external processes.
    """

class PipesCloudWatchMessageReader(PipesMessageReader):
    """
    Read messages from CloudWatch logs.
    """
    
    def __init__(
        self,
        log_group: str,
        log_stream_prefix: str = "",
        **kwargs
    ): ...

class PipesCloudWatchLogReader(PipesLogReader):
    """
    Read logs from CloudWatch.
    """

class PipesLambdaLogsMessageReader(PipesMessageReader):
    """
    Read messages from Lambda execution logs.
    """

Usage Examples

ECS Pipes Client

from dagster import op, job, Definitions
from dagster_aws.pipes import (
    PipesECSClient,
    PipesS3ContextInjector,
    PipesCloudWatchLogReader
)

ecs_pipes_client = PipesECSClient(
    cluster="my-dagster-cluster",
    task_definition="my-external-task",
    subnets=["subnet-12345"],
    security_group_ids=["sg-67890"],
    context_injector=PipesS3ContextInjector(
        bucket="my-pipes-bucket",
        key_prefix="context"
    ),
    message_reader=PipesCloudWatchLogReader(
        log_group="/aws/ecs/my-external-task"
    )
)

@op
def run_external_processing(context, pipes_client: PipesECSClient):
    """
    Run external data processing on ECS via Pipes.
    """
    return pipes_client.run(
        context=context,
        extras={"input_path": "s3://data-bucket/input/"},
        task_overrides={
            "cpu": "1024",
            "memory": "2048"
        }
    )

@job(
    resource_defs={
        "pipes_ecs_client": ecs_pipes_client
    }
)
def external_processing_job():
    run_external_processing()

defs = Definitions(jobs=[external_processing_job])

Lambda Pipes Client

from dagster import op, job, Definitions
from dagster_aws.pipes import (
    PipesLambdaClient,
    PipesLambdaEventContextInjector,
    PipesLambdaLogsMessageReader
)

lambda_pipes_client = PipesLambdaClient(
    context_injector=PipesLambdaEventContextInjector(),
    message_reader=PipesLambdaLogsMessageReader()
)

@op
def invoke_lambda_function(context, pipes_client: PipesLambdaClient):
    """
    Invoke Lambda function via Pipes for serverless processing.
    """
    return pipes_client.run(
        context=context,
        function_name="my-data-processor",
        payload={
            "input_bucket": "source-data",
            "output_bucket": "processed-data"
        }
    )

@job(
    resource_defs={
        "pipes_lambda_client": lambda_pipes_client
    }
)
def serverless_processing_job():
    invoke_lambda_function()

defs = Definitions(jobs=[serverless_processing_job])

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-aws

docs

athena-queries.md

cloudwatch-logging.md

ecr-integration.md

ecs-orchestration.md

emr-processing.md

index.md

parameter-store.md

pipes-orchestration.md

rds-operations.md

redshift-integration.md

s3-storage.md

secrets-management.md

tile.json