CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-aws

Package for AWS-specific Dagster framework solid and resource components.

Pending
Overview
Eval results
Files

s3-storage.mddocs/

S3 Storage and File Management

Comprehensive Amazon S3 integration for Dagster providing data storage, file management, compute logs, and I/O operations. The S3 module includes I/O managers for different data formats, file management utilities, and compute log storage.

Capabilities

S3 Resource

Core S3 resource providing configured access to Amazon S3 services with automatic credential management and retry logic.

class S3Resource(ResourceWithS3Configuration):
    """
    Resource for interacting with Amazon S3.
    
    Inherits S3-specific configuration options from ResourceWithS3Configuration.
    """
    def get_client(self):
        """
        Get configured boto3 S3 client.
        
        Returns:
            boto3.client: Configured S3 client
        """

def s3_resource(**kwargs) -> S3Resource:
    """
    Factory function to create S3Resource.
    
    Parameters:
        **kwargs: Configuration parameters for S3Resource
        
    Returns:
        S3Resource: Configured S3 resource
    """

S3 I/O Managers

I/O managers for storing and retrieving Dagster assets and op outputs in Amazon S3, supporting different serialization formats.

class S3PickleIOManager(ConfigurableIOManager):
    """
    I/O manager that stores and retrieves objects in S3 using pickle serialization.
    """
    s3_resource: S3Resource
    s3_bucket: str
    s3_prefix: str = ""
    
    def load_input(self, context):
        """
        Load input from S3.
        
        Parameters:
            context: Input loading context
            
        Returns:
            Any: Deserialized object from S3
        """
    
    def handle_output(self, context, obj):
        """
        Store output to S3.
        
        Parameters:
            context: Output handling context
            obj: Object to serialize and store
        """

class PickledObjectS3IOManager(S3PickleIOManager):
    """
    Legacy alias for S3PickleIOManager.
    """

class ConfigurablePickledObjectS3IOManager(ConfigurableIOManager):
    """
    Configurable version of S3 pickle I/O manager.
    """

def s3_pickle_io_manager(**kwargs):
    """
    Factory function for S3 pickle I/O manager.
    
    Parameters:
        s3_resource: S3Resource instance
        s3_bucket: S3 bucket name
        s3_prefix: Optional S3 key prefix
        
    Returns:
        IOManagerDefinition: Configured S3 pickle I/O manager
    """

S3 File Management

File management utilities for handling file uploads, downloads, and operations within Dagster pipelines.

class S3FileHandle:
    """
    Handle representing a file stored in S3.
    """
    s3_bucket: str
    s3_key: str
    
    def __init__(self, s3_bucket: str, s3_key: str): ...

class S3FileManager:
    """
    Utility for managing files in S3.
    """
    def __init__(self, s3_session, s3_bucket: str, s3_base_key: str): ...
    
    def copy_handle_to_local_temp(self, file_handle: S3FileHandle) -> str:
        """
        Copy S3 file to local temporary file.
        
        Parameters:
            file_handle: S3 file handle
            
        Returns:
            str: Path to local temp file
        """
    
    def read(self, file_handle, mode="rb"):
        """
        Context manager for reading files.
        
        Parameters:
            file_handle: S3 file handle
            mode: File read mode
            
        Returns:
            Context manager for file reading
        """
    
    def read_data(self, file_handle) -> bytes:
        """
        Read file data as bytes.
        
        Parameters:
            file_handle: S3 file handle
            
        Returns:
            bytes: File content as bytes
        """
    
    def write_data(self, data: bytes, ext=None) -> S3FileHandle:
        """
        Write bytes data to S3.
        
        Parameters:
            data: Bytes data to write
            ext: Optional file extension
            
        Returns:
            S3FileHandle: Handle to written file
        """
    
    def write(self, file_obj, mode="wb", ext=None) -> S3FileHandle:
        """
        Write file object to S3.
        
        Parameters:
            file_obj: File-like object to write
            mode: Write mode
            ext: Optional file extension
            
        Returns:
            S3FileHandle: Handle to written file
        """
    
    def get_full_key(self, file_key: str) -> str:
        """
        Get full S3 key with prefix.
        
        Parameters:
            file_key: Base file key
            
        Returns:
            str: Full S3 key with prefix
        """
    
    def delete_local_temp(self): ...
    def upload_file(self, file_path: str, key: str) -> S3FileHandle: ...

class S3FileManagerResource(S3FileManager, ConfigurableResource):
    """
    Configurable S3 file manager resource.
    """

def s3_file_manager(**kwargs):
    """
    Factory function for S3 file manager.
    
    Returns:
        ResourceDefinition: Configured S3 file manager resource
    """

S3 Compute Log Manager

Manages compute logs storage in S3 for Dagster run execution logs.

class S3ComputeLogManager:
    """
    Compute log manager that stores logs in S3.
    """
    def __init__(self, bucket: str, local_dir=None, inst_data=None, 
                 prefix="dagster", use_ssl=True, verify=True, 
                 verify_cert_path=None, endpoint_url=None, 
                 skip_empty_files=False, upload_interval=None, 
                 upload_extra_args=None, show_url_only=False, region=None): ...
    
    def get_log_data(self, log_key: str) -> str:
        """
        Retrieve log data from S3.
        
        Parameters:
            log_key: Log identifier
            
        Returns:
            str: Log content
        """
    
    def delete_logs(self, log_key=None, prefix=None):
        """
        Delete logs from S3.
        
        Parameters:
            log_key: Specific log key to delete
            prefix: Prefix for bulk deletion
        """
    
    def download_url_for_type(self, log_key: str, io_type: str) -> str:
        """
        Get download URL for log type.
        
        Parameters:
            log_key: Log identifier
            io_type: Log type (stdout/stderr)
            
        Returns:
            str: Presigned download URL
        """
    
    def cloud_storage_has_logs(self, log_key: str, io_type: str, partial=False) -> bool:
        """
        Check if logs exist in cloud storage.
        
        Parameters:
            log_key: Log identifier
            io_type: Log type
            partial: Check for partial logs
            
        Returns:
            bool: True if logs exist
        """
    
    def upload_log(self, log_key: str, log_data: str): ...

S3 Operations and Utilities

Utility functions and operations for working with S3 within Dagster pipelines.

class S3Coordinate:
    """
    Coordinate for S3 file location.
    """
    bucket: str
    key: str

def file_handle_to_s3(context, file_handle) -> S3Coordinate:
    """
    Convert file handle to S3 coordinate.
    
    Parameters:
        context: Dagster execution context
        file_handle: File handle to convert
        
    Returns:
        S3Coordinate: S3 location coordinate
    """

class S3Callback:
    """
    Callback utilities for S3 operations.
    """
    def __init__(self, logger, bucket: str, key: str, filename: str, size: int): ...
    def __call__(self, bytes_amount: int): ...

S3 Testing Utilities

Mock S3 resources and utilities for testing Dagster pipelines without actual S3 dependencies.

class S3FakeSession:
    """
    Fake S3 session for testing purposes.
    """
    def client(self, service_name: str): ...

def create_s3_fake_resource(**kwargs):
    """
    Create fake S3 resource for testing.
    
    Returns:
        ResourceDefinition: Mock S3 resource for testing
    """

Usage Examples

Basic S3 I/O Manager Setup

from dagster import Definitions, asset
from dagster_aws.s3 import S3Resource, s3_pickle_io_manager

# Configure S3 resource
s3_resource_def = S3Resource(
    region_name="us-west-2",
    aws_access_key_id="your-access-key",
    aws_secret_access_key="your-secret-key"
)

# Configure I/O manager
s3_io_manager = s3_pickle_io_manager.configured({
    "s3_bucket": "my-dagster-bucket",
    "s3_prefix": "dagster-outputs/"
})

@asset
def my_dataset():
    return [1, 2, 3, 4, 5]

defs = Definitions(
    assets=[my_dataset],
    resources={
        "s3": s3_resource_def,
        "io_manager": s3_io_manager
    }
)

S3 File Management

from dagster import op, job
from dagster_aws.s3 import S3FileManagerResource

@op(required_resource_keys={"s3_file_manager"})
def process_file(context):
    file_manager = context.resources.s3_file_manager
    
    # Upload a file
    file_handle = file_manager.upload_file("/local/path/data.csv", "data/input.csv")
    
    # Copy to local temp for processing
    local_path = file_manager.copy_handle_to_local_temp(file_handle)
    
    # Process file...
    
    # Clean up temp file
    file_manager.delete_local_temp()

@job(
    resource_defs={
        "s3_file_manager": S3FileManagerResource(
            s3_bucket="my-bucket",
            region_name="us-west-2"
        )
    }
)
def file_processing_job():
    process_file()

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-aws

docs

athena-queries.md

cloudwatch-logging.md

ecr-integration.md

ecs-orchestration.md

emr-processing.md

index.md

parameter-store.md

pipes-orchestration.md

rds-operations.md

redshift-integration.md

s3-storage.md

secrets-management.md

tile.json