Pathlib-style classes for cloud storage services that provide seamless access to AWS S3, Google Cloud Storage, and Azure Blob Storage with familiar filesystem operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete AWS S3 support with advanced features including multipart uploads, transfer acceleration, custom endpoints, and S3-specific metadata access. This implementation provides full compatibility with AWS S3 and S3-compatible services.
S3-specific path implementation with access to S3 metadata and operations.
class S3Path(CloudPath):
"""AWS S3 path implementation."""
@property
def bucket(self) -> str:
"""
S3 bucket name.
Returns:
Bucket name from the S3 URI
"""
@property
def key(self) -> str:
"""
S3 object key (path within bucket).
Returns:
Object key string
"""
@property
def etag(self) -> str:
"""
S3 object ETag identifier.
Returns:
ETag string for the object
"""S3 client for authentication and service configuration.
class S3Client:
"""AWS S3 client with comprehensive configuration options."""
def __init__(
self,
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
aws_session_token: str = None,
no_sign_request: bool = False,
botocore_session = None,
profile_name: str = None,
boto3_session = None,
file_cache_mode: FileCacheMode = None,
local_cache_dir: str = None,
endpoint_url: str = None,
boto3_transfer_config = None,
content_type_method = None,
extra_args: dict = None
):
"""
Initialize S3 client.
Args:
aws_access_key_id: AWS access key ID
aws_secret_access_key: AWS secret access key
aws_session_token: AWS session token for temporary credentials
no_sign_request: Make unsigned requests (for public buckets)
botocore_session: Custom botocore session
profile_name: AWS profile name from credentials file
boto3_session: Custom boto3 session
file_cache_mode: Cache management strategy
local_cache_dir: Local directory for file cache
endpoint_url: Custom S3 endpoint URL
boto3_transfer_config: Transfer configuration for multipart uploads
content_type_method: Function to determine MIME types
extra_args: Additional arguments for S3 operations
"""from cloudpathlib import S3Path, S3Client
# Create S3 path (uses default client)
s3_path = S3Path("s3://my-bucket/data/file.txt")
# Access S3-specific properties
print(f"Bucket: {s3_path.bucket}") # "my-bucket"
print(f"Key: {s3_path.key}") # "data/file.txt"
# Check if object exists
if s3_path.exists():
print(f"ETag: {s3_path.etag}")# Configure S3 client with credentials
client = S3Client(
aws_access_key_id="your-access-key",
aws_secret_access_key="your-secret-key"
)
# Set as default client
client.set_as_default_client()
# Use with paths
s3_path = S3Path("s3://my-bucket/file.txt") # Uses configured client# Use AWS profile from ~/.aws/credentials
client = S3Client(profile_name="my-profile")
client.set_as_default_client()
# Create paths using profile
s3_path = S3Path("s3://my-bucket/data.json")
content = s3_path.read_text()# Use temporary credentials with session token
client = S3Client(
aws_access_key_id="temp-access-key",
aws_secret_access_key="temp-secret-key",
aws_session_token="session-token"
)
# Work with temporary credentials
s3_path = S3Path("s3://secure-bucket/confidential.txt", client=client)# Access public S3 buckets without credentials
client = S3Client(no_sign_request=True)
# Work with public data
public_path = S3Path("s3://public-bucket/open-data.csv", client=client)
data = public_path.read_text()# Use S3-compatible services (MinIO, Ceph, etc.)
client = S3Client(
endpoint_url="https://s3.my-company.com",
aws_access_key_id="minio-access-key",
aws_secret_access_key="minio-secret-key"
)
# Work with custom endpoint
s3_path = S3Path("s3://internal-bucket/file.txt", client=client)import boto3
# Configure transfer settings for large files
transfer_config = boto3.s3.transfer.TransferConfig(
multipart_threshold=1024 * 25, # 25MB
max_concurrency=10,
multipart_chunksize=1024 * 25,
use_threads=True
)
client = S3Client(boto3_transfer_config=transfer_config)
# Upload large file with optimized settings
large_file = S3Path("s3://my-bucket/large-file.zip", client=client)
large_file.upload_from("local-large-file.zip")# Upload with specific storage class
client = S3Client(extra_args={"StorageClass": "GLACIER"})
# Upload file to Glacier
s3_path = S3Path("s3://archive-bucket/archive.tar", client=client)
s3_path.upload_from("data.tar")
# Upload with different storage classes
storage_classes = {
"standard": S3Client(extra_args={"StorageClass": "STANDARD"}),
"ia": S3Client(extra_args={"StorageClass": "STANDARD_IA"}),
"glacier": S3Client(extra_args={"StorageClass": "GLACIER"}),
"deep_archive": S3Client(extra_args={"StorageClass": "DEEP_ARCHIVE"})
}
# Use appropriate storage class
file_path = S3Path("s3://my-bucket/backup.zip", client=storage_classes["glacier"])# Configure server-side encryption
client = S3Client(extra_args={
"ServerSideEncryption": "AES256"
})
# Upload encrypted file
encrypted_path = S3Path("s3://secure-bucket/encrypted.txt", client=client)
encrypted_path.write_text("Sensitive data")
# Use KMS encryption
kms_client = S3Client(extra_args={
"ServerSideEncryption": "aws:kms",
"SSEKMSKeyId": "your-kms-key-id"
})# Upload with metadata
client = S3Client(extra_args={
"Metadata": {
"Author": "Data Team",
"Project": "Analytics",
"Version": "1.0"
},
"Tagging": "Environment=Production&Department=Analytics"
})
s3_path = S3Path("s3://my-bucket/report.pdf", client=client)
s3_path.upload_from("monthly-report.pdf")# Generate presigned URLs for S3
s3_path = S3Path("s3://private-bucket/document.pdf")
# Download URL (valid for 1 hour)
download_url = s3_path.as_url(presign=True, expire_seconds=3600)
print(f"Download: {download_url}")
# Share with expiration
share_url = s3_path.as_url(presign=True, expire_seconds=86400) # 24 hours
print(f"Share URL: {share_url}")# Note: S3 Select requires direct boto3 usage
# This is an example of extending S3Path for advanced operations
class ExtendedS3Path(S3Path):
def select_object_content(self, expression, input_serialization, output_serialization):
"""Perform S3 Select query on object."""
response = self.client.boto3_session.client('s3').select_object_content(
Bucket=self.bucket,
Key=self.key,
Expression=expression,
ExpressionType='SQL',
InputSerialization=input_serialization,
OutputSerialization=output_serialization
)
# Process streaming response
for event in response['Payload']:
if 'Records' in event:
yield event['Records']['Payload'].decode('utf-8')
# Usage
csv_path = ExtendedS3Path("s3://data-bucket/large-dataset.csv")
query = "SELECT * FROM S3Object s WHERE s.category = 'important'"
for chunk in csv_path.select_object_content(
expression=query,
input_serialization={'CSV': {'FileHeaderInfo': 'Use'}},
output_serialization={'CSV': {}}
):
process_chunk(chunk)# Upload multiple files efficiently
import concurrent.futures
from pathlib import Path
def upload_file(local_path, s3_base):
s3_path = s3_base / local_path.name
s3_path.upload_from(local_path)
return s3_path
# Parallel uploads
local_files = list(Path("data/").glob("*.csv"))
s3_base = S3Path("s3://my-bucket/csv-data/")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(upload_file, f, s3_base) for f in local_files]
for future in concurrent.futures.as_completed(futures):
s3_path = future.result()
print(f"Uploaded: {s3_path}")# Work with different lifecycle stages
def get_storage_class_client(storage_class):
return S3Client(extra_args={"StorageClass": storage_class})
# Archive old files
cutoff_date = datetime.now() - timedelta(days=365)
archive_client = get_storage_class_client("GLACIER")
for s3_file in S3Path("s3://my-bucket/logs/").rglob("*.log"):
if s3_file.stat().st_mtime < cutoff_date.timestamp():
# Copy to Glacier storage
archive_path = S3Path(str(s3_file), client=archive_client)
s3_file.copy(archive_path)
print(f"Archived: {s3_file}")# Work with buckets in different regions
us_east_client = S3Client(
aws_access_key_id="key",
aws_secret_access_key="secret",
region_name="us-east-1"
)
eu_west_client = S3Client(
aws_access_key_id="key",
aws_secret_access_key="secret",
region_name="eu-west-1"
)
# Copy between regions
source = S3Path("s3://us-bucket/data.txt", client=us_east_client)
destination = S3Path("s3://eu-bucket/data.txt", client=eu_west_client)
source.copy(destination)from cloudpathlib import (
CloudPathFileNotFoundError,
MissingCredentialsError,
InvalidPrefixError
)
import botocore.exceptions
try:
s3_path = S3Path("s3://nonexistent-bucket/file.txt")
content = s3_path.read_text()
except CloudPathFileNotFoundError:
print("S3 object not found")
except botocore.exceptions.NoCredentialsError:
print("AWS credentials not configured")
except botocore.exceptions.BotoCoreError as e:
print(f"AWS error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-cloudpathlib