Package for AWS-specific Dagster framework solid and resource components.
npx @tessl/cli install tessl/pypi-dagster-aws@0.27.0Package for AWS-specific Dagster framework solid and resource components. This library provides comprehensive AWS service integrations for the Dagster data orchestration framework, enabling developers to build data pipelines that seamlessly interact with AWS services including S3, ECS, EMR, Redshift, Athena, RDS, CloudWatch, Secrets Manager, SSM, ECR, and more.
pip install dagster-awsThe package is organized by AWS service, with each service module exporting its components:
# S3 functionality
from dagster_aws.s3 import S3Resource, s3_resource, S3PickleIOManager
# ECS functionality
from dagster_aws.ecs import EcsRunLauncher, ecs_executor
# EMR functionality
from dagster_aws.emr import EmrJobRunner, emr_pyspark_step_launcher
# Redshift functionality
from dagster_aws.redshift import RedshiftResource, redshift_resource
# Other services
from dagster_aws.athena import AthenaResource
from dagster_aws.cloudwatch import cloudwatch_logger
from dagster_aws.secretsmanager import SecretsManagerResource
from dagster_aws.ssm import ParameterStoreResource
from dagster_aws.pipes import PipesECSClient, PipesS3ContextInjectorfrom dagster import Definitions, asset
from dagster_aws.s3 import S3Resource, S3PickleIOManager
from dagster_aws.ecs import ecs_executor
# Configure S3 resource
s3_resource = S3Resource(region_name="us-west-2")
# Configure S3 I/O manager for asset storage
s3_io_manager = S3PickleIOManager(
s3_resource=s3_resource,
s3_bucket="my-data-bucket"
)
# Use ECS executor for distributed computation
@asset
def my_data_asset():
return [1, 2, 3, 4, 5]
# Define deployment with AWS resources
defs = Definitions(
assets=[my_data_asset],
resources={
"s3": s3_resource,
"io_manager": s3_io_manager,
},
executors={
"ecs": ecs_executor.configured({
"cluster": "my-dagster-cluster",
"subnets": ["subnet-12345"],
"security_group_ids": ["sg-67890"]
})
}
)Dagster AWS follows a service-oriented architecture where each AWS service integration is contained in its own module. The library provides three main types of components:
All resources inherit from Dagster's ConfigurableResource and can be configured with AWS credentials, regions, and service-specific settings.
Comprehensive S3 integration for data storage, file management, compute logs, and I/O operations. Includes specialized I/O managers for different data formats and use cases.
class S3Resource(ResourceWithBoto3Configuration):
def get_client(self): ...
class S3PickleIOManager(ConfigurableIOManager):
def load_input(self, context): ...
def handle_output(self, context, obj): ...
def s3_resource(**kwargs) -> S3Resource: ...
def s3_pickle_io_manager(**kwargs): ...Execute Dagster jobs and ops on Amazon ECS clusters with full support for task configuration, networking, and scaling.
class EcsRunLauncher(RunLauncher):
def launch_run(self, context): ...
def ecs_executor(**kwargs): ...
class EcsEventualConsistencyTimeout(Exception): ...Integrate with Amazon EMR for big data processing workflows, including PySpark step execution and cluster management.
class EmrJobRunner:
def run_job_flow(self, **kwargs): ...
def wait_for_completion(self): ...
def emr_pyspark_step_launcher(**kwargs): ...
class EmrError(Exception): ...
class EmrClusterState(Enum): ...
class EmrStepState(Enum): ...Connect to and execute queries against Amazon Redshift clusters with connection pooling and query optimization.
class RedshiftResource(ResourceWithBoto3Configuration):
def get_connection(self): ...
def execute_query(self, query: str): ...
def redshift_resource(**kwargs) -> RedshiftResource: ...
class RedshiftError(Exception): ...Execute serverless SQL queries against data in S3 using Amazon Athena with result management and query optimization.
class AthenaResource(ResourceWithBoto3Configuration):
def execute_query(self, query: str): ...
def get_query_results(self, execution_id: str): ...
def athena_resource(**kwargs) -> AthenaResource: ...
class AthenaError(Exception): ...
class AthenaTimeout(Exception): ...Send Dagster logs to Amazon CloudWatch for centralized log management and monitoring.
def cloudwatch_logger(**kwargs): ...Integrate with AWS Secrets Manager for secure credential and configuration management within Dagster pipelines.
class SecretsManagerResource(ResourceWithBoto3Configuration):
def get_secret(self, secret_id: str): ...
def secretsmanager_resource(**kwargs) -> SecretsManagerResource: ...
def get_secrets_from_arns(arns: list) -> dict: ...
def get_tagged_secrets(tags: dict) -> dict: ...Access AWS Systems Manager Parameter Store for configuration management and secure parameter storage.
class ParameterStoreResource(ResourceWithBoto3Configuration):
def get_parameter(self, name: str): ...
def get_parameters_by_path(self, path: str): ...
class SSMResource(ResourceWithBoto3Configuration): ...
def parameter_store_resource(**kwargs) -> ParameterStoreResource: ...
def ssm_resource(**kwargs) -> SSMResource: ...Interact with Amazon ECR for container image management in containerized Dagster workflows.
class ECRPublicResource(ResourceWithBoto3Configuration):
def get_authorization_token(self): ...
def ecr_public_resource(**kwargs) -> ECRPublicResource: ...Connect to and manage Amazon RDS instances for relational database operations within Dagster pipelines.
class RDSResource(ResourceWithBoto3Configuration):
def get_connection(self): ...Orchestrate external processes running on various AWS services using Dagster's Pipes protocol for subprocess communication.
class PipesECSClient(PipesClient):
def run(self, context, **kwargs): ...
class PipesLambdaClient(PipesClient):
def run(self, context, **kwargs): ...
class PipesS3ContextInjector(PipesContextInjector):
def inject_context(self, context): ...
class PipesCloudWatchLogReader(PipesLogReader):
def read_logs(self): ...class ResourceWithBoto3Configuration(ConfigurableResource):
"""
Base resource class for AWS services using boto3 with standard configuration options.
"""
region_name: Optional[str] = None
max_attempts: int = 5
profile_name: Optional[str] = None
use_ssl: bool = True
endpoint_url: Optional[str] = None
verify: Optional[bool] = True
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
aws_session_token: Optional[str] = None
class ResourceWithS3Configuration(ConfigurableResource):
"""
Base resource class for S3-specific services with S3-focused configuration options.
"""
use_unsigned_session: bool = False
region_name: Optional[str] = None
endpoint_url: Optional[str] = None
max_attempts: int = 5
profile_name: Optional[str] = None
use_ssl: bool = True
verify: Optional[bool] = None
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
aws_session_token: Optional[str] = None
def construct_boto_client_retry_config(max_attempts: int) -> dict:
"""
Construct retry configuration for boto3 clients.
Parameters:
max_attempts: Maximum number of retry attempts
Returns:
dict: Boto3 retry configuration
"""