Pathlib-style classes for cloud storage services that provide seamless access to AWS S3, Google Cloud Storage, and Azure Blob Storage with familiar filesystem operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure Blob Storage support with Azure Active Directory authentication, hierarchical namespace support for ADLS Gen2, and Azure-specific blob operations. This implementation provides comprehensive access to Azure Blob Storage and Azure Data Lake Storage Gen2 capabilities.
Azure Blob Storage-specific path implementation with access to Azure metadata and ADLS Gen2 support.
class AzureBlobPath(CloudPath):
"""Azure Blob Storage path implementation."""
@property
def container(self) -> str:
"""
Azure container name.
Returns:
Container name from the Azure URI
"""
@property
def blob(self) -> str:
"""
Blob name (path within container).
Returns:
Blob name string
"""
@property
def etag(self) -> str:
"""
Azure blob ETag identifier.
Returns:
ETag string for the blob
"""
@property
def md5(self) -> str:
"""
MD5 hash of the blob content.
Returns:
MD5 hash string
"""Azure Blob Storage client with comprehensive authentication and configuration options.
class AzureBlobClient:
"""Azure Blob Storage client."""
def __init__(
self,
account_url: str = None,
credential = None,
connection_string: str = None,
blob_service_client = None,
data_lake_client = None,
file_cache_mode: FileCacheMode = None,
local_cache_dir: str = None,
content_type_method = None
):
"""
Initialize Azure Blob client.
Args:
account_url: Azure storage account URL
credential: Azure credential object (various types supported)
connection_string: Azure storage connection string
blob_service_client: Custom BlobServiceClient instance
data_lake_client: Custom DataLakeServiceClient for ADLS Gen2
file_cache_mode: Cache management strategy
local_cache_dir: Local directory for file cache
content_type_method: Function to determine MIME types
"""from cloudpathlib import AzureBlobPath, AzureBlobClient
# Create Azure path (uses default client)
az_path = AzureBlobPath("az://my-container/data/file.txt")
# Access Azure-specific properties
print(f"Container: {az_path.container}") # "my-container"
print(f"Blob: {az_path.blob}") # "data/file.txt"
# Check if blob exists and get metadata
if az_path.exists():
print(f"ETag: {az_path.etag}")
print(f"MD5: {az_path.md5}")# Use connection string from Azure portal
connection_string = (
"DefaultEndpointsProtocol=https;"
"AccountName=mystorageaccount;"
"AccountKey=myaccountkey;"
"EndpointSuffix=core.windows.net"
)
client = AzureBlobClient(connection_string=connection_string)
client.set_as_default_client()
# Create paths using connection string
az_path = AzureBlobPath("az://my-container/data.json")
content = az_path.read_text()from azure.storage.blob import BlobServiceClient
# Create client with account key
account_url = "https://mystorageaccount.blob.core.windows.net"
account_key = "your-account-key"
client = AzureBlobClient(
account_url=account_url,
credential=account_key
)
az_path = AzureBlobPath("az://my-container/file.txt", client=client)from azure.identity import DefaultAzureCredential, ClientSecretCredential
# Use default Azure credential (recommended for production)
credential = DefaultAzureCredential()
client = AzureBlobClient(
account_url="https://mystorageaccount.blob.core.windows.net",
credential=credential
)
# Or use service principal
credential = ClientSecretCredential(
tenant_id="your-tenant-id",
client_id="your-client-id",
client_secret="your-client-secret"
)
client = AzureBlobClient(
account_url="https://mystorageaccount.blob.core.windows.net",
credential=credential
)from azure.identity import ManagedIdentityCredential
# Use managed identity (for Azure VMs, App Service, etc.)
credential = ManagedIdentityCredential()
client = AzureBlobClient(
account_url="https://mystorageaccount.blob.core.windows.net",
credential=credential
)
# Use with specific client ID
credential = ManagedIdentityCredential(client_id="your-managed-identity-client-id")# Use Shared Access Signature token
sas_token = "your-sas-token"
account_url = f"https://mystorageaccount.blob.core.windows.net?{sas_token}"
client = AzureBlobClient(account_url=account_url)
az_path = AzureBlobPath("az://my-container/file.txt", client=client)from azure.storage.filedatalake import DataLakeServiceClient
# ADLS Gen2 with hierarchical namespace support
dfs_client = DataLakeServiceClient(
account_url="https://mystorageaccount.dfs.core.windows.net",
credential=DefaultAzureCredential()
)
client = AzureBlobClient(data_lake_client=dfs_client)
# ADLS Gen2 supports true directory operations
adls_path = AzureBlobPath("az://filesystem/directory/", client=client)
adls_path.mkdir(parents=True, exist_ok=True)
# Create files in directory structure
file_path = adls_path / "data.txt"
file_path.write_text("ADLS Gen2 content")# Upload with specific access tier
def upload_with_tier(local_path, az_path, tier):
"""Upload blob with specific access tier."""
with open(local_path, 'rb') as data:
blob_client = az_path.client.blob_service_client.get_blob_client(
container=az_path.container,
blob=az_path.blob
)
blob_client.upload_blob(data, standard_blob_tier=tier, overwrite=True)
# Usage examples
az_path = AzureBlobPath("az://my-container/archive.zip")
upload_with_tier("data.zip", az_path, "Archive") # Cold storage
# Different access tiers: Hot, Cool, Archive
tiers = ["Hot", "Cool", "Archive"]# Set custom metadata
def set_blob_metadata(az_path, metadata_dict):
"""Set custom metadata on Azure blob."""
blob_client = az_path.client.blob_service_client.get_blob_client(
container=az_path.container,
blob=az_path.blob
)
blob_client.set_blob_metadata(metadata=metadata_dict)
def get_blob_metadata(az_path):
"""Get blob metadata and properties."""
blob_client = az_path.client.blob_service_client.get_blob_client(
container=az_path.container,
blob=az_path.blob
)
properties = blob_client.get_blob_properties()
return properties.metadata, properties
# Usage
az_path = AzureBlobPath("az://my-container/document.pdf")
# Set metadata
set_blob_metadata(az_path, {
"author": "Data Team",
"project": "Analytics",
"version": "1.0"
})
# Read metadata and properties
metadata, properties = get_blob_metadata(az_path)
print(f"Metadata: {metadata}")
print(f"Content Type: {properties.content_settings.content_type}")
print(f"Last Modified: {properties.last_modified}")from azure.storage.blob import BlobLeaseClient
def acquire_blob_lease(az_path, lease_duration=60):
"""Acquire exclusive lease on blob."""
blob_client = az_path.client.blob_service_client.get_blob_client(
container=az_path.container,
blob=az_path.blob
)
lease_client = BlobLeaseClient(blob_client)
lease_id = lease_client.acquire(lease_duration=lease_duration)
return lease_client, lease_id
# Usage
az_path = AzureBlobPath("az://my-container/critical-file.txt")
lease_client, lease_id = acquire_blob_lease(az_path)
try:
# Perform operations with exclusive access
content = az_path.read_text()
modified_content = content + "\nModified with lease"
az_path.write_text(modified_content)
finally:
# Always release lease
lease_client.release()import concurrent.futures
from pathlib import Path
def upload_file_to_azure(local_path, az_base):
"""Upload single file to Azure."""
az_path = az_base / local_path.name
az_path.upload_from(local_path)
return az_path
# Parallel upload
local_files = list(Path("data/").glob("*.csv"))
az_base = AzureBlobPath("az://my-container/csv-data/")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(upload_file_to_azure, f, az_base) for f in local_files]
for future in concurrent.futures.as_completed(futures):
try:
az_path = future.result()
print(f"Uploaded: {az_path}")
except Exception as e:
print(f"Upload failed: {e}")from datetime import datetime
def create_blob_snapshot(az_path):
"""Create snapshot of blob."""
blob_client = az_path.client.blob_service_client.get_blob_client(
container=az_path.container,
blob=az_path.blob
)
snapshot = blob_client.create_snapshot()
return snapshot['snapshot']
def list_blob_snapshots(az_path):
"""List all snapshots of a blob."""
container_client = az_path.client.blob_service_client.get_container_client(
az_path.container
)
snapshots = []
for blob in container_client.list_blobs(name_starts_with=az_path.blob, include=['snapshots']):
if blob.name == az_path.blob and blob.snapshot:
snapshots.append({
'snapshot': blob.snapshot,
'last_modified': blob.last_modified,
'size': blob.size
})
return sorted(snapshots, key=lambda x: x['last_modified'], reverse=True)
# Usage
az_path = AzureBlobPath("az://my-container/important.txt")
# Create snapshot before modification
snapshot_id = create_blob_snapshot(az_path)
print(f"Created snapshot: {snapshot_id}")
# List all snapshots
snapshots = list_blob_snapshots(az_path)
for snapshot in snapshots:
print(f"Snapshot {snapshot['snapshot']}: {snapshot['last_modified']}")def create_container(container_name, client):
"""Create container with public access."""
container_client = client.blob_service_client.get_container_client(container_name)
try:
container_client.create_container(public_access='blob')
print(f"Created container: {container_name}")
except Exception as e:
print(f"Container creation failed: {e}")
def list_containers(client):
"""List all containers in storage account."""
containers = []
for container in client.blob_service_client.list_containers():
containers.append({
'name': container.name,
'last_modified': container.last_modified,
'public_access': container.public_access
})
return containers
# Usage
client = AzureBlobClient(connection_string=connection_string)
create_container("new-container", client)
containers = list_containers(client)# Work with geo-replicated storage accounts
primary_client = AzureBlobClient(
account_url="https://mystorageaccount.blob.core.windows.net",
credential=credential
)
secondary_client = AzureBlobClient(
account_url="https://mystorageaccount-secondary.blob.core.windows.net",
credential=credential
)
# Read from secondary region (read-access geo-redundant storage)
primary_path = AzureBlobPath("az://my-container/data.txt", client=primary_client)
secondary_path = AzureBlobPath("az://my-container/data.txt", client=secondary_client)
try:
content = primary_path.read_text()
except Exception:
# Fallback to secondary region
content = secondary_path.read_text()# Work with Azure Event Grid for blob events
def setup_blob_monitoring(az_path):
"""Example of how blob operations can trigger events."""
# Note: Event Grid setup requires Azure portal configuration
# This shows the blob operations that can trigger events
# These operations can trigger blob events:
blob_operations = [
az_path.write_text("New content"), # BlobCreated
az_path.copy(az_path.with_suffix('.bak')), # BlobCreated
az_path.unlink(), # BlobDeleted
]
return blob_operations
# Usage
az_path = AzureBlobPath("az://monitored-container/file.txt")
setup_blob_monitoring(az_path)# Configure for high-throughput operations
from azure.storage.blob import BlobServiceClient
from azure.core.pipeline.transport import RequestsTransport
# Custom transport with connection pooling
transport = RequestsTransport(
connection_pool_maxsize=100,
connection_pool_block=False
)
blob_service_client = BlobServiceClient(
account_url="https://mystorageaccount.blob.core.windows.net",
credential=credential,
transport=transport
)
client = AzureBlobClient(blob_service_client=blob_service_client)
# Performance monitoring
import time
az_path = AzureBlobPath("az://my-container/large-file.dat", client=client)
start_time = time.time()
az_path.download_to("local-large-file.dat")
duration = time.time() - start_time
print(f"Download completed in {duration:.2f} seconds")from cloudpathlib import (
CloudPathFileNotFoundError,
MissingCredentialsError
)
from azure.core.exceptions import (
AzureError,
ResourceNotFoundError,
ClientAuthenticationError
)
try:
az_path = AzureBlobPath("az://nonexistent-container/file.txt")
content = az_path.read_text()
except CloudPathFileNotFoundError:
print("Azure blob not found")
except ClientAuthenticationError:
print("Azure authentication failed")
except ResourceNotFoundError:
print("Azure resource not found")
except AzureError as e:
print(f"Azure error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-cloudpathlib