CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-gcloud-aio-storage

Python asyncio client library for Google Cloud Storage with full CRUD operations and streaming support

Pending
Overview
Eval results
Files

streaming-operations.mddocs/

Streaming Operations

The StreamResponse class provides efficient streaming functionality for handling large files without loading entire contents into memory. It serves as a wrapper around HTTP responses that enables chunk-by-chunk processing of download streams, making it ideal for processing large objects or when memory usage is a concern.

Capabilities

Stream Initialization

StreamResponse instances are created by the Storage client's download_stream() method and should not be instantiated directly.

def __init__(self, response):
    """
    Initialize a StreamResponse wrapper.
    
    Parameters:
    - response (aiohttp.ClientResponse): HTTP response object
    
    Attributes:
    - response: Underlying HTTP response object
    """

Usage Example:

async with Storage() as storage:
    # Get streaming response (don't instantiate directly)
    stream = storage.download_stream('my-bucket', 'large-file.dat')
    
    # StreamResponse is created internally by download_stream()
    async with stream as stream_reader:
        # Use stream_reader for processing
        pass

Content Length Information

Access to the total content length of the streamed object.

@property
def content_length(self):
    """
    Get the total content length of the stream.
    
    Returns:
    int: Content length in bytes, or None if not available
    """

Usage Example:

async with Storage() as storage:
    async with storage.download_stream('my-bucket', 'video.mp4') as stream:
        total_size = stream.content_length
        if total_size:
            print(f"Downloading {total_size:,} bytes")
            
        bytes_read = 0
        while True:
            chunk = await stream.read(8192)
            if not chunk:
                break
            bytes_read += len(chunk)
            
            # Show progress
            if total_size:
                progress = (bytes_read / total_size) * 100
                print(f"Progress: {progress:.1f}% ({bytes_read:,}/{total_size:,} bytes)")

Stream Reading

Read data from the stream in configurable chunk sizes.

async def read(self, size=-1):
    """
    Read data from the stream.
    
    Parameters:
    - size (int): Number of bytes to read. -1 reads all remaining data
    
    Returns:
    bytes: Data chunk read from stream, empty bytes when stream is exhausted
    """

Usage Example:

async with Storage() as storage:
    async with storage.download_stream('my-bucket', 'large-dataset.csv') as stream:
        # Read in 64KB chunks
        chunk_size = 65536
        
        while True:
            chunk = await stream.read(chunk_size)
            if not chunk:  # End of stream
                break
                
            # Process chunk (e.g., parse CSV rows)
            process_data_chunk(chunk)
        
        # Read all remaining data at once (not recommended for large files)
        # remaining_data = await stream.read(-1)

Context Manager Support

StreamResponse supports async context manager protocol for automatic resource cleanup.

async def __aenter__(self):
    """
    Enter async context manager.
    
    Returns:
    StreamResponse: Self for use in context
    """

async def __aexit__(self, *exc_info):
    """
    Exit async context manager, cleaning up resources.
    
    Parameters:
    - exc_info: Exception information if context exited due to exception
    
    Returns:
    None
    """

Usage Example:

async with Storage() as storage:
    # Recommended: automatic cleanup with context manager
    async with storage.download_stream('my-bucket', 'file.dat') as stream:
        while True:
            chunk = await stream.read(8192)
            if not chunk:
                break
            # Process chunk
    # Stream automatically closed here
    
    # Manual management (not recommended)
    stream = storage.download_stream('my-bucket', 'file.dat')
    try:
        async with stream as stream_reader:
            # Use stream_reader
            pass
    finally:
        # Cleanup handled by context manager
        pass

Common Usage Patterns

Large File Download with Progress Tracking

import asyncio
from pathlib import Path

async def download_with_progress(storage, bucket_name, object_name, local_path):
    """Download large file with progress tracking."""
    
    async with storage.download_stream(bucket_name, object_name) as stream:
        total_size = stream.content_length
        bytes_downloaded = 0
        
        with open(local_path, 'wb') as f:
            while True:
                chunk = await stream.read(1024 * 1024)  # 1MB chunks
                if not chunk:
                    break
                    
                f.write(chunk)
                bytes_downloaded += len(chunk)
                
                if total_size:
                    progress = (bytes_downloaded / total_size) * 100
                    print(f"\rDownloading: {progress:.1f}% complete", end='', flush=True)
        
        print(f"\nDownload complete: {bytes_downloaded:,} bytes")

# Usage
async with Storage() as storage:
    await download_with_progress(
        storage, 
        'my-bucket', 
        'large-video.mp4', 
        '/tmp/downloaded-video.mp4'
    )

Streaming Data Processing

import json
import asyncio

async def process_streaming_json_lines(storage, bucket_name, object_name):
    """Process JSON Lines file without loading it entirely into memory."""
    
    async with storage.download_stream(bucket_name, object_name) as stream:
        buffer = b''
        
        while True:
            chunk = await stream.read(8192)
            if not chunk:
                # Process any remaining data in buffer
                if buffer.strip():
                    try:
                        record = json.loads(buffer.decode('utf-8'))
                        await process_record(record)
                    except json.JSONDecodeError:
                        print(f"Warning: Could not parse final record: {buffer}")
                break
            
            buffer += chunk
            
            # Process complete lines
            while b'\n' in buffer:
                line, buffer = buffer.split(b'\n', 1)
                if line.strip():  # Skip empty lines
                    try:
                        record = json.loads(line.decode('utf-8'))
                        await process_record(record)
                    except json.JSONDecodeError as e:
                        print(f"Warning: Could not parse line: {line} ({e})")

async def process_record(record):
    """Process individual JSON record."""
    # Your processing logic here
    print(f"Processing: {record.get('id', 'unknown')}")
    await asyncio.sleep(0.01)  # Simulate processing time

# Usage
async with Storage() as storage:
    await process_streaming_json_lines(storage, 'data-bucket', 'large-dataset.jsonl')

Concurrent Stream Processing

import asyncio
from typing import List

async def parallel_stream_download(storage, bucket_name, object_names: List[str]):
    """Download multiple large files concurrently using streams."""
    
    async def download_single(object_name):
        local_path = f"/tmp/{object_name.replace('/', '_')}"
        
        async with storage.download_stream(bucket_name, object_name) as stream:
            with open(local_path, 'wb') as f:
                bytes_written = 0
                
                while True:
                    chunk = await stream.read(64 * 1024)  # 64KB chunks
                    if not chunk:
                        break
                    f.write(chunk)
                    bytes_written += len(chunk)
                
                print(f"Downloaded {object_name}: {bytes_written:,} bytes")
                return local_path
    
    # Download all files concurrently
    tasks = [download_single(name) for name in object_names]
    downloaded_paths = await asyncio.gather(*tasks)
    
    return downloaded_paths

# Usage
async with Storage() as storage:
    files_to_download = [
        'datasets/2023/data-01.csv',
        'datasets/2023/data-02.csv', 
        'datasets/2023/data-03.csv'
    ]
    
    paths = await parallel_stream_download(storage, 'my-bucket', files_to_download)
    print(f"All files downloaded to: {paths}")

Memory-Efficient Data Transformation

import gzip
import asyncio

async def transform_and_reupload(storage, source_bucket, source_object, dest_bucket, dest_object):
    """Transform data while streaming from source to destination."""
    
    # Download stream from source
    async with storage.download_stream(source_bucket, source_object) as source_stream:
        
        # Collect transformed data in chunks
        transformed_chunks = []
        
        while True:
            chunk = await source_stream.read(32 * 1024)  # 32KB chunks
            if not chunk:
                break
            
            # Transform data (example: convert to uppercase)
            transformed_chunk = chunk.upper()
            transformed_chunks.append(transformed_chunk)
            
            # Optional: Process chunks in smaller batches to control memory
            if len(transformed_chunks) >= 10:  # Process every 10 chunks
                batch_data = b''.join(transformed_chunks)
                # Could upload partial results here for very large files
                transformed_chunks = []
        
        # Combine all transformed data
        final_data = b''.join(transformed_chunks)
        
        # Compress before uploading
        compressed_data = gzip.compress(final_data)
        
        # Upload transformed result
        result = await storage.upload(
            dest_bucket, 
            dest_object, 
            compressed_data,
            content_type='application/gzip'
        )
        
        print(f"Transformed {len(final_data):,} bytes to {len(compressed_data):,} bytes")
        return result

# Usage
async with Storage() as storage:
    await transform_and_reupload(
        storage,
        'input-bucket', 'raw-data.txt',
        'output-bucket', 'processed-data.txt.gz'
    )

Performance Considerations

Optimal Chunk Sizes

# Recommended chunk sizes for different scenarios:

# Network-limited environments (slower connections)
chunk_size = 8 * 1024  # 8KB

# Balanced performance (most common)
chunk_size = 64 * 1024  # 64KB

# High-bandwidth, processing-intensive tasks
chunk_size = 1024 * 1024  # 1MB

# Memory-constrained environments
chunk_size = 4 * 1024  # 4KB

async with storage.download_stream('bucket', 'file') as stream:
    while True:
        chunk = await stream.read(chunk_size)
        if not chunk:
            break
        # Process with appropriate chunk size

Resource Management

async with Storage() as storage:
    # Always use context managers for automatic cleanup
    async with storage.download_stream('bucket', 'large-file') as stream:
        # Stream resources are automatically managed
        while True:
            chunk = await stream.read(8192)
            if not chunk:
                break
            # Process chunk
    # Resources automatically released here

Install with Tessl CLI

npx tessl i tessl/pypi-gcloud-aio-storage

docs

blob-management.md

bucket-operations.md

index.md

storage-client.md

streaming-operations.md

tile.json