Python asyncio client library for Google Cloud Storage with full CRUD operations and streaming support
—
The Blob class represents individual objects stored in Google Cloud Storage, providing methods for content manipulation, metadata management, and secure access through signed URLs. Blob instances are created through Bucket operations and maintain references to their parent bucket and storage client.
Blob instances are typically created through Bucket methods like get_blob() or new_blob() rather than direct instantiation.
def __init__(self, bucket, name, metadata):
"""
Initialize a Blob instance.
Parameters:
- bucket (Bucket): Parent bucket instance
- name (str): Blob name/path
- metadata (Dict[str, Any]): Blob metadata from GCS
Attributes:
- bucket (Bucket): Reference to parent bucket
- name (str): Blob name/path
- size (int): Blob size in bytes (from metadata)
- chunk_size (int): Download chunk size for streaming
"""Usage Example:
async with Storage() as storage:
bucket = storage.get_bucket('my-bucket')
# Get existing blob with metadata
blob = await bucket.get_blob('existing-file.txt')
# Create new blob instance (without metadata)
new_blob = bucket.new_blob('new-file.txt')
# Access blob properties
print(f"Blob name: {blob.name}")
print(f"Blob size: {blob.size} bytes")
print(f"Parent bucket: {blob.bucket.name}")Download blob content with options for automatic decompression and different return formats.
async def download(self, timeout=10, session=None, auto_decompress=True):
"""
Download blob content.
Parameters:
- timeout (int): Request timeout in seconds
- session (aiohttp.ClientSession, optional): Custom session
- auto_decompress (bool): Automatically decompress gzip content
Returns:
Any: Blob content (typically bytes, but can be decompressed text)
"""Usage Example:
async with Storage() as storage:
bucket = storage.get_bucket('my-bucket')
blob = await bucket.get_blob('data.json')
# Download content as bytes
content = await blob.download()
# Download with custom timeout
content = await blob.download(timeout=30)
# Download without auto-decompression for gzipped content
raw_content = await blob.download(auto_decompress=False)
# Convert to string if needed
if isinstance(content, bytes):
text_content = content.decode('utf-8')Upload data to the blob with content type specification and metadata handling.
async def upload(self, data, content_type=None, session=None):
"""
Upload data to the blob.
Parameters:
- data (bytes or file-like): Data to upload
- content_type (str, optional): MIME type of the content
- session (aiohttp.ClientSession, optional): Custom session
Returns:
Dict[str, Any]: Upload response metadata
"""Usage Example:
async with Storage() as storage:
bucket = storage.get_bucket('my-bucket')
# Create new blob and upload data
blob = bucket.new_blob('new-document.pdf')
with open('/local/path/document.pdf', 'rb') as f:
pdf_data = f.read()
result = await blob.upload(pdf_data, content_type='application/pdf')
# Upload text content
text_blob = bucket.new_blob('notes.txt')
text_data = "Important notes here".encode('utf-8')
result = await text_blob.upload(text_data, content_type='text/plain')
# Upload JSON data
json_blob = bucket.new_blob('config.json')
import json
json_data = json.dumps({'setting': 'value'}).encode('utf-8')
result = await json_blob.upload(json_data, content_type='application/json')Generate temporary, secure URLs for accessing blob content without authentication credentials.
async def get_signed_url(self, expiration, headers=None, query_params=None, http_method='GET', iam_client=None, service_account_email=None, token=None, session=None):
"""
Generate a signed URL for the blob.
Parameters:
- expiration (int or datetime): URL expiration (seconds from now or datetime object)
- headers (dict, optional): Headers to include in the signed request
- query_params (dict, optional): Query parameters to include
- http_method (str): HTTP method the URL will be used with
- iam_client (IAMClient, optional): IAM client for signature generation
- service_account_email (str, optional): Service account for IAM signing
- token (Token, optional): Authentication token
- session (aiohttp.ClientSession, optional): Custom session
Returns:
str: Signed URL for blob access
"""Usage Example:
from datetime import datetime, timedelta
async with Storage() as storage:
bucket = storage.get_bucket('my-bucket')
blob = await bucket.get_blob('private-document.pdf')
# Generate URL valid for 1 hour (3600 seconds)
url = await blob.get_signed_url(3600)
print(f"Temporary URL: {url}")
# Generate URL with specific expiration datetime
expiry = datetime.now() + timedelta(hours=2)
url = await blob.get_signed_url(expiry)
# Generate URL for POST uploads with custom headers
headers = {'Content-Type': 'application/pdf'}
query_params = {'uploadType': 'media'}
upload_url = await blob.get_signed_url(
3600,
headers=headers,
query_params=query_params,
http_method='POST'
)
# Generate URL using IAM service account
url = await blob.get_signed_url(
3600,
service_account_email='my-service@project.iam.gserviceaccount.com'
)Static methods for generating cryptographic signatures used in signed URL creation.
@staticmethod
def get_pem_signature(str_to_sign, private_key):
"""
Generate PEM-based signature for string signing.
Parameters:
- str_to_sign (str): String to be signed
- private_key (str): PEM-formatted private key
Returns:
bytes: Cryptographic signature
"""
@staticmethod
async def get_iam_api_signature(str_to_sign, iam_client, service_account_email, session):
"""
Generate signature using Google IAM API.
Parameters:
- str_to_sign (str): String to be signed
- iam_client (IAMClient): IAM client instance
- service_account_email (str): Service account email
- session (aiohttp.ClientSession): HTTP session
Returns:
bytes: Cryptographic signature from IAM API
"""Usage Example:
# These are typically used internally by get_signed_url()
# but can be used directly for custom signing scenarios
# Using PEM key directly
private_key_pem = """-----BEGIN PRIVATE KEY-----
MIIEvQ...
-----END PRIVATE KEY-----"""
signature = Blob.get_pem_signature("string to sign", private_key_pem)
# Using IAM API for signing (requires IAM client setup)
from gcloud.aio.auth import IAMClient
async with IAMClient() as iam_client:
signature = await Blob.get_iam_api_signature(
"string to sign",
iam_client,
"service-account@project.iam.gserviceaccount.com",
session
)async with Storage() as storage:
bucket = storage.get_bucket('processing-bucket')
# Get input blob
input_blob = await bucket.get_blob('input/data.csv')
csv_content = await input_blob.download()
# Process data
processed_data = process_csv(csv_content)
# Upload processed result
output_blob = bucket.new_blob('output/processed-data.json')
json_data = json.dumps(processed_data).encode('utf-8')
await output_blob.upload(json_data, content_type='application/json')async with Storage() as storage:
bucket = storage.get_bucket('shared-files')
blob = await bucket.get_blob('confidential-report.pdf')
# Generate short-lived URL for sharing
secure_url = await blob.get_signed_url(
3600, # 1 hour expiration
http_method='GET'
)
# Send URL to authorized user
send_secure_link(user_email, secure_url)async with Storage() as storage:
source_bucket = storage.get_bucket('live-data')
archive_bucket = storage.get_bucket('archive')
# Get blob from live bucket
live_blob = await source_bucket.get_blob('important-data.db')
content = await live_blob.download()
# Create archived copy with timestamp
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
archive_blob = archive_bucket.new_blob(f'backups/data_{timestamp}.db')
await archive_blob.upload(content, content_type='application/octet-stream')Install with Tessl CLI
npx tessl i tessl/pypi-gcloud-aio-storage