Microsoft Azure Blob Storage Client Library for Python providing comprehensive APIs for blob storage operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Convenient helper functions for common blob operations without requiring explicit client instantiation. These functions provide simplified access for basic upload and download scenarios, making them ideal for simple scripts and one-off operations.
Upload data directly to a blob URL without creating a client instance. This function handles client creation, upload, and cleanup automatically.
def upload_blob_to_url(blob_url: str, data, credential=None, **kwargs) -> dict:
"""
Upload data directly to a blob URL.
Args:
blob_url (str): Complete URL to the destination blob
data: Data to upload (bytes, str, or file-like object)
credential: Optional credential for authentication. Can be:
- str: Account key or SAS token string
- dict: Account name and key mapping
- AzureNamedKeyCredential: Named key credential
- AzureSasCredential: SAS credential
- TokenCredential: Azure AD token credential
- None: For SAS URLs or public access
Keyword Args:
overwrite (bool): Whether to overwrite existing blob (default: False)
max_concurrency (int): Maximum concurrent uploads for large blobs
length (int, optional): Number of bytes to upload
metadata (dict, optional): Blob metadata as key-value pairs
validate_content (bool): Validate content integrity during upload
encoding (str, optional): Text encoding if data is string (default: UTF-8)
Returns:
dict: Upload response containing:
- etag: Entity tag of uploaded blob
- last_modified: Last modified timestamp
- content_md5: MD5 hash of content (if calculated)
- client_request_id: Request ID for tracking
- request_id: Server request ID
- version: Blob service version
- date: Response date
Raises:
ResourceExistsError: If blob exists and overwrite=False
HttpResponseError: For other service errors
"""Download blob content from a URL to a local file or stream without creating a client instance.
def download_blob_from_url(blob_url: str, output, credential=None, **kwargs) -> None:
"""
Download blob content from URL to file or stream.
Args:
blob_url (str): Complete URL to the source blob
output: Download destination. Can be:
- str: Local file path to write to
- file-like object: Stream to write to (must have 'write' method)
credential: Optional credential for authentication. Can be:
- str: Account key or SAS token string
- dict: Account name and key mapping
- AzureNamedKeyCredential: Named key credential
- AzureSasCredential: SAS credential
- TokenCredential: Azure AD token credential
- None: For SAS URLs or public access
Keyword Args:
overwrite (bool): Whether to overwrite existing file (default: False)
max_concurrency (int): Maximum concurrent downloads for large blobs
offset (int, optional): Start byte position for partial download
length (int, optional): Number of bytes to download
validate_content (bool): Validate content integrity during download
Returns:
None
Raises:
ValueError: If file exists and overwrite=False
ResourceNotFoundError: If blob does not exist
HttpResponseError: For other service errors
"""Asynchronous versions of the utility functions for concurrent operations.
# Available in azure.storage.blob.aio module
async def upload_blob_to_url(blob_url: str, data, credential=None, **kwargs) -> dict:
"""
Async version of upload_blob_to_url.
Same parameters and return value as sync version.
"""
async def download_blob_from_url(blob_url: str, output, credential=None, **kwargs) -> None:
"""
Async version of download_blob_from_url.
Same parameters as sync version.
"""from azure.storage.blob import upload_blob_to_url
# Upload string data to blob with SAS token in URL
blob_url = "https://account.blob.core.windows.net/container/file.txt?sp=w&st=..."
upload_blob_to_url(blob_url, "Hello, World!")
# Upload binary data with separate credential
blob_url = "https://account.blob.core.windows.net/container/image.jpg"
with open("local_image.jpg", "rb") as data:
upload_blob_to_url(
blob_url,
data,
credential="account_key_here",
overwrite=True,
metadata={"source": "camera", "date": "2023-01-01"}
)
# Upload with Azure AD credential
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
upload_blob_to_url(
"https://account.blob.core.windows.net/container/document.pdf",
pdf_data,
credential=credential,
overwrite=True
)from azure.storage.blob import download_blob_from_url
# Download to local file
blob_url = "https://account.blob.core.windows.net/container/data.csv?sp=r&st=..."
download_blob_from_url(blob_url, "local_data.csv")
# Download to stream
blob_url = "https://account.blob.core.windows.net/container/log.txt"
with open("downloaded_log.txt", "wb") as file_handle:
download_blob_from_url(
blob_url,
file_handle,
credential="account_key_here",
max_concurrency=4
)
# Download partial content
download_blob_from_url(
blob_url,
"first_1mb.dat",
credential=credential,
offset=0,
length=1024*1024 # First 1MB only
)from azure.storage.blob import upload_blob_to_url, download_blob_from_url
from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError, HttpResponseError
# Upload with error handling
try:
upload_blob_to_url(blob_url, data, overwrite=False)
except ResourceExistsError:
print("Blob already exists. Use overwrite=True to replace.")
except HttpResponseError as e:
print(f"Upload failed: {e.status_code} - {e.message}")
# Download with error handling
try:
download_blob_from_url(blob_url, "output.txt", overwrite=False)
except ValueError as e:
print(f"File operation error: {e}")
except ResourceNotFoundError:
print("Blob not found")
except HttpResponseError as e:
print(f"Download failed: {e.status_code} - {e.message}")import asyncio
from azure.storage.blob.aio import upload_blob_to_url, download_blob_from_url
async def async_operations():
# Async upload
await upload_blob_to_url(
"https://account.blob.core.windows.net/container/async_file.txt",
"Async upload data",
credential=credential
)
# Async download
await download_blob_from_url(
"https://account.blob.core.windows.net/container/source.txt",
"async_downloaded.txt",
credential=credential
)
# Concurrent operations
async def concurrent_uploads():
urls_and_data = [
("https://account.blob.core.windows.net/container/file1.txt", "Data 1"),
("https://account.blob.core.windows.net/container/file2.txt", "Data 2"),
("https://account.blob.core.windows.net/container/file3.txt", "Data 3"),
]
# Upload all files concurrently
tasks = [
upload_blob_to_url(url, data, credential=credential, overwrite=True)
for url, data in urls_and_data
]
results = await asyncio.gather(*tasks)
print(f"Uploaded {len(results)} files concurrently")
asyncio.run(concurrent_uploads())# Upload large file with progress tracking
def upload_large_file_with_progress():
def progress_callback(bytes_transferred, total_bytes):
percentage = (bytes_transferred / total_bytes) * 100
print(f"Upload progress: {percentage:.1f}%")
with open("large_file.zip", "rb") as data:
result = upload_blob_to_url(
blob_url,
data,
credential=credential,
overwrite=True,
max_concurrency=8, # Parallel uploads
validate_content=True, # Verify integrity
# Note: progress_callback not directly supported in utility functions
# Use BlobClient for advanced progress tracking
)
return result
# Conditional upload based on blob existence
def conditional_upload():
try:
# Try upload without overwrite
upload_blob_to_url(blob_url, data, overwrite=False)
print("New blob uploaded")
except ResourceExistsError:
# Blob exists, decide whether to update
from azure.storage.blob import BlobClient
blob_client = BlobClient.from_blob_url(blob_url, credential=credential)
properties = blob_client.get_blob_properties()
if properties.size != len(data):
upload_blob_to_url(blob_url, data, overwrite=True)
print("Blob updated with new content")
else:
print("Blob unchanged, skipping upload")
# Download with retry logic
def download_with_retry(max_retries=3):
for attempt in range(max_retries):
try:
download_blob_from_url(blob_url, output_file, credential=credential)
print("Download successful")
break
except HttpResponseError as e:
if e.status_code >= 500 and attempt < max_retries - 1:
print(f"Server error, retrying... (attempt {attempt + 1})")
time.sleep(2 ** attempt) # Exponential backoff
else:
raiseIdeal for:
Examples:
# Quick backup script
def backup_file(local_path, backup_url):
with open(local_path, "rb") as data:
upload_blob_to_url(backup_url, data, overwrite=True)
# Simple data processing pipeline
def process_remote_data(source_url, output_path):
# Download data
download_blob_from_url(source_url, "temp_data.csv")
# Process data (your processing logic here)
processed_data = process_csv("temp_data.csv")
# Upload results
upload_blob_to_url(output_path, processed_data, overwrite=True)Use BlobClient/ContainerClient/BlobServiceClient for:
Migration from utility functions to clients:
# Utility function approach
upload_blob_to_url(blob_url, data, credential=credential)
# Equivalent using BlobClient
blob_client = BlobClient.from_blob_url(blob_url, credential=credential)
blob_client.upload_blob(data, overwrite=True)
blob_client.close() # Or use context managerInstall with Tessl CLI
npx tessl i tessl/pypi-azure-storage-blob