Python asyncio client library for Google Cloud Storage with full CRUD operations and streaming support
—
The StreamResponse class provides efficient streaming functionality for handling large files without loading entire contents into memory. It serves as a wrapper around HTTP responses that enables chunk-by-chunk processing of download streams, making it ideal for processing large objects or when memory usage is a concern.
StreamResponse instances are created by the Storage client's download_stream() method and should not be instantiated directly.
def __init__(self, response):
"""
Initialize a StreamResponse wrapper.
Parameters:
- response (aiohttp.ClientResponse): HTTP response object
Attributes:
- response: Underlying HTTP response object
"""Usage Example:
async with Storage() as storage:
# Get streaming response (don't instantiate directly)
stream = storage.download_stream('my-bucket', 'large-file.dat')
# StreamResponse is created internally by download_stream()
async with stream as stream_reader:
# Use stream_reader for processing
passAccess to the total content length of the streamed object.
@property
def content_length(self):
"""
Get the total content length of the stream.
Returns:
int: Content length in bytes, or None if not available
"""Usage Example:
async with Storage() as storage:
async with storage.download_stream('my-bucket', 'video.mp4') as stream:
total_size = stream.content_length
if total_size:
print(f"Downloading {total_size:,} bytes")
bytes_read = 0
while True:
chunk = await stream.read(8192)
if not chunk:
break
bytes_read += len(chunk)
# Show progress
if total_size:
progress = (bytes_read / total_size) * 100
print(f"Progress: {progress:.1f}% ({bytes_read:,}/{total_size:,} bytes)")Read data from the stream in configurable chunk sizes.
async def read(self, size=-1):
"""
Read data from the stream.
Parameters:
- size (int): Number of bytes to read. -1 reads all remaining data
Returns:
bytes: Data chunk read from stream, empty bytes when stream is exhausted
"""Usage Example:
async with Storage() as storage:
async with storage.download_stream('my-bucket', 'large-dataset.csv') as stream:
# Read in 64KB chunks
chunk_size = 65536
while True:
chunk = await stream.read(chunk_size)
if not chunk: # End of stream
break
# Process chunk (e.g., parse CSV rows)
process_data_chunk(chunk)
# Read all remaining data at once (not recommended for large files)
# remaining_data = await stream.read(-1)StreamResponse supports async context manager protocol for automatic resource cleanup.
async def __aenter__(self):
"""
Enter async context manager.
Returns:
StreamResponse: Self for use in context
"""
async def __aexit__(self, *exc_info):
"""
Exit async context manager, cleaning up resources.
Parameters:
- exc_info: Exception information if context exited due to exception
Returns:
None
"""Usage Example:
async with Storage() as storage:
# Recommended: automatic cleanup with context manager
async with storage.download_stream('my-bucket', 'file.dat') as stream:
while True:
chunk = await stream.read(8192)
if not chunk:
break
# Process chunk
# Stream automatically closed here
# Manual management (not recommended)
stream = storage.download_stream('my-bucket', 'file.dat')
try:
async with stream as stream_reader:
# Use stream_reader
pass
finally:
# Cleanup handled by context manager
passimport asyncio
from pathlib import Path
async def download_with_progress(storage, bucket_name, object_name, local_path):
"""Download large file with progress tracking."""
async with storage.download_stream(bucket_name, object_name) as stream:
total_size = stream.content_length
bytes_downloaded = 0
with open(local_path, 'wb') as f:
while True:
chunk = await stream.read(1024 * 1024) # 1MB chunks
if not chunk:
break
f.write(chunk)
bytes_downloaded += len(chunk)
if total_size:
progress = (bytes_downloaded / total_size) * 100
print(f"\rDownloading: {progress:.1f}% complete", end='', flush=True)
print(f"\nDownload complete: {bytes_downloaded:,} bytes")
# Usage
async with Storage() as storage:
await download_with_progress(
storage,
'my-bucket',
'large-video.mp4',
'/tmp/downloaded-video.mp4'
)import json
import asyncio
async def process_streaming_json_lines(storage, bucket_name, object_name):
"""Process JSON Lines file without loading it entirely into memory."""
async with storage.download_stream(bucket_name, object_name) as stream:
buffer = b''
while True:
chunk = await stream.read(8192)
if not chunk:
# Process any remaining data in buffer
if buffer.strip():
try:
record = json.loads(buffer.decode('utf-8'))
await process_record(record)
except json.JSONDecodeError:
print(f"Warning: Could not parse final record: {buffer}")
break
buffer += chunk
# Process complete lines
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
if line.strip(): # Skip empty lines
try:
record = json.loads(line.decode('utf-8'))
await process_record(record)
except json.JSONDecodeError as e:
print(f"Warning: Could not parse line: {line} ({e})")
async def process_record(record):
"""Process individual JSON record."""
# Your processing logic here
print(f"Processing: {record.get('id', 'unknown')}")
await asyncio.sleep(0.01) # Simulate processing time
# Usage
async with Storage() as storage:
await process_streaming_json_lines(storage, 'data-bucket', 'large-dataset.jsonl')import asyncio
from typing import List
async def parallel_stream_download(storage, bucket_name, object_names: List[str]):
"""Download multiple large files concurrently using streams."""
async def download_single(object_name):
local_path = f"/tmp/{object_name.replace('/', '_')}"
async with storage.download_stream(bucket_name, object_name) as stream:
with open(local_path, 'wb') as f:
bytes_written = 0
while True:
chunk = await stream.read(64 * 1024) # 64KB chunks
if not chunk:
break
f.write(chunk)
bytes_written += len(chunk)
print(f"Downloaded {object_name}: {bytes_written:,} bytes")
return local_path
# Download all files concurrently
tasks = [download_single(name) for name in object_names]
downloaded_paths = await asyncio.gather(*tasks)
return downloaded_paths
# Usage
async with Storage() as storage:
files_to_download = [
'datasets/2023/data-01.csv',
'datasets/2023/data-02.csv',
'datasets/2023/data-03.csv'
]
paths = await parallel_stream_download(storage, 'my-bucket', files_to_download)
print(f"All files downloaded to: {paths}")import gzip
import asyncio
async def transform_and_reupload(storage, source_bucket, source_object, dest_bucket, dest_object):
"""Transform data while streaming from source to destination."""
# Download stream from source
async with storage.download_stream(source_bucket, source_object) as source_stream:
# Collect transformed data in chunks
transformed_chunks = []
while True:
chunk = await source_stream.read(32 * 1024) # 32KB chunks
if not chunk:
break
# Transform data (example: convert to uppercase)
transformed_chunk = chunk.upper()
transformed_chunks.append(transformed_chunk)
# Optional: Process chunks in smaller batches to control memory
if len(transformed_chunks) >= 10: # Process every 10 chunks
batch_data = b''.join(transformed_chunks)
# Could upload partial results here for very large files
transformed_chunks = []
# Combine all transformed data
final_data = b''.join(transformed_chunks)
# Compress before uploading
compressed_data = gzip.compress(final_data)
# Upload transformed result
result = await storage.upload(
dest_bucket,
dest_object,
compressed_data,
content_type='application/gzip'
)
print(f"Transformed {len(final_data):,} bytes to {len(compressed_data):,} bytes")
return result
# Usage
async with Storage() as storage:
await transform_and_reupload(
storage,
'input-bucket', 'raw-data.txt',
'output-bucket', 'processed-data.txt.gz'
)# Recommended chunk sizes for different scenarios:
# Network-limited environments (slower connections)
chunk_size = 8 * 1024 # 8KB
# Balanced performance (most common)
chunk_size = 64 * 1024 # 64KB
# High-bandwidth, processing-intensive tasks
chunk_size = 1024 * 1024 # 1MB
# Memory-constrained environments
chunk_size = 4 * 1024 # 4KB
async with storage.download_stream('bucket', 'file') as stream:
while True:
chunk = await stream.read(chunk_size)
if not chunk:
break
# Process with appropriate chunk sizeasync with Storage() as storage:
# Always use context managers for automatic cleanup
async with storage.download_stream('bucket', 'large-file') as stream:
# Stream resources are automatically managed
while True:
chunk = await stream.read(8192)
if not chunk:
break
# Process chunk
# Resources automatically released hereInstall with Tessl CLI
npx tessl i tessl/pypi-gcloud-aio-storage