Microsoft Azure Blob Storage Client Library for Python providing comprehensive APIs for blob storage operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Asynchronous versions of all client classes for high-performance, concurrent operations. The async clients provide identical APIs with async/await patterns, enabling scalable applications that can handle many concurrent storage operations efficiently.
All client classes have async equivalents in the azure.storage.blob.aio module with identical method signatures but using async/await patterns.
# Async imports
from azure.storage.blob.aio import BlobServiceClient, ContainerClient, BlobClient, BlobLeaseClient
# All async classes support the same methods as sync versions
class BlobServiceClient:
"""Async version of BlobServiceClient."""
async def __aenter__(self) -> 'BlobServiceClient': ...
async def __aexit__(self, *args) -> None: ...
# All sync methods are available as async methods
class ContainerClient:
"""Async version of ContainerClient."""
async def __aenter__(self) -> 'ContainerClient': ...
async def __aexit__(self, *args) -> None: ...
# All sync methods are available as async methods
class BlobClient:
"""Async version of BlobClient."""
async def __aenter__(self) -> 'BlobClient': ...
async def __aexit__(self, *args) -> None: ...
# All sync methods are available as async methods
class BlobLeaseClient:
"""Async version of BlobLeaseClient."""
# All sync methods are available as async methodsAll async clients support context management for automatic resource cleanup.
# Context manager support
async with BlobServiceClient(account_url, credential) as service_client:
# Client is automatically closed when exiting context
async for container in service_client.list_containers():
print(container.name)Account-level operations with async support for high-throughput scenarios.
class BlobServiceClient:
# Client creation and authentication
async def __aenter__(self) -> 'BlobServiceClient': ...
async def __aexit__(self, *args) -> None: ...
# Service configuration (async versions of sync methods)
async def get_account_information(self, **kwargs) -> dict: ...
async def get_service_properties(self, **kwargs) -> dict: ...
async def set_service_properties(self, analytics_logging=None, hour_metrics=None, minute_metrics=None, cors=None, target_version=None, delete_retention_policy=None, static_website=None, **kwargs) -> None: ...
async def get_service_stats(self, **kwargs) -> dict: ...
async def get_user_delegation_key(self, key_start_time, key_expiry_time, **kwargs) -> 'UserDelegationKey': ...
# Container management
def list_containers(self, name_starts_with=None, include_metadata=False, **kwargs) -> AsyncItemPaged[ContainerProperties]: ...
async def create_container(self, name: str, metadata=None, public_access=None, **kwargs) -> ContainerClient: ...
async def delete_container(self, container: str, **kwargs) -> None: ...
async def undelete_container(self, deleted_container_name: str, deleted_container_version: str, **kwargs) -> ContainerClient: ...
# Cross-container queries
def find_blobs_by_tags(self, filter_expression: str, **kwargs) -> AsyncItemPaged['FilteredBlob']: ...
# Client factory methods
def get_container_client(self, container: str) -> 'ContainerClient': ...
def get_blob_client(self, container: str, blob: str, snapshot=None) -> 'BlobClient': ...Container-level operations with async support for concurrent blob management.
class ContainerClient:
# Context management
async def __aenter__(self) -> 'ContainerClient': ...
async def __aexit__(self, *args) -> None: ...
# Container lifecycle
async def create_container(self, metadata=None, public_access=None, **kwargs) -> dict: ...
async def delete_container(self, **kwargs) -> None: ...
async def exists(self, **kwargs) -> bool: ...
# Properties and metadata
async def get_container_properties(self, **kwargs) -> ContainerProperties: ...
async def set_container_metadata(self, metadata=None, **kwargs) -> dict: ...
async def get_container_access_policy(self, **kwargs) -> dict: ...
async def set_container_access_policy(self, signed_identifiers=None, public_access=None, **kwargs) -> dict: ...
# Leasing
async def acquire_lease(self, lease_duration=-1, lease_id=None, **kwargs) -> BlobLeaseClient: ...
# Blob listing (returns async iterators)
def list_blobs(self, name_starts_with=None, include=None, **kwargs) -> AsyncItemPaged[BlobProperties]: ...
def list_blob_names(self, **kwargs) -> AsyncItemPaged[str]: ...
def walk_blobs(self, name_starts_with=None, include=None, delimiter='/', **kwargs) -> AsyncItemPaged[Union[BlobProperties, BlobPrefix]]: ...
def find_blobs_by_tags(self, filter_expression: str, **kwargs) -> AsyncItemPaged['FilteredBlob']: ...
# Blob operations
async def upload_blob(self, name: str, data, blob_type='BlockBlob', **kwargs) -> BlobClient: ...
async def download_blob(self, blob: str, offset=None, length=None, **kwargs) -> StorageStreamDownloader: ...
async def delete_blob(self, blob: str, delete_snapshots=None, **kwargs) -> None: ...
# Batch operations (return async iterators)
def delete_blobs(self, *blobs, **kwargs) -> AsyncIterator[HttpResponse]: ...
def set_standard_blob_tier_blobs(self, *blobs, **kwargs) -> AsyncIterator[HttpResponse]: ...
def set_premium_page_blob_tier_blobs(self, *blobs, **kwargs) -> AsyncIterator[HttpResponse]: ...
# Client factory and account info
def get_blob_client(self, blob: str, snapshot=None) -> 'BlobClient': ...
async def get_account_information(self, **kwargs) -> dict: ...Individual blob operations with async support for high-throughput data transfer.
class BlobClient:
# Context management
async def __aenter__(self) -> 'BlobClient': ...
async def __aexit__(self, *args) -> None: ...
# Basic operations
async def upload_blob(self, data, blob_type='BlockBlob', **kwargs) -> dict: ...
async def download_blob(self, offset=None, length=None, **kwargs) -> StorageStreamDownloader: ...
async def delete_blob(self, delete_snapshots=None, **kwargs) -> None: ...
async def exists(self, **kwargs) -> bool: ...
async def undelete_blob(self, **kwargs) -> None: ...
# Properties and metadata
async def get_blob_properties(self, **kwargs) -> BlobProperties: ...
async def set_blob_metadata(self, metadata=None, **kwargs) -> dict: ...
async def set_http_headers(self, content_settings=None, **kwargs) -> dict: ...
async def set_blob_tags(self, tags=None, **kwargs) -> dict: ...
async def get_blob_tags(self, **kwargs) -> dict: ...
# Snapshots and versioning
async def create_snapshot(self, **kwargs) -> dict: ...
# Copy operations
async def start_copy_from_url(self, source_url: str, **kwargs) -> dict: ...
async def abort_copy(self, copy_id: str, **kwargs) -> None: ...
# Leasing
async def acquire_lease(self, lease_duration=-1, lease_id=None, **kwargs) -> BlobLeaseClient: ...
# Tier management
async def set_standard_blob_tier(self, standard_blob_tier, **kwargs) -> None: ...
async def set_premium_page_blob_tier(self, premium_page_blob_tier, **kwargs) -> None: ...
# Block blob operations
async def stage_block(self, block_id: str, data, **kwargs) -> None: ...
async def stage_block_from_url(self, block_id: str, source_url: str, **kwargs) -> None: ...
async def get_block_list(self, **kwargs) -> BlockList: ...
async def commit_block_list(self, block_list, **kwargs) -> dict: ...
# Page blob operations
async def create_page_blob(self, size: int, **kwargs) -> dict: ...
async def upload_page(self, page, offset: int, **kwargs) -> dict: ...
async def upload_pages_from_url(self, source_url: str, offset: int, source_offset: int, **kwargs) -> dict: ...
async def clear_page(self, offset: int, length: int, **kwargs) -> dict: ...
async def get_page_ranges(self, **kwargs) -> PageRanges: ...
async def resize_blob(self, size: int, **kwargs) -> dict: ...
async def set_sequence_number(self, sequence_number_action, sequence_number=None, **kwargs) -> dict: ...
# Append blob operations
async def create_append_blob(self, **kwargs) -> dict: ...
async def append_block(self, data, **kwargs) -> dict: ...
async def append_block_from_url(self, copy_source_url: str, **kwargs) -> dict: ...
async def seal_append_blob(self, **kwargs) -> dict: ...
# Query operations
async def query_blob(self, query_expression: str, **kwargs) -> BlobQueryReader: ...
# Immutability and legal hold
async def set_immutability_policy(self, **kwargs) -> dict: ...
async def delete_immutability_policy(self, **kwargs) -> dict: ...
async def set_legal_hold(self, legal_hold: bool, **kwargs) -> dict: ...
# Account information
async def get_account_information(self, **kwargs) -> dict: ...Lease management with async support for concurrent lease operations.
class BlobLeaseClient:
# All lease operations as async methods
async def acquire(self, lease_duration=-1, **kwargs) -> None: ...
async def renew(self, **kwargs) -> None: ...
async def release(self, **kwargs) -> None: ...
async def change(self, proposed_lease_id: str, **kwargs) -> None: ...
async def break_lease(self, lease_break_period=None, **kwargs) -> int: ...Convenient async helper functions for common operations.
async def upload_blob_to_url(blob_url: str, data, credential=None, **kwargs) -> dict:
"""
Async upload data to a blob URL.
Args:
blob_url (str): Complete URL to the blob
data: Data to upload
credential: Optional credential for authentication
Returns:
dict: Upload response with ETag and last modified time
"""
async def download_blob_from_url(blob_url: str, output, credential=None, **kwargs) -> None:
"""
Async download blob from URL to file or stream.
Args:
blob_url (str): Complete URL to the blob
output: File path or stream to write to
credential: Optional credential for authentication
"""import asyncio
from azure.storage.blob.aio import BlobServiceClient
async def basic_operations():
# Create async service client
async with BlobServiceClient.from_connection_string(conn_str) as service_client:
# List containers asynchronously
async for container in service_client.list_containers():
print(f"Container: {container.name}")
# Create container
container_client = await service_client.create_container("my-async-container")
# Upload blob
async with container_client.get_blob_client("test.txt") as blob_client:
await blob_client.upload_blob("Hello, async world!")
# Download blob
downloader = await blob_client.download_blob()
content = await downloader.readall()
print(content.decode())
asyncio.run(basic_operations())import asyncio
from azure.storage.blob.aio import BlobServiceClient
async def concurrent_uploads():
async with BlobServiceClient.from_connection_string(conn_str) as service_client:
container_client = service_client.get_container_client("my-container")
# Upload multiple blobs concurrently
async def upload_blob(name, data):
blob_client = container_client.get_blob_client(name)
return await blob_client.upload_blob(data, overwrite=True)
# Create multiple upload tasks
tasks = [
upload_blob(f"file{i}.txt", f"Content of file {i}")
for i in range(10)
]
# Execute all uploads concurrently
results = await asyncio.gather(*tasks)
print(f"Uploaded {len(results)} blobs concurrently")
asyncio.run(concurrent_uploads())async def process_all_blobs():
async with BlobServiceClient.from_connection_string(conn_str) as service_client:
# Process all containers
async for container in service_client.list_containers():
print(f"Processing container: {container.name}")
container_client = service_client.get_container_client(container.name)
# Process all blobs in container
async for blob in container_client.list_blobs():
print(f" Blob: {blob.name}, Size: {blob.size}")
# Download and process blob content if needed
if blob.size < 1024: # Small blobs only
blob_client = container_client.get_blob_client(blob.name)
downloader = await blob_client.download_blob()
content = await downloader.readall()
# Process content...
asyncio.run(process_all_blobs())async def batch_operations():
async with BlobServiceClient.from_connection_string(conn_str) as service_client:
container_client = service_client.get_container_client("my-container")
# Batch delete with async iteration
blobs_to_delete = ["file1.txt", "file2.txt", "file3.txt"]
async for response in container_client.delete_blobs(*blobs_to_delete):
if response.status_code == 202:
print("Blob deleted successfully")
else:
print(f"Delete failed: {response.status_code}")
# Batch tier change with async iteration
tier_changes = [
("large-file1.dat", StandardBlobTier.COOL),
("large-file2.dat", StandardBlobTier.COOL),
("old-backup.zip", StandardBlobTier.ARCHIVE)
]
async for response in container_client.set_standard_blob_tier_blobs(*tier_changes):
print(f"Tier change status: {response.status_code}")
asyncio.run(batch_operations())from azure.core.exceptions import ResourceNotFoundError, HttpResponseError
async def async_error_handling():
async with BlobServiceClient.from_connection_string(conn_str) as service_client:
try:
# Attempt operation that might fail
blob_client = service_client.get_blob_client("nonexistent-container", "test.txt")
await blob_client.upload_blob("test data")
except ResourceNotFoundError:
print("Container does not exist")
# Create container and retry
container_client = service_client.get_container_client("nonexistent-container")
await container_client.create_container()
await blob_client.upload_blob("test data")
except HttpResponseError as e:
print(f"HTTP error occurred: {e.status_code} - {e.message}")
except Exception as e:
print(f"Unexpected error: {e}")
asyncio.run(async_error_handling())import asyncio
from azure.storage.blob.aio import BlobServiceClient
# Limit concurrent operations to avoid overwhelming the service
async def controlled_concurrency():
semaphore = asyncio.Semaphore(10) # Max 10 concurrent operations
async def upload_with_semaphore(container_client, name, data):
async with semaphore:
blob_client = container_client.get_blob_client(name)
return await blob_client.upload_blob(data, overwrite=True)
async with BlobServiceClient.from_connection_string(conn_str) as service_client:
container_client = service_client.get_container_client("my-container")
# Create many upload tasks with concurrency control
tasks = [
upload_with_semaphore(container_client, f"file{i}.txt", f"Content {i}")
for i in range(100)
]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} uploads with controlled concurrency")
asyncio.run(controlled_concurrency())# Recommended: Use async context managers for automatic cleanup
async def recommended_pattern():
async with BlobServiceClient.from_connection_string(conn_str) as service_client:
async with service_client.get_container_client("my-container") as container_client:
async with container_client.get_blob_client("test.txt") as blob_client:
await blob_client.upload_blob("data")
# All clients automatically closed when exiting context
# Also supported: Manual client management
async def manual_pattern():
service_client = BlobServiceClient.from_connection_string(conn_str)
try:
container_client = service_client.get_container_client("my-container")
blob_client = container_client.get_blob_client("test.txt")
await blob_client.upload_blob("data")
finally:
await service_client.close() # Manual cleanup required
asyncio.run(recommended_pattern())Install with Tessl CLI
npx tessl i tessl/pypi-azure-storage-blob