Package for AWS-specific Dagster framework solid and resource components.
—
Orchestrate external processes running on various AWS services using Dagster's Pipes protocol for subprocess communication. This enables seamless integration with AWS compute services while maintaining observability and data flow.
Clients for orchestrating external processes on different AWS compute services.
class PipesECSClient(PipesClient):
"""
Pipes client for running external processes on ECS.
"""
def __init__(
self,
client=None,
context_injector=None,
message_reader=None,
forward_termination=True
): ...
def run(
self,
context: OpExecutionContext,
extras: Optional[Dict] = None,
**kwargs
) -> PipesExecutionResult:
"""
Execute external process on ECS.
Parameters:
context: Dagster execution context
extras: Additional context for external process
**kwargs: ECS task configuration overrides
Returns:
PipesExecutionResult: Execution results
"""
class PipesLambdaClient(PipesClient):
"""
Pipes client for running external processes on Lambda.
"""
def run(
self,
function_name: str,
event: Dict,
context: OpExecutionContext,
**kwargs
) -> PipesExecutionResult:
"""
Execute external process on Lambda.
Parameters:
function_name: Lambda function name
event: Event payload for Lambda function
context: Dagster execution context
**kwargs: Additional Lambda invocation parameters
Returns:
PipesExecutionResult: Execution results
"""
class PipesGlueClient(PipesClient):
"""
Pipes client for running external processes on AWS Glue.
"""
class PipesEMRClient(PipesClient):
"""
Pipes client for running external processes on EMR.
"""
class PipesEMRContainersClient(PipesClient):
"""
Pipes client for running external processes on EMR on EKS.
"""
class PipesEMRServerlessClient(PipesClient):
"""
Pipes client for running external processes on EMR Serverless.
"""Inject Dagster context into external processes running on AWS services.
class PipesS3ContextInjector(PipesContextInjector):
"""
Inject context via S3 for external processes.
"""
def __init__(
self,
*,
bucket: str,
client
): ...
def inject_context(
self,
context: OpExecutionContext
) -> PipesContextData: ...
class PipesLambdaEventContextInjector(PipesContextInjector):
"""
Inject context via Lambda event payload.
"""Read messages and logs from external processes running on AWS services.
class PipesS3MessageReader(PipesMessageReader):
"""
Read messages from S3 for external processes.
"""
def __init__(
self,
bucket: str,
key_prefix: str = "dagster-pipes",
**kwargs
): ...
class PipesS3LogReader(PipesLogReader):
"""
Read logs from S3 for external processes.
"""
class PipesCloudWatchMessageReader(PipesMessageReader):
"""
Read messages from CloudWatch logs.
"""
def __init__(
self,
log_group: str,
log_stream_prefix: str = "",
**kwargs
): ...
class PipesCloudWatchLogReader(PipesLogReader):
"""
Read logs from CloudWatch.
"""
class PipesLambdaLogsMessageReader(PipesMessageReader):
"""
Read messages from Lambda execution logs.
"""from dagster import op, job, Definitions
from dagster_aws.pipes import (
PipesECSClient,
PipesS3ContextInjector,
PipesCloudWatchLogReader
)
ecs_pipes_client = PipesECSClient(
cluster="my-dagster-cluster",
task_definition="my-external-task",
subnets=["subnet-12345"],
security_group_ids=["sg-67890"],
context_injector=PipesS3ContextInjector(
bucket="my-pipes-bucket",
key_prefix="context"
),
message_reader=PipesCloudWatchLogReader(
log_group="/aws/ecs/my-external-task"
)
)
@op
def run_external_processing(context, pipes_client: PipesECSClient):
"""
Run external data processing on ECS via Pipes.
"""
return pipes_client.run(
context=context,
extras={"input_path": "s3://data-bucket/input/"},
task_overrides={
"cpu": "1024",
"memory": "2048"
}
)
@job(
resource_defs={
"pipes_ecs_client": ecs_pipes_client
}
)
def external_processing_job():
run_external_processing()
defs = Definitions(jobs=[external_processing_job])from dagster import op, job, Definitions
from dagster_aws.pipes import (
PipesLambdaClient,
PipesLambdaEventContextInjector,
PipesLambdaLogsMessageReader
)
lambda_pipes_client = PipesLambdaClient(
context_injector=PipesLambdaEventContextInjector(),
message_reader=PipesLambdaLogsMessageReader()
)
@op
def invoke_lambda_function(context, pipes_client: PipesLambdaClient):
"""
Invoke Lambda function via Pipes for serverless processing.
"""
return pipes_client.run(
context=context,
function_name="my-data-processor",
payload={
"input_bucket": "source-data",
"output_bucket": "processed-data"
}
)
@job(
resource_defs={
"pipes_lambda_client": lambda_pipes_client
}
)
def serverless_processing_job():
invoke_lambda_function()
defs = Definitions(jobs=[serverless_processing_job])Install with Tessl CLI
npx tessl i tessl/pypi-dagster-aws