Pathlib-style classes for cloud storage services that provide seamless access to AWS S3, Google Cloud Storage, and Azure Blob Storage with familiar filesystem operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Base client functionality for authentication, caching configuration, and cloud service connection management. The client system provides a unified interface for managing connections to different cloud providers while handling authentication, caching, and service-specific configurations.
Abstract base class that defines the common interface for all cloud clients.
class Client:
"""Base class for all cloud storage clients."""
def __init__(
self,
file_cache_mode: FileCacheMode = None,
local_cache_dir: str = None,
content_type_method = None
):
"""
Initialize base client.
Args:
file_cache_mode: Cache management strategy
local_cache_dir: Local directory for file caching
content_type_method: Function to determine MIME types
"""
@classmethod
def get_default_client(cls):
"""
Get the default client instance for this client type.
Returns:
Default client instance or None if not set
"""
def set_as_default_client(self) -> None:
"""
Set this client as the default for its type.
All paths created without explicit client will use this client.
"""
def CloudPath(
self,
cloud_path: str,
*parts: str
) -> "CloudPath":
"""
Create CloudPath associated with this client.
Args:
cloud_path: Cloud storage URI
*parts: Additional path segments
Returns:
CloudPath instance using this client
"""
def clear_cache(self) -> None:
"""
Clear all cached files for this client.
"""
@property
def file_cache_mode(self) -> FileCacheMode:
"""Cache management mode for this client."""
@property
def content_type_method(self):
"""Function used to determine MIME types."""from cloudpathlib import S3Client, GSClient, AzureBlobClient, CloudPath
# Configure default clients for each provider
s3_client = S3Client(
aws_access_key_id="your-key",
aws_secret_access_key="your-secret"
)
s3_client.set_as_default_client()
gs_client = GSClient(
application_credentials="path/to/service-account.json"
)
gs_client.set_as_default_client()
azure_client = AzureBlobClient(
connection_string="your-connection-string"
)
azure_client.set_as_default_client()
# Now all paths use the configured default clients
s3_path = CloudPath("s3://my-bucket/file.txt") # Uses s3_client
gs_path = CloudPath("gs://my-bucket/file.txt") # Uses gs_client
azure_path = CloudPath("az://my-container/file.txt") # Uses azure_client
# Check which client is being used
print(f"S3 client: {s3_path.client}")
print(f"GS client: {gs_path.client}")# Configure different clients for different environments
prod_s3_client = S3Client(
profile_name="production",
file_cache_mode=FileCacheMode.persistent
)
dev_s3_client = S3Client(
profile_name="development",
file_cache_mode=FileCacheMode.tmp_dir
)
# Use specific clients explicitly
prod_path = CloudPath("s3://prod-bucket/data.txt", client=prod_s3_client)
dev_path = CloudPath("s3://dev-bucket/data.txt", client=dev_s3_client)
# Or create paths using client method
prod_path = prod_s3_client.CloudPath("s3://prod-bucket/data.txt")
dev_path = dev_s3_client.CloudPath("s3://dev-bucket/data.txt")from cloudpathlib import FileCacheMode
import tempfile
# Configure client with persistent cache
cache_dir = "/tmp/cloudpathlib-cache"
client = S3Client(
file_cache_mode=FileCacheMode.persistent,
local_cache_dir=cache_dir
)
# Create paths with configured caching
path = CloudPath("s3://my-bucket/large-file.dat", client=client)
# File is cached locally on first access
content = path.read_bytes() # Downloads and caches
content = path.read_bytes() # Uses cached version
# Clear cache for specific client
client.clear_cache()
# Clear cache for specific path
path.clear_cache()import mimetypes
def custom_content_type(path):
"""Custom MIME type detection."""
mime_type, _ = mimetypes.guess_type(str(path))
# Custom mappings
if str(path).endswith('.parquet'):
return 'application/octet-stream'
elif str(path).endswith('.jsonl'):
return 'application/x-jsonlines'
return mime_type or 'application/octet-stream'
# Configure client with custom content type detection
client = S3Client(content_type_method=custom_content_type)
# Uploads will use custom MIME type detection
path = CloudPath("s3://my-bucket/data.parquet", client=client)
path.upload_from("local_data.parquet") # Uses custom content typeclass CloudClientFactory:
"""Factory for creating configured cloud clients."""
@staticmethod
def create_s3_client(environment="production"):
"""Create S3 client for specific environment."""
if environment == "production":
return S3Client(
profile_name="prod",
file_cache_mode=FileCacheMode.persistent,
local_cache_dir="/var/cache/cloudpathlib"
)
elif environment == "development":
return S3Client(
profile_name="dev",
file_cache_mode=FileCacheMode.tmp_dir
)
elif environment == "testing":
return S3Client(
no_sign_request=True, # For public buckets
file_cache_mode=FileCacheMode.close_file
)
else:
raise ValueError(f"Unknown environment: {environment}")
@staticmethod
def create_gs_client(environment="production"):
"""Create GCS client for specific environment."""
if environment == "production":
return GSClient(
project="my-prod-project",
file_cache_mode=FileCacheMode.persistent
)
elif environment == "development":
return GSClient(
application_credentials="dev-service-account.json",
file_cache_mode=FileCacheMode.tmp_dir
)
else:
raise ValueError(f"Unknown environment: {environment}")
# Usage
import os
env = os.getenv("ENVIRONMENT", "development")
s3_client = CloudClientFactory.create_s3_client(env)
s3_client.set_as_default_client()
gs_client = CloudClientFactory.create_gs_client(env)
gs_client.set_as_default_client()import os
from cloudpathlib import S3Client, GSClient, FileCacheMode
def configure_clients_from_env():
"""Configure clients from environment variables."""
# S3 client configuration
s3_config = {}
if os.getenv("AWS_ACCESS_KEY_ID"):
s3_config["aws_access_key_id"] = os.getenv("AWS_ACCESS_KEY_ID")
if os.getenv("AWS_SECRET_ACCESS_KEY"):
s3_config["aws_secret_access_key"] = os.getenv("AWS_SECRET_ACCESS_KEY")
if os.getenv("AWS_PROFILE"):
s3_config["profile_name"] = os.getenv("AWS_PROFILE")
if os.getenv("S3_ENDPOINT_URL"):
s3_config["endpoint_url"] = os.getenv("S3_ENDPOINT_URL")
# Cache configuration
cache_mode = os.getenv("CLOUDPATHLIB_CACHE_MODE", "tmp_dir")
cache_dir = os.getenv("CLOUDPATHLIB_CACHE_DIR")
s3_config["file_cache_mode"] = FileCacheMode(cache_mode)
if cache_dir:
s3_config["local_cache_dir"] = cache_dir
s3_client = S3Client(**s3_config)
s3_client.set_as_default_client()
# GCS client configuration
gs_config = {}
if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
gs_config["application_credentials"] = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
if os.getenv("GCP_PROJECT"):
gs_config["project"] = os.getenv("GCP_PROJECT")
gs_config["file_cache_mode"] = FileCacheMode(cache_mode)
if cache_dir:
gs_config["local_cache_dir"] = cache_dir
gs_client = GSClient(**gs_config)
gs_client.set_as_default_client()
return s3_client, gs_client
# Configure from environment
s3_client, gs_client = configure_clients_from_env()class TemporaryClient:
"""Context manager for temporary client configuration."""
def __init__(self, client):
self.client = client
self.original_default = None
def __enter__(self):
# Save current default
self.original_default = self.client.__class__.get_default_client()
# Set temporary default
self.client.set_as_default_client()
return self.client
def __exit__(self, exc_type, exc_val, exc_tb):
# Restore original default
if self.original_default:
self.original_default.set_as_default_client()
# Usage
temp_client = S3Client(profile_name="temporary-profile")
with TemporaryClient(temp_client):
# Inside context, paths use temporary client
path = CloudPath("s3://temp-bucket/file.txt")
content = path.read_text()
# Outside context, original default is restoreddef check_client_connectivity(client):
"""Check if client can connect to cloud service."""
try:
# Try to list a path (this tests authentication and connectivity)
test_path = client.CloudPath("s3://test-bucket/")
list(test_path.iterdir())
return True, "Connection successful"
except Exception as e:
return False, str(e)
# Check all configured clients
clients = {
"S3": S3Client.get_default_client(),
"GCS": GSClient.get_default_client(),
"Azure": AzureBlobClient.get_default_client()
}
for name, client in clients.items():
if client:
is_healthy, message = check_client_connectivity(client)
print(f"{name} client: {'✓' if is_healthy else '✗'} {message}")
else:
print(f"{name} client: Not configured")import tempfile
import shutil
from pathlib import Path
class ManagedCacheClient:
"""Client wrapper with advanced cache management."""
def __init__(self, base_client, max_cache_size_mb=1000):
self.base_client = base_client
self.max_cache_size_mb = max_cache_size_mb
self.cache_dir = Path(tempfile.mkdtemp(prefix="cloudpath_"))
# Configure client with managed cache directory
self.base_client.local_cache_dir = str(self.cache_dir)
self.base_client.file_cache_mode = FileCacheMode.persistent
def get_cache_size_mb(self):
"""Get current cache size in MB."""
total_size = sum(
f.stat().st_size for f in self.cache_dir.rglob('*') if f.is_file()
)
return total_size / (1024 * 1024)
def cleanup_old_files(self):
"""Remove old cached files if cache is too large."""
current_size = self.get_cache_size_mb()
if current_size <= self.max_cache_size_mb:
return
# Get all cached files with modification times
cached_files = [
(f, f.stat().st_mtime) for f in self.cache_dir.rglob('*')
if f.is_file()
]
# Sort by modification time (oldest first)
cached_files.sort(key=lambda x: x[1])
# Remove files until under size limit
for file_path, _ in cached_files:
file_path.unlink()
current_size = self.get_cache_size_mb()
if current_size <= self.max_cache_size_mb:
break
def CloudPath(self, *args, **kwargs):
"""Create CloudPath and manage cache."""
self.cleanup_old_files()
return self.base_client.CloudPath(*args, **kwargs)
def __del__(self):
"""Clean up temporary cache directory."""
if self.cache_dir.exists():
shutil.rmtree(self.cache_dir)
# Usage
base_s3_client = S3Client(profile_name="default")
managed_client = ManagedCacheClient(base_s3_client, max_cache_size_mb=500)
# Paths automatically benefit from managed caching
path = managed_client.CloudPath("s3://large-data-bucket/dataset.csv")
data = path.read_text() # Cached with size managementclass MultiRegionS3Client:
"""Wrapper for managing S3 clients across multiple regions."""
def __init__(self, regions, credentials):
self.clients = {}
self.credentials = credentials
for region in regions:
self.clients[region] = S3Client(
region_name=region,
**credentials
)
def get_client_for_bucket(self, bucket_name):
"""Get appropriate client for bucket based on region."""
# This would require boto3 to determine bucket region
# Simplified example assumes bucket naming convention
for region, client in self.clients.items():
if region in bucket_name:
return client
# Return default region client
return next(iter(self.clients.values()))
def CloudPath(self, path_str):
"""Create CloudPath with region-appropriate client."""
# Extract bucket name from path
bucket_name = path_str.split('/')[2] # s3://bucket/key
client = self.get_client_for_bucket(bucket_name)
return client.CloudPath(path_str)
# Usage
multi_region_client = MultiRegionS3Client(
regions=["us-east-1", "us-west-2", "eu-west-1"],
credentials={
"aws_access_key_id": "your-key",
"aws_secret_access_key": "your-secret"
}
)
# Automatically uses appropriate regional client
us_path = multi_region_client.CloudPath("s3://us-east-1-bucket/data.txt")
eu_path = multi_region_client.CloudPath("s3://eu-west-1-bucket/data.txt")import time
from collections import defaultdict
class MonitoringClient:
"""Client wrapper that tracks usage metrics."""
def __init__(self, base_client):
self.base_client = base_client
self.metrics = defaultdict(int)
self.operation_times = defaultdict(list)
def CloudPath(self, *args, **kwargs):
"""Create monitored CloudPath."""
self.metrics["paths_created"] += 1
return MonitoredCloudPath(
self.base_client.CloudPath(*args, **kwargs),
self
)
def record_operation(self, operation, duration):
"""Record operation metrics."""
self.metrics[f"{operation}_count"] += 1
self.operation_times[operation].append(duration)
def get_metrics(self):
"""Get collected metrics."""
summary = dict(self.metrics)
for operation, times in self.operation_times.items():
if times:
summary[f"{operation}_avg_time"] = sum(times) / len(times)
summary[f"{operation}_total_time"] = sum(times)
return summary
class MonitoredCloudPath:
"""CloudPath wrapper that tracks operations."""
def __init__(self, path, monitor):
self.path = path
self.monitor = monitor
def read_text(self):
start_time = time.time()
try:
result = self.path.read_text()
duration = time.time() - start_time
self.monitor.record_operation("read_text", duration)
return result
except Exception:
self.monitor.record_operation("read_text_error", 0)
raise
def write_text(self, data):
start_time = time.time()
try:
result = self.path.write_text(data)
duration = time.time() - start_time
self.monitor.record_operation("write_text", duration)
return result
except Exception:
self.monitor.record_operation("write_text_error", 0)
raise
# Delegate other methods to wrapped path
def __getattr__(self, name):
return getattr(self.path, name)
# Usage
base_client = S3Client()
monitoring_client = MonitoringClient(base_client)
# All operations are monitored
path = monitoring_client.CloudPath("s3://my-bucket/file.txt")
path.write_text("Hello, world!")
content = path.read_text()
# Check metrics
metrics = monitoring_client.get_metrics()
print(f"Operations performed: {metrics}")Install with Tessl CLI
npx tessl i tessl/pypi-cloudpathlib