Pathlib-style classes for cloud storage services that provide seamless access to AWS S3, Google Cloud Storage, and Azure Blob Storage with familiar filesystem operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Full Google Cloud Storage support with service account authentication, custom retry policies, concurrent downloads, and GCS-specific features. This implementation provides comprehensive access to Google Cloud Storage capabilities through a pathlib-compatible interface.
GCS-specific path implementation with access to Google Cloud Storage metadata.
class GSPath(CloudPath):
"""Google Cloud Storage path implementation."""
@property
def bucket(self) -> str:
"""
GCS bucket name.
Returns:
Bucket name from the GCS URI
"""
@property
def blob(self) -> str:
"""
GCS object name (path within bucket).
Returns:
Object name string
"""
@property
def etag(self) -> str:
"""
GCS object ETag identifier.
Returns:
ETag string for the object
"""
@property
def md5(self) -> str:
"""
MD5 hash of the object content.
Returns:
MD5 hash string
"""Google Cloud Storage client with comprehensive authentication and configuration options.
class GSClient:
"""Google Cloud Storage client."""
def __init__(
self,
application_credentials: str = None,
credentials = None,
project: str = None,
storage_client = None,
file_cache_mode: FileCacheMode = None,
local_cache_dir: str = None,
content_type_method = None,
download_chunks_concurrently_kwargs: dict = None,
timeout: float = None,
retry = None
):
"""
Initialize GCS client.
Args:
application_credentials: Path to service account JSON file
credentials: Google auth credentials object
project: GCP project ID
storage_client: Custom google.cloud.storage.Client instance
file_cache_mode: Cache management strategy
local_cache_dir: Local directory for file cache
content_type_method: Function to determine MIME types
download_chunks_concurrently_kwargs: Concurrent download settings
timeout: Request timeout in seconds
retry: Retry policy for failed requests
"""from cloudpathlib import GSPath, GSClient
# Create GCS path (uses default client)
gs_path = GSPath("gs://my-bucket/data/file.txt")
# Access GCS-specific properties
print(f"Bucket: {gs_path.bucket}") # "my-bucket"
print(f"Blob: {gs_path.blob}") # "data/file.txt"
# Check if object exists and get metadata
if gs_path.exists():
print(f"ETag: {gs_path.etag}")
print(f"MD5: {gs_path.md5}")# Use service account key file
client = GSClient(application_credentials="path/to/service-account.json")
client.set_as_default_client()
# Create paths using service account
gs_path = GSPath("gs://my-bucket/data.json")
content = gs_path.read_text()from google.oauth2 import service_account
# Load credentials from service account
credentials = service_account.Credentials.from_service_account_file(
"service-account.json",
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
client = GSClient(
credentials=credentials,
project="my-gcp-project"
)
gs_path = GSPath("gs://my-bucket/file.txt", client=client)# Use Application Default Credentials (ADC)
# Works when running on GCE, Cloud Run, or with gcloud auth
client = GSClient(project="my-gcp-project")
# ADC automatically handles authentication
gs_path = GSPath("gs://my-bucket/data.csv", client=client)
data = gs_path.read_text()from google.cloud import storage
# Create custom storage client with specific settings
storage_client = storage.Client(
project="my-project",
credentials=credentials
)
client = GSClient(storage_client=storage_client)
# Use custom client
gs_path = GSPath("gs://my-bucket/file.txt", client=client)# Configure concurrent download settings
client = GSClient(
download_chunks_concurrently_kwargs={
"max_workers": 8,
"chunk_size": 1024 * 1024 # 1MB chunks
}
)
# Download large file with concurrent chunks
large_file = GSPath("gs://my-bucket/large-dataset.zip", client=client)
large_file.download_to("local-dataset.zip")from google.api_core import retry
import google.api_core.exceptions
# Configure custom retry policy
custom_retry = retry.Retry(
initial=1.0,
maximum=10.0,
multiplier=2.0,
predicate=retry.if_exception_type(
google.api_core.exceptions.ServiceUnavailable,
google.api_core.exceptions.TooManyRequests
)
)
client = GSClient(
timeout=60.0, # 60 second timeout
retry=custom_retry # Custom retry policy
)
# Operations use configured timeout and retry
gs_path = GSPath("gs://my-bucket/important.txt", client=client)# Upload with specific storage class
def upload_with_storage_class(local_path, gs_path, storage_class):
"""Upload file with specific GCS storage class."""
# Note: Storage class is set via direct client usage
blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)
blob.storage_class = storage_class
with open(local_path, 'rb') as f:
blob.upload_from_file(f)
# Usage examples
gs_path = GSPath("gs://my-bucket/archive.zip")
upload_with_storage_class("data.zip", gs_path, "COLDLINE")
# Different storage classes
storage_classes = ["STANDARD", "NEARLINE", "COLDLINE", "ARCHIVE"]# Work with object lifecycle
def archive_old_files(bucket_name, days_old=365):
"""Archive files older than specified days to ARCHIVE storage class."""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days_old)
bucket_path = GSPath(f"gs://{bucket_name}/")
for gs_file in bucket_path.rglob("*"):
if gs_file.is_file():
stats = gs_file.stat()
if datetime.fromtimestamp(stats.st_mtime) < cutoff_date:
# Move to archive storage class
blob = gs_file.client.storage_client.bucket(gs_file.bucket).blob(gs_file.blob)
blob.storage_class = "ARCHIVE"
blob.patch()
print(f"Archived: {gs_file}")
# Usage
archive_old_files("my-backup-bucket")from datetime import datetime, timedelta
# Generate signed URLs for temporary access
gs_path = GSPath("gs://private-bucket/confidential.pdf")
# Generate download URL (valid for 1 hour)
download_url = gs_path.as_url(presign=True, expire_seconds=3600)
print(f"Download URL: {download_url}")
# Generate upload URL
upload_path = GSPath("gs://uploads-bucket/new-file.txt")
upload_url = upload_path.as_url(presign=True, expire_seconds=1800) # 30 minutes# Access and modify object metadata
def set_custom_metadata(gs_path, metadata_dict):
"""Set custom metadata on GCS object."""
blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)
blob.metadata = metadata_dict
blob.patch()
def get_custom_metadata(gs_path):
"""Get custom metadata from GCS object."""
blob = gs_path.client.storage_client.bucket(gs_path.bucket).blob(gs_path.blob)
blob.reload()
return blob.metadata
# Usage
gs_path = GSPath("gs://my-bucket/document.pdf")
# Set metadata
set_custom_metadata(gs_path, {
"author": "Data Team",
"project": "Analytics",
"version": "1.0"
})
# Read metadata
metadata = get_custom_metadata(gs_path)
print(f"Metadata: {metadata}")import concurrent.futures
from pathlib import Path
def upload_file_parallel(local_path, gs_base):
"""Upload single file to GCS."""
gs_path = gs_base / local_path.name
gs_path.upload_from(local_path)
return gs_path
# Parallel upload of multiple files
local_files = list(Path("data/").glob("*.json"))
gs_base = GSPath("gs://my-bucket/json-data/")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(upload_file_parallel, f, gs_base) for f in local_files]
for future in concurrent.futures.as_completed(futures):
try:
gs_path = future.result()
print(f"Uploaded: {gs_path}")
except Exception as e:
print(f"Upload failed: {e}")# Work with object versions (requires versioned bucket)
def list_object_versions(gs_path):
"""List all versions of an object."""
bucket = gs_path.client.storage_client.bucket(gs_path.bucket)
versions = []
for blob in bucket.list_blobs(prefix=gs_path.blob, versions=True):
if blob.name == gs_path.blob:
versions.append({
"generation": blob.generation,
"time_created": blob.time_created,
"size": blob.size,
"etag": blob.etag
})
return sorted(versions, key=lambda x: x["time_created"], reverse=True)
# Usage
gs_path = GSPath("gs://versioned-bucket/important.txt")
versions = list_object_versions(gs_path)
for version in versions:
print(f"Generation {version['generation']}: {version['time_created']}")# Work with buckets in different projects
project_a_client = GSClient(
project="project-a",
application_credentials="project-a-credentials.json"
)
project_b_client = GSClient(
project="project-b",
application_credentials="project-b-credentials.json"
)
# Copy between projects
source = GSPath("gs://project-a-bucket/data.txt", client=project_a_client)
destination = GSPath("gs://project-b-bucket/data.txt", client=project_b_client)
source.copy(destination)# Stream large files without downloading entirely
def process_large_csv(gs_path):
"""Process large CSV file by streaming."""
import csv
with gs_path.open('r') as f:
reader = csv.DictReader(f)
for row_num, row in enumerate(reader):
process_row(row)
if row_num % 10000 == 0:
print(f"Processed {row_num} rows")
# Usage
large_csv = GSPath("gs://data-bucket/huge-dataset.csv")
process_large_csv(large_csv)# Check object permissions (requires direct client usage)
def check_object_permissions(gs_path, permissions):
"""Check if current credentials have specified permissions."""
bucket = gs_path.client.storage_client.bucket(gs_path.bucket)
blob = bucket.blob(gs_path.blob)
try:
result = blob.test_iam_permissions(permissions)
return result
except Exception as e:
print(f"Permission check failed: {e}")
return []
# Usage
gs_path = GSPath("gs://my-bucket/file.txt")
permissions = ["storage.objects.get", "storage.objects.delete"]
allowed = check_object_permissions(gs_path, permissions)
print(f"Allowed permissions: {allowed}")from cloudpathlib import (
CloudPathFileNotFoundError,
MissingCredentialsError
)
from google.api_core import exceptions
import google.auth.exceptions
try:
gs_path = GSPath("gs://nonexistent-bucket/file.txt")
content = gs_path.read_text()
except CloudPathFileNotFoundError:
print("GCS object not found")
except google.auth.exceptions.DefaultCredentialsError:
print("GCP credentials not configured")
except exceptions.PermissionDenied:
print("Access denied")
except exceptions.GoogleAPIError as e:
print(f"GCP API error: {e}")# Optimize for large file operations
client = GSClient(
download_chunks_concurrently_kwargs={
"max_workers": 16, # More concurrent workers
"chunk_size": 8 * 1024 * 1024 # 8MB chunks
},
timeout=300.0, # 5 minute timeout for large operations
)
# Configure client for high-throughput operations
gs_path = GSPath("gs://big-data-bucket/huge-file.dat", client=client)
# Performance monitoring
import time
start_time = time.time()
gs_path.download_to("local-huge-file.dat")
duration = time.time() - start_time
print(f"Download completed in {duration:.2f} seconds")Install with Tessl CLI
npx tessl i tessl/pypi-cloudpathlib