CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cloudpathlib

Pathlib-style classes for cloud storage services that provide seamless access to AWS S3, Google Cloud Storage, and Azure Blob Storage with familiar filesystem operations.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

gcs-integration.mddocs/

Google Cloud Storage Integration

Full Google Cloud Storage support with service account authentication, custom retry policies, concurrent downloads, and GCS-specific features. This implementation provides comprehensive access to Google Cloud Storage capabilities through a pathlib-compatible interface.

Capabilities

GSPath Class

GCS-specific path implementation with access to Google Cloud Storage metadata.

class GSPath(CloudPath):
    """Google Cloud Storage path implementation."""
    
    @property
    def bucket(self) -> str:
        """
        GCS bucket name.
        
        Returns:
            Bucket name from the GCS URI
        """
    
    @property
    def blob(self) -> str:
        """
        GCS object name (path within bucket).
        
        Returns:
            Object name string
        """
    
    @property
    def etag(self) -> str:
        """
        GCS object ETag identifier.
        
        Returns:
            ETag string for the object
        """
    
    @property
    def md5(self) -> str:
        """
        MD5 hash of the object content.
        
        Returns:
            MD5 hash string
        """

GSClient Class

Google Cloud Storage client with comprehensive authentication and configuration options.

class GSClient:
    """Google Cloud Storage client."""
    
    def __init__(
        self,
        application_credentials: str = None,
        credentials = None,
        project: str = None,
        storage_client = None,
        file_cache_mode: FileCacheMode = None,
        local_cache_dir: str = None,
        content_type_method = None,
        download_chunks_concurrently_kwargs: dict = None,
        timeout: float = None,
        retry = None
    ):
        """
        Initialize GCS client.
        
        Args:
            application_credentials: Path to service account JSON file
            credentials: Google auth credentials object
            project: GCP project ID
            storage_client: Custom google.cloud.storage.Client instance
            file_cache_mode: Cache management strategy
            local_cache_dir: Local directory for file cache
            content_type_method: Function to determine MIME types
            download_chunks_concurrently_kwargs: Concurrent download settings
            timeout: Request timeout in seconds
            retry: Retry policy for failed requests
        """

Usage Examples

Basic GCS Operations

from cloudpathlib import GSPath, GSClient

# Create GCS path (uses default client)
gs_path = GSPath("gs://my-bucket/data/file.txt")

# Access GCS-specific properties
print(f"Bucket: {gs_path.bucket}")     # "my-bucket"
print(f"Blob: {gs_path.blob}")         # "data/file.txt"

# Check if object exists and get metadata
if gs_path.exists():
    print(f"ETag: {gs_path.etag}")
    print(f"MD5: {gs_path.md5}")

Service Account Authentication

# Use service account key file
client = GSClient(application_credentials="path/to/service-account.json")
client.set_as_default_client()

# Create paths using service account
gs_path = GSPath("gs://my-bucket/data.json")
content = gs_path.read_text()

Credentials Object Authentication

from google.oauth2 import service_account

# Load credentials from service account
credentials = service_account.Credentials.from_service_account_file(
    "service-account.json",
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

client = GSClient(
    credentials=credentials,
    project="my-gcp-project"
)

gs_path = GSPath("gs://my-bucket/file.txt", client=client)

Application Default Credentials

# Use Application Default Credentials (ADC)
# Works when running on GCE, Cloud Run, or with gcloud auth
client = GSClient(project="my-gcp-project")

# ADC automatically handles authentication
gs_path = GSPath("gs://my-bucket/data.csv", client=client)
data = gs_path.read_text()

Custom Storage Client

from google.cloud import storage

# Create custom storage client with specific settings
storage_client = storage.Client(
    project="my-project",
    credentials=credentials
)

client = GSClient(storage_client=storage_client)

# Use custom client
gs_path = GSPath("gs://my-bucket/file.txt", client=client)

Concurrent Downloads

# Configure concurrent download settings
client = GSClient(
    download_chunks_concurrently_kwargs={
        "max_workers": 8,
        "chunk_size": 1024 * 1024  # 1MB chunks
    }
)

# Download large file with concurrent chunks
large_file = GSPath("gs://my-bucket/large-dataset.zip", client=client)
large_file.download_to("local-dataset.zip")

Timeout and Retry Configuration

from google.api_core import retry
import google.api_core.exceptions

# Configure custom retry policy
custom_retry = retry.Retry(
    initial=1.0,
    maximum=10.0,
    multiplier=2.0,
    predicate=retry.if_exception_type(
        google.api_core.exceptions.ServiceUnavailable,
        google.api_core.exceptions.TooManyRequests
    )
)

client = GSClient(
    timeout=60.0,          # 60 second timeout
    retry=custom_retry     # Custom retry policy
)

# Operations use configured timeout and retry
gs_path = GSPath("gs://my-bucket/important.txt", client=client)

Storage Classes

# Upload with specific storage class
def upload_with_storage_class(local_path, gs_path, storage_class):
    """Upload file with specific GCS storage class."""
    
    # Note: Storage class is set via direct client usage
    blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)
    blob.storage_class = storage_class
    
    with open(local_path, 'rb') as f:
        blob.upload_from_file(f)

# Usage examples
gs_path = GSPath("gs://my-bucket/archive.zip")
upload_with_storage_class("data.zip", gs_path, "COLDLINE")

# Different storage classes
storage_classes = ["STANDARD", "NEARLINE", "COLDLINE", "ARCHIVE"]

Lifecycle Management

# Work with object lifecycle
def archive_old_files(bucket_name, days_old=365):
    """Archive files older than specified days to ARCHIVE storage class."""
    from datetime import datetime, timedelta
    
    cutoff_date = datetime.now() - timedelta(days=days_old)
    bucket_path = GSPath(f"gs://{bucket_name}/")
    
    for gs_file in bucket_path.rglob("*"):
        if gs_file.is_file():
            stats = gs_file.stat()
            if datetime.fromtimestamp(stats.st_mtime) < cutoff_date:
                # Move to archive storage class
                blob = gs_file.client.storage_client.bucket(gs_file.bucket).blob(gs_file.blob)
                blob.storage_class = "ARCHIVE"
                blob.patch()
                print(f"Archived: {gs_file}")

# Usage
archive_old_files("my-backup-bucket")

Signed URLs

from datetime import datetime, timedelta

# Generate signed URLs for temporary access
gs_path = GSPath("gs://private-bucket/confidential.pdf")

# Generate download URL (valid for 1 hour)
download_url = gs_path.as_url(presign=True, expire_seconds=3600)
print(f"Download URL: {download_url}")

# Generate upload URL
upload_path = GSPath("gs://uploads-bucket/new-file.txt")
upload_url = upload_path.as_url(presign=True, expire_seconds=1800)  # 30 minutes

Metadata Operations

# Access and modify object metadata
def set_custom_metadata(gs_path, metadata_dict):
    """Set custom metadata on GCS object."""
    blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)
    blob.metadata = metadata_dict
    blob.patch()

def get_custom_metadata(gs_path):
    """Get custom metadata from GCS object."""
    blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)
    blob.reload()
    return blob.metadata

# Usage
gs_path = GSPath("gs://my-bucket/document.pdf")

# Set metadata
set_custom_metadata(gs_path, {
    "author": "Data Team",
    "project": "Analytics",
    "version": "1.0"
})

# Read metadata
metadata = get_custom_metadata(gs_path)
print(f"Metadata: {metadata}")

Batch Operations

import concurrent.futures
from pathlib import Path

def upload_file_parallel(local_path, gs_base):
    """Upload single file to GCS."""
    gs_path = gs_base / local_path.name
    gs_path.upload_from(local_path)
    return gs_path

# Parallel upload of multiple files
local_files = list(Path("data/").glob("*.json"))
gs_base = GSPath("gs://my-bucket/json-data/")

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(upload_file_parallel, f, gs_base) for f in local_files]
    
    for future in concurrent.futures.as_completed(futures):
        try:
            gs_path = future.result()
            print(f"Uploaded: {gs_path}")
        except Exception as e:
            print(f"Upload failed: {e}")

Object Versioning

# Work with object versions (requires versioned bucket)
def list_object_versions(gs_path):
    """List all versions of an object."""
    bucket = gs_path.client.storage_client.bucket(gs_path.bucket)
    
    versions = []
    for blob in bucket.list_blobs(prefix=gs_path.blob, versions=True):
        if blob.name == gs_path.blob:
            versions.append({
                "generation": blob.generation,
                "time_created": blob.time_created,
                "size": blob.size,
                "etag": blob.etag
            })
    
    return sorted(versions, key=lambda x: x["time_created"], reverse=True)

# Usage
gs_path = GSPath("gs://versioned-bucket/important.txt")
versions = list_object_versions(gs_path)
for version in versions:
    print(f"Generation {version['generation']}: {version['time_created']}")

Cross-Project Operations

# Work with buckets in different projects
project_a_client = GSClient(
    project="project-a",
    application_credentials="project-a-credentials.json"
)

project_b_client = GSClient(
    project="project-b", 
    application_credentials="project-b-credentials.json"
)

# Copy between projects
source = GSPath("gs://project-a-bucket/data.txt", client=project_a_client)
destination = GSPath("gs://project-b-bucket/data.txt", client=project_b_client)

source.copy(destination)

Streaming Operations

# Stream large files without downloading entirely
def process_large_csv(gs_path):
    """Process large CSV file by streaming."""
    import csv
    
    with gs_path.open('r') as f:
        reader = csv.DictReader(f)
        for row_num, row in enumerate(reader):
            process_row(row)
            
            if row_num % 10000 == 0:
                print(f"Processed {row_num} rows")

# Usage
large_csv = GSPath("gs://data-bucket/huge-dataset.csv")
process_large_csv(large_csv)

IAM and Permissions

# Check object permissions (requires direct client usage)
def check_object_permissions(gs_path, permissions):
    """Check if current credentials have specified permissions."""
    bucket = gs_path.client.storage_client.bucket(gs_path.bucket)
    blob = bucket.blob(gs_path.blob)
    
    try:
        result = blob.test_iam_permissions(permissions)
        return result
    except Exception as e:
        print(f"Permission check failed: {e}")
        return []

# Usage
gs_path = GSPath("gs://my-bucket/file.txt")
permissions = ["storage.objects.get", "storage.objects.delete"]
allowed = check_object_permissions(gs_path, permissions)
print(f"Allowed permissions: {allowed}")

Error Handling

from cloudpathlib import (
    CloudPathFileNotFoundError,
    MissingCredentialsError
)
from google.api_core import exceptions
import google.auth.exceptions

try:
    gs_path = GSPath("gs://nonexistent-bucket/file.txt")
    content = gs_path.read_text()
except CloudPathFileNotFoundError:
    print("GCS object not found")
except google.auth.exceptions.DefaultCredentialsError:
    print("GCP credentials not configured")
except exceptions.PermissionDenied:
    print("Access denied")
except exceptions.GoogleAPIError as e:
    print(f"GCP API error: {e}")

Performance Optimization

# Optimize for large file operations
client = GSClient(
    download_chunks_concurrently_kwargs={
        "max_workers": 16,           # More concurrent workers
        "chunk_size": 8 * 1024 * 1024  # 8MB chunks
    },
    timeout=300.0,  # 5 minute timeout for large operations
)

# Configure client for high-throughput operations
gs_path = GSPath("gs://big-data-bucket/huge-file.dat", client=client)

# Performance monitoring
import time
start_time = time.time()
gs_path.download_to("local-huge-file.dat")
duration = time.time() - start_time
print(f"Download completed in {duration:.2f} seconds")

Install with Tessl CLI

npx tessl i tessl/pypi-cloudpathlib

docs

anypath.md

azure-integration.md

client-management.md

cloud-operations.md

configuration.md

core-operations.md

directory-operations.md

exceptions.md

file-io.md

gcs-integration.md

http-support.md

index.md

patching.md

s3-integration.md

tile.json