CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cloudpathlib

Pathlib-style classes for cloud storage services that provide seamless access to AWS S3, Google Cloud Storage, and Azure Blob Storage with familiar filesystem operations.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

client-management.mddocs/

Client Management

Base client functionality for authentication, caching configuration, and cloud service connection management. The client system provides a unified interface for managing connections to different cloud providers while handling authentication, caching, and service-specific configurations.

Capabilities

Base Client Class

Abstract base class that defines the common interface for all cloud clients.

class Client:
    """Base class for all cloud storage clients."""
    
    def __init__(
        self,
        file_cache_mode: FileCacheMode = None,
        local_cache_dir: str = None,
        content_type_method = None
    ):
        """
        Initialize base client.
        
        Args:
            file_cache_mode: Cache management strategy
            local_cache_dir: Local directory for file caching
            content_type_method: Function to determine MIME types
        """
    
    @classmethod
    def get_default_client(cls):
        """
        Get the default client instance for this client type.
        
        Returns:
            Default client instance or None if not set
        """
    
    def set_as_default_client(self) -> None:
        """
        Set this client as the default for its type.
        
        All paths created without explicit client will use this client.
        """
    
    def CloudPath(
        self,
        cloud_path: str,
        *parts: str
    ) -> "CloudPath":
        """
        Create CloudPath associated with this client.
        
        Args:
            cloud_path: Cloud storage URI
            *parts: Additional path segments
            
        Returns:
            CloudPath instance using this client
        """
    
    def clear_cache(self) -> None:
        """
        Clear all cached files for this client.
        """
    
    @property
    def file_cache_mode(self) -> FileCacheMode:
        """Cache management mode for this client."""
    
    @property
    def content_type_method(self):
        """Function used to determine MIME types."""

Usage Examples

Default Client Management

from cloudpathlib import S3Client, GSClient, AzureBlobClient, CloudPath

# Configure default clients for each provider
s3_client = S3Client(
    aws_access_key_id="your-key",
    aws_secret_access_key="your-secret"
)
s3_client.set_as_default_client()

gs_client = GSClient(
    application_credentials="path/to/service-account.json"
)
gs_client.set_as_default_client()

azure_client = AzureBlobClient(
    connection_string="your-connection-string"
)
azure_client.set_as_default_client()

# Now all paths use the configured default clients
s3_path = CloudPath("s3://my-bucket/file.txt")     # Uses s3_client
gs_path = CloudPath("gs://my-bucket/file.txt")     # Uses gs_client
azure_path = CloudPath("az://my-container/file.txt") # Uses azure_client

# Check which client is being used
print(f"S3 client: {s3_path.client}")
print(f"GS client: {gs_path.client}")

Multiple Client Configurations

# Configure different clients for different environments
prod_s3_client = S3Client(
    profile_name="production",
    file_cache_mode=FileCacheMode.persistent
)

dev_s3_client = S3Client(
    profile_name="development",
    file_cache_mode=FileCacheMode.tmp_dir
)

# Use specific clients explicitly
prod_path = CloudPath("s3://prod-bucket/data.txt", client=prod_s3_client)
dev_path = CloudPath("s3://dev-bucket/data.txt", client=dev_s3_client)

# Or create paths using client method
prod_path = prod_s3_client.CloudPath("s3://prod-bucket/data.txt")
dev_path = dev_s3_client.CloudPath("s3://dev-bucket/data.txt")

Cache Management

from cloudpathlib import FileCacheMode
import tempfile

# Configure client with persistent cache
cache_dir = "/tmp/cloudpathlib-cache"
client = S3Client(
    file_cache_mode=FileCacheMode.persistent,
    local_cache_dir=cache_dir
)

# Create paths with configured caching
path = CloudPath("s3://my-bucket/large-file.dat", client=client)

# File is cached locally on first access
content = path.read_bytes()  # Downloads and caches
content = path.read_bytes()  # Uses cached version

# Clear cache for specific client
client.clear_cache()

# Clear cache for specific path
path.clear_cache()

Content Type Detection

import mimetypes

def custom_content_type(path):
    """Custom MIME type detection."""
    mime_type, _ = mimetypes.guess_type(str(path))
    
    # Custom mappings
    if str(path).endswith('.parquet'):
        return 'application/octet-stream'
    elif str(path).endswith('.jsonl'):
        return 'application/x-jsonlines'
    
    return mime_type or 'application/octet-stream'

# Configure client with custom content type detection
client = S3Client(content_type_method=custom_content_type)

# Uploads will use custom MIME type detection
path = CloudPath("s3://my-bucket/data.parquet", client=client)
path.upload_from("local_data.parquet")  # Uses custom content type

Client Factory Pattern

class CloudClientFactory:
    """Factory for creating configured cloud clients."""
    
    @staticmethod
    def create_s3_client(environment="production"):
        """Create S3 client for specific environment."""
        if environment == "production":
            return S3Client(
                profile_name="prod",
                file_cache_mode=FileCacheMode.persistent,
                local_cache_dir="/var/cache/cloudpathlib"
            )
        elif environment == "development":
            return S3Client(
                profile_name="dev",
                file_cache_mode=FileCacheMode.tmp_dir
            )
        elif environment == "testing":
            return S3Client(
                no_sign_request=True,  # For public buckets
                file_cache_mode=FileCacheMode.close_file
            )
        else:
            raise ValueError(f"Unknown environment: {environment}")
    
    @staticmethod
    def create_gs_client(environment="production"):
        """Create GCS client for specific environment."""
        if environment == "production":
            return GSClient(
                project="my-prod-project",
                file_cache_mode=FileCacheMode.persistent
            )
        elif environment == "development":
            return GSClient(
                application_credentials="dev-service-account.json",
                file_cache_mode=FileCacheMode.tmp_dir
            )
        else:
            raise ValueError(f"Unknown environment: {environment}")

# Usage
import os
env = os.getenv("ENVIRONMENT", "development")

s3_client = CloudClientFactory.create_s3_client(env)
s3_client.set_as_default_client()

gs_client = CloudClientFactory.create_gs_client(env)
gs_client.set_as_default_client()

Configuration from Environment

import os
from cloudpathlib import S3Client, GSClient, FileCacheMode

def configure_clients_from_env():
    """Configure clients from environment variables."""
    
    # S3 client configuration
    s3_config = {}
    if os.getenv("AWS_ACCESS_KEY_ID"):
        s3_config["aws_access_key_id"] = os.getenv("AWS_ACCESS_KEY_ID")
    if os.getenv("AWS_SECRET_ACCESS_KEY"):
        s3_config["aws_secret_access_key"] = os.getenv("AWS_SECRET_ACCESS_KEY")
    if os.getenv("AWS_PROFILE"):
        s3_config["profile_name"] = os.getenv("AWS_PROFILE")
    if os.getenv("S3_ENDPOINT_URL"):
        s3_config["endpoint_url"] = os.getenv("S3_ENDPOINT_URL")
    
    # Cache configuration
    cache_mode = os.getenv("CLOUDPATHLIB_CACHE_MODE", "tmp_dir")
    cache_dir = os.getenv("CLOUDPATHLIB_CACHE_DIR")
    
    s3_config["file_cache_mode"] = FileCacheMode(cache_mode)
    if cache_dir:
        s3_config["local_cache_dir"] = cache_dir
    
    s3_client = S3Client(**s3_config)
    s3_client.set_as_default_client()
    
    # GCS client configuration
    gs_config = {}
    if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
        gs_config["application_credentials"] = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
    if os.getenv("GCP_PROJECT"):
        gs_config["project"] = os.getenv("GCP_PROJECT")
    
    gs_config["file_cache_mode"] = FileCacheMode(cache_mode)
    if cache_dir:
        gs_config["local_cache_dir"] = cache_dir
    
    gs_client = GSClient(**gs_config)
    gs_client.set_as_default_client()
    
    return s3_client, gs_client

# Configure from environment
s3_client, gs_client = configure_clients_from_env()

Client Context Managers

class TemporaryClient:
    """Context manager for temporary client configuration."""
    
    def __init__(self, client):
        self.client = client
        self.original_default = None
    
    def __enter__(self):
        # Save current default
        self.original_default = self.client.__class__.get_default_client()
        # Set temporary default
        self.client.set_as_default_client()
        return self.client
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        # Restore original default
        if self.original_default:
            self.original_default.set_as_default_client()

# Usage
temp_client = S3Client(profile_name="temporary-profile")

with TemporaryClient(temp_client):
    # Inside context, paths use temporary client
    path = CloudPath("s3://temp-bucket/file.txt")
    content = path.read_text()

# Outside context, original default is restored

Client Health Checks

def check_client_connectivity(client):
    """Check if client can connect to cloud service."""
    try:
        # Try to list a path (this tests authentication and connectivity)
        test_path = client.CloudPath("s3://test-bucket/")
        list(test_path.iterdir())
        return True, "Connection successful"
    except Exception as e:
        return False, str(e)

# Check all configured clients
clients = {
    "S3": S3Client.get_default_client(),
    "GCS": GSClient.get_default_client(),
    "Azure": AzureBlobClient.get_default_client()
}

for name, client in clients.items():
    if client:
        is_healthy, message = check_client_connectivity(client)
        print(f"{name} client: {'✓' if is_healthy else '✗'} {message}")
    else:
        print(f"{name} client: Not configured")

Advanced Cache Configuration

import tempfile
import shutil
from pathlib import Path

class ManagedCacheClient:
    """Client wrapper with advanced cache management."""
    
    def __init__(self, base_client, max_cache_size_mb=1000):
        self.base_client = base_client
        self.max_cache_size_mb = max_cache_size_mb
        self.cache_dir = Path(tempfile.mkdtemp(prefix="cloudpath_"))
        
        # Configure client with managed cache directory
        self.base_client.local_cache_dir = str(self.cache_dir)
        self.base_client.file_cache_mode = FileCacheMode.persistent
    
    def get_cache_size_mb(self):
        """Get current cache size in MB."""
        total_size = sum(
            f.stat().st_size for f in self.cache_dir.rglob('*') if f.is_file()
        )
        return total_size / (1024 * 1024)
    
    def cleanup_old_files(self):
        """Remove old cached files if cache is too large."""
        current_size = self.get_cache_size_mb()
        
        if current_size <= self.max_cache_size_mb:
            return
        
        # Get all cached files with modification times
        cached_files = [
            (f, f.stat().st_mtime) for f in self.cache_dir.rglob('*') 
            if f.is_file()
        ]
        
        # Sort by modification time (oldest first)
        cached_files.sort(key=lambda x: x[1])
        
        # Remove files until under size limit
        for file_path, _ in cached_files:
            file_path.unlink()
            current_size = self.get_cache_size_mb()
            if current_size <= self.max_cache_size_mb:
                break
    
    def CloudPath(self, *args, **kwargs):
        """Create CloudPath and manage cache."""
        self.cleanup_old_files()
        return self.base_client.CloudPath(*args, **kwargs)
    
    def __del__(self):
        """Clean up temporary cache directory."""
        if self.cache_dir.exists():
            shutil.rmtree(self.cache_dir)

# Usage
base_s3_client = S3Client(profile_name="default")
managed_client = ManagedCacheClient(base_s3_client, max_cache_size_mb=500)

# Paths automatically benefit from managed caching
path = managed_client.CloudPath("s3://large-data-bucket/dataset.csv")
data = path.read_text()  # Cached with size management

Multi-Region Client Setup

class MultiRegionS3Client:
    """Wrapper for managing S3 clients across multiple regions."""
    
    def __init__(self, regions, credentials):
        self.clients = {}
        self.credentials = credentials
        
        for region in regions:
            self.clients[region] = S3Client(
                region_name=region,
                **credentials
            )
    
    def get_client_for_bucket(self, bucket_name):
        """Get appropriate client for bucket based on region."""
        # This would require boto3 to determine bucket region
        # Simplified example assumes bucket naming convention
        for region, client in self.clients.items():
            if region in bucket_name:
                return client
        
        # Return default region client
        return next(iter(self.clients.values()))
    
    def CloudPath(self, path_str):
        """Create CloudPath with region-appropriate client."""
        # Extract bucket name from path
        bucket_name = path_str.split('/')[2]  # s3://bucket/key
        client = self.get_client_for_bucket(bucket_name)
        return client.CloudPath(path_str)

# Usage
multi_region_client = MultiRegionS3Client(
    regions=["us-east-1", "us-west-2", "eu-west-1"],
    credentials={
        "aws_access_key_id": "your-key",
        "aws_secret_access_key": "your-secret"
    }
)

# Automatically uses appropriate regional client
us_path = multi_region_client.CloudPath("s3://us-east-1-bucket/data.txt")
eu_path = multi_region_client.CloudPath("s3://eu-west-1-bucket/data.txt")

Client Monitoring and Metrics

import time
from collections import defaultdict

class MonitoringClient:
    """Client wrapper that tracks usage metrics."""
    
    def __init__(self, base_client):
        self.base_client = base_client
        self.metrics = defaultdict(int)
        self.operation_times = defaultdict(list)
    
    def CloudPath(self, *args, **kwargs):
        """Create monitored CloudPath."""
        self.metrics["paths_created"] += 1
        return MonitoredCloudPath(
            self.base_client.CloudPath(*args, **kwargs),
            self
        )
    
    def record_operation(self, operation, duration):
        """Record operation metrics."""
        self.metrics[f"{operation}_count"] += 1
        self.operation_times[operation].append(duration)
    
    def get_metrics(self):
        """Get collected metrics."""
        summary = dict(self.metrics)
        
        for operation, times in self.operation_times.items():
            if times:
                summary[f"{operation}_avg_time"] = sum(times) / len(times)
                summary[f"{operation}_total_time"] = sum(times)
        
        return summary

class MonitoredCloudPath:
    """CloudPath wrapper that tracks operations."""
    
    def __init__(self, path, monitor):
        self.path = path
        self.monitor = monitor
    
    def read_text(self):
        start_time = time.time()
        try:
            result = self.path.read_text()
            duration = time.time() - start_time
            self.monitor.record_operation("read_text", duration)
            return result
        except Exception:
            self.monitor.record_operation("read_text_error", 0)
            raise
    
    def write_text(self, data):
        start_time = time.time()
        try:
            result = self.path.write_text(data)
            duration = time.time() - start_time
            self.monitor.record_operation("write_text", duration)
            return result
        except Exception:
            self.monitor.record_operation("write_text_error", 0)
            raise
    
    # Delegate other methods to wrapped path
    def __getattr__(self, name):
        return getattr(self.path, name)

# Usage
base_client = S3Client()
monitoring_client = MonitoringClient(base_client)

# All operations are monitored
path = monitoring_client.CloudPath("s3://my-bucket/file.txt")
path.write_text("Hello, world!")
content = path.read_text()

# Check metrics
metrics = monitoring_client.get_metrics()
print(f"Operations performed: {metrics}")

Install with Tessl CLI

npx tessl i tessl/pypi-cloudpathlib

docs

anypath.md

azure-integration.md

client-management.md

cloud-operations.md

configuration.md

core-operations.md

directory-operations.md

exceptions.md

file-io.md

gcs-integration.md

http-support.md

index.md

patching.md

s3-integration.md

tile.json