Python asyncio client library for Google Cloud Storage with full CRUD operations and streaming support
—
The Storage class is the main entry point for interacting with Google Cloud Storage. It handles authentication, session management, and provides direct access to all storage operations including bucket management, object manipulation, and metadata operations.
Create a Storage client with optional authentication and configuration parameters.
def __init__(self, *, service_file=None, token=None, session=None, api_root=None):
"""
Initialize Storage client.
Parameters:
- service_file (str, optional): Path to service account JSON file
- token (Token, optional): Pre-configured authentication token
- session (aiohttp.ClientSession, optional): Custom HTTP session
- api_root (str, optional): Custom API root URL for testing/emulators
"""Usage Example:
# Default initialization (uses Application Default Credentials)
storage = Storage()
# With service account file
storage = Storage(service_file='/path/to/service-account.json')
# For testing with emulator
storage = Storage(api_root='http://localhost:8080')The Storage client supports async context manager protocol for automatic session cleanup.
async def __aenter__(self):
"""Enter async context manager, returns self."""
async def __aexit__(self, *args):
"""Exit async context manager, closes session."""
async def close(self):
"""Manually close the session."""Usage Example:
# Recommended: automatic cleanup
async with Storage() as storage:
# Use storage client
pass
# Manual cleanup when needed
storage = Storage()
try:
# Use storage client
pass
finally:
await storage.close()Operations for listing and managing buckets at the project level.
async def list_buckets(self, project, *, params=None, headers=None, session=None, timeout=10):
"""
List all buckets in a project.
Parameters:
- project (str): Google Cloud project ID
- params (dict, optional): Additional query parameters
- headers (dict, optional): Custom HTTP headers
- session (aiohttp.ClientSession, optional): Custom session
- timeout (int): Request timeout in seconds
Returns:
List[Bucket]: List of bucket instances
"""
def get_bucket(self, bucket_name):
"""
Get a bucket instance for operations.
Parameters:
- bucket_name (str): Name of the bucket
Returns:
Bucket: Bucket instance for further operations
"""
async def get_bucket_metadata(self, bucket, *, params=None, headers=None, session=None, timeout=10):
"""
Get metadata for a specific bucket.
Parameters:
- bucket (str): Bucket name
- params (dict, optional): Additional query parameters
- headers (dict, optional): Custom HTTP headers
- session (aiohttp.ClientSession, optional): Custom session
- timeout (int): Request timeout in seconds
Returns:
Dict[str, Any]: Bucket metadata
"""Usage Example:
async with Storage() as storage:
# List all buckets in project
buckets = await storage.list_buckets('my-project-id')
# Get bucket instance
bucket = storage.get_bucket('my-bucket')
# Get bucket metadata
metadata = await storage.get_bucket_metadata('my-bucket')Download objects from Cloud Storage with various options for streaming, metadata-only retrieval, and file output.
async def download(self, bucket, object_name, *, headers=None, timeout=10, session=None):
"""
Download object content as bytes.
Parameters:
- bucket (str): Bucket name
- object_name (str): Object name/path
- headers (dict, optional): Custom HTTP headers for range requests, etc.
- timeout (int): Request timeout in seconds
- session (aiohttp.ClientSession, optional): Custom session
Returns:
bytes: Object content
"""
async def download_to_filename(self, bucket, object_name, filename, **kwargs):
"""
Download object directly to a local file.
Parameters:
- bucket (str): Bucket name
- object_name (str): Object name/path
- filename (str): Local file path to write to
- **kwargs: Additional arguments passed to download()
Returns:
None
"""
async def download_metadata(self, bucket, object_name, *, headers=None, session=None, timeout=10):
"""
Get object metadata without downloading content.
Parameters:
- bucket (str): Bucket name
- object_name (str): Object name/path
- headers (dict, optional): Custom HTTP headers
- session (aiohttp.ClientSession, optional): Custom session
- timeout (int): Request timeout in seconds
Returns:
Dict[str, Any]: Object metadata
"""
async def download_stream(self, bucket, object_name, *, headers=None, timeout=10, session=None):
"""
Download object as a stream for large files.
Parameters:
- bucket (str): Bucket name
- object_name (str): Object name/path
- headers (dict, optional): Custom HTTP headers for range requests
- timeout (int): Request timeout in seconds
- session (aiohttp.ClientSession, optional): Custom session
Returns:
StreamResponse: Async stream for reading content
"""Usage Example:
async with Storage() as storage:
# Download entire file
content = await storage.download('my-bucket', 'file.txt')
# Download to local file
await storage.download_to_filename('my-bucket', 'large-file.zip', '/tmp/download.zip')
# Get metadata only
metadata = await storage.download_metadata('my-bucket', 'file.txt')
print(f"File size: {metadata['size']} bytes")
# Stream large file
async with storage.download_stream('my-bucket', 'huge-file.dat') as stream:
while True:
chunk = await stream.read(8192)
if not chunk:
break
# Process chunkUpload objects to Cloud Storage with support for different upload methods, compression, and metadata.
async def upload(self, bucket, object_name, file_data, *, content_type=None, parameters=None, headers=None, metadata=None, session=None, force_resumable_upload=None, zipped=False, timeout=30):
"""
Upload data to an object.
Parameters:
- bucket (str): Bucket name
- object_name (str): Object name/path
- file_data (bytes or file-like): Data to upload
- content_type (str, optional): MIME type of the content
- parameters (dict, optional): Additional upload parameters
- headers (dict, optional): Custom HTTP headers
- metadata (dict, optional): Object metadata key-value pairs
- session (aiohttp.ClientSession, optional): Custom session
- force_resumable_upload (bool, optional): Force resumable upload method
- zipped (bool): Whether to gzip compress the data
- timeout (int): Request timeout in seconds
Returns:
Dict[str, Any]: Upload response metadata
"""
async def upload_from_filename(self, bucket, object_name, filename, **kwargs):
"""
Upload a local file to an object.
Parameters:
- bucket (str): Bucket name
- object_name (str): Object name/path
- filename (str): Local file path to upload
- **kwargs: Additional arguments passed to upload()
Returns:
Dict[str, Any]: Upload response metadata
"""Usage Example:
async with Storage() as storage:
# Upload bytes data
data = b"Hello, Cloud Storage!"
result = await storage.upload('my-bucket', 'greeting.txt', data,
content_type='text/plain')
# Upload with metadata
metadata = {'author': 'user', 'version': '1.0'}
result = await storage.upload('my-bucket', 'data.json', json_data,
content_type='application/json',
metadata=metadata)
# Upload from local file
result = await storage.upload_from_filename('my-bucket', 'backup.zip',
'/path/to/backup.zip')
# Upload with compression
result = await storage.upload('my-bucket', 'large-text.txt', text_data,
zipped=True, content_type='text/plain')Additional operations for managing objects including copying, deleting, listing, and metadata updates.
async def copy(self, bucket, object_name, destination_bucket, *, new_name=None, metadata=None, params=None, headers=None, timeout=10, session=None):
"""
Copy an object to another location.
Parameters:
- bucket (str): Source bucket name
- object_name (str): Source object name/path
- destination_bucket (str): Destination bucket name
- new_name (str, optional): New object name, defaults to original name
- metadata (dict, optional): New metadata for copied object
- params (dict, optional): Additional query parameters
- headers (dict, optional): Custom HTTP headers
- timeout (int): Request timeout in seconds
- session (aiohttp.ClientSession, optional): Custom session
Returns:
Dict[str, Any]: Copy operation response
"""
async def delete(self, bucket, object_name, *, timeout=10, params=None, headers=None, session=None):
"""
Delete an object.
Parameters:
- bucket (str): Bucket name
- object_name (str): Object name/path
- timeout (int): Request timeout in seconds
- params (dict, optional): Additional query parameters
- headers (dict, optional): Custom HTTP headers
- session (aiohttp.ClientSession, optional): Custom session
Returns:
str: Deletion response
"""
async def list_objects(self, bucket, *, params=None, headers=None, session=None, timeout=10):
"""
List objects in a bucket.
Parameters:
- bucket (str): Bucket name
- params (dict, optional): Query parameters (prefix, delimiter, maxResults, etc.)
- headers (dict, optional): Custom HTTP headers
- session (aiohttp.ClientSession, optional): Custom session
- timeout (int): Request timeout in seconds
Returns:
Dict[str, Any]: List response with objects and metadata
"""
async def patch_metadata(self, bucket, object_name, metadata, *, params=None, headers=None, session=None, timeout=10):
"""
Update object metadata.
Parameters:
- bucket (str): Bucket name
- object_name (str): Object name/path
- metadata (dict): Metadata key-value pairs to update
- params (dict, optional): Additional query parameters
- headers (dict, optional): Custom HTTP headers
- session (aiohttp.ClientSession, optional): Custom session
- timeout (int): Request timeout in seconds
Returns:
Dict[str, Any]: Updated object metadata
"""
async def compose(self, bucket, object_name, source_object_names, *, content_type=None, params=None, headers=None, session=None, timeout=10):
"""
Compose multiple objects into a single object.
Parameters:
- bucket (str): Bucket name
- object_name (str): Name for the composed object
- source_object_names (List[str]): List of source object names to compose
- content_type (str, optional): MIME type for composed object
- params (dict, optional): Additional query parameters
- headers (dict, optional): Custom HTTP headers
- session (aiohttp.ClientSession, optional): Custom session
- timeout (int): Request timeout in seconds
Returns:
Dict[str, Any]: Compose operation response
"""Usage Example:
async with Storage() as storage:
# Copy object to another bucket
await storage.copy('source-bucket', 'file.txt', 'dest-bucket',
new_name='copied-file.txt')
# Delete object
await storage.delete('my-bucket', 'old-file.txt')
# List objects with prefix
params = {'prefix': 'logs/', 'maxResults': 100}
result = await storage.list_objects('my-bucket', params=params)
# Update metadata
metadata = {'status': 'processed', 'last_modified': '2023-01-01'}
await storage.patch_metadata('my-bucket', 'data.json', metadata)
# Compose multiple objects
parts = ['part1.txt', 'part2.txt', 'part3.txt']
await storage.compose('my-bucket', 'combined.txt', parts,
content_type='text/plain')Install with Tessl CLI
npx tessl i tessl/pypi-gcloud-aio-storage