Python asyncio client library for Google Cloud Storage with full CRUD operations and streaming support
npx @tessl/cli install tessl/pypi-gcloud-aio-storage@9.6.0An asyncio-compatible Python client library for Google Cloud Storage that provides full CRUD operations for buckets and blobs with streaming support for large files, parallel upload capabilities, and built-in session management. Designed for high-performance cloud storage operations with modern async/await patterns.
pip install gcloud-aio-storagefrom gcloud.aio.storage import Storage, Bucket, Blob, StreamResponse, SCOPESimport asyncio
from gcloud.aio.storage import Storage
async def main():
# Initialize storage client
async with Storage() as storage:
# List buckets
buckets = await storage.list_buckets('my-project')
# Upload a file
with open('local-file.txt', 'rb') as f:
file_data = f.read()
result = await storage.upload('my-bucket', 'remote-file.txt', file_data)
# Download a file
content = await storage.download('my-bucket', 'remote-file.txt')
# Stream download for large files
async with storage.download_stream('my-bucket', 'large-file.txt') as stream:
while True:
chunk = await stream.read(8192)
if not chunk:
break
# Process chunk
# Can also be used without context manager
storage = Storage()
try:
content = await storage.download('my-bucket', 'file.txt')
finally:
await storage.close()
asyncio.run(main())The library follows a hierarchical structure that mirrors Google Cloud Storage's organization:
This design enables both direct operations through the Storage client and object-oriented manipulation through Bucket and Blob instances, providing flexibility for different usage patterns while maintaining efficient session reuse.
The library supports both production and testing environments through built-in emulator support via the STORAGE_EMULATOR_HOST environment variable, and provides dual compatibility with both asyncio (aiohttp) and synchronous (requests) HTTP clients through the gcloud-rest-* variants.
Core storage client functionality for bucket management, object operations, and session handling. Provides direct access to all Cloud Storage operations with automatic authentication and session management.
class Storage:
def __init__(self, *, service_file=None, token=None, session=None, api_root=None): ...
async def list_buckets(self, project, *, params=None, headers=None, session=None, timeout=10): ...
def get_bucket(self, bucket_name): ...
async def copy(self, bucket, object_name, destination_bucket, *, new_name=None, metadata=None, params=None, headers=None, timeout=10, session=None): ...
async def delete(self, bucket, object_name, *, timeout=10, params=None, headers=None, session=None): ...
async def download(self, bucket, object_name, *, headers=None, timeout=10, session=None): ...
async def download_to_filename(self, bucket, object_name, filename, **kwargs): ...
async def download_metadata(self, bucket, object_name, *, headers=None, session=None, timeout=10): ...
async def download_stream(self, bucket, object_name, *, headers=None, timeout=10, session=None): ...
async def list_objects(self, bucket, *, params=None, headers=None, session=None, timeout=10): ...
async def upload(self, bucket, object_name, file_data, *, content_type=None, parameters=None, headers=None, metadata=None, session=None, force_resumable_upload=None, zipped=False, timeout=30): ...
async def upload_from_filename(self, bucket, object_name, filename, **kwargs): ...
async def compose(self, bucket, object_name, source_object_names, *, content_type=None, params=None, headers=None, session=None, timeout=10): ...
async def patch_metadata(self, bucket, object_name, metadata, *, params=None, headers=None, session=None, timeout=10): ...
async def get_bucket_metadata(self, bucket, *, params=None, headers=None, session=None, timeout=10): ...
async def close(self): ...
async def __aenter__(self): ...
async def __aexit__(self, *args): ...Bucket-level operations for managing Cloud Storage buckets and their contained blobs. Provides a container abstraction that simplifies working with groups of related objects.
class Bucket:
def __init__(self, storage, name): ...
async def get_blob(self, blob_name, timeout=10, session=None): ...
async def blob_exists(self, blob_name, session=None): ...
async def list_blobs(self, prefix='', match_glob='', delimiter='', session=None): ...
def new_blob(self, blob_name): ...
async def get_metadata(self, params=None, session=None): ...Individual object operations for Cloud Storage blobs including content manipulation, metadata management, and signed URL generation for secure access.
class Blob:
def __init__(self, bucket, name, metadata): ...
async def download(self, timeout=10, session=None, auto_decompress=True): ...
async def upload(self, data, content_type=None, session=None): ...
async def get_signed_url(self, expiration, headers=None, query_params=None, http_method='GET', iam_client=None, service_account_email=None, token=None, session=None): ...
@staticmethod
def get_pem_signature(str_to_sign, private_key): ...
@staticmethod
async def get_iam_api_signature(str_to_sign, iam_client, service_account_email, session): ...Efficient streaming functionality for handling large files without loading entire contents into memory. Supports both upload and download streaming with automatic chunk management.
class StreamResponse:
def __init__(self, response): ...
async def read(self, size=-1): ...
@property
def content_length(self): ...
async def __aenter__(self): ...
async def __aexit__(self, *exc_info): ...SCOPES = ['https://www.googleapis.com/auth/devstorage.read_write']
DEFAULT_TIMEOUT = 10
MAX_CONTENT_LENGTH_SIMPLE_UPLOAD = 5242880 # 5MB
HOST = 'storage.googleapis.com' # Can be overridden via STORAGE_EMULATOR_HOST
PKCS1_MARKER = ('-----BEGIN RSA PRIVATE KEY-----', '-----END RSA PRIVATE KEY-----')
PKCS8_MARKER = ('-----BEGIN PRIVATE KEY-----', '-----END PRIVATE KEY-----')from enum import Enum
class UploadType(Enum):
SIMPLE = 1
RESUMABLE = 2
MULTIPART = 3
class PemKind(Enum):
INVALID = -1
PKCS1 = 0
PKCS8 = 1