or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

athena-queries.mdcloudwatch-logging.mdecr-integration.mdecs-orchestration.mdemr-processing.mdindex.mdparameter-store.mdpipes-orchestration.mdrds-operations.mdredshift-integration.mds3-storage.mdsecrets-management.md
tile.json

tessl/pypi-dagster-aws

Package for AWS-specific Dagster framework solid and resource components.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-aws@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-aws@0.27.0

index.mddocs/

Dagster AWS

Package for AWS-specific Dagster framework solid and resource components. This library provides comprehensive AWS service integrations for the Dagster data orchestration framework, enabling developers to build data pipelines that seamlessly interact with AWS services including S3, ECS, EMR, Redshift, Athena, RDS, CloudWatch, Secrets Manager, SSM, ECR, and more.

Package Information

  • Package Name: dagster-aws
  • Language: Python
  • Installation: pip install dagster-aws

Core Imports

The package is organized by AWS service, with each service module exporting its components:

# S3 functionality
from dagster_aws.s3 import S3Resource, s3_resource, S3PickleIOManager

# ECS functionality
from dagster_aws.ecs import EcsRunLauncher, ecs_executor

# EMR functionality
from dagster_aws.emr import EmrJobRunner, emr_pyspark_step_launcher

# Redshift functionality
from dagster_aws.redshift import RedshiftResource, redshift_resource

# Other services
from dagster_aws.athena import AthenaResource
from dagster_aws.cloudwatch import cloudwatch_logger
from dagster_aws.secretsmanager import SecretsManagerResource
from dagster_aws.ssm import ParameterStoreResource
from dagster_aws.pipes import PipesECSClient, PipesS3ContextInjector

Basic Usage

from dagster import Definitions, asset
from dagster_aws.s3 import S3Resource, S3PickleIOManager
from dagster_aws.ecs import ecs_executor

# Configure S3 resource
s3_resource = S3Resource(region_name="us-west-2")

# Configure S3 I/O manager for asset storage
s3_io_manager = S3PickleIOManager(
    s3_resource=s3_resource,
    s3_bucket="my-data-bucket"
)

# Use ECS executor for distributed computation
@asset
def my_data_asset():
    return [1, 2, 3, 4, 5]

# Define deployment with AWS resources
defs = Definitions(
    assets=[my_data_asset],
    resources={
        "s3": s3_resource,
        "io_manager": s3_io_manager,
    },
    executors={
        "ecs": ecs_executor.configured({
            "cluster": "my-dagster-cluster",
            "subnets": ["subnet-12345"],
            "security_group_ids": ["sg-67890"]
        })
    }
)

Architecture

Dagster AWS follows a service-oriented architecture where each AWS service integration is contained in its own module. The library provides three main types of components:

  • Resources: Configured connections to AWS services (S3Resource, RedshiftResource, etc.)
  • I/O Managers: Handle data storage and retrieval (S3PickleIOManager, etc.)
  • Executors/Launchers: Manage job execution on AWS infrastructure (EcsRunLauncher, ecs_executor)
  • Pipes Clients: Enable external process orchestration (PipesECSClient, PipesLambdaClient, etc.)

All resources inherit from Dagster's ConfigurableResource and can be configured with AWS credentials, regions, and service-specific settings.

Capabilities

S3 Storage and File Management

Comprehensive S3 integration for data storage, file management, compute logs, and I/O operations. Includes specialized I/O managers for different data formats and use cases.

class S3Resource(ResourceWithBoto3Configuration):
    def get_client(self): ...

class S3PickleIOManager(ConfigurableIOManager):
    def load_input(self, context): ...
    def handle_output(self, context, obj): ...

def s3_resource(**kwargs) -> S3Resource: ...
def s3_pickle_io_manager(**kwargs): ...

S3 Storage

ECS Container Orchestration

Execute Dagster jobs and ops on Amazon ECS clusters with full support for task configuration, networking, and scaling.

class EcsRunLauncher(RunLauncher):
    def launch_run(self, context): ...

def ecs_executor(**kwargs): ...

class EcsEventualConsistencyTimeout(Exception): ...

ECS Orchestration

EMR Big Data Processing

Integrate with Amazon EMR for big data processing workflows, including PySpark step execution and cluster management.

class EmrJobRunner:
    def run_job_flow(self, **kwargs): ...
    def wait_for_completion(self): ...

def emr_pyspark_step_launcher(**kwargs): ...

class EmrError(Exception): ...
class EmrClusterState(Enum): ...
class EmrStepState(Enum): ...

EMR Processing

Redshift Data Warehousing

Connect to and execute queries against Amazon Redshift clusters with connection pooling and query optimization.

class RedshiftResource(ResourceWithBoto3Configuration):
    def get_connection(self): ...
    def execute_query(self, query: str): ...

def redshift_resource(**kwargs) -> RedshiftResource: ...

class RedshiftError(Exception): ...

Redshift Integration

Athena Query Service

Execute serverless SQL queries against data in S3 using Amazon Athena with result management and query optimization.

class AthenaResource(ResourceWithBoto3Configuration):
    def execute_query(self, query: str): ...
    def get_query_results(self, execution_id: str): ...

def athena_resource(**kwargs) -> AthenaResource: ...

class AthenaError(Exception): ...
class AthenaTimeout(Exception): ...

Athena Queries

CloudWatch Logging

Send Dagster logs to Amazon CloudWatch for centralized log management and monitoring.

def cloudwatch_logger(**kwargs): ...

CloudWatch Logging

Secrets Management

Integrate with AWS Secrets Manager for secure credential and configuration management within Dagster pipelines.

class SecretsManagerResource(ResourceWithBoto3Configuration):
    def get_secret(self, secret_id: str): ...

def secretsmanager_resource(**kwargs) -> SecretsManagerResource: ...
def get_secrets_from_arns(arns: list) -> dict: ...
def get_tagged_secrets(tags: dict) -> dict: ...

Secrets Management

Parameter Store Configuration

Access AWS Systems Manager Parameter Store for configuration management and secure parameter storage.

class ParameterStoreResource(ResourceWithBoto3Configuration):
    def get_parameter(self, name: str): ...
    def get_parameters_by_path(self, path: str): ...

class SSMResource(ResourceWithBoto3Configuration): ...

def parameter_store_resource(**kwargs) -> ParameterStoreResource: ...
def ssm_resource(**kwargs) -> SSMResource: ...

Parameter Store

Container Registry Integration

Interact with Amazon ECR for container image management in containerized Dagster workflows.

class ECRPublicResource(ResourceWithBoto3Configuration):
    def get_authorization_token(self): ...

def ecr_public_resource(**kwargs) -> ECRPublicResource: ...

ECR Integration

RDS Database Operations

Connect to and manage Amazon RDS instances for relational database operations within Dagster pipelines.

class RDSResource(ResourceWithBoto3Configuration):
    def get_connection(self): ...

RDS Operations

Pipes External Process Orchestration

Orchestrate external processes running on various AWS services using Dagster's Pipes protocol for subprocess communication.

class PipesECSClient(PipesClient):
    def run(self, context, **kwargs): ...

class PipesLambdaClient(PipesClient):
    def run(self, context, **kwargs): ...

class PipesS3ContextInjector(PipesContextInjector):
    def inject_context(self, context): ...

class PipesCloudWatchLogReader(PipesLogReader):
    def read_logs(self): ...

Pipes Orchestration

Common Types

class ResourceWithBoto3Configuration(ConfigurableResource):
    """
    Base resource class for AWS services using boto3 with standard configuration options.
    """
    region_name: Optional[str] = None
    max_attempts: int = 5
    profile_name: Optional[str] = None
    use_ssl: bool = True
    endpoint_url: Optional[str] = None
    verify: Optional[bool] = True
    aws_access_key_id: Optional[str] = None
    aws_secret_access_key: Optional[str] = None
    aws_session_token: Optional[str] = None

class ResourceWithS3Configuration(ConfigurableResource):
    """
    Base resource class for S3-specific services with S3-focused configuration options.
    """
    use_unsigned_session: bool = False
    region_name: Optional[str] = None
    endpoint_url: Optional[str] = None
    max_attempts: int = 5
    profile_name: Optional[str] = None
    use_ssl: bool = True
    verify: Optional[bool] = None
    aws_access_key_id: Optional[str] = None
    aws_secret_access_key: Optional[str] = None
    aws_session_token: Optional[str] = None

def construct_boto_client_retry_config(max_attempts: int) -> dict:
    """
    Construct retry configuration for boto3 clients.
    
    Parameters:
        max_attempts: Maximum number of retry attempts
        
    Returns:
        dict: Boto3 retry configuration
    """