CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gcloud-aio-storage

Python asyncio client library for Google Cloud Storage with full CRUD operations and streaming support

Pending
Overview
Eval results
Files

blob-management.mddocs/

Blob Management

The Blob class represents individual objects stored in Google Cloud Storage, providing methods for content manipulation, metadata management, and secure access through signed URLs. Blob instances are created through Bucket operations and maintain references to their parent bucket and storage client.

Capabilities

Blob Initialization

Blob instances are typically created through Bucket methods like get_blob() or new_blob() rather than direct instantiation.

def __init__(self, bucket, name, metadata):
    """
    Initialize a Blob instance.
    
    Parameters:
    - bucket (Bucket): Parent bucket instance
    - name (str): Blob name/path
    - metadata (Dict[str, Any]): Blob metadata from GCS
    
    Attributes:
    - bucket (Bucket): Reference to parent bucket
    - name (str): Blob name/path
    - size (int): Blob size in bytes (from metadata)
    - chunk_size (int): Download chunk size for streaming
    """

Usage Example:

async with Storage() as storage:
    bucket = storage.get_bucket('my-bucket')
    
    # Get existing blob with metadata
    blob = await bucket.get_blob('existing-file.txt')
    
    # Create new blob instance (without metadata)
    new_blob = bucket.new_blob('new-file.txt')
    
    # Access blob properties
    print(f"Blob name: {blob.name}")
    print(f"Blob size: {blob.size} bytes")
    print(f"Parent bucket: {blob.bucket.name}")

Content Download Operations

Download blob content with options for automatic decompression and different return formats.

async def download(self, timeout=10, session=None, auto_decompress=True):
    """
    Download blob content.
    
    Parameters:
    - timeout (int): Request timeout in seconds
    - session (aiohttp.ClientSession, optional): Custom session
    - auto_decompress (bool): Automatically decompress gzip content
    
    Returns:
    Any: Blob content (typically bytes, but can be decompressed text)
    """

Usage Example:

async with Storage() as storage:
    bucket = storage.get_bucket('my-bucket')
    blob = await bucket.get_blob('data.json')
    
    # Download content as bytes
    content = await blob.download()
    
    # Download with custom timeout
    content = await blob.download(timeout=30)
    
    # Download without auto-decompression for gzipped content
    raw_content = await blob.download(auto_decompress=False)
    
    # Convert to string if needed
    if isinstance(content, bytes):
        text_content = content.decode('utf-8')

Content Upload Operations

Upload data to the blob with content type specification and metadata handling.

async def upload(self, data, content_type=None, session=None):
    """
    Upload data to the blob.
    
    Parameters:
    - data (bytes or file-like): Data to upload
    - content_type (str, optional): MIME type of the content
    - session (aiohttp.ClientSession, optional): Custom session
    
    Returns:
    Dict[str, Any]: Upload response metadata
    """

Usage Example:

async with Storage() as storage:
    bucket = storage.get_bucket('my-bucket')
    
    # Create new blob and upload data
    blob = bucket.new_blob('new-document.pdf')
    with open('/local/path/document.pdf', 'rb') as f:
        pdf_data = f.read()
    
    result = await blob.upload(pdf_data, content_type='application/pdf')
    
    # Upload text content
    text_blob = bucket.new_blob('notes.txt')
    text_data = "Important notes here".encode('utf-8')
    result = await text_blob.upload(text_data, content_type='text/plain')
    
    # Upload JSON data
    json_blob = bucket.new_blob('config.json')
    import json
    json_data = json.dumps({'setting': 'value'}).encode('utf-8')
    result = await json_blob.upload(json_data, content_type='application/json')

Signed URL Generation

Generate temporary, secure URLs for accessing blob content without authentication credentials.

async def get_signed_url(self, expiration, headers=None, query_params=None, http_method='GET', iam_client=None, service_account_email=None, token=None, session=None):
    """
    Generate a signed URL for the blob.
    
    Parameters:
    - expiration (int or datetime): URL expiration (seconds from now or datetime object)
    - headers (dict, optional): Headers to include in the signed request
    - query_params (dict, optional): Query parameters to include
    - http_method (str): HTTP method the URL will be used with
    - iam_client (IAMClient, optional): IAM client for signature generation
    - service_account_email (str, optional): Service account for IAM signing
    - token (Token, optional): Authentication token
    - session (aiohttp.ClientSession, optional): Custom session
    
    Returns:
    str: Signed URL for blob access
    """

Usage Example:

from datetime import datetime, timedelta

async with Storage() as storage:
    bucket = storage.get_bucket('my-bucket')
    blob = await bucket.get_blob('private-document.pdf')
    
    # Generate URL valid for 1 hour (3600 seconds)
    url = await blob.get_signed_url(3600)
    print(f"Temporary URL: {url}")
    
    # Generate URL with specific expiration datetime
    expiry = datetime.now() + timedelta(hours=2)
    url = await blob.get_signed_url(expiry)
    
    # Generate URL for POST uploads with custom headers
    headers = {'Content-Type': 'application/pdf'}
    query_params = {'uploadType': 'media'}
    upload_url = await blob.get_signed_url(
        3600, 
        headers=headers,
        query_params=query_params,
        http_method='POST'
    )
    
    # Generate URL using IAM service account
    url = await blob.get_signed_url(
        3600,
        service_account_email='my-service@project.iam.gserviceaccount.com'
    )

Cryptographic Signature Methods

Static methods for generating cryptographic signatures used in signed URL creation.

@staticmethod
def get_pem_signature(str_to_sign, private_key):
    """
    Generate PEM-based signature for string signing.
    
    Parameters:
    - str_to_sign (str): String to be signed
    - private_key (str): PEM-formatted private key
    
    Returns:
    bytes: Cryptographic signature
    """

@staticmethod
async def get_iam_api_signature(str_to_sign, iam_client, service_account_email, session):
    """
    Generate signature using Google IAM API.
    
    Parameters:
    - str_to_sign (str): String to be signed
    - iam_client (IAMClient): IAM client instance
    - service_account_email (str): Service account email
    - session (aiohttp.ClientSession): HTTP session
    
    Returns:
    bytes: Cryptographic signature from IAM API
    """

Usage Example:

# These are typically used internally by get_signed_url()
# but can be used directly for custom signing scenarios

# Using PEM key directly
private_key_pem = """-----BEGIN PRIVATE KEY-----
MIIEvQ...
-----END PRIVATE KEY-----"""

signature = Blob.get_pem_signature("string to sign", private_key_pem)

# Using IAM API for signing (requires IAM client setup)
from gcloud.aio.auth import IAMClient

async with IAMClient() as iam_client:
    signature = await Blob.get_iam_api_signature(
        "string to sign",
        iam_client,
        "service-account@project.iam.gserviceaccount.com",
        session
    )

Common Usage Patterns

File Processing Pipeline

async with Storage() as storage:
    bucket = storage.get_bucket('processing-bucket')
    
    # Get input blob
    input_blob = await bucket.get_blob('input/data.csv')
    csv_content = await input_blob.download()
    
    # Process data
    processed_data = process_csv(csv_content)
    
    # Upload processed result
    output_blob = bucket.new_blob('output/processed-data.json')
    json_data = json.dumps(processed_data).encode('utf-8')
    await output_blob.upload(json_data, content_type='application/json')

Secure File Sharing

async with Storage() as storage:
    bucket = storage.get_bucket('shared-files')
    blob = await bucket.get_blob('confidential-report.pdf')
    
    # Generate short-lived URL for sharing
    secure_url = await blob.get_signed_url(
        3600,  # 1 hour expiration
        http_method='GET'
    )
    
    # Send URL to authorized user
    send_secure_link(user_email, secure_url)

Backup and Archival

async with Storage() as storage:
    source_bucket = storage.get_bucket('live-data')
    archive_bucket = storage.get_bucket('archive')
    
    # Get blob from live bucket
    live_blob = await source_bucket.get_blob('important-data.db')
    content = await live_blob.download()
    
    # Create archived copy with timestamp
    from datetime import datetime
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    archive_blob = archive_bucket.new_blob(f'backups/data_{timestamp}.db')
    await archive_blob.upload(content, content_type='application/octet-stream')

Install with Tessl CLI

npx tessl i tessl/pypi-gcloud-aio-storage

docs

blob-management.md

bucket-operations.md

index.md

storage-client.md

streaming-operations.md

tile.json