CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-storage-blob

Microsoft Azure Blob Storage Client Library for Python providing comprehensive APIs for blob storage operations.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-operations.mddocs/

Async Operations

Asynchronous versions of all client classes for high-performance, concurrent operations. The async clients provide identical APIs with async/await patterns, enabling scalable applications that can handle many concurrent storage operations efficiently.

Capabilities

Async Client Classes

All client classes have async equivalents in the azure.storage.blob.aio module with identical method signatures but using async/await patterns.

# Async imports
from azure.storage.blob.aio import BlobServiceClient, ContainerClient, BlobClient, BlobLeaseClient

# All async classes support the same methods as sync versions
class BlobServiceClient:
    """Async version of BlobServiceClient."""
    async def __aenter__(self) -> 'BlobServiceClient': ...
    async def __aexit__(self, *args) -> None: ...
    # All sync methods are available as async methods

class ContainerClient:
    """Async version of ContainerClient."""
    async def __aenter__(self) -> 'ContainerClient': ...
    async def __aexit__(self, *args) -> None: ...
    # All sync methods are available as async methods

class BlobClient:
    """Async version of BlobClient."""
    async def __aenter__(self) -> 'BlobClient': ...
    async def __aexit__(self, *args) -> None: ...
    # All sync methods are available as async methods

class BlobLeaseClient:
    """Async version of BlobLeaseClient."""
    # All sync methods are available as async methods

Async Context Management

All async clients support context management for automatic resource cleanup.

# Context manager support
async with BlobServiceClient(account_url, credential) as service_client:
    # Client is automatically closed when exiting context
    async for container in service_client.list_containers():
        print(container.name)

Async Service Client Operations

Account-level operations with async support for high-throughput scenarios.

class BlobServiceClient:
    # Client creation and authentication
    async def __aenter__(self) -> 'BlobServiceClient': ...
    async def __aexit__(self, *args) -> None: ...
    
    # Service configuration (async versions of sync methods)
    async def get_account_information(self, **kwargs) -> dict: ...
    async def get_service_properties(self, **kwargs) -> dict: ...
    async def set_service_properties(self, analytics_logging=None, hour_metrics=None, minute_metrics=None, cors=None, target_version=None, delete_retention_policy=None, static_website=None, **kwargs) -> None: ...
    async def get_service_stats(self, **kwargs) -> dict: ...
    async def get_user_delegation_key(self, key_start_time, key_expiry_time, **kwargs) -> 'UserDelegationKey': ...
    
    # Container management  
    def list_containers(self, name_starts_with=None, include_metadata=False, **kwargs) -> AsyncItemPaged[ContainerProperties]: ...
    async def create_container(self, name: str, metadata=None, public_access=None, **kwargs) -> ContainerClient: ...
    async def delete_container(self, container: str, **kwargs) -> None: ...
    async def undelete_container(self, deleted_container_name: str, deleted_container_version: str, **kwargs) -> ContainerClient: ...
    
    # Cross-container queries
    def find_blobs_by_tags(self, filter_expression: str, **kwargs) -> AsyncItemPaged['FilteredBlob']: ...
    
    # Client factory methods
    def get_container_client(self, container: str) -> 'ContainerClient': ...
    def get_blob_client(self, container: str, blob: str, snapshot=None) -> 'BlobClient': ...

Async Container Operations

Container-level operations with async support for concurrent blob management.

class ContainerClient:
    # Context management
    async def __aenter__(self) -> 'ContainerClient': ...
    async def __aexit__(self, *args) -> None: ...
    
    # Container lifecycle
    async def create_container(self, metadata=None, public_access=None, **kwargs) -> dict: ...
    async def delete_container(self, **kwargs) -> None: ...
    async def exists(self, **kwargs) -> bool: ...
    
    # Properties and metadata
    async def get_container_properties(self, **kwargs) -> ContainerProperties: ...
    async def set_container_metadata(self, metadata=None, **kwargs) -> dict: ...
    async def get_container_access_policy(self, **kwargs) -> dict: ...
    async def set_container_access_policy(self, signed_identifiers=None, public_access=None, **kwargs) -> dict: ...
    
    # Leasing
    async def acquire_lease(self, lease_duration=-1, lease_id=None, **kwargs) -> BlobLeaseClient: ...
    
    # Blob listing (returns async iterators)
    def list_blobs(self, name_starts_with=None, include=None, **kwargs) -> AsyncItemPaged[BlobProperties]: ...
    def list_blob_names(self, **kwargs) -> AsyncItemPaged[str]: ...
    def walk_blobs(self, name_starts_with=None, include=None, delimiter='/', **kwargs) -> AsyncItemPaged[Union[BlobProperties, BlobPrefix]]: ...
    def find_blobs_by_tags(self, filter_expression: str, **kwargs) -> AsyncItemPaged['FilteredBlob']: ...
    
    # Blob operations
    async def upload_blob(self, name: str, data, blob_type='BlockBlob', **kwargs) -> BlobClient: ...
    async def download_blob(self, blob: str, offset=None, length=None, **kwargs) -> StorageStreamDownloader: ...
    async def delete_blob(self, blob: str, delete_snapshots=None, **kwargs) -> None: ...
    
    # Batch operations (return async iterators)
    def delete_blobs(self, *blobs, **kwargs) -> AsyncIterator[HttpResponse]: ...
    def set_standard_blob_tier_blobs(self, *blobs, **kwargs) -> AsyncIterator[HttpResponse]: ...
    def set_premium_page_blob_tier_blobs(self, *blobs, **kwargs) -> AsyncIterator[HttpResponse]: ...
    
    # Client factory and account info
    def get_blob_client(self, blob: str, snapshot=None) -> 'BlobClient': ...
    async def get_account_information(self, **kwargs) -> dict: ...

Async Blob Operations

Individual blob operations with async support for high-throughput data transfer.

class BlobClient:
    # Context management
    async def __aenter__(self) -> 'BlobClient': ...
    async def __aexit__(self, *args) -> None: ...
    
    # Basic operations
    async def upload_blob(self, data, blob_type='BlockBlob', **kwargs) -> dict: ...
    async def download_blob(self, offset=None, length=None, **kwargs) -> StorageStreamDownloader: ...
    async def delete_blob(self, delete_snapshots=None, **kwargs) -> None: ...
    async def exists(self, **kwargs) -> bool: ...
    async def undelete_blob(self, **kwargs) -> None: ...
    
    # Properties and metadata
    async def get_blob_properties(self, **kwargs) -> BlobProperties: ...
    async def set_blob_metadata(self, metadata=None, **kwargs) -> dict: ...
    async def set_http_headers(self, content_settings=None, **kwargs) -> dict: ...
    async def set_blob_tags(self, tags=None, **kwargs) -> dict: ...
    async def get_blob_tags(self, **kwargs) -> dict: ...
    
    # Snapshots and versioning
    async def create_snapshot(self, **kwargs) -> dict: ...
    
    # Copy operations
    async def start_copy_from_url(self, source_url: str, **kwargs) -> dict: ...
    async def abort_copy(self, copy_id: str, **kwargs) -> None: ...
    
    # Leasing
    async def acquire_lease(self, lease_duration=-1, lease_id=None, **kwargs) -> BlobLeaseClient: ...
    
    # Tier management
    async def set_standard_blob_tier(self, standard_blob_tier, **kwargs) -> None: ...
    async def set_premium_page_blob_tier(self, premium_page_blob_tier, **kwargs) -> None: ...
    
    # Block blob operations
    async def stage_block(self, block_id: str, data, **kwargs) -> None: ...
    async def stage_block_from_url(self, block_id: str, source_url: str, **kwargs) -> None: ...
    async def get_block_list(self, **kwargs) -> BlockList: ...
    async def commit_block_list(self, block_list, **kwargs) -> dict: ...
    
    # Page blob operations
    async def create_page_blob(self, size: int, **kwargs) -> dict: ...
    async def upload_page(self, page, offset: int, **kwargs) -> dict: ...
    async def upload_pages_from_url(self, source_url: str, offset: int, source_offset: int, **kwargs) -> dict: ...
    async def clear_page(self, offset: int, length: int, **kwargs) -> dict: ...
    async def get_page_ranges(self, **kwargs) -> PageRanges: ...
    async def resize_blob(self, size: int, **kwargs) -> dict: ...
    async def set_sequence_number(self, sequence_number_action, sequence_number=None, **kwargs) -> dict: ...
    
    # Append blob operations
    async def create_append_blob(self, **kwargs) -> dict: ...
    async def append_block(self, data, **kwargs) -> dict: ...
    async def append_block_from_url(self, copy_source_url: str, **kwargs) -> dict: ...
    async def seal_append_blob(self, **kwargs) -> dict: ...
    
    # Query operations
    async def query_blob(self, query_expression: str, **kwargs) -> BlobQueryReader: ...
    
    # Immutability and legal hold
    async def set_immutability_policy(self, **kwargs) -> dict: ...
    async def delete_immutability_policy(self, **kwargs) -> dict: ...
    async def set_legal_hold(self, legal_hold: bool, **kwargs) -> dict: ...
    
    # Account information
    async def get_account_information(self, **kwargs) -> dict: ...

Async Lease Operations

Lease management with async support for concurrent lease operations.

class BlobLeaseClient:
    # All lease operations as async methods
    async def acquire(self, lease_duration=-1, **kwargs) -> None: ...
    async def renew(self, **kwargs) -> None: ...
    async def release(self, **kwargs) -> None: ...
    async def change(self, proposed_lease_id: str, **kwargs) -> None: ...
    async def break_lease(self, lease_break_period=None, **kwargs) -> int: ...

Async Utility Functions

Convenient async helper functions for common operations.

async def upload_blob_to_url(blob_url: str, data, credential=None, **kwargs) -> dict:
    """
    Async upload data to a blob URL.
    
    Args:
        blob_url (str): Complete URL to the blob
        data: Data to upload
        credential: Optional credential for authentication
        
    Returns:
        dict: Upload response with ETag and last modified time
    """

async def download_blob_from_url(blob_url: str, output, credential=None, **kwargs) -> None:
    """
    Async download blob from URL to file or stream.
    
    Args:
        blob_url (str): Complete URL to the blob
        output: File path or stream to write to
        credential: Optional credential for authentication
    """

Async Usage Patterns

Basic Async Operations

import asyncio
from azure.storage.blob.aio import BlobServiceClient

async def basic_operations():
    # Create async service client
    async with BlobServiceClient.from_connection_string(conn_str) as service_client:
        # List containers asynchronously  
        async for container in service_client.list_containers():
            print(f"Container: {container.name}")
        
        # Create container
        container_client = await service_client.create_container("my-async-container")
        
        # Upload blob
        async with container_client.get_blob_client("test.txt") as blob_client:
            await blob_client.upload_blob("Hello, async world!")
            
            # Download blob
            downloader = await blob_client.download_blob()
            content = await downloader.readall()
            print(content.decode())

asyncio.run(basic_operations())

Concurrent Operations

import asyncio
from azure.storage.blob.aio import BlobServiceClient

async def concurrent_uploads():
    async with BlobServiceClient.from_connection_string(conn_str) as service_client:
        container_client = service_client.get_container_client("my-container")
        
        # Upload multiple blobs concurrently
        async def upload_blob(name, data):
            blob_client = container_client.get_blob_client(name)
            return await blob_client.upload_blob(data, overwrite=True)
        
        # Create multiple upload tasks
        tasks = [
            upload_blob(f"file{i}.txt", f"Content of file {i}")
            for i in range(10)
        ]
        
        # Execute all uploads concurrently
        results = await asyncio.gather(*tasks)
        print(f"Uploaded {len(results)} blobs concurrently")

asyncio.run(concurrent_uploads())

Async Iteration

async def process_all_blobs():
    async with BlobServiceClient.from_connection_string(conn_str) as service_client:
        # Process all containers
        async for container in service_client.list_containers():
            print(f"Processing container: {container.name}")
            
            container_client = service_client.get_container_client(container.name)
            
            # Process all blobs in container
            async for blob in container_client.list_blobs():
                print(f"  Blob: {blob.name}, Size: {blob.size}")
                
                # Download and process blob content if needed
                if blob.size < 1024:  # Small blobs only
                    blob_client = container_client.get_blob_client(blob.name)
                    downloader = await blob_client.download_blob()
                    content = await downloader.readall()
                    # Process content...

asyncio.run(process_all_blobs())

Async Batch Operations

async def batch_operations():
    async with BlobServiceClient.from_connection_string(conn_str) as service_client:
        container_client = service_client.get_container_client("my-container")
        
        # Batch delete with async iteration
        blobs_to_delete = ["file1.txt", "file2.txt", "file3.txt"]
        async for response in container_client.delete_blobs(*blobs_to_delete):
            if response.status_code == 202:
                print("Blob deleted successfully")
            else:
                print(f"Delete failed: {response.status_code}")
        
        # Batch tier change with async iteration
        tier_changes = [
            ("large-file1.dat", StandardBlobTier.COOL),
            ("large-file2.dat", StandardBlobTier.COOL),
            ("old-backup.zip", StandardBlobTier.ARCHIVE)
        ]
        async for response in container_client.set_standard_blob_tier_blobs(*tier_changes):
            print(f"Tier change status: {response.status_code}")

asyncio.run(batch_operations())

Error Handling in Async Operations

from azure.core.exceptions import ResourceNotFoundError, HttpResponseError

async def async_error_handling():
    async with BlobServiceClient.from_connection_string(conn_str) as service_client:
        try:
            # Attempt operation that might fail
            blob_client = service_client.get_blob_client("nonexistent-container", "test.txt")
            await blob_client.upload_blob("test data")
            
        except ResourceNotFoundError:
            print("Container does not exist")
            # Create container and retry
            container_client = service_client.get_container_client("nonexistent-container")
            await container_client.create_container()
            await blob_client.upload_blob("test data")
            
        except HttpResponseError as e:
            print(f"HTTP error occurred: {e.status_code} - {e.message}")
            
        except Exception as e:
            print(f"Unexpected error: {e}")

asyncio.run(async_error_handling())

Performance Considerations

Concurrency Control

import asyncio
from azure.storage.blob.aio import BlobServiceClient

# Limit concurrent operations to avoid overwhelming the service
async def controlled_concurrency():
    semaphore = asyncio.Semaphore(10)  # Max 10 concurrent operations
    
    async def upload_with_semaphore(container_client, name, data):
        async with semaphore:
            blob_client = container_client.get_blob_client(name)
            return await blob_client.upload_blob(data, overwrite=True)
    
    async with BlobServiceClient.from_connection_string(conn_str) as service_client:
        container_client = service_client.get_container_client("my-container")
        
        # Create many upload tasks with concurrency control
        tasks = [
            upload_with_semaphore(container_client, f"file{i}.txt", f"Content {i}")
            for i in range(100)
        ]
        
        results = await asyncio.gather(*tasks)
        print(f"Completed {len(results)} uploads with controlled concurrency")

asyncio.run(controlled_concurrency())

Async Context Management Best Practices

# Recommended: Use async context managers for automatic cleanup
async def recommended_pattern():
    async with BlobServiceClient.from_connection_string(conn_str) as service_client:
        async with service_client.get_container_client("my-container") as container_client:
            async with container_client.get_blob_client("test.txt") as blob_client:
                await blob_client.upload_blob("data")
                # All clients automatically closed when exiting context

# Also supported: Manual client management
async def manual_pattern():
    service_client = BlobServiceClient.from_connection_string(conn_str)
    try:
        container_client = service_client.get_container_client("my-container")
        blob_client = container_client.get_blob_client("test.txt")
        await blob_client.upload_blob("data")
    finally:
        await service_client.close()  # Manual cleanup required

asyncio.run(recommended_pattern())

Install with Tessl CLI

npx tessl i tessl/pypi-azure-storage-blob

docs

async-operations.md

blob-client.md

blob-types-tiers.md

container-client.md

index.md

sas-generation.md

service-client.md

utility-functions.md

tile.json