Package for AWS-specific Dagster framework solid and resource components.
—
Comprehensive Amazon S3 integration for Dagster providing data storage, file management, compute logs, and I/O operations. The S3 module includes I/O managers for different data formats, file management utilities, and compute log storage.
Core S3 resource providing configured access to Amazon S3 services with automatic credential management and retry logic.
class S3Resource(ResourceWithS3Configuration):
"""
Resource for interacting with Amazon S3.
Inherits S3-specific configuration options from ResourceWithS3Configuration.
"""
def get_client(self):
"""
Get configured boto3 S3 client.
Returns:
boto3.client: Configured S3 client
"""
def s3_resource(**kwargs) -> S3Resource:
"""
Factory function to create S3Resource.
Parameters:
**kwargs: Configuration parameters for S3Resource
Returns:
S3Resource: Configured S3 resource
"""I/O managers for storing and retrieving Dagster assets and op outputs in Amazon S3, supporting different serialization formats.
class S3PickleIOManager(ConfigurableIOManager):
"""
I/O manager that stores and retrieves objects in S3 using pickle serialization.
"""
s3_resource: S3Resource
s3_bucket: str
s3_prefix: str = ""
def load_input(self, context):
"""
Load input from S3.
Parameters:
context: Input loading context
Returns:
Any: Deserialized object from S3
"""
def handle_output(self, context, obj):
"""
Store output to S3.
Parameters:
context: Output handling context
obj: Object to serialize and store
"""
class PickledObjectS3IOManager(S3PickleIOManager):
"""
Legacy alias for S3PickleIOManager.
"""
class ConfigurablePickledObjectS3IOManager(ConfigurableIOManager):
"""
Configurable version of S3 pickle I/O manager.
"""
def s3_pickle_io_manager(**kwargs):
"""
Factory function for S3 pickle I/O manager.
Parameters:
s3_resource: S3Resource instance
s3_bucket: S3 bucket name
s3_prefix: Optional S3 key prefix
Returns:
IOManagerDefinition: Configured S3 pickle I/O manager
"""File management utilities for handling file uploads, downloads, and operations within Dagster pipelines.
class S3FileHandle:
"""
Handle representing a file stored in S3.
"""
s3_bucket: str
s3_key: str
def __init__(self, s3_bucket: str, s3_key: str): ...
class S3FileManager:
"""
Utility for managing files in S3.
"""
def __init__(self, s3_session, s3_bucket: str, s3_base_key: str): ...
def copy_handle_to_local_temp(self, file_handle: S3FileHandle) -> str:
"""
Copy S3 file to local temporary file.
Parameters:
file_handle: S3 file handle
Returns:
str: Path to local temp file
"""
def read(self, file_handle, mode="rb"):
"""
Context manager for reading files.
Parameters:
file_handle: S3 file handle
mode: File read mode
Returns:
Context manager for file reading
"""
def read_data(self, file_handle) -> bytes:
"""
Read file data as bytes.
Parameters:
file_handle: S3 file handle
Returns:
bytes: File content as bytes
"""
def write_data(self, data: bytes, ext=None) -> S3FileHandle:
"""
Write bytes data to S3.
Parameters:
data: Bytes data to write
ext: Optional file extension
Returns:
S3FileHandle: Handle to written file
"""
def write(self, file_obj, mode="wb", ext=None) -> S3FileHandle:
"""
Write file object to S3.
Parameters:
file_obj: File-like object to write
mode: Write mode
ext: Optional file extension
Returns:
S3FileHandle: Handle to written file
"""
def get_full_key(self, file_key: str) -> str:
"""
Get full S3 key with prefix.
Parameters:
file_key: Base file key
Returns:
str: Full S3 key with prefix
"""
def delete_local_temp(self): ...
def upload_file(self, file_path: str, key: str) -> S3FileHandle: ...
class S3FileManagerResource(S3FileManager, ConfigurableResource):
"""
Configurable S3 file manager resource.
"""
def s3_file_manager(**kwargs):
"""
Factory function for S3 file manager.
Returns:
ResourceDefinition: Configured S3 file manager resource
"""Manages compute logs storage in S3 for Dagster run execution logs.
class S3ComputeLogManager:
"""
Compute log manager that stores logs in S3.
"""
def __init__(self, bucket: str, local_dir=None, inst_data=None,
prefix="dagster", use_ssl=True, verify=True,
verify_cert_path=None, endpoint_url=None,
skip_empty_files=False, upload_interval=None,
upload_extra_args=None, show_url_only=False, region=None): ...
def get_log_data(self, log_key: str) -> str:
"""
Retrieve log data from S3.
Parameters:
log_key: Log identifier
Returns:
str: Log content
"""
def delete_logs(self, log_key=None, prefix=None):
"""
Delete logs from S3.
Parameters:
log_key: Specific log key to delete
prefix: Prefix for bulk deletion
"""
def download_url_for_type(self, log_key: str, io_type: str) -> str:
"""
Get download URL for log type.
Parameters:
log_key: Log identifier
io_type: Log type (stdout/stderr)
Returns:
str: Presigned download URL
"""
def cloud_storage_has_logs(self, log_key: str, io_type: str, partial=False) -> bool:
"""
Check if logs exist in cloud storage.
Parameters:
log_key: Log identifier
io_type: Log type
partial: Check for partial logs
Returns:
bool: True if logs exist
"""
def upload_log(self, log_key: str, log_data: str): ...Utility functions and operations for working with S3 within Dagster pipelines.
class S3Coordinate:
"""
Coordinate for S3 file location.
"""
bucket: str
key: str
def file_handle_to_s3(context, file_handle) -> S3Coordinate:
"""
Convert file handle to S3 coordinate.
Parameters:
context: Dagster execution context
file_handle: File handle to convert
Returns:
S3Coordinate: S3 location coordinate
"""
class S3Callback:
"""
Callback utilities for S3 operations.
"""
def __init__(self, logger, bucket: str, key: str, filename: str, size: int): ...
def __call__(self, bytes_amount: int): ...Mock S3 resources and utilities for testing Dagster pipelines without actual S3 dependencies.
class S3FakeSession:
"""
Fake S3 session for testing purposes.
"""
def client(self, service_name: str): ...
def create_s3_fake_resource(**kwargs):
"""
Create fake S3 resource for testing.
Returns:
ResourceDefinition: Mock S3 resource for testing
"""from dagster import Definitions, asset
from dagster_aws.s3 import S3Resource, s3_pickle_io_manager
# Configure S3 resource
s3_resource_def = S3Resource(
region_name="us-west-2",
aws_access_key_id="your-access-key",
aws_secret_access_key="your-secret-key"
)
# Configure I/O manager
s3_io_manager = s3_pickle_io_manager.configured({
"s3_bucket": "my-dagster-bucket",
"s3_prefix": "dagster-outputs/"
})
@asset
def my_dataset():
return [1, 2, 3, 4, 5]
defs = Definitions(
assets=[my_dataset],
resources={
"s3": s3_resource_def,
"io_manager": s3_io_manager
}
)from dagster import op, job
from dagster_aws.s3 import S3FileManagerResource
@op(required_resource_keys={"s3_file_manager"})
def process_file(context):
file_manager = context.resources.s3_file_manager
# Upload a file
file_handle = file_manager.upload_file("/local/path/data.csv", "data/input.csv")
# Copy to local temp for processing
local_path = file_manager.copy_handle_to_local_temp(file_handle)
# Process file...
# Clean up temp file
file_manager.delete_local_temp()
@job(
resource_defs={
"s3_file_manager": S3FileManagerResource(
s3_bucket="my-bucket",
region_name="us-west-2"
)
}
)
def file_processing_job():
process_file()Install with Tessl CLI
npx tessl i tessl/pypi-dagster-aws