CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cloudpathlib

Pathlib-style classes for cloud storage services that provide seamless access to AWS S3, Google Cloud Storage, and Azure Blob Storage with familiar filesystem operations.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

azure-integration.mddocs/

Azure Blob Storage Integration

Azure Blob Storage support with Azure Active Directory authentication, hierarchical namespace support for ADLS Gen2, and Azure-specific blob operations. This implementation provides comprehensive access to Azure Blob Storage and Azure Data Lake Storage Gen2 capabilities.

Capabilities

AzureBlobPath Class

Azure Blob Storage-specific path implementation with access to Azure metadata and ADLS Gen2 support.

class AzureBlobPath(CloudPath):
    """Azure Blob Storage path implementation."""
    
    @property
    def container(self) -> str:
        """
        Azure container name.
        
        Returns:
            Container name from the Azure URI
        """
    
    @property
    def blob(self) -> str:
        """
        Blob name (path within container).
        
        Returns:
            Blob name string
        """
    
    @property
    def etag(self) -> str:
        """
        Azure blob ETag identifier.
        
        Returns:
            ETag string for the blob
        """
    
    @property
    def md5(self) -> str:
        """
        MD5 hash of the blob content.
        
        Returns:
            MD5 hash string
        """

AzureBlobClient Class

Azure Blob Storage client with comprehensive authentication and configuration options.

class AzureBlobClient:
    """Azure Blob Storage client."""
    
    def __init__(
        self,
        account_url: str = None,
        credential = None,
        connection_string: str = None,
        blob_service_client = None,
        data_lake_client = None,
        file_cache_mode: FileCacheMode = None,
        local_cache_dir: str = None,
        content_type_method = None
    ):
        """
        Initialize Azure Blob client.
        
        Args:
            account_url: Azure storage account URL
            credential: Azure credential object (various types supported)
            connection_string: Azure storage connection string
            blob_service_client: Custom BlobServiceClient instance
            data_lake_client: Custom DataLakeServiceClient for ADLS Gen2
            file_cache_mode: Cache management strategy
            local_cache_dir: Local directory for file cache
            content_type_method: Function to determine MIME types
        """

Usage Examples

Basic Azure Blob Operations

from cloudpathlib import AzureBlobPath, AzureBlobClient

# Create Azure path (uses default client)
az_path = AzureBlobPath("az://my-container/data/file.txt")

# Access Azure-specific properties
print(f"Container: {az_path.container}")   # "my-container"
print(f"Blob: {az_path.blob}")             # "data/file.txt"

# Check if blob exists and get metadata
if az_path.exists():
    print(f"ETag: {az_path.etag}")
    print(f"MD5: {az_path.md5}")

Connection String Authentication

# Use connection string from Azure portal
connection_string = (
    "DefaultEndpointsProtocol=https;"
    "AccountName=mystorageaccount;"
    "AccountKey=myaccountkey;"
    "EndpointSuffix=core.windows.net"
)

client = AzureBlobClient(connection_string=connection_string)
client.set_as_default_client()

# Create paths using connection string
az_path = AzureBlobPath("az://my-container/data.json")
content = az_path.read_text()

Account Key Authentication

from azure.storage.blob import BlobServiceClient

# Create client with account key
account_url = "https://mystorageaccount.blob.core.windows.net"
account_key = "your-account-key"

client = AzureBlobClient(
    account_url=account_url,
    credential=account_key
)

az_path = AzureBlobPath("az://my-container/file.txt", client=client)

Azure Active Directory Authentication

from azure.identity import DefaultAzureCredential, ClientSecretCredential

# Use default Azure credential (recommended for production)
credential = DefaultAzureCredential()
client = AzureBlobClient(
    account_url="https://mystorageaccount.blob.core.windows.net",
    credential=credential
)

# Or use service principal
credential = ClientSecretCredential(
    tenant_id="your-tenant-id",
    client_id="your-client-id",
    client_secret="your-client-secret"
)

client = AzureBlobClient(
    account_url="https://mystorageaccount.blob.core.windows.net",
    credential=credential
)

Managed Identity Authentication

from azure.identity import ManagedIdentityCredential

# Use managed identity (for Azure VMs, App Service, etc.)
credential = ManagedIdentityCredential()
client = AzureBlobClient(
    account_url="https://mystorageaccount.blob.core.windows.net",
    credential=credential
)

# Use with specific client ID
credential = ManagedIdentityCredential(client_id="your-managed-identity-client-id")

SAS Token Authentication

# Use Shared Access Signature token
sas_token = "your-sas-token"
account_url = f"https://mystorageaccount.blob.core.windows.net?{sas_token}"

client = AzureBlobClient(account_url=account_url)

az_path = AzureBlobPath("az://my-container/file.txt", client=client)

Azure Data Lake Storage Gen2 (ADLS Gen2)

from azure.storage.filedatalake import DataLakeServiceClient

# ADLS Gen2 with hierarchical namespace support
dfs_client = DataLakeServiceClient(
    account_url="https://mystorageaccount.dfs.core.windows.net",
    credential=DefaultAzureCredential()
)

client = AzureBlobClient(data_lake_client=dfs_client)

# ADLS Gen2 supports true directory operations
adls_path = AzureBlobPath("az://filesystem/directory/", client=client)
adls_path.mkdir(parents=True, exist_ok=True)

# Create files in directory structure
file_path = adls_path / "data.txt"
file_path.write_text("ADLS Gen2 content")

Blob Tiers and Storage Classes

# Upload with specific access tier
def upload_with_tier(local_path, az_path, tier):
    """Upload blob with specific access tier."""
    with open(local_path, 'rb') as data:
        blob_client = az_path.client.blob_service_client.get_blob_client(
            container=az_path.container,
            blob=az_path.blob
        )
        blob_client.upload_blob(data, standard_blob_tier=tier, overwrite=True)

# Usage examples
az_path = AzureBlobPath("az://my-container/archive.zip")
upload_with_tier("data.zip", az_path, "Archive")  # Cold storage

# Different access tiers: Hot, Cool, Archive
tiers = ["Hot", "Cool", "Archive"]

Blob Metadata and Properties

# Set custom metadata
def set_blob_metadata(az_path, metadata_dict):
    """Set custom metadata on Azure blob."""
    blob_client = az_path.client.blob_service_client.get_blob_client(
        container=az_path.container,
        blob=az_path.blob
    )
    blob_client.set_blob_metadata(metadata=metadata_dict)

def get_blob_metadata(az_path):
    """Get blob metadata and properties."""
    blob_client = az_path.client.blob_service_client.get_blob_client(
        container=az_path.container,
        blob=az_path.blob
    )
    properties = blob_client.get_blob_properties()
    return properties.metadata, properties

# Usage
az_path = AzureBlobPath("az://my-container/document.pdf")

# Set metadata
set_blob_metadata(az_path, {
    "author": "Data Team",
    "project": "Analytics",
    "version": "1.0"
})

# Read metadata and properties
metadata, properties = get_blob_metadata(az_path)
print(f"Metadata: {metadata}")
print(f"Content Type: {properties.content_settings.content_type}")
print(f"Last Modified: {properties.last_modified}")

Lease Operations

from azure.storage.blob import BlobLeaseClient

def acquire_blob_lease(az_path, lease_duration=60):
    """Acquire exclusive lease on blob."""
    blob_client = az_path.client.blob_service_client.get_blob_client(
        container=az_path.container,
        blob=az_path.blob
    )
    
    lease_client = BlobLeaseClient(blob_client)
    lease_id = lease_client.acquire(lease_duration=lease_duration)
    return lease_client, lease_id

# Usage
az_path = AzureBlobPath("az://my-container/critical-file.txt")
lease_client, lease_id = acquire_blob_lease(az_path)

try:
    # Perform operations with exclusive access
    content = az_path.read_text()
    modified_content = content + "\nModified with lease"
    az_path.write_text(modified_content)
finally:
    # Always release lease
    lease_client.release()

Batch Operations

import concurrent.futures
from pathlib import Path

def upload_file_to_azure(local_path, az_base):
    """Upload single file to Azure."""
    az_path = az_base / local_path.name
    az_path.upload_from(local_path)
    return az_path

# Parallel upload
local_files = list(Path("data/").glob("*.csv"))
az_base = AzureBlobPath("az://my-container/csv-data/")

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(upload_file_to_azure, f, az_base) for f in local_files]
    
    for future in concurrent.futures.as_completed(futures):
        try:
            az_path = future.result()
            print(f"Uploaded: {az_path}")
        except Exception as e:
            print(f"Upload failed: {e}")

Snapshot Operations

from datetime import datetime

def create_blob_snapshot(az_path):
    """Create snapshot of blob."""
    blob_client = az_path.client.blob_service_client.get_blob_client(
        container=az_path.container,
        blob=az_path.blob
    )
    
    snapshot = blob_client.create_snapshot()
    return snapshot['snapshot']

def list_blob_snapshots(az_path):
    """List all snapshots of a blob."""
    container_client = az_path.client.blob_service_client.get_container_client(
        az_path.container
    )
    
    snapshots = []
    for blob in container_client.list_blobs(name_starts_with=az_path.blob, include=['snapshots']):
        if blob.name == az_path.blob and blob.snapshot:
            snapshots.append({
                'snapshot': blob.snapshot,
                'last_modified': blob.last_modified,
                'size': blob.size
            })
    
    return sorted(snapshots, key=lambda x: x['last_modified'], reverse=True)

# Usage
az_path = AzureBlobPath("az://my-container/important.txt")

# Create snapshot before modification
snapshot_id = create_blob_snapshot(az_path)
print(f"Created snapshot: {snapshot_id}")

# List all snapshots
snapshots = list_blob_snapshots(az_path)
for snapshot in snapshots:
    print(f"Snapshot {snapshot['snapshot']}: {snapshot['last_modified']}")

Container Operations

def create_container(container_name, client):
    """Create container with public access."""
    container_client = client.blob_service_client.get_container_client(container_name)
    
    try:
        container_client.create_container(public_access='blob')
        print(f"Created container: {container_name}")
    except Exception as e:
        print(f"Container creation failed: {e}")

def list_containers(client):
    """List all containers in storage account."""
    containers = []
    for container in client.blob_service_client.list_containers():
        containers.append({
            'name': container.name,
            'last_modified': container.last_modified,
            'public_access': container.public_access
        })
    return containers

# Usage
client = AzureBlobClient(connection_string=connection_string)
create_container("new-container", client)
containers = list_containers(client)

Cross-Region Replication

# Work with geo-replicated storage accounts
primary_client = AzureBlobClient(
    account_url="https://mystorageaccount.blob.core.windows.net",
    credential=credential
)

secondary_client = AzureBlobClient(
    account_url="https://mystorageaccount-secondary.blob.core.windows.net",
    credential=credential
)

# Read from secondary region (read-access geo-redundant storage)
primary_path = AzureBlobPath("az://my-container/data.txt", client=primary_client)
secondary_path = AzureBlobPath("az://my-container/data.txt", client=secondary_client)

try:
    content = primary_path.read_text()
except Exception:
    # Fallback to secondary region
    content = secondary_path.read_text()

Event Grid Integration

# Work with Azure Event Grid for blob events
def setup_blob_monitoring(az_path):
    """Example of how blob operations can trigger events."""
    
    # Note: Event Grid setup requires Azure portal configuration
    # This shows the blob operations that can trigger events
    
    # These operations can trigger blob events:
    blob_operations = [
        az_path.write_text("New content"),        # BlobCreated
        az_path.copy(az_path.with_suffix('.bak')), # BlobCreated
        az_path.unlink(),                         # BlobDeleted
    ]
    
    return blob_operations

# Usage
az_path = AzureBlobPath("az://monitored-container/file.txt")
setup_blob_monitoring(az_path)

Performance Optimization

# Configure for high-throughput operations
from azure.storage.blob import BlobServiceClient
from azure.core.pipeline.transport import RequestsTransport

# Custom transport with connection pooling
transport = RequestsTransport(
    connection_pool_maxsize=100,
    connection_pool_block=False
)

blob_service_client = BlobServiceClient(
    account_url="https://mystorageaccount.blob.core.windows.net",
    credential=credential,
    transport=transport
)

client = AzureBlobClient(blob_service_client=blob_service_client)

# Performance monitoring
import time
az_path = AzureBlobPath("az://my-container/large-file.dat", client=client)

start_time = time.time()
az_path.download_to("local-large-file.dat")
duration = time.time() - start_time
print(f"Download completed in {duration:.2f} seconds")

Error Handling

from cloudpathlib import (
    CloudPathFileNotFoundError,
    MissingCredentialsError
)
from azure.core.exceptions import (
    AzureError,
    ResourceNotFoundError,
    ClientAuthenticationError
)

try:
    az_path = AzureBlobPath("az://nonexistent-container/file.txt")
    content = az_path.read_text()
except CloudPathFileNotFoundError:
    print("Azure blob not found")
except ClientAuthenticationError:
    print("Azure authentication failed")
except ResourceNotFoundError:
    print("Azure resource not found")
except AzureError as e:
    print(f"Azure error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-cloudpathlib

docs

anypath.md

azure-integration.md

client-management.md

cloud-operations.md

configuration.md

core-operations.md

directory-operations.md

exceptions.md

file-io.md

gcs-integration.md

http-support.md

index.md

patching.md

s3-integration.md

tile.json