Microsoft Azure Blob Storage Client Library for Python providing comprehensive APIs for blob storage operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Individual blob operations including upload, download, properties management, and blob-type specific operations. The BlobClient provides comprehensive blob management for all Azure Blob Storage blob types (Block, Page, and Append blobs).
Create BlobClient instances with various authentication methods and from different sources including URLs and connection strings.
class BlobClient:
def __init__(self, account_url: str, container_name: str, blob_name: str, snapshot: Optional[Union[str, Dict[str, Any]]] = None, credential: Optional[Union[str, Dict[str, str], AzureNamedKeyCredential, AzureSasCredential, TokenCredential]] = None, **kwargs):
"""
Create a BlobClient.
Args:
account_url (str): The URL to the storage account
container_name (str): Name of the container
blob_name (str): Name of the blob
snapshot (Optional[Union[str, Dict[str, Any]]]): Snapshot identifier for blob snapshot or snapshot properties dict
credential (Optional[Union[str, Dict[str, str], AzureNamedKeyCredential, AzureSasCredential, TokenCredential]]): The credentials for authentication. Can be:
- str: Account key or SAS token string
- dict: Account name and key mapping
- AzureNamedKeyCredential: Named key credential
- AzureSasCredential: SAS credential
- TokenCredential: Azure AD token credential
- None: For anonymous access or SAS URLs
"""
@classmethod
def from_blob_url(cls, blob_url: str, credential=None, **kwargs) -> 'BlobClient':
"""
Create a BlobClient from a blob URL.
Args:
blob_url (str): Complete URL to the blob
credential: Optional credential for authentication
Returns:
BlobClient: Configured client instance
"""
@classmethod
def from_connection_string(cls, conn_str: str, container_name: str, blob_name: str, **kwargs) -> 'BlobClient':
"""
Create a BlobClient from a connection string.
Args:
conn_str (str): Azure Storage connection string
container_name (str): Name of the container
blob_name (str): Name of the blob
Returns:
BlobClient: Configured client instance
"""Core blob operations including upload, download, delete, and existence checking.
def upload_blob(self, data: Union[bytes, str, Iterable[AnyStr], IO[bytes]], blob_type: Union[str, BlobType] = BlobType.BLOCKBLOB, length: Optional[int] = None, metadata: Optional[Dict[str, str]] = None, **kwargs) -> Dict[str, Any]:
"""
Upload data to create or replace a blob.
Args:
data: Data to upload (bytes, str, or file-like object)
blob_type (BlobType): Type of blob to create
Keyword Args:
length (int, optional): Number of bytes to upload
metadata (dict, optional): Blob metadata
content_type (str, optional): MIME content type
content_encoding (str, optional): Content encoding
content_language (str, optional): Content language
content_disposition (str, optional): Content disposition
cache_control (str, optional): Cache control header
content_md5 (bytes, optional): MD5 hash of content
validate_content (bool): Validate content integrity during upload
max_concurrency (int): Maximum concurrent uploads for large blobs
overwrite (bool): Whether to overwrite existing blob (default True)
standard_blob_tier (StandardBlobTier, optional): Access tier for standard storage
premium_page_blob_tier (PremiumPageBlobTier, optional): Tier for premium page blobs
tags (dict, optional): Blob index tags as key-value pairs
encryption_scope (str, optional): Encryption scope for blob
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
Returns:
dict: Upload response with ETag, last modified time, and other properties
"""
def download_blob(self, offset=None, length=None, **kwargs) -> StorageStreamDownloader:
"""
Download blob content.
Args:
offset (int, optional): Start byte position for partial download
length (int, optional): Number of bytes to download
Keyword Args:
version_id (str, optional): Blob version to download
validate_content (bool): Validate content integrity during download
max_concurrency (int): Maximum concurrent downloads for large blobs
encoding (str, optional): Text encoding for string content
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
Returns:
StorageStreamDownloader: Streaming downloader with content and properties
"""
def delete_blob(self, delete_snapshots=None, **kwargs) -> None:
"""
Delete the blob.
Args:
delete_snapshots (str, optional): How to handle snapshots:
- 'include': Delete blob and all snapshots
- 'only': Delete snapshots but not the blob
Keyword Args:
version_id (str, optional): Specific version to delete
lease (BlobLeaseClient or str, optional): Required if blob has active lease
if_modified_since (datetime, optional): Delete only if modified since this time
if_unmodified_since (datetime, optional): Delete only if not modified since this time
etag (str, optional): Delete only if ETag matches
match_condition (MatchConditions, optional): ETag matching condition
"""
def exists(self, **kwargs) -> bool:
"""
Check whether the blob exists.
Returns:
bool: True if blob exists, False otherwise
"""
def undelete_blob(self, **kwargs) -> None:
"""
Restore a soft-deleted blob.
Note: Only works if soft delete is enabled on the account
"""Retrieve and modify blob properties, metadata, HTTP headers, and tags.
def get_blob_properties(self, **kwargs) -> BlobProperties:
"""
Get blob properties and metadata.
Keyword Args:
version_id (str, optional): Specific blob version
snapshot (str, optional): Blob snapshot identifier
lease (BlobLeaseClient or str, optional): Required if blob has active lease
if_modified_since (datetime, optional): Return only if modified since this time
if_unmodified_since (datetime, optional): Return only if not modified since this time
etag (str, optional): Return only if ETag matches
match_condition (MatchConditions, optional): ETag matching condition
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
Returns:
BlobProperties: Comprehensive blob properties and metadata
"""
def set_blob_metadata(self, metadata=None, **kwargs) -> dict:
"""
Set blob metadata.
Args:
metadata (dict, optional): Metadata as key-value pairs
Returns:
dict: Response with ETag and last modified time
"""
def set_http_headers(self, content_settings=None, **kwargs) -> dict:
"""
Set HTTP headers for the blob.
Args:
content_settings (ContentSettings, optional): HTTP header values
Returns:
dict: Response with ETag and last modified time
"""
def set_blob_tags(self, tags=None, **kwargs) -> dict:
"""
Set blob index tags.
Args:
tags (dict, optional): Tags as key-value pairs
Returns:
dict: Response with ETag and last modified time
"""
def get_blob_tags(self, **kwargs) -> dict:
"""
Get blob index tags.
Returns:
dict: Blob tags as key-value pairs
"""Create and manage blob snapshots for point-in-time copies.
def create_snapshot(self, **kwargs) -> dict:
"""
Create a snapshot of the blob.
Keyword Args:
metadata (dict, optional): Metadata for the snapshot
lease (BlobLeaseClient or str, optional): Required if blob has active lease
if_modified_since (datetime, optional): Create only if modified since this time
if_unmodified_since (datetime, optional): Create only if not modified since this time
etag (str, optional): Create only if ETag matches
match_condition (MatchConditions, optional): ETag matching condition
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
Returns:
dict: Snapshot information including snapshot ID and timestamp
"""Copy blobs between locations with progress tracking and cancellation support.
def start_copy_from_url(self, source_url: str, **kwargs) -> dict:
"""
Start copying a blob from a source URL.
Args:
source_url (str): URL of the source blob to copy
Keyword Args:
metadata (dict, optional): Metadata for the destination blob
incremental_copy (bool): Whether to perform incremental copy (page blobs only)
source_lease (BlobLeaseClient or str, optional): Lease for source blob
destination_lease (BlobLeaseClient or str, optional): Lease for destination blob
source_if_modified_since (datetime, optional): Copy only if source modified since
source_if_unmodified_since (datetime, optional): Copy only if source not modified since
source_etag (str, optional): Copy only if source ETag matches
source_match_condition (MatchConditions, optional): Source ETag matching condition
destination_if_modified_since (datetime, optional): Replace only if destination modified since
destination_if_unmodified_since (datetime, optional): Replace only if destination not modified since
destination_etag (str, optional): Replace only if destination ETag matches
destination_match_condition (MatchConditions, optional): Destination ETag matching condition
standard_blob_tier (StandardBlobTier, optional): Destination access tier
premium_page_blob_tier (PremiumPageBlobTier, optional): Destination premium tier
rehydrate_priority (RehydratePriority, optional): Priority for archive rehydration
source_authorization (str, optional): Authorization header for source
tags (dict, optional): Blob index tags for destination
Returns:
dict: Copy operation details including copy ID and status
"""
def abort_copy(self, copy_id: str, **kwargs) -> None:
"""
Abort an ongoing copy operation.
Args:
copy_id (str): Copy operation ID to abort
"""Acquire and manage leases on blobs to prevent concurrent modifications.
def acquire_lease(self, lease_duration=-1, lease_id=None, **kwargs) -> BlobLeaseClient:
"""
Acquire a lease on the blob.
Args:
lease_duration (int): Lease duration in seconds (-1 for infinite)
lease_id (str, optional): Proposed lease identifier
Returns:
BlobLeaseClient: Lease client for managing the lease
"""Set and modify access tiers for cost optimization and performance requirements.
def set_standard_blob_tier(self, standard_blob_tier, **kwargs) -> None:
"""
Set the access tier for a standard storage blob.
Args:
standard_blob_tier (StandardBlobTier): Target access tier
Keyword Args:
rehydrate_priority (RehydratePriority, optional): Priority for archive rehydration
lease (BlobLeaseClient or str, optional): Required if blob has active lease
version_id (str, optional): Specific version to modify
"""
def set_premium_page_blob_tier(self, premium_page_blob_tier, **kwargs) -> None:
"""
Set the tier for a premium page blob.
Args:
premium_page_blob_tier (PremiumPageBlobTier): Target premium tier
"""Block blob specific operations for staging and committing blocks for large uploads.
def stage_block(self, block_id: str, data, **kwargs) -> None:
"""
Stage a block for a block blob.
Args:
block_id (str): Base64-encoded block identifier
data: Block data to stage
Keyword Args:
length (int, optional): Number of bytes to upload
validate_content (bool): Validate block integrity
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
encryption_scope (str, optional): Encryption scope
"""
def stage_block_from_url(self, block_id: str, source_url: str, **kwargs) -> None:
"""
Stage a block from a source URL.
Args:
block_id (str): Base64-encoded block identifier
source_url (str): URL of source data
Keyword Args:
source_offset (int, optional): Byte offset in source
source_length (int, optional): Number of bytes to copy
source_content_md5 (bytes, optional): MD5 hash of source range
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
encryption_scope (str, optional): Encryption scope
"""
def get_block_list(self, **kwargs) -> BlockList:
"""
Get the list of blocks for a block blob.
Keyword Args:
block_list_type (str): Type of blocks to return:
- 'committed': Only committed blocks
- 'uncommitted': Only uncommitted blocks
- 'all': Both committed and uncommitted blocks
snapshot (str, optional): Blob snapshot identifier
lease (BlobLeaseClient or str, optional): Required if blob has active lease
Returns:
BlockList: Lists of committed and uncommitted blocks
"""
def commit_block_list(self, block_list, **kwargs) -> dict:
"""
Commit a list of staged blocks to create or update a block blob.
Args:
block_list (list): List of BlobBlock objects or block IDs
Keyword Args:
content_type (str, optional): MIME content type
content_encoding (str, optional): Content encoding
content_language (str, optional): Content language
content_disposition (str, optional): Content disposition
cache_control (str, optional): Cache control header
content_md5 (bytes, optional): MD5 hash of content
metadata (dict, optional): Blob metadata
validate_content (bool): Validate content integrity
standard_blob_tier (StandardBlobTier, optional): Access tier
tags (dict, optional): Blob index tags
immutability_policy (ImmutabilityPolicy, optional): Immutability policy
legal_hold (bool, optional): Legal hold status
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
encryption_scope (str, optional): Encryption scope
Returns:
dict: Commit response with ETag, last modified time, and other properties
"""Page blob specific operations for random access, sparse data, and virtual machine disks.
def create_page_blob(self, size: int, **kwargs) -> dict:
"""
Create a page blob of specified size.
Args:
size (int): Size of the page blob in bytes (must be multiple of 512)
Keyword Args:
content_type (str, optional): MIME content type
content_encoding (str, optional): Content encoding
content_language (str, optional): Content language
content_disposition (str, optional): Content disposition
cache_control (str, optional): Cache control header
metadata (dict, optional): Blob metadata
premium_page_blob_tier (PremiumPageBlobTier, optional): Access tier
sequence_number (int, optional): Initial sequence number
tags (dict, optional): Blob index tags
immutability_policy (ImmutabilityPolicy, optional): Immutability policy
legal_hold (bool, optional): Legal hold status
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
encryption_scope (str, optional): Encryption scope
Returns:
dict: Creation response with ETag, last modified time, and sequence number
"""
def upload_page(self, page, offset: int, **kwargs) -> dict:
"""
Upload a page to a page blob.
Args:
page: Page data to upload (must be multiple of 512 bytes)
offset (int): Byte offset where page starts (must be multiple of 512)
Keyword Args:
length (int, optional): Number of bytes to upload
validate_content (bool): Validate page integrity
lease (BlobLeaseClient or str, optional): Required if blob has active lease
if_sequence_number_less_than_or_equal_to (int, optional): Conditional update
if_sequence_number_less_than (int, optional): Conditional update
if_sequence_number_equal_to (int, optional): Conditional update
if_modified_since (datetime, optional): Upload only if modified since
if_unmodified_since (datetime, optional): Upload only if not modified since
etag (str, optional): Upload only if ETag matches
match_condition (MatchConditions, optional): ETag matching condition
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
encryption_scope (str, optional): Encryption scope
Returns:
dict: Upload response with ETag, last modified time, and sequence number
"""
def upload_pages_from_url(self, source_url: str, offset: int, source_offset: int, **kwargs) -> dict:
"""
Upload pages to a page blob from a source URL.
Args:
source_url (str): URL of source data
offset (int): Destination byte offset (must be multiple of 512)
source_offset (int): Source byte offset (must be multiple of 512)
Keyword Args:
length (int): Number of bytes to copy (must be multiple of 512)
source_content_md5 (bytes, optional): MD5 hash of source range
content_md5 (bytes, optional): MD5 hash for validation
lease (BlobLeaseClient or str, optional): Required if blob has active lease
if_sequence_number_less_than_or_equal_to (int, optional): Conditional update
if_sequence_number_less_than (int, optional): Conditional update
if_sequence_number_equal_to (int, optional): Conditional update
if_modified_since (datetime, optional): Upload only if modified since
if_unmodified_since (datetime, optional): Upload only if not modified since
etag (str, optional): Upload only if ETag matches
match_condition (MatchConditions, optional): ETag matching condition
source_if_modified_since (datetime, optional): Copy only if source modified since
source_if_unmodified_since (datetime, optional): Copy only if source not modified since
source_etag (str, optional): Copy only if source ETag matches
source_match_condition (MatchConditions, optional): Source ETag matching condition
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
encryption_scope (str, optional): Encryption scope
Returns:
dict: Upload response with ETag, last modified time, and sequence number
"""
def clear_page(self, offset: int, length: int, **kwargs) -> dict:
"""
Clear a range of pages in a page blob.
Args:
offset (int): Start byte position to clear (must be multiple of 512)
length (int): Number of bytes to clear (must be multiple of 512)
Returns:
dict: Clear response with ETag, last modified time, and sequence number
"""
def get_page_ranges(self, **kwargs) -> PageRanges:
"""
Get the list of valid page ranges for a page blob.
Keyword Args:
offset (int, optional): Start byte position
length (int, optional): Number of bytes to query
previous_snapshot (str, optional): Previous snapshot for diff
snapshot (str, optional): Blob snapshot identifier
lease (BlobLeaseClient or str, optional): Required if blob has active lease
if_modified_since (datetime, optional): Return only if modified since
if_unmodified_since (datetime, optional): Return only if not modified since
etag (str, optional): Return only if ETag matches
match_condition (MatchConditions, optional): ETag matching condition
Returns:
PageRanges: List of page ranges with start and end offsets
"""
def resize_blob(self, size: int, **kwargs) -> dict:
"""
Resize a page blob.
Args:
size (int): New size in bytes (must be multiple of 512)
Returns:
dict: Resize response with ETag, last modified time, and sequence number
"""
def set_sequence_number(self, sequence_number_action, sequence_number=None, **kwargs) -> dict:
"""
Set the sequence number for a page blob.
Args:
sequence_number_action (SequenceNumberAction): Action to perform
sequence_number (int, optional): New sequence number (required for UPDATE and MAX actions)
Returns:
dict: Response with ETag, last modified time, and sequence number
"""Append blob specific operations for log files and streaming data scenarios.
def create_append_blob(self, **kwargs) -> dict:
"""
Create an append blob.
Keyword Args:
content_type (str, optional): MIME content type
content_encoding (str, optional): Content encoding
content_language (str, optional): Content language
content_disposition (str, optional): Content disposition
cache_control (str, optional): Cache control header
metadata (dict, optional): Blob metadata
tags (dict, optional): Blob index tags
immutability_policy (ImmutabilityPolicy, optional): Immutability policy
legal_hold (bool, optional): Legal hold status
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
encryption_scope (str, optional): Encryption scope
Returns:
dict: Creation response with ETag and last modified time
"""
def append_block(self, data, **kwargs) -> dict:
"""
Append data to an append blob.
Args:
data: Data to append
Keyword Args:
length (int, optional): Number of bytes to append
validate_content (bool): Validate block integrity
max_size (int, optional): Maximum allowed blob size
append_position (int, optional): Expected current blob size
lease (BlobLeaseClient or str, optional): Required if blob has active lease
if_modified_since (datetime, optional): Append only if modified since
if_unmodified_since (datetime, optional): Append only if not modified since
etag (str, optional): Append only if ETag matches
match_condition (MatchConditions, optional): ETag matching condition
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
encryption_scope (str, optional): Encryption scope
Returns:
dict: Append response with ETag, last modified time, and append offset
"""
def append_block_from_url(self, copy_source_url: str, **kwargs) -> dict:
"""
Append data to an append blob from a source URL.
Args:
copy_source_url (str): URL of source data
Keyword Args:
source_offset (int, optional): Byte offset in source
source_length (int, optional): Number of bytes to copy
source_content_md5 (bytes, optional): MD5 hash of source range
max_size (int, optional): Maximum allowed blob size
append_position (int, optional): Expected current blob size
lease (BlobLeaseClient or str, optional): Required if blob has active lease
if_modified_since (datetime, optional): Append only if modified since
if_unmodified_since (datetime, optional): Append only if not modified since
etag (str, optional): Append only if ETag matches
match_condition (MatchConditions, optional): ETag matching condition
source_if_modified_since (datetime, optional): Copy only if source modified since
source_if_unmodified_since (datetime, optional): Copy only if source not modified since
source_etag (str, optional): Copy only if source ETag matches
source_match_condition (MatchConditions, optional): Source ETag matching condition
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
encryption_scope (str, optional): Encryption scope
Returns:
dict: Append response with ETag, last modified time, and append offset
"""
def seal_append_blob(self, **kwargs) -> dict:
"""
Seal an append blob to make it read-only.
Returns:
dict: Seal response with ETag and last modified time
"""Execute SQL-like queries on blob content for structured data processing.
def query_blob(self, query_expression: str, **kwargs) -> BlobQueryReader:
"""
Execute a SQL query on blob content.
Args:
query_expression (str): SQL query to execute on blob data
Keyword Args:
blob_format (QuickQueryDialect): Input data format configuration
output_format (QuickQueryDialect): Output data format configuration
on_error (callable, optional): Error handler function
blob_query_error_callback (callable, optional): Query error callback
snapshot (str, optional): Blob snapshot identifier
lease (BlobLeaseClient or str, optional): Required if blob has active lease
if_modified_since (datetime, optional): Query only if modified since
if_unmodified_since (datetime, optional): Query only if not modified since
etag (str, optional): Query only if ETag matches
match_condition (MatchConditions, optional): ETag matching condition
cpk (CustomerProvidedEncryptionKey, optional): Customer-provided encryption key
Returns:
BlobQueryReader: Query results with streaming capabilities
"""Manage data immutability policies and legal holds for compliance scenarios.
def set_immutability_policy(self, **kwargs) -> dict:
"""
Set an immutability policy on the blob.
Keyword Args:
immutability_policy (ImmutabilityPolicy): Policy configuration
if_unmodified_since (datetime, optional): Set only if not modified since
Returns:
dict: Response with immutability policy details
"""
def delete_immutability_policy(self, **kwargs) -> dict:
"""
Delete the immutability policy from the blob.
Returns:
dict: Response confirming policy deletion
"""
def set_legal_hold(self, legal_hold: bool, **kwargs) -> dict:
"""
Set or clear legal hold on the blob.
Args:
legal_hold (bool): Whether to set (True) or clear (False) legal hold
Returns:
dict: Response with legal hold status
"""Retrieve storage account information through the blob client.
def get_account_information(self, **kwargs) -> dict:
"""
Get information about the storage account.
Returns:
dict: Account information including account kind and SKU
"""class StorageStreamDownloader:
"""Streaming downloader for blob content."""
def readall(self) -> bytes: ...
def readinto(self, stream) -> int: ...
def download_to_stream(self, stream) -> None: ...
def chunks(self) -> Iterator[bytes]: ...
def content_as_bytes(self) -> bytes: ...
def content_as_text(self, encoding: str = 'utf-8') -> str: ...
class BlobProperties:
"""Comprehensive blob properties."""
name: str
container: str
snapshot: str
blob_type: BlobType
last_modified: datetime
etag: str
size: int
content_type: str
content_encoding: str
content_language: str
content_disposition: str
cache_control: str
content_md5: bytes
metadata: dict
lease: LeaseProperties
copy: CopyProperties
creation_time: datetime
archive_status: str
rehydrate_priority: str
encryption_key_sha256: str
encryption_scope: str
request_server_encrypted: bool
object_replication_source_properties: list
object_replication_destination_policy: str
tag_count: int
tags: dict
immutability_policy: ImmutabilityPolicy
has_legal_hold: bool
has_versions_only: bool
class ContentSettings:
"""HTTP headers for blob content."""
content_type: str
content_encoding: str
content_language: str
content_disposition: str
cache_control: str
content_md5: bytes
class BlobBlock:
"""Block information for block blobs."""
block_id: str
state: BlockState
class BlockList:
"""Block list for block blobs."""
committed_blocks: list[BlobBlock]
uncommitted_blocks: list[BlobBlock]
class PageRange:
"""Page range for page blobs."""
start: int
end: int
class PageRanges:
"""Page ranges collection."""
page_ranges: list[PageRange]
clear_ranges: list[PageRange]
class CopyProperties:
"""Blob copy operation properties."""
id: str
source: str
status: str
progress: str
completion_time: datetime
status_description: str
incremental_copy: bool
destination_snapshot: str
class ImmutabilityPolicy:
"""Blob immutability policy."""
expiry_time: datetime
policy_mode: BlobImmutabilityPolicyMode
class CustomerProvidedEncryptionKey:
"""Customer-managed encryption key."""
key_value: str
key_hash: str
algorithm: str
class BlobQueryReader:
"""SQL query results reader."""
def readall(self) -> bytes: ...
def readinto(self, stream) -> int: ...Install with Tessl CLI
npx tessl i tessl/pypi-azure-storage-blob