CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gcloud-aio-storage

Python asyncio client library for Google Cloud Storage with full CRUD operations and streaming support

Pending
Overview
Eval results
Files

storage-client.mddocs/

Storage Client

The Storage class is the main entry point for interacting with Google Cloud Storage. It handles authentication, session management, and provides direct access to all storage operations including bucket management, object manipulation, and metadata operations.

Capabilities

Client Initialization

Create a Storage client with optional authentication and configuration parameters.

def __init__(self, *, service_file=None, token=None, session=None, api_root=None):
    """
    Initialize Storage client.
    
    Parameters:
    - service_file (str, optional): Path to service account JSON file
    - token (Token, optional): Pre-configured authentication token
    - session (aiohttp.ClientSession, optional): Custom HTTP session
    - api_root (str, optional): Custom API root URL for testing/emulators
    """

Usage Example:

# Default initialization (uses Application Default Credentials)
storage = Storage()

# With service account file
storage = Storage(service_file='/path/to/service-account.json')

# For testing with emulator
storage = Storage(api_root='http://localhost:8080')

Context Manager Support

The Storage client supports async context manager protocol for automatic session cleanup.

async def __aenter__(self):
    """Enter async context manager, returns self."""

async def __aexit__(self, *args):
    """Exit async context manager, closes session."""
    
async def close(self):
    """Manually close the session."""

Usage Example:

# Recommended: automatic cleanup
async with Storage() as storage:
    # Use storage client
    pass

# Manual cleanup when needed
storage = Storage()
try:
    # Use storage client
    pass
finally:
    await storage.close()

Bucket Management

Operations for listing and managing buckets at the project level.

async def list_buckets(self, project, *, params=None, headers=None, session=None, timeout=10):
    """
    List all buckets in a project.
    
    Parameters:
    - project (str): Google Cloud project ID
    - params (dict, optional): Additional query parameters
    - headers (dict, optional): Custom HTTP headers
    - session (aiohttp.ClientSession, optional): Custom session
    - timeout (int): Request timeout in seconds
    
    Returns:
    List[Bucket]: List of bucket instances
    """

def get_bucket(self, bucket_name):
    """
    Get a bucket instance for operations.
    
    Parameters:
    - bucket_name (str): Name of the bucket
    
    Returns:
    Bucket: Bucket instance for further operations
    """

async def get_bucket_metadata(self, bucket, *, params=None, headers=None, session=None, timeout=10):
    """
    Get metadata for a specific bucket.
    
    Parameters:
    - bucket (str): Bucket name
    - params (dict, optional): Additional query parameters
    - headers (dict, optional): Custom HTTP headers  
    - session (aiohttp.ClientSession, optional): Custom session
    - timeout (int): Request timeout in seconds
    
    Returns:
    Dict[str, Any]: Bucket metadata
    """

Usage Example:

async with Storage() as storage:
    # List all buckets in project
    buckets = await storage.list_buckets('my-project-id')
    
    # Get bucket instance
    bucket = storage.get_bucket('my-bucket')
    
    # Get bucket metadata
    metadata = await storage.get_bucket_metadata('my-bucket')

Object Download Operations

Download objects from Cloud Storage with various options for streaming, metadata-only retrieval, and file output.

async def download(self, bucket, object_name, *, headers=None, timeout=10, session=None):
    """
    Download object content as bytes.
    
    Parameters:
    - bucket (str): Bucket name
    - object_name (str): Object name/path
    - headers (dict, optional): Custom HTTP headers for range requests, etc.
    - timeout (int): Request timeout in seconds
    - session (aiohttp.ClientSession, optional): Custom session
    
    Returns:
    bytes: Object content
    """

async def download_to_filename(self, bucket, object_name, filename, **kwargs):
    """
    Download object directly to a local file.
    
    Parameters:
    - bucket (str): Bucket name
    - object_name (str): Object name/path
    - filename (str): Local file path to write to
    - **kwargs: Additional arguments passed to download()
    
    Returns:
    None
    """

async def download_metadata(self, bucket, object_name, *, headers=None, session=None, timeout=10):
    """
    Get object metadata without downloading content.
    
    Parameters:
    - bucket (str): Bucket name
    - object_name (str): Object name/path
    - headers (dict, optional): Custom HTTP headers
    - session (aiohttp.ClientSession, optional): Custom session
    - timeout (int): Request timeout in seconds
    
    Returns:
    Dict[str, Any]: Object metadata
    """

async def download_stream(self, bucket, object_name, *, headers=None, timeout=10, session=None):
    """
    Download object as a stream for large files.
    
    Parameters:
    - bucket (str): Bucket name
    - object_name (str): Object name/path
    - headers (dict, optional): Custom HTTP headers for range requests
    - timeout (int): Request timeout in seconds
    - session (aiohttp.ClientSession, optional): Custom session
    
    Returns:
    StreamResponse: Async stream for reading content
    """

Usage Example:

async with Storage() as storage:
    # Download entire file
    content = await storage.download('my-bucket', 'file.txt')
    
    # Download to local file
    await storage.download_to_filename('my-bucket', 'large-file.zip', '/tmp/download.zip')
    
    # Get metadata only
    metadata = await storage.download_metadata('my-bucket', 'file.txt')
    print(f"File size: {metadata['size']} bytes")
    
    # Stream large file
    async with storage.download_stream('my-bucket', 'huge-file.dat') as stream:
        while True:
            chunk = await stream.read(8192)
            if not chunk:
                break
            # Process chunk

Object Upload Operations

Upload objects to Cloud Storage with support for different upload methods, compression, and metadata.

async def upload(self, bucket, object_name, file_data, *, content_type=None, parameters=None, headers=None, metadata=None, session=None, force_resumable_upload=None, zipped=False, timeout=30):
    """
    Upload data to an object.
    
    Parameters:
    - bucket (str): Bucket name
    - object_name (str): Object name/path
    - file_data (bytes or file-like): Data to upload
    - content_type (str, optional): MIME type of the content
    - parameters (dict, optional): Additional upload parameters
    - headers (dict, optional): Custom HTTP headers
    - metadata (dict, optional): Object metadata key-value pairs
    - session (aiohttp.ClientSession, optional): Custom session
    - force_resumable_upload (bool, optional): Force resumable upload method
    - zipped (bool): Whether to gzip compress the data
    - timeout (int): Request timeout in seconds
    
    Returns:
    Dict[str, Any]: Upload response metadata
    """

async def upload_from_filename(self, bucket, object_name, filename, **kwargs):
    """
    Upload a local file to an object.
    
    Parameters:
    - bucket (str): Bucket name
    - object_name (str): Object name/path
    - filename (str): Local file path to upload
    - **kwargs: Additional arguments passed to upload()
    
    Returns:
    Dict[str, Any]: Upload response metadata
    """

Usage Example:

async with Storage() as storage:
    # Upload bytes data
    data = b"Hello, Cloud Storage!"
    result = await storage.upload('my-bucket', 'greeting.txt', data, 
                                  content_type='text/plain')
    
    # Upload with metadata
    metadata = {'author': 'user', 'version': '1.0'}
    result = await storage.upload('my-bucket', 'data.json', json_data,
                                  content_type='application/json',
                                  metadata=metadata)
    
    # Upload from local file
    result = await storage.upload_from_filename('my-bucket', 'backup.zip', 
                                                '/path/to/backup.zip')
    
    # Upload with compression
    result = await storage.upload('my-bucket', 'large-text.txt', text_data,
                                  zipped=True, content_type='text/plain')

Object Management Operations

Additional operations for managing objects including copying, deleting, listing, and metadata updates.

async def copy(self, bucket, object_name, destination_bucket, *, new_name=None, metadata=None, params=None, headers=None, timeout=10, session=None):
    """
    Copy an object to another location.
    
    Parameters:
    - bucket (str): Source bucket name
    - object_name (str): Source object name/path
    - destination_bucket (str): Destination bucket name
    - new_name (str, optional): New object name, defaults to original name
    - metadata (dict, optional): New metadata for copied object
    - params (dict, optional): Additional query parameters
    - headers (dict, optional): Custom HTTP headers
    - timeout (int): Request timeout in seconds
    - session (aiohttp.ClientSession, optional): Custom session
    
    Returns:
    Dict[str, Any]: Copy operation response
    """

async def delete(self, bucket, object_name, *, timeout=10, params=None, headers=None, session=None):
    """
    Delete an object.
    
    Parameters:
    - bucket (str): Bucket name
    - object_name (str): Object name/path
    - timeout (int): Request timeout in seconds
    - params (dict, optional): Additional query parameters
    - headers (dict, optional): Custom HTTP headers
    - session (aiohttp.ClientSession, optional): Custom session
    
    Returns:
    str: Deletion response
    """

async def list_objects(self, bucket, *, params=None, headers=None, session=None, timeout=10):
    """
    List objects in a bucket.
    
    Parameters:
    - bucket (str): Bucket name
    - params (dict, optional): Query parameters (prefix, delimiter, maxResults, etc.)
    - headers (dict, optional): Custom HTTP headers
    - session (aiohttp.ClientSession, optional): Custom session
    - timeout (int): Request timeout in seconds
    
    Returns:
    Dict[str, Any]: List response with objects and metadata
    """

async def patch_metadata(self, bucket, object_name, metadata, *, params=None, headers=None, session=None, timeout=10):
    """
    Update object metadata.
    
    Parameters:
    - bucket (str): Bucket name
    - object_name (str): Object name/path
    - metadata (dict): Metadata key-value pairs to update
    - params (dict, optional): Additional query parameters
    - headers (dict, optional): Custom HTTP headers
    - session (aiohttp.ClientSession, optional): Custom session
    - timeout (int): Request timeout in seconds
    
    Returns:
    Dict[str, Any]: Updated object metadata
    """

async def compose(self, bucket, object_name, source_object_names, *, content_type=None, params=None, headers=None, session=None, timeout=10):
    """
    Compose multiple objects into a single object.
    
    Parameters:
    - bucket (str): Bucket name
    - object_name (str): Name for the composed object
    - source_object_names (List[str]): List of source object names to compose
    - content_type (str, optional): MIME type for composed object
    - params (dict, optional): Additional query parameters
    - headers (dict, optional): Custom HTTP headers
    - session (aiohttp.ClientSession, optional): Custom session
    - timeout (int): Request timeout in seconds
    
    Returns:
    Dict[str, Any]: Compose operation response
    """

Usage Example:

async with Storage() as storage:
    # Copy object to another bucket
    await storage.copy('source-bucket', 'file.txt', 'dest-bucket', 
                       new_name='copied-file.txt')
    
    # Delete object
    await storage.delete('my-bucket', 'old-file.txt')
    
    # List objects with prefix
    params = {'prefix': 'logs/', 'maxResults': 100}
    result = await storage.list_objects('my-bucket', params=params)
    
    # Update metadata
    metadata = {'status': 'processed', 'last_modified': '2023-01-01'}
    await storage.patch_metadata('my-bucket', 'data.json', metadata)
    
    # Compose multiple objects
    parts = ['part1.txt', 'part2.txt', 'part3.txt']
    await storage.compose('my-bucket', 'combined.txt', parts,
                          content_type='text/plain')

Install with Tessl CLI

npx tessl i tessl/pypi-gcloud-aio-storage

docs

blob-management.md

bucket-operations.md

index.md

storage-client.md

streaming-operations.md

tile.json