Helper utilities for uploading and downloading files to/from Amazon S3.
Static methods for uploading files and directories to S3.
class S3Uploader:
"""
Static methods for uploading data to Amazon S3.
Static Methods:
upload(local_path, desired_s3_uri, kms_key=None, sagemaker_session=None, callback=None) -> str
Upload file or directory to S3.
Parameters:
local_path: str - Path (absolute or relative) of local file or directory (required)
desired_s3_uri: str - Desired S3 location (required)
- For file: prefix to which filename will be added
- For directory: prefix for all files
kms_key: Optional[str] - KMS key ARN for encryption
sagemaker_session: Optional[Session] - SageMaker session
callback: Optional[callable] - Progress callback function(bytes_transferred)
Returns:
str: S3 URI of uploaded file(s)
Raises:
FileNotFoundError: If local_path doesn't exist
ClientError: S3 upload errors
Notes:
- Directories uploaded recursively
- Preserves directory structure
- Skips hidden files (.* files)
- Large files use multipart upload
upload_string_as_file_body(body, desired_s3_uri, kms_key=None, sagemaker_session=None) -> str
Upload string content as file to S3.
Parameters:
body: str - String content to upload (required)
desired_s3_uri: str - Full S3 URI for the file (required)
- Includes filename: "s3://bucket/path/file.txt"
kms_key: Optional[str] - KMS key for encryption
sagemaker_session: Optional[Session] - Session
Returns:
str: S3 URI of uploaded file
Raises:
ValueError: If desired_s3_uri malformed
ClientError: S3 upload errors
upload_bytes(b, s3_uri, kms_key=None, sagemaker_session=None) -> str
Upload bytes to S3.
Parameters:
b: Union[bytes, io.BytesIO] - Bytes or BytesIO object (required)
s3_uri: str - S3 URI to upload to (required)
kms_key: Optional[str] - KMS key for encryption
sagemaker_session: Optional[Session] - Session
Returns:
str: S3 URI of uploaded file
Raises:
ValueError: If s3_uri malformed
ClientError: S3 upload errors
Notes:
- All methods use boto3 S3 client under the hood
- Large uploads automatically use multipart
- KMS encryption applied server-side
- Requires s3:PutObject permission
"""Usage:
from sagemaker.core.s3 import S3Uploader
# Upload single file
s3_uri = S3Uploader.upload(
local_path="./model.tar.gz",
desired_s3_uri="s3://my-bucket/models/"
)
print(f"Uploaded to: {s3_uri}")
# Result: "s3://my-bucket/models/model.tar.gz"
# Upload directory recursively
s3_uri = S3Uploader.upload(
local_path="./training_data/",
desired_s3_uri="s3://my-bucket/data/"
)
print(f"Uploaded directory to: {s3_uri}")
# Uploads all files, preserves structure
# Upload with encryption
s3_uri = S3Uploader.upload(
local_path="./sensitive_data.csv",
desired_s3_uri="s3://my-bucket/secure/",
kms_key="arn:aws:kms:us-west-2:123456789012:key/abc-123"
)
# Upload with progress callback
def progress_callback(bytes_transferred):
print(f"Transferred: {bytes_transferred / 1024 / 1024:.1f} MB", end='\r')
s3_uri = S3Uploader.upload(
local_path="./large_file.bin",
desired_s3_uri="s3://bucket/uploads/",
callback=progress_callback
)Upload String Content:
import json
# Upload JSON configuration
config = {
"epochs": 10,
"batch_size": 32,
"learning_rate": 0.001
}
s3_uri = S3Uploader.upload_string_as_file_body(
body=json.dumps(config, indent=2),
desired_s3_uri="s3://my-bucket/config/training.json"
)
# Upload text file
s3_uri = S3Uploader.upload_string_as_file_body(
body="Training started at 2024-01-15 10:00:00\nStatus: In Progress",
desired_s3_uri="s3://my-bucket/logs/status.txt"
)Upload Binary Data:
import io
# Upload bytes
data = b"Binary data content"
s3_uri = S3Uploader.upload_bytes(
b=data,
s3_uri="s3://my-bucket/binary/data.bin"
)
# Upload BytesIO stream
bytes_io = io.BytesIO(b"Stream content")
s3_uri = S3Uploader.upload_bytes(
b=bytes_io,
s3_uri="s3://my-bucket/streams/data.bin"
)
# Upload image from memory
from PIL import Image
import io
image = Image.open("photo.jpg")
buffer = io.BytesIO()
image.save(buffer, format="JPEG")
buffer.seek(0)
s3_uri = S3Uploader.upload_bytes(
b=buffer.getvalue(),
s3_uri="s3://bucket/images/photo.jpg"
)Static methods for downloading files and directories from S3.
class S3Downloader:
"""
Static methods for downloading data from Amazon S3.
Static Methods:
download(s3_uri, local_path, kms_key=None, sagemaker_session=None) -> List[str]
Download files from S3 to local machine.
Parameters:
s3_uri: str - S3 URI to download from (required)
- File: "s3://bucket/file.txt"
- Directory: "s3://bucket/prefix/"
local_path: str - Local path to download to (required)
kms_key: Optional[str] - KMS key for decryption
sagemaker_session: Optional[Session] - Session
Returns:
List[str]: List of local paths of downloaded files
Raises:
ValueError: If s3_uri malformed
ClientError: S3 download errors (NoSuchKey, AccessDenied)
Notes:
- Creates local directories as needed
- Preserves S3 directory structure
- Overwrites existing local files
read_file(s3_uri, sagemaker_session=None) -> str
Read S3 file content as string.
Parameters:
s3_uri: str - S3 URI of file to read (required)
sagemaker_session: Optional[Session] - Session
Returns:
str: File content as UTF-8 string
Raises:
ClientError: If file doesn't exist or not accessible
UnicodeDecodeError: If file not valid UTF-8
read_bytes(s3_uri, sagemaker_session=None) -> bytes
Read S3 object as bytes.
Parameters:
s3_uri: str - S3 URI of object to read (required)
sagemaker_session: Optional[Session] - Session
Returns:
bytes: Object content as bytes
Raises:
ClientError: If object doesn't exist or not accessible
list(s3_uri, sagemaker_session=None) -> List[str]
List contents of S3 location.
Parameters:
s3_uri: str - S3 base URI to list objects in (required)
sagemaker_session: Optional[Session] - Session
Returns:
List[str]: List of S3 URIs (full paths)
Raises:
ValueError: If s3_uri malformed
ClientError: S3 list errors
Notes:
- All methods handle S3 pagination automatically
- Large files streamed efficiently
- Requires s3:GetObject permission
"""Usage:
from sagemaker.core.s3 import S3Downloader
# Download single file
try:
local_files = S3Downloader.download(
s3_uri="s3://my-bucket/models/model.tar.gz",
local_path="./downloaded_models/"
)
print(f"Downloaded: {local_files}")
# Result: ["./downloaded_models/model.tar.gz"]
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchKey':
print("File not found in S3")
elif error_code == 'AccessDenied':
print("Access denied - check IAM permissions")
else:
raise
# Download directory
local_files = S3Downloader.download(
s3_uri="s3://my-bucket/data/",
local_path="./local_data/"
)
print(f"Downloaded {len(local_files)} files")
# Download with decryption
local_files = S3Downloader.download(
s3_uri="s3://my-bucket/secure/data.enc",
local_path="./decrypted/",
kms_key="arn:aws:kms:us-west-2:123456789012:key/abc-123"
)Read File Directly:
import json
# Read text file without downloading
content = S3Downloader.read_file(
s3_uri="s3://my-bucket/config/settings.txt"
)
print(f"Content: {content}")
# Read JSON file
json_content = S3Downloader.read_file(
s3_uri="s3://my-bucket/config/training.json"
)
config = json.loads(json_content)
print(f"Config: {config}")
# Read CSV as string
csv_content = S3Downloader.read_file(
s3_uri="s3://bucket/data.csv"
)
lines = csv_content.split('\n')
print(f"CSV has {len(lines)} lines")Read Binary Data:
# Read binary file
binary_data = S3Downloader.read_bytes(
s3_uri="s3://my-bucket/models/weights.bin"
)
# Process bytes
with open("local_weights.bin", "wb") as f:
f.write(binary_data)
# Read image
image_bytes = S3Downloader.read_bytes(
s3_uri="s3://bucket/images/photo.jpg"
)
from PIL import Image
import io
image = Image.open(io.BytesIO(image_bytes))
image.show()List S3 Contents:
# List files in bucket prefix
files = S3Downloader.list(
s3_uri="s3://my-bucket/models/"
)
print(f"Found {len(files)} files:")
for file_uri in files[:10]:
print(f" - {file_uri}")
# List with pattern matching (client-side)
all_files = S3Downloader.list(s3_uri="s3://bucket/data/")
csv_files = [f for f in all_files if f.endswith('.csv')]
json_files = [f for f in all_files if f.endswith('.json')]
print(f"CSV files: {len(csv_files)}")
print(f"JSON files: {len(json_files)}")def parse_s3_url(url: str) -> Tuple[str, str]:
"""
Parse S3 URL into bucket and key.
Parameters:
url: str - S3 URL (required)
- Must have s3:// scheme
- Format: "s3://bucket-name/key/path"
Returns:
Tuple[str, str]: (bucket_name, key)
- bucket_name: S3 bucket name
- key: S3 object key
Raises:
ValueError: If URL scheme is not 's3'
Example:
bucket, key = parse_s3_url("s3://my-bucket/path/to/file.txt")
# bucket = "my-bucket"
# key = "path/to/file.txt"
"""Usage:
from sagemaker.core.s3 import parse_s3_url
# Parse S3 URL
try:
bucket, key = parse_s3_url("s3://my-bucket/models/model.tar.gz")
print(f"Bucket: {bucket}") # "my-bucket"
print(f"Key: {key}") # "models/model.tar.gz"
# Use with boto3
import boto3
s3 = boto3.client('s3')
# Download directly
s3.download_file(bucket, key, 'local-model.tar.gz')
except ValueError as e:
print(f"Invalid S3 URL: {e}")
# Parse fails for non-S3 URLs
try:
bucket, key = parse_s3_url("https://example.com/file.txt")
except ValueError as e:
print(f"Error: {e}") # "URL scheme must be 's3'"def is_s3_url(url: str) -> bool:
"""
Check if URL is an S3 URL.
Parameters:
url: str - URL to check (required)
Returns:
bool: True if URL has s3:// scheme, False otherwise
Example:
is_s3_url("s3://my-bucket/file.txt") # True
is_s3_url("https://example.com") # False
is_s3_url("/local/path") # False
"""Usage:
from sagemaker.core.s3 import is_s3_url, S3Downloader
# Conditional data loading
def load_data(data_path):
"""Load data from S3 or local path."""
if is_s3_url(data_path):
# Download from S3
data = S3Downloader.read_file(data_path)
else:
# Read from local file
with open(data_path, 'r') as f:
data = f.read()
return data
# Works with both S3 and local paths
s3_data = load_data("s3://bucket/data.csv")
local_data = load_data("./local_data.csv")def s3_path_join(*args, with_end_slash: bool = False) -> str:
"""
Join S3 path components with forward slash.
Similar to os.path.join() but for S3 paths. Handles:
- Preserves "s3://" prefix if present
- Removes duplicate slashes (except in s3://)
- Removes leading/trailing slashes (except s3:// and optional end slash)
- Skips empty or None arguments
Parameters:
*args: str - Strings to join
with_end_slash: bool - Append trailing slash (default: False)
Returns:
str: Joined S3 path
Example:
s3_path_join("s3://", "bucket", "path", "file.txt")
# Result: "s3://bucket/path/file.txt"
s3_path_join("s3://", "bucket", "folder", with_end_slash=True)
# Result: "s3://bucket/folder/"
Notes:
- Handles malformed inputs gracefully
- Removes duplicate slashes automatically
- None values skipped
- Empty strings skipped
"""Usage:
from sagemaker.core.s3 import s3_path_join
# Basic joining
path = s3_path_join("s3://", "my-bucket", "path", "to", "file.txt")
print(path) # "s3://my-bucket/path/to/file.txt"
# Handles extra slashes
path = s3_path_join("s3://", "//my-bucket/", "/path//", "file.txt")
print(path) # "s3://my-bucket/path/file.txt"
# With trailing slash for directories
path = s3_path_join("s3://my-bucket", "folder", with_end_slash=True)
print(path) # "s3://my-bucket/folder/"
# Skips empty values
path = s3_path_join("s3://", "bucket", None, "", "file.txt")
print(path) # "s3://bucket/file.txt"
# Build dynamic paths
bucket = "my-ml-bucket"
prefix = "experiments"
experiment_id = "exp-001"
model_name = "resnet50"
version = "v1"
model_path = s3_path_join(
"s3://", bucket, prefix, experiment_id, "models", model_name, version
)
print(model_path)
# "s3://my-ml-bucket/experiments/exp-001/models/resnet50/v1"
# Construct paths safely
def build_model_path(bucket, experiment, model, version):
return s3_path_join(
"s3://",
bucket,
"models",
experiment,
model,
f"v{version}",
with_end_slash=True
)
path = build_model_path("ml-bucket", "churn-prediction", "xgboost", 3)
# "s3://ml-bucket/models/churn-prediction/xgboost/v3/"def determine_bucket_and_prefix(
bucket: Optional[str] = None,
key_prefix: Optional[str] = None,
sagemaker_session: Optional[Session] = None
) -> Tuple[str, str]:
"""
Determine correct S3 bucket and prefix to use.
If bucket is not provided, uses session's default bucket
and appends default prefix.
Parameters:
bucket: Optional[str] - S3 bucket name
- If None: uses default bucket (sagemaker-{region}-{account})
key_prefix: Optional[str] - S3 key prefix
- Appended to default prefix if using default bucket
sagemaker_session: Optional[Session] - SageMaker session
Returns:
Tuple[str, str]: (bucket, key_prefix)
- bucket: Final bucket name
- key_prefix: Final key prefix
Example:
# With default bucket
bucket, prefix = determine_bucket_and_prefix(key_prefix="my-models")
# bucket = "sagemaker-us-west-2-123456789012"
# prefix = "sagemaker/my-models"
# With custom bucket
bucket, prefix = determine_bucket_and_prefix(
bucket="my-custom-bucket",
key_prefix="models"
)
# bucket = "my-custom-bucket"
# prefix = "models"
Notes:
- Default bucket automatically created if doesn't exist
- Default prefix: "sagemaker/"
- Useful for consistent path management
"""Usage:
from sagemaker.core.s3 import determine_bucket_and_prefix, s3_path_join
from sagemaker.core.helper.session_helper import Session
session = Session()
# Use default bucket
bucket, prefix = determine_bucket_and_prefix(
key_prefix="experiments/exp-001",
sagemaker_session=session
)
print(f"Bucket: {bucket}") # "sagemaker-us-west-2-123456789012"
print(f"Prefix: {prefix}") # "sagemaker/experiments/exp-001"
# Build full path
full_path = s3_path_join("s3://", bucket, prefix, "model.tar.gz")
print(f"Full path: {full_path}")
# Use custom bucket
bucket, prefix = determine_bucket_and_prefix(
bucket="my-ml-bucket",
key_prefix="production/models",
sagemaker_session=session
)
print(f"Custom bucket: {bucket}") # "my-ml-bucket"
print(f"Custom prefix: {prefix}") # "production/models"from sagemaker.core.s3 import (
S3Uploader, S3Downloader,
s3_path_join, is_s3_url, parse_s3_url
)
# Build S3 paths
bucket = "my-ml-bucket"
experiment_id = "exp-2024-01-15"
model_name = "customer-churn-classifier"
# Training data path
train_path = s3_path_join("s3://", bucket, "data", experiment_id, "train.csv")
val_path = s3_path_join("s3://", bucket, "data", experiment_id, "val.csv")
# Upload training data
train_uri = S3Uploader.upload(
local_path="./data/train.csv",
desired_s3_uri=s3_path_join("s3://", bucket, "data", experiment_id, with_end_slash=True)
)
val_uri = S3Uploader.upload(
local_path="./data/val.csv",
desired_s3_uri=s3_path_join("s3://", bucket, "data", experiment_id, with_end_slash=True)
)
print(f"Data uploaded:")
print(f" Train: {train_uri}")
print(f" Val: {val_uri}")
# Train model (pseudo-code)
train_model(train_uri, val_uri)
# Model saved to S3 by SageMaker
model_uri = s3_path_join(
"s3://", bucket, "models", experiment_id, model_name, "model.tar.gz"
)
# List all models in experiment
models = S3Downloader.list(
s3_uri=s3_path_join("s3://", bucket, "models", experiment_id, with_end_slash=True)
)
print(f"\nModels in experiment:")
for model_uri in models:
if is_s3_url(model_uri):
bucket_name, key = parse_s3_url(model_uri)
filename = key.split('/')[-1]
print(f" - {filename}")
# Download best model for local testing
best_model_uri = models[0] # Assume first is best
local_model_files = S3Downloader.download(
s3_uri=best_model_uri,
local_path="./inference_model/"
)
print(f"\nDownloaded model to: {local_model_files}")
# Read model metadata
metadata_uri = s3_path_join(
"s3://", bucket, "models", experiment_id, model_name, "metadata.json"
)
try:
metadata = S3Downloader.read_file(s3_uri=metadata_uri)
metadata_dict = json.loads(metadata)
print(f"\nModel metadata: {metadata_dict}")
except ClientError:
print("No metadata file found")from concurrent.futures import ThreadPoolExecutor
from sagemaker.core.s3 import S3Uploader
# Upload multiple files in parallel
files_to_upload = [
("./data/file1.csv", "s3://bucket/data/"),
("./data/file2.csv", "s3://bucket/data/"),
("./data/file3.csv", "s3://bucket/data/"),
]
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(S3Uploader.upload, local_path, s3_prefix)
for local_path, s3_prefix in files_to_upload
]
results = [future.result() for future in futures]
print(f"Uploaded {len(results)} files")import os
def upload_with_progress(local_path, s3_uri):
"""Upload file with progress bar."""
file_size = os.path.getsize(local_path)
bytes_uploaded = [0]
def progress_callback(bytes_transferred):
bytes_uploaded[0] = bytes_transferred
percent = (bytes_transferred / file_size) * 100
print(f"\rProgress: {percent:.1f}% ({bytes_transferred}/{file_size} bytes)", end='')
result_uri = S3Uploader.upload(
local_path=local_path,
desired_s3_uri=s3_uri,
callback=progress_callback
)
print() # New line after progress
return result_uri
# Upload large file with progress
s3_uri = upload_with_progress(
"./large_dataset.parquet",
"s3://bucket/datasets/"
)from sagemaker.core.s3 import is_s3_url, S3Downloader, S3Uploader
def ensure_in_s3(path, s3_destination):
"""
Ensure data is in S3, upload if local.
Args:
path: Local path or S3 URI
s3_destination: S3 URI destination if upload needed
Returns:
str: S3 URI of data
"""
if is_s3_url(path):
# Already in S3
return path
else:
# Upload to S3
print(f"Uploading {path} to S3...")
s3_uri = S3Uploader.upload(
local_path=path,
desired_s3_uri=s3_destination
)
return s3_uri
# Use in training pipeline
train_data_uri = ensure_in_s3(
"./local_train_data.csv",
"s3://bucket/training-data/"
)
val_data_uri = ensure_in_s3(
"s3://bucket/existing-val-data.csv", # Already in S3
"s3://bucket/training-data/"
)
# Both paths now guaranteed to be S3 URIs
trainer.train(input_data_config=[
InputData(channel_name="training", data_source=train_data_uri),
InputData(channel_name="validation", data_source=val_data_uri)
])NoSuchBucket:
AccessDenied:
NoSuchKey:
InvalidObjectState:
RequestTimeout:
Path Traversal in Keys: