or run

tessl search
Log in

Version

Files

docs

ai-registry.mdclarify.mddata-io.mddebugger.mdevaluation.mdexperiments.mdexplainer-config.mdindex.mdjumpstart.mdlineage.mdmlops.mdmonitoring.mdprocessing.mdremote-functions.mdresources.mds3-utilities.mdserving.mdtraining.mdworkflow-primitives.md
tile.json

s3-utilities.mddocs/

S3 Utilities

Helper utilities for uploading and downloading files to/from Amazon S3.

Capabilities

S3Uploader

Static methods for uploading files and directories to S3.

class S3Uploader:
    """
    Static methods for uploading data to Amazon S3.

    Static Methods:
        upload(local_path, desired_s3_uri, kms_key=None, sagemaker_session=None, callback=None) -> str
            Upload file or directory to S3.
            
            Parameters:
                local_path: str - Path (absolute or relative) of local file or directory (required)
                desired_s3_uri: str - Desired S3 location (required)
                    - For file: prefix to which filename will be added
                    - For directory: prefix for all files
                kms_key: Optional[str] - KMS key ARN for encryption
                sagemaker_session: Optional[Session] - SageMaker session
                callback: Optional[callable] - Progress callback function(bytes_transferred)
            
            Returns:
                str: S3 URI of uploaded file(s)
            
            Raises:
                FileNotFoundError: If local_path doesn't exist
                ClientError: S3 upload errors
            
            Notes:
                - Directories uploaded recursively
                - Preserves directory structure
                - Skips hidden files (.* files)
                - Large files use multipart upload
        
        upload_string_as_file_body(body, desired_s3_uri, kms_key=None, sagemaker_session=None) -> str
            Upload string content as file to S3.
            
            Parameters:
                body: str - String content to upload (required)
                desired_s3_uri: str - Full S3 URI for the file (required)
                    - Includes filename: "s3://bucket/path/file.txt"
                kms_key: Optional[str] - KMS key for encryption
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                str: S3 URI of uploaded file
            
            Raises:
                ValueError: If desired_s3_uri malformed
                ClientError: S3 upload errors
        
        upload_bytes(b, s3_uri, kms_key=None, sagemaker_session=None) -> str
            Upload bytes to S3.
            
            Parameters:
                b: Union[bytes, io.BytesIO] - Bytes or BytesIO object (required)
                s3_uri: str - S3 URI to upload to (required)
                kms_key: Optional[str] - KMS key for encryption
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                str: S3 URI of uploaded file
            
            Raises:
                ValueError: If s3_uri malformed
                ClientError: S3 upload errors

    Notes:
        - All methods use boto3 S3 client under the hood
        - Large uploads automatically use multipart
        - KMS encryption applied server-side
        - Requires s3:PutObject permission
    """

Usage:

from sagemaker.core.s3 import S3Uploader

# Upload single file
s3_uri = S3Uploader.upload(
    local_path="./model.tar.gz",
    desired_s3_uri="s3://my-bucket/models/"
)
print(f"Uploaded to: {s3_uri}")
# Result: "s3://my-bucket/models/model.tar.gz"

# Upload directory recursively
s3_uri = S3Uploader.upload(
    local_path="./training_data/",
    desired_s3_uri="s3://my-bucket/data/"
)
print(f"Uploaded directory to: {s3_uri}")
# Uploads all files, preserves structure

# Upload with encryption
s3_uri = S3Uploader.upload(
    local_path="./sensitive_data.csv",
    desired_s3_uri="s3://my-bucket/secure/",
    kms_key="arn:aws:kms:us-west-2:123456789012:key/abc-123"
)

# Upload with progress callback
def progress_callback(bytes_transferred):
    print(f"Transferred: {bytes_transferred / 1024 / 1024:.1f} MB", end='\r')

s3_uri = S3Uploader.upload(
    local_path="./large_file.bin",
    desired_s3_uri="s3://bucket/uploads/",
    callback=progress_callback
)

Upload String Content:

import json

# Upload JSON configuration
config = {
    "epochs": 10,
    "batch_size": 32,
    "learning_rate": 0.001
}

s3_uri = S3Uploader.upload_string_as_file_body(
    body=json.dumps(config, indent=2),
    desired_s3_uri="s3://my-bucket/config/training.json"
)

# Upload text file
s3_uri = S3Uploader.upload_string_as_file_body(
    body="Training started at 2024-01-15 10:00:00\nStatus: In Progress",
    desired_s3_uri="s3://my-bucket/logs/status.txt"
)

Upload Binary Data:

import io

# Upload bytes
data = b"Binary data content"
s3_uri = S3Uploader.upload_bytes(
    b=data,
    s3_uri="s3://my-bucket/binary/data.bin"
)

# Upload BytesIO stream
bytes_io = io.BytesIO(b"Stream content")
s3_uri = S3Uploader.upload_bytes(
    b=bytes_io,
    s3_uri="s3://my-bucket/streams/data.bin"
)

# Upload image from memory
from PIL import Image
import io

image = Image.open("photo.jpg")
buffer = io.BytesIO()
image.save(buffer, format="JPEG")
buffer.seek(0)

s3_uri = S3Uploader.upload_bytes(
    b=buffer.getvalue(),
    s3_uri="s3://bucket/images/photo.jpg"
)

S3Downloader

Static methods for downloading files and directories from S3.

class S3Downloader:
    """
    Static methods for downloading data from Amazon S3.

    Static Methods:
        download(s3_uri, local_path, kms_key=None, sagemaker_session=None) -> List[str]
            Download files from S3 to local machine.
            
            Parameters:
                s3_uri: str - S3 URI to download from (required)
                    - File: "s3://bucket/file.txt"
                    - Directory: "s3://bucket/prefix/"
                local_path: str - Local path to download to (required)
                kms_key: Optional[str] - KMS key for decryption
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                List[str]: List of local paths of downloaded files
            
            Raises:
                ValueError: If s3_uri malformed
                ClientError: S3 download errors (NoSuchKey, AccessDenied)
            
            Notes:
                - Creates local directories as needed
                - Preserves S3 directory structure
                - Overwrites existing local files
        
        read_file(s3_uri, sagemaker_session=None) -> str
            Read S3 file content as string.
            
            Parameters:
                s3_uri: str - S3 URI of file to read (required)
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                str: File content as UTF-8 string
            
            Raises:
                ClientError: If file doesn't exist or not accessible
                UnicodeDecodeError: If file not valid UTF-8
        
        read_bytes(s3_uri, sagemaker_session=None) -> bytes
            Read S3 object as bytes.
            
            Parameters:
                s3_uri: str - S3 URI of object to read (required)
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                bytes: Object content as bytes
            
            Raises:
                ClientError: If object doesn't exist or not accessible
        
        list(s3_uri, sagemaker_session=None) -> List[str]
            List contents of S3 location.
            
            Parameters:
                s3_uri: str - S3 base URI to list objects in (required)
                sagemaker_session: Optional[Session] - Session
            
            Returns:
                List[str]: List of S3 URIs (full paths)
            
            Raises:
                ValueError: If s3_uri malformed
                ClientError: S3 list errors

    Notes:
        - All methods handle S3 pagination automatically
        - Large files streamed efficiently
        - Requires s3:GetObject permission
    """

Usage:

from sagemaker.core.s3 import S3Downloader

# Download single file
try:
    local_files = S3Downloader.download(
        s3_uri="s3://my-bucket/models/model.tar.gz",
        local_path="./downloaded_models/"
    )
    print(f"Downloaded: {local_files}")
    # Result: ["./downloaded_models/model.tar.gz"]
    
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'NoSuchKey':
        print("File not found in S3")
    elif error_code == 'AccessDenied':
        print("Access denied - check IAM permissions")
    else:
        raise

# Download directory
local_files = S3Downloader.download(
    s3_uri="s3://my-bucket/data/",
    local_path="./local_data/"
)
print(f"Downloaded {len(local_files)} files")

# Download with decryption
local_files = S3Downloader.download(
    s3_uri="s3://my-bucket/secure/data.enc",
    local_path="./decrypted/",
    kms_key="arn:aws:kms:us-west-2:123456789012:key/abc-123"
)

Read File Directly:

import json

# Read text file without downloading
content = S3Downloader.read_file(
    s3_uri="s3://my-bucket/config/settings.txt"
)
print(f"Content: {content}")

# Read JSON file
json_content = S3Downloader.read_file(
    s3_uri="s3://my-bucket/config/training.json"
)
config = json.loads(json_content)
print(f"Config: {config}")

# Read CSV as string
csv_content = S3Downloader.read_file(
    s3_uri="s3://bucket/data.csv"
)
lines = csv_content.split('\n')
print(f"CSV has {len(lines)} lines")

Read Binary Data:

# Read binary file
binary_data = S3Downloader.read_bytes(
    s3_uri="s3://my-bucket/models/weights.bin"
)

# Process bytes
with open("local_weights.bin", "wb") as f:
    f.write(binary_data)

# Read image
image_bytes = S3Downloader.read_bytes(
    s3_uri="s3://bucket/images/photo.jpg"
)

from PIL import Image
import io

image = Image.open(io.BytesIO(image_bytes))
image.show()

List S3 Contents:

# List files in bucket prefix
files = S3Downloader.list(
    s3_uri="s3://my-bucket/models/"
)

print(f"Found {len(files)} files:")
for file_uri in files[:10]:
    print(f"  - {file_uri}")

# List with pattern matching (client-side)
all_files = S3Downloader.list(s3_uri="s3://bucket/data/")
csv_files = [f for f in all_files if f.endswith('.csv')]
json_files = [f for f in all_files if f.endswith('.json')]

print(f"CSV files: {len(csv_files)}")
print(f"JSON files: {len(json_files)}")

Utility Functions

parse_s3_url

def parse_s3_url(url: str) -> Tuple[str, str]:
    """
    Parse S3 URL into bucket and key.

    Parameters:
        url: str - S3 URL (required)
            - Must have s3:// scheme
            - Format: "s3://bucket-name/key/path"

    Returns:
        Tuple[str, str]: (bucket_name, key)
            - bucket_name: S3 bucket name
            - key: S3 object key

    Raises:
        ValueError: If URL scheme is not 's3'

    Example:
        bucket, key = parse_s3_url("s3://my-bucket/path/to/file.txt")
        # bucket = "my-bucket"
        # key = "path/to/file.txt"
    """

Usage:

from sagemaker.core.s3 import parse_s3_url

# Parse S3 URL
try:
    bucket, key = parse_s3_url("s3://my-bucket/models/model.tar.gz")
    print(f"Bucket: {bucket}")  # "my-bucket"
    print(f"Key: {key}")  # "models/model.tar.gz"
    
    # Use with boto3
    import boto3
    s3 = boto3.client('s3')
    
    # Download directly
    s3.download_file(bucket, key, 'local-model.tar.gz')
    
except ValueError as e:
    print(f"Invalid S3 URL: {e}")

# Parse fails for non-S3 URLs
try:
    bucket, key = parse_s3_url("https://example.com/file.txt")
except ValueError as e:
    print(f"Error: {e}")  # "URL scheme must be 's3'"

is_s3_url

def is_s3_url(url: str) -> bool:
    """
    Check if URL is an S3 URL.

    Parameters:
        url: str - URL to check (required)

    Returns:
        bool: True if URL has s3:// scheme, False otherwise

    Example:
        is_s3_url("s3://my-bucket/file.txt")  # True
        is_s3_url("https://example.com")  # False
        is_s3_url("/local/path")  # False
    """

Usage:

from sagemaker.core.s3 import is_s3_url, S3Downloader

# Conditional data loading
def load_data(data_path):
    """Load data from S3 or local path."""
    if is_s3_url(data_path):
        # Download from S3
        data = S3Downloader.read_file(data_path)
    else:
        # Read from local file
        with open(data_path, 'r') as f:
            data = f.read()
    
    return data

# Works with both S3 and local paths
s3_data = load_data("s3://bucket/data.csv")
local_data = load_data("./local_data.csv")

s3_path_join

def s3_path_join(*args, with_end_slash: bool = False) -> str:
    """
    Join S3 path components with forward slash.

    Similar to os.path.join() but for S3 paths. Handles:
    - Preserves "s3://" prefix if present
    - Removes duplicate slashes (except in s3://)
    - Removes leading/trailing slashes (except s3:// and optional end slash)
    - Skips empty or None arguments

    Parameters:
        *args: str - Strings to join
        with_end_slash: bool - Append trailing slash (default: False)

    Returns:
        str: Joined S3 path

    Example:
        s3_path_join("s3://", "bucket", "path", "file.txt")
        # Result: "s3://bucket/path/file.txt"
        
        s3_path_join("s3://", "bucket", "folder", with_end_slash=True)
        # Result: "s3://bucket/folder/"
    
    Notes:
        - Handles malformed inputs gracefully
        - Removes duplicate slashes automatically
        - None values skipped
        - Empty strings skipped
    """

Usage:

from sagemaker.core.s3 import s3_path_join

# Basic joining
path = s3_path_join("s3://", "my-bucket", "path", "to", "file.txt")
print(path)  # "s3://my-bucket/path/to/file.txt"

# Handles extra slashes
path = s3_path_join("s3://", "//my-bucket/", "/path//", "file.txt")
print(path)  # "s3://my-bucket/path/file.txt"

# With trailing slash for directories
path = s3_path_join("s3://my-bucket", "folder", with_end_slash=True)
print(path)  # "s3://my-bucket/folder/"

# Skips empty values
path = s3_path_join("s3://", "bucket", None, "", "file.txt")
print(path)  # "s3://bucket/file.txt"

# Build dynamic paths
bucket = "my-ml-bucket"
prefix = "experiments"
experiment_id = "exp-001"
model_name = "resnet50"
version = "v1"

model_path = s3_path_join(
    "s3://", bucket, prefix, experiment_id, "models", model_name, version
)
print(model_path)
# "s3://my-ml-bucket/experiments/exp-001/models/resnet50/v1"

# Construct paths safely
def build_model_path(bucket, experiment, model, version):
    return s3_path_join(
        "s3://",
        bucket,
        "models",
        experiment,
        model,
        f"v{version}",
        with_end_slash=True
    )

path = build_model_path("ml-bucket", "churn-prediction", "xgboost", 3)
# "s3://ml-bucket/models/churn-prediction/xgboost/v3/"

determine_bucket_and_prefix

def determine_bucket_and_prefix(
    bucket: Optional[str] = None,
    key_prefix: Optional[str] = None,
    sagemaker_session: Optional[Session] = None
) -> Tuple[str, str]:
    """
    Determine correct S3 bucket and prefix to use.

    If bucket is not provided, uses session's default bucket
    and appends default prefix.

    Parameters:
        bucket: Optional[str] - S3 bucket name
            - If None: uses default bucket (sagemaker-{region}-{account})
        key_prefix: Optional[str] - S3 key prefix
            - Appended to default prefix if using default bucket
        sagemaker_session: Optional[Session] - SageMaker session

    Returns:
        Tuple[str, str]: (bucket, key_prefix)
            - bucket: Final bucket name
            - key_prefix: Final key prefix

    Example:
        # With default bucket
        bucket, prefix = determine_bucket_and_prefix(key_prefix="my-models")
        # bucket = "sagemaker-us-west-2-123456789012"
        # prefix = "sagemaker/my-models"
        
        # With custom bucket
        bucket, prefix = determine_bucket_and_prefix(
            bucket="my-custom-bucket",
            key_prefix="models"
        )
        # bucket = "my-custom-bucket"
        # prefix = "models"
    
    Notes:
        - Default bucket automatically created if doesn't exist
        - Default prefix: "sagemaker/"
        - Useful for consistent path management
    """

Usage:

from sagemaker.core.s3 import determine_bucket_and_prefix, s3_path_join
from sagemaker.core.helper.session_helper import Session

session = Session()

# Use default bucket
bucket, prefix = determine_bucket_and_prefix(
    key_prefix="experiments/exp-001",
    sagemaker_session=session
)

print(f"Bucket: {bucket}")  # "sagemaker-us-west-2-123456789012"
print(f"Prefix: {prefix}")  # "sagemaker/experiments/exp-001"

# Build full path
full_path = s3_path_join("s3://", bucket, prefix, "model.tar.gz")
print(f"Full path: {full_path}")

# Use custom bucket
bucket, prefix = determine_bucket_and_prefix(
    bucket="my-ml-bucket",
    key_prefix="production/models",
    sagemaker_session=session
)

print(f"Custom bucket: {bucket}")  # "my-ml-bucket"
print(f"Custom prefix: {prefix}")  # "production/models"

Complete Example

from sagemaker.core.s3 import (
    S3Uploader, S3Downloader,
    s3_path_join, is_s3_url, parse_s3_url
)

# Build S3 paths
bucket = "my-ml-bucket"
experiment_id = "exp-2024-01-15"
model_name = "customer-churn-classifier"

# Training data path
train_path = s3_path_join("s3://", bucket, "data", experiment_id, "train.csv")
val_path = s3_path_join("s3://", bucket, "data", experiment_id, "val.csv")

# Upload training data
train_uri = S3Uploader.upload(
    local_path="./data/train.csv",
    desired_s3_uri=s3_path_join("s3://", bucket, "data", experiment_id, with_end_slash=True)
)

val_uri = S3Uploader.upload(
    local_path="./data/val.csv",
    desired_s3_uri=s3_path_join("s3://", bucket, "data", experiment_id, with_end_slash=True)
)

print(f"Data uploaded:")
print(f"  Train: {train_uri}")
print(f"  Val: {val_uri}")

# Train model (pseudo-code)
train_model(train_uri, val_uri)

# Model saved to S3 by SageMaker
model_uri = s3_path_join(
    "s3://", bucket, "models", experiment_id, model_name, "model.tar.gz"
)

# List all models in experiment
models = S3Downloader.list(
    s3_uri=s3_path_join("s3://", bucket, "models", experiment_id, with_end_slash=True)
)

print(f"\nModels in experiment:")
for model_uri in models:
    if is_s3_url(model_uri):
        bucket_name, key = parse_s3_url(model_uri)
        filename = key.split('/')[-1]
        print(f"  - {filename}")

# Download best model for local testing
best_model_uri = models[0]  # Assume first is best

local_model_files = S3Downloader.download(
    s3_uri=best_model_uri,
    local_path="./inference_model/"
)

print(f"\nDownloaded model to: {local_model_files}")

# Read model metadata
metadata_uri = s3_path_join(
    "s3://", bucket, "models", experiment_id, model_name, "metadata.json"
)

try:
    metadata = S3Downloader.read_file(s3_uri=metadata_uri)
    metadata_dict = json.loads(metadata)
    print(f"\nModel metadata: {metadata_dict}")
except ClientError:
    print("No metadata file found")

Advanced Usage

Parallel Upload/Download

from concurrent.futures import ThreadPoolExecutor
from sagemaker.core.s3 import S3Uploader

# Upload multiple files in parallel
files_to_upload = [
    ("./data/file1.csv", "s3://bucket/data/"),
    ("./data/file2.csv", "s3://bucket/data/"),
    ("./data/file3.csv", "s3://bucket/data/"),
]

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [
        executor.submit(S3Uploader.upload, local_path, s3_prefix)
        for local_path, s3_prefix in files_to_upload
    ]
    
    results = [future.result() for future in futures]

print(f"Uploaded {len(results)} files")

Progress Tracking

import os

def upload_with_progress(local_path, s3_uri):
    """Upload file with progress bar."""
    file_size = os.path.getsize(local_path)
    bytes_uploaded = [0]
    
    def progress_callback(bytes_transferred):
        bytes_uploaded[0] = bytes_transferred
        percent = (bytes_transferred / file_size) * 100
        print(f"\rProgress: {percent:.1f}% ({bytes_transferred}/{file_size} bytes)", end='')
    
    result_uri = S3Uploader.upload(
        local_path=local_path,
        desired_s3_uri=s3_uri,
        callback=progress_callback
    )
    
    print()  # New line after progress
    return result_uri

# Upload large file with progress
s3_uri = upload_with_progress(
    "./large_dataset.parquet",
    "s3://bucket/datasets/"
)

Conditional S3 Operations

from sagemaker.core.s3 import is_s3_url, S3Downloader, S3Uploader

def ensure_in_s3(path, s3_destination):
    """
    Ensure data is in S3, upload if local.
    
    Args:
        path: Local path or S3 URI
        s3_destination: S3 URI destination if upload needed
    
    Returns:
        str: S3 URI of data
    """
    if is_s3_url(path):
        # Already in S3
        return path
    else:
        # Upload to S3
        print(f"Uploading {path} to S3...")
        s3_uri = S3Uploader.upload(
            local_path=path,
            desired_s3_uri=s3_destination
        )
        return s3_uri

# Use in training pipeline
train_data_uri = ensure_in_s3(
    "./local_train_data.csv",
    "s3://bucket/training-data/"
)

val_data_uri = ensure_in_s3(
    "s3://bucket/existing-val-data.csv",  # Already in S3
    "s3://bucket/training-data/"
)

# Both paths now guaranteed to be S3 URIs
trainer.train(input_data_config=[
    InputData(channel_name="training", data_source=train_data_uri),
    InputData(channel_name="validation", data_source=val_data_uri)
])

Validation and Constraints

S3 Constraints

  • Bucket names: 3-63 characters, lowercase, no underscores, globally unique
  • Object keys: Maximum 1024 characters
  • Object size: Maximum 5 TB per object
  • Multipart upload: Automatic for files >100 MB
  • List operation: Maximum 1000 objects per call (pagination automatic)
  • Request rate: 3,500 PUT/COPY/POST/DELETE, 5,500 GET/HEAD per second per prefix

Upload Constraints

  • Maximum file size: 5 TB (via multipart upload)
  • Multipart threshold: 8 MB (automatic)
  • Maximum parts: 10,000 per object
  • Part size: 5 MB - 5 GB

Download Constraints

  • Maximum bandwidth: Network-dependent
  • Concurrent downloads: Limited by connection pool
  • Large file handling: Streamed automatically

Common Error Scenarios

  1. NoSuchBucket:

    • Cause: Bucket doesn't exist
    • Solution: Create bucket or fix bucket name
  2. AccessDenied:

    • Cause: Insufficient IAM permissions
    • Solution: Add s3:GetObject, s3:PutObject permissions
  3. NoSuchKey:

    • Cause: Object doesn't exist at specified key
    • Solution: Verify S3 URI, check object exists with list()
  4. InvalidObjectState:

    • Cause: Object in Glacier storage class
    • Solution: Restore from Glacier before accessing
  5. RequestTimeout:

    • Cause: Network issues or large file
    • Solution: Retry with exponential backoff
  6. Path Traversal in Keys:

    • Cause: Using ../ in keys
    • Solution: Use absolute paths, validate user input