CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

blob-storage.mddocs/

Azure Blob Storage

Comprehensive Azure Blob Storage integration providing full blob operations, container management, file transfers, and monitoring capabilities. Supports both synchronous and asynchronous operations with extensive configuration options.

Capabilities

Blob Storage Hook

The primary interface for Azure Blob Storage operations, providing authenticated connections and core blob functionality.

class WasbHook(AzureBaseHook):
    """
    Hook for Azure Blob Storage (WASB) operations.
    
    Provides methods for blob operations, container management, and file transfers.
    Supports multiple authentication methods and connection configurations.
    """
    
    def get_conn(self) -> BlobServiceClient:
        """Get authenticated Azure Blob Service client."""
    
    def check_for_blob(self, container_name: str, blob_name: str, **kwargs) -> bool:
        """
        Check if a blob exists in the specified container.
        
        Args:
            container_name (str): Name of the container
            blob_name (str): Name of the blob to check
            
        Returns:
            bool: True if blob exists, False otherwise
        """
    
    def load_file(
        self, 
        file_path: str, 
        container_name: str, 
        blob_name: str, 
        **kwargs
    ) -> None:
        """
        Upload a local file to Azure Blob Storage.
        
        Args:
            file_path (str): Path to local file
            container_name (str): Target container name  
            blob_name (str): Target blob name
        """
    
    def load_string(
        self,
        string_data: str,
        container_name: str,
        blob_name: str,
        **kwargs
    ) -> None:
        """
        Upload string data to Azure Blob Storage.
        
        Args:
            string_data (str): String data to upload
            container_name (str): Target container name
            blob_name (str): Target blob name
        """
    
    def check_for_prefix(self, container_name: str, prefix: str, **kwargs) -> bool:
        """
        Check if any blobs exist with the given prefix.
        
        Args:
            container_name (str): Name of the container
            prefix (str): Prefix to search for
            
        Returns:
            bool: True if blobs with prefix exist, False otherwise
        """
    
    def get_blobs_list(
        self,
        container_name: str,
        prefix: str | None = None,
        include: list | None = None,
        delimiter: str = "/",
        **kwargs
    ) -> list:
        """
        List blobs in container with optional prefix filtering.
        
        Args:
            container_name (str): Name of the container
            prefix (str, optional): Filter blobs by prefix
            include (list, optional): Additional properties to include
            delimiter (str): Delimiter for blob hierarchy
            
        Returns:
            list: List of blob names
        """
    
    def get_blobs_list_recursive(
        self,
        container_name: str,
        prefix: str | None = None,
        include: list | None = None,
        endswith: str = "",
        **kwargs
    ) -> list:
        """
        Recursively list all blobs in container.
        
        Args:
            container_name (str): Name of the container
            prefix (str, optional): Filter blobs by prefix
            include (list, optional): Additional properties to include
            endswith (str): Filter blobs ending with string
            
        Returns:
            list: List of all blob names recursively
        """
    
    def read_file(self, container_name: str, blob_name: str, **kwargs) -> bytes:
        """
        Download blob content as bytes.
        
        Args:
            container_name (str): Name of the container
            blob_name (str): Name of the blob
            
        Returns:
            bytes: Blob content as bytes
        """
    
    def get_file(self, file_path: str, container_name: str, blob_name: str, **kwargs) -> None:
        """
        Download blob to local file.
        
        Args:
            file_path (str): Local file path to save blob
            container_name (str): Name of the container
            blob_name (str): Name of the blob
        """
    
    def upload(
        self,
        container_name: str,
        blob_name: str,
        data: Any,
        blob_type: str = "BlockBlob",
        length: int | None = None,
        **kwargs
    ) -> dict[str, Any]:
        """
        Upload data to blob with advanced options.
        
        Args:
            container_name (str): Name of the container
            blob_name (str): Name of the blob
            data: Data to upload
            blob_type (str): Type of blob (BlockBlob, PageBlob, AppendBlob)
            length (int, optional): Length of data
            
        Returns:
            dict: Upload response metadata
        """
    
    def download(
        self,
        container_name: str,
        blob_name: str,
        offset: int | None = None,
        length: int | None = None,
        **kwargs
    ) -> StorageStreamDownloader:
        """
        Download blob with range support.
        
        Args:
            container_name (str): Name of the container
            blob_name (str): Name of the blob
            offset (int, optional): Start offset for partial download
            length (int, optional): Number of bytes to download
            
        Returns:
            StorageStreamDownloader: Stream downloader object
        """
    
    def create_container(self, container_name: str, **kwargs) -> None:
        """
        Create a new container.
        
        Args:
            container_name (str): Name of the container to create
        """
    
    def delete_container(self, container_name: str, **kwargs) -> None:
        """
        Delete an existing container.
        
        Args:
            container_name (str): Name of the container to delete
        """
    
    def delete_blobs(self, container_name: str, *blobs, **kwargs) -> None:
        """
        Delete multiple blobs from container.
        
        Args:
            container_name (str): Name of the container
            *blobs: Variable number of blob names to delete
        """
    
    def copy_blobs(
        self,
        source_container: str,
        source_blob: str,
        destination_container: str,
        destination_blob: str,
        **kwargs
    ) -> None:
        """
        Copy blob from source to destination.
        
        Args:
            source_container (str): Source container name
            source_blob (str): Source blob name
            destination_container (str): Destination container name
            destination_blob (str): Destination blob name
        """
    
    def delete_file(
        self,
        container_name: str,
        blob_name: str,
        is_prefix: bool = False,
        ignore_if_missing: bool = False,
        **kwargs
    ) -> None:
        """
        Delete blob(s) from container.
        
        Args:
            container_name (str): Name of the container
            blob_name (str): Name of the blob or prefix
            is_prefix (bool): Whether to delete all blobs with prefix
            ignore_if_missing (bool): Don't raise error if blob doesn't exist
        """

Async Blob Storage Hook

Async version of the WASB hook for deferrable operations and high-performance scenarios.

class WasbAsyncHook(WasbHook):
    """
    Async hook for Azure Blob Storage operations.
    
    Provides async methods for blob operations with improved performance
    for deferrable tasks and high-concurrency scenarios.
    """
    
    async def get_async_conn(self) -> AsyncBlobServiceClient:
        """Get async Azure Blob Service client."""
    
    async def check_for_blob_async(self, container_name: str, blob_name: str, **kwargs) -> bool:
        """Async version of check_for_blob."""
    
    async def get_blobs_list_async(
        self,
        container_name: str,
        prefix: str | None = None,
        include: list | None = None,
        delimiter: str = "/",
        **kwargs
    ) -> list:
        """Async version of get_blobs_list."""
    
    async def check_for_prefix_async(self, container_name: str, prefix: str, **kwargs) -> bool:
        """Async version of check_for_prefix."""
        container_name: str, 
        blob_name: str,
        **kwargs
    ) -> None:
        """
        Upload string data as a blob.
        
        Args:
            string_data (str): String content to upload
            container_name (str): Target container name
            blob_name (str): Target blob name  
        """
    
    def read_file(self, container_name: str, blob_name: str, **kwargs) -> bytes:
        """
        Download blob content as bytes.
        
        Args:
            container_name (str): Container name
            blob_name (str): Blob name
            
        Returns:
            bytes: Blob content
        """
    
    def delete_file(
        self, 
        container_name: str, 
        blob_name: str, 
        is_prefix: bool = False, 
        ignore_if_missing: bool = False
    ) -> None:
        """
        Delete a blob from the container.
        
        Args:
            container_name (str): Container name
            blob_name (str): Blob name or prefix
            is_prefix (bool): Whether to delete all blobs with this prefix
            ignore_if_missing (bool): Don't raise error if blob doesn't exist
        """
    
    def get_file(
        self, 
        file_path: str, 
        container_name: str, 
        blob_name: str, 
        **kwargs
    ) -> None:
        """
        Download a blob to local file.
        
        Args:
            file_path (str): Local file path to save to
            container_name (str): Container name  
            blob_name (str): Blob name
        """
    
    def create_container(self, container_name: str, **kwargs) -> None:
        """
        Create a new container.
        
        Args:
            container_name (str): Name of container to create
        """
    
    def delete_container(self, container_name: str, **kwargs) -> None:
        """
        Delete a container and all its blobs.
        
        Args:
            container_name (str): Name of container to delete
        """

Async Blob Storage Hook

Asynchronous version of the blob storage hook for non-blocking operations.

class WasbAsyncHook(WasbHook):
    """Async hook for Azure Blob Storage operations."""
    
    async def get_conn(self) -> BlobServiceClient:
        """Get authenticated async Azure Blob Service client."""
    
    async def check_for_blob(self, container_name: str, blob_name: str) -> bool:
        """Async check if a blob exists."""

Blob Deletion Operator

Operator for deleting blobs from Azure Blob Storage containers.

class WasbDeleteBlobOperator(BaseOperator):
    """
    Delete blobs from Azure Blob Storage.
    
    Supports deleting single blobs or multiple blobs using prefix matching.
    """
    
    def __init__(
        self,
        container_name: str,
        blob_name: str,
        wasb_conn_id: str = "wasb_default",
        is_prefix: bool = False,
        ignore_if_missing: bool = False,
        **kwargs
    ):
        """
        Initialize blob deletion operator.
        
        Args:
            container_name (str): Azure container name
            blob_name (str): Blob name or prefix to delete
            wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
            is_prefix (bool): Whether to delete all blobs with this prefix  
            ignore_if_missing (bool): Don't fail if blob doesn't exist
        """

Blob Existence Sensor

Sensor that waits for a blob to exist in Azure Blob Storage.

class WasbBlobSensor(BaseSensorOperator):
    """
    Sensor that waits for a blob to exist in Azure Blob Storage.
    
    Polls the blob storage container at regular intervals until the specified
    blob is found or timeout is reached.
    """
    
    def __init__(
        self,
        container_name: str,
        blob_name: str,
        wasb_conn_id: str = "wasb_default", 
        check_options: dict | None = None,
        **kwargs
    ):
        """
        Initialize blob sensor.
        
        Args:
            container_name (str): Azure container name to monitor
            blob_name (str): Blob name to wait for
            wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
            check_options (dict): Additional options for blob checking
        """
    
    def poke(self, context: dict) -> bool:
        """Check if the blob exists."""

Blob Prefix Sensor

Sensor that waits for blobs matching a prefix pattern.

class WasbPrefixSensor(BaseSensorOperator):
    """
    Sensor that waits for blobs with a specific prefix in Azure Blob Storage.
    
    Useful for waiting for multiple files or files with unknown exact names
    but known prefix patterns.
    """
    
    def __init__(
        self,
        container_name: str,
        prefix: str,
        wasb_conn_id: str = "wasb_default",
        **kwargs  
    ):
        """
        Initialize prefix sensor.
        
        Args:
            container_name (str): Azure container name to monitor
            prefix (str): Blob name prefix to match
            wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
        """

Async Blob Triggers

Deferrable triggers for blob monitoring that don't block worker slots.

class WasbBlobSensorTrigger(BaseTrigger):
    """Async trigger for blob existence monitoring."""
    
    def __init__(
        self,
        container_name: str,
        blob_name: str, 
        wasb_conn_id: str,
        poke_interval: float = 60,
        **kwargs
    ):
        """
        Initialize blob sensor trigger.
        
        Args:
            container_name (str): Container name to monitor
            blob_name (str): Blob name to wait for
            wasb_conn_id (str): Connection ID
            poke_interval (float): Polling interval in seconds
        """

class WasbPrefixSensorTrigger(BaseTrigger):
    """Async trigger for blob prefix monitoring."""
    
    def __init__(
        self,
        container_name: str,
        prefix: str,
        wasb_conn_id: str, 
        poke_interval: float = 60,
        **kwargs
    ):
        """
        Initialize prefix sensor trigger.
        
        Args:
            container_name (str): Container name to monitor  
            prefix (str): Blob prefix to match
            wasb_conn_id (str): Connection ID
            poke_interval (float): Polling interval in seconds
        """

File Transfer Operators

Transfer data between local filesystem and Azure Blob Storage.

class LocalFilesystemToWasbOperator(BaseOperator):
    """Transfer files from local filesystem to Azure Blob Storage."""
    
    def __init__(
        self,
        file_path: str,
        container_name: str, 
        blob_name: str,
        wasb_conn_id: str = "wasb_default",
        create_container: bool = False,
        **kwargs
    ):
        """
        Initialize local to WASB transfer operator.
        
        Args:
            file_path (str): Local file path to upload
            container_name (str): Target container name
            blob_name (str): Target blob name
            wasb_conn_id (str): Connection ID  
            create_container (bool): Create container if it doesn't exist
        """

class SFTPToWasbOperator(BaseOperator):
    """Transfer files from SFTP server to Azure Blob Storage."""
    
    def __init__(
        self,
        sftp_source_path: str,
        container_name: str,
        blob_name: str, 
        sftp_conn_id: str = "sftp_default",
        wasb_conn_id: str = "wasb_default",
        **kwargs
    ):
        """
        Initialize SFTP to WASB transfer operator.
        
        Args:
            sftp_source_path (str): Source file path on SFTP server
            container_name (str): Target container name  
            blob_name (str): Target blob name
            sftp_conn_id (str): SFTP connection ID
            wasb_conn_id (str): WASB connection ID
        """

class S3ToAzureBlobStorageOperator(BaseOperator):
    """Transfer objects from AWS S3 to Azure Blob Storage."""
    
    def __init__(
        self,
        s3_source_key: str,
        container_name: str,
        blob_name: str,
        s3_bucket: str | None = None,
        aws_conn_id: str = "aws_default", 
        wasb_conn_id: str = "wasb_default",
        **kwargs
    ):
        """
        Initialize S3 to Azure Blob transfer operator.
        
        Args:
            s3_source_key (str): S3 object key to transfer
            container_name (str): Target Azure container name
            blob_name (str): Target blob name  
            s3_bucket (str): Source S3 bucket name
            aws_conn_id (str): AWS connection ID
            wasb_conn_id (str): WASB connection ID
        """

Azure Blob Filesystem Interface

fsspec-compatible filesystem interface for Azure Blob Storage.

def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) -> AbstractFileSystem:
    """
    Create Azure Blob FileSystem (fsspec-compatible).
    
    Args:
        conn_id (str): Airflow connection ID for Azure storage
        storage_options (dict): Additional storage configuration options
        
    Returns:
        AbstractFileSystem: fsspec filesystem for Azure Blob Storage
    """

Logging Handler

Task log handler that writes Airflow logs to Azure Blob Storage.

class WasbTaskHandler(FileTaskHandler):
    """Airflow task handler that writes logs to Azure Blob Storage."""
    
    def __init__(
        self,
        base_log_folder: str,
        wasb_log_folder: str,
        wasb_container: str,
        filename_template: str | None = None,
        **kwargs
    ):
        """
        Initialize WASB task handler.
        
        Args:
            base_log_folder (str): Base folder for local logs
            wasb_log_folder (str): WASB folder for remote logs  
            wasb_container (str): WASB container for logs
            filename_template (str): Log filename template
        """

class WasbRemoteLogIO:
    """Low-level I/O operations for WASB logging."""
    
    def upload_log(self, log_content: str, remote_log_location: str) -> None:
        """Upload log content to WASB."""
    
    def download_log(self, remote_log_location: str) -> str:
        """Download log content from WASB."""

Usage Examples

Basic Blob Operations

from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

# Initialize hook
wasb_hook = WasbHook(wasb_conn_id='azure_default')

# Upload a file
wasb_hook.load_file(
    file_path='/path/to/local/file.txt',
    container_name='my-container', 
    blob_name='uploaded-file.txt'
)

# Check if blob exists
exists = wasb_hook.check_for_blob(
    container_name='my-container',
    blob_name='uploaded-file.txt'
)

# Download blob content
content = wasb_hook.read_file(
    container_name='my-container',
    blob_name='uploaded-file.txt'
)

# Delete blob
wasb_hook.delete_file(
    container_name='my-container', 
    blob_name='uploaded-file.txt'
)

Using Sensors in DAGs

from airflow import DAG
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor, WasbPrefixSensor
from datetime import datetime

dag = DAG(
    'blob_monitoring_example',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily'
)

# Wait for specific blob
wait_for_file = WasbBlobSensor(
    task_id='wait_for_input_file',
    container_name='input-data',
    blob_name='daily-export.csv', 
    wasb_conn_id='azure_default',
    timeout=300,
    poke_interval=30,
    dag=dag
)

# Wait for files with prefix
wait_for_batch = WasbPrefixSensor(
    task_id='wait_for_batch_files',
    container_name='batch-data',
    prefix='batch_2024_',
    wasb_conn_id='azure_default', 
    dag=dag
)

Connection Configuration

Azure Blob Storage connections support multiple authentication methods:

Connection Type: wasb

Authentication Options:

  • Account Key: Use storage account name and key
  • SAS Token: Use Shared Access Signature
  • Managed Identity: Use Azure managed identity
  • Service Principal: Use client credentials

Connection Fields:

  • account_name: Azure storage account name
  • account_key: Storage account access key (for key auth)
  • sas_token: Shared access signature (for SAS auth)
  • client_id: Service principal client ID
  • client_secret: Service principal secret
  • tenant_id: Azure tenant ID

Error Handling

# Custom exceptions for blob operations
class AzureBlobStorageException(AirflowException):
    """Base exception for Azure Blob Storage operations."""

class BlobNotFound(AzureBlobStorageException):
    """Raised when a blob is not found."""
    
class ContainerNotFound(AzureBlobStorageException):
    """Raised when a container is not found."""

The Azure Blob Storage integration provides comprehensive functionality for managing blob storage operations within Airflow workflows, with support for both simple file operations and complex data pipeline scenarios.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json