Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Azure Blob Storage integration providing full blob operations, container management, file transfers, and monitoring capabilities. Supports both synchronous and asynchronous operations with extensive configuration options.
The primary interface for Azure Blob Storage operations, providing authenticated connections and core blob functionality.
class WasbHook(AzureBaseHook):
"""
Hook for Azure Blob Storage (WASB) operations.
Provides methods for blob operations, container management, and file transfers.
Supports multiple authentication methods and connection configurations.
"""
def get_conn(self) -> BlobServiceClient:
"""Get authenticated Azure Blob Service client."""
def check_for_blob(self, container_name: str, blob_name: str, **kwargs) -> bool:
"""
Check if a blob exists in the specified container.
Args:
container_name (str): Name of the container
blob_name (str): Name of the blob to check
Returns:
bool: True if blob exists, False otherwise
"""
def load_file(
self,
file_path: str,
container_name: str,
blob_name: str,
**kwargs
) -> None:
"""
Upload a local file to Azure Blob Storage.
Args:
file_path (str): Path to local file
container_name (str): Target container name
blob_name (str): Target blob name
"""
def load_string(
self,
string_data: str,
container_name: str,
blob_name: str,
**kwargs
) -> None:
"""
Upload string data to Azure Blob Storage.
Args:
string_data (str): String data to upload
container_name (str): Target container name
blob_name (str): Target blob name
"""
def check_for_prefix(self, container_name: str, prefix: str, **kwargs) -> bool:
"""
Check if any blobs exist with the given prefix.
Args:
container_name (str): Name of the container
prefix (str): Prefix to search for
Returns:
bool: True if blobs with prefix exist, False otherwise
"""
def get_blobs_list(
self,
container_name: str,
prefix: str | None = None,
include: list | None = None,
delimiter: str = "/",
**kwargs
) -> list:
"""
List blobs in container with optional prefix filtering.
Args:
container_name (str): Name of the container
prefix (str, optional): Filter blobs by prefix
include (list, optional): Additional properties to include
delimiter (str): Delimiter for blob hierarchy
Returns:
list: List of blob names
"""
def get_blobs_list_recursive(
self,
container_name: str,
prefix: str | None = None,
include: list | None = None,
endswith: str = "",
**kwargs
) -> list:
"""
Recursively list all blobs in container.
Args:
container_name (str): Name of the container
prefix (str, optional): Filter blobs by prefix
include (list, optional): Additional properties to include
endswith (str): Filter blobs ending with string
Returns:
list: List of all blob names recursively
"""
def read_file(self, container_name: str, blob_name: str, **kwargs) -> bytes:
"""
Download blob content as bytes.
Args:
container_name (str): Name of the container
blob_name (str): Name of the blob
Returns:
bytes: Blob content as bytes
"""
def get_file(self, file_path: str, container_name: str, blob_name: str, **kwargs) -> None:
"""
Download blob to local file.
Args:
file_path (str): Local file path to save blob
container_name (str): Name of the container
blob_name (str): Name of the blob
"""
def upload(
self,
container_name: str,
blob_name: str,
data: Any,
blob_type: str = "BlockBlob",
length: int | None = None,
**kwargs
) -> dict[str, Any]:
"""
Upload data to blob with advanced options.
Args:
container_name (str): Name of the container
blob_name (str): Name of the blob
data: Data to upload
blob_type (str): Type of blob (BlockBlob, PageBlob, AppendBlob)
length (int, optional): Length of data
Returns:
dict: Upload response metadata
"""
def download(
self,
container_name: str,
blob_name: str,
offset: int | None = None,
length: int | None = None,
**kwargs
) -> StorageStreamDownloader:
"""
Download blob with range support.
Args:
container_name (str): Name of the container
blob_name (str): Name of the blob
offset (int, optional): Start offset for partial download
length (int, optional): Number of bytes to download
Returns:
StorageStreamDownloader: Stream downloader object
"""
def create_container(self, container_name: str, **kwargs) -> None:
"""
Create a new container.
Args:
container_name (str): Name of the container to create
"""
def delete_container(self, container_name: str, **kwargs) -> None:
"""
Delete an existing container.
Args:
container_name (str): Name of the container to delete
"""
def delete_blobs(self, container_name: str, *blobs, **kwargs) -> None:
"""
Delete multiple blobs from container.
Args:
container_name (str): Name of the container
*blobs: Variable number of blob names to delete
"""
def copy_blobs(
self,
source_container: str,
source_blob: str,
destination_container: str,
destination_blob: str,
**kwargs
) -> None:
"""
Copy blob from source to destination.
Args:
source_container (str): Source container name
source_blob (str): Source blob name
destination_container (str): Destination container name
destination_blob (str): Destination blob name
"""
def delete_file(
self,
container_name: str,
blob_name: str,
is_prefix: bool = False,
ignore_if_missing: bool = False,
**kwargs
) -> None:
"""
Delete blob(s) from container.
Args:
container_name (str): Name of the container
blob_name (str): Name of the blob or prefix
is_prefix (bool): Whether to delete all blobs with prefix
ignore_if_missing (bool): Don't raise error if blob doesn't exist
"""Async version of the WASB hook for deferrable operations and high-performance scenarios.
class WasbAsyncHook(WasbHook):
"""
Async hook for Azure Blob Storage operations.
Provides async methods for blob operations with improved performance
for deferrable tasks and high-concurrency scenarios.
"""
async def get_async_conn(self) -> AsyncBlobServiceClient:
"""Get async Azure Blob Service client."""
async def check_for_blob_async(self, container_name: str, blob_name: str, **kwargs) -> bool:
"""Async version of check_for_blob."""
async def get_blobs_list_async(
self,
container_name: str,
prefix: str | None = None,
include: list | None = None,
delimiter: str = "/",
**kwargs
) -> list:
"""Async version of get_blobs_list."""
async def check_for_prefix_async(self, container_name: str, prefix: str, **kwargs) -> bool:
"""Async version of check_for_prefix."""
container_name: str,
blob_name: str,
**kwargs
) -> None:
"""
Upload string data as a blob.
Args:
string_data (str): String content to upload
container_name (str): Target container name
blob_name (str): Target blob name
"""
def read_file(self, container_name: str, blob_name: str, **kwargs) -> bytes:
"""
Download blob content as bytes.
Args:
container_name (str): Container name
blob_name (str): Blob name
Returns:
bytes: Blob content
"""
def delete_file(
self,
container_name: str,
blob_name: str,
is_prefix: bool = False,
ignore_if_missing: bool = False
) -> None:
"""
Delete a blob from the container.
Args:
container_name (str): Container name
blob_name (str): Blob name or prefix
is_prefix (bool): Whether to delete all blobs with this prefix
ignore_if_missing (bool): Don't raise error if blob doesn't exist
"""
def get_file(
self,
file_path: str,
container_name: str,
blob_name: str,
**kwargs
) -> None:
"""
Download a blob to local file.
Args:
file_path (str): Local file path to save to
container_name (str): Container name
blob_name (str): Blob name
"""
def create_container(self, container_name: str, **kwargs) -> None:
"""
Create a new container.
Args:
container_name (str): Name of container to create
"""
def delete_container(self, container_name: str, **kwargs) -> None:
"""
Delete a container and all its blobs.
Args:
container_name (str): Name of container to delete
"""Asynchronous version of the blob storage hook for non-blocking operations.
class WasbAsyncHook(WasbHook):
"""Async hook for Azure Blob Storage operations."""
async def get_conn(self) -> BlobServiceClient:
"""Get authenticated async Azure Blob Service client."""
async def check_for_blob(self, container_name: str, blob_name: str) -> bool:
"""Async check if a blob exists."""Operator for deleting blobs from Azure Blob Storage containers.
class WasbDeleteBlobOperator(BaseOperator):
"""
Delete blobs from Azure Blob Storage.
Supports deleting single blobs or multiple blobs using prefix matching.
"""
def __init__(
self,
container_name: str,
blob_name: str,
wasb_conn_id: str = "wasb_default",
is_prefix: bool = False,
ignore_if_missing: bool = False,
**kwargs
):
"""
Initialize blob deletion operator.
Args:
container_name (str): Azure container name
blob_name (str): Blob name or prefix to delete
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
is_prefix (bool): Whether to delete all blobs with this prefix
ignore_if_missing (bool): Don't fail if blob doesn't exist
"""Sensor that waits for a blob to exist in Azure Blob Storage.
class WasbBlobSensor(BaseSensorOperator):
"""
Sensor that waits for a blob to exist in Azure Blob Storage.
Polls the blob storage container at regular intervals until the specified
blob is found or timeout is reached.
"""
def __init__(
self,
container_name: str,
blob_name: str,
wasb_conn_id: str = "wasb_default",
check_options: dict | None = None,
**kwargs
):
"""
Initialize blob sensor.
Args:
container_name (str): Azure container name to monitor
blob_name (str): Blob name to wait for
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
check_options (dict): Additional options for blob checking
"""
def poke(self, context: dict) -> bool:
"""Check if the blob exists."""Sensor that waits for blobs matching a prefix pattern.
class WasbPrefixSensor(BaseSensorOperator):
"""
Sensor that waits for blobs with a specific prefix in Azure Blob Storage.
Useful for waiting for multiple files or files with unknown exact names
but known prefix patterns.
"""
def __init__(
self,
container_name: str,
prefix: str,
wasb_conn_id: str = "wasb_default",
**kwargs
):
"""
Initialize prefix sensor.
Args:
container_name (str): Azure container name to monitor
prefix (str): Blob name prefix to match
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
"""Deferrable triggers for blob monitoring that don't block worker slots.
class WasbBlobSensorTrigger(BaseTrigger):
"""Async trigger for blob existence monitoring."""
def __init__(
self,
container_name: str,
blob_name: str,
wasb_conn_id: str,
poke_interval: float = 60,
**kwargs
):
"""
Initialize blob sensor trigger.
Args:
container_name (str): Container name to monitor
blob_name (str): Blob name to wait for
wasb_conn_id (str): Connection ID
poke_interval (float): Polling interval in seconds
"""
class WasbPrefixSensorTrigger(BaseTrigger):
"""Async trigger for blob prefix monitoring."""
def __init__(
self,
container_name: str,
prefix: str,
wasb_conn_id: str,
poke_interval: float = 60,
**kwargs
):
"""
Initialize prefix sensor trigger.
Args:
container_name (str): Container name to monitor
prefix (str): Blob prefix to match
wasb_conn_id (str): Connection ID
poke_interval (float): Polling interval in seconds
"""Transfer data between local filesystem and Azure Blob Storage.
class LocalFilesystemToWasbOperator(BaseOperator):
"""Transfer files from local filesystem to Azure Blob Storage."""
def __init__(
self,
file_path: str,
container_name: str,
blob_name: str,
wasb_conn_id: str = "wasb_default",
create_container: bool = False,
**kwargs
):
"""
Initialize local to WASB transfer operator.
Args:
file_path (str): Local file path to upload
container_name (str): Target container name
blob_name (str): Target blob name
wasb_conn_id (str): Connection ID
create_container (bool): Create container if it doesn't exist
"""
class SFTPToWasbOperator(BaseOperator):
"""Transfer files from SFTP server to Azure Blob Storage."""
def __init__(
self,
sftp_source_path: str,
container_name: str,
blob_name: str,
sftp_conn_id: str = "sftp_default",
wasb_conn_id: str = "wasb_default",
**kwargs
):
"""
Initialize SFTP to WASB transfer operator.
Args:
sftp_source_path (str): Source file path on SFTP server
container_name (str): Target container name
blob_name (str): Target blob name
sftp_conn_id (str): SFTP connection ID
wasb_conn_id (str): WASB connection ID
"""
class S3ToAzureBlobStorageOperator(BaseOperator):
"""Transfer objects from AWS S3 to Azure Blob Storage."""
def __init__(
self,
s3_source_key: str,
container_name: str,
blob_name: str,
s3_bucket: str | None = None,
aws_conn_id: str = "aws_default",
wasb_conn_id: str = "wasb_default",
**kwargs
):
"""
Initialize S3 to Azure Blob transfer operator.
Args:
s3_source_key (str): S3 object key to transfer
container_name (str): Target Azure container name
blob_name (str): Target blob name
s3_bucket (str): Source S3 bucket name
aws_conn_id (str): AWS connection ID
wasb_conn_id (str): WASB connection ID
"""fsspec-compatible filesystem interface for Azure Blob Storage.
def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) -> AbstractFileSystem:
"""
Create Azure Blob FileSystem (fsspec-compatible).
Args:
conn_id (str): Airflow connection ID for Azure storage
storage_options (dict): Additional storage configuration options
Returns:
AbstractFileSystem: fsspec filesystem for Azure Blob Storage
"""Task log handler that writes Airflow logs to Azure Blob Storage.
class WasbTaskHandler(FileTaskHandler):
"""Airflow task handler that writes logs to Azure Blob Storage."""
def __init__(
self,
base_log_folder: str,
wasb_log_folder: str,
wasb_container: str,
filename_template: str | None = None,
**kwargs
):
"""
Initialize WASB task handler.
Args:
base_log_folder (str): Base folder for local logs
wasb_log_folder (str): WASB folder for remote logs
wasb_container (str): WASB container for logs
filename_template (str): Log filename template
"""
class WasbRemoteLogIO:
"""Low-level I/O operations for WASB logging."""
def upload_log(self, log_content: str, remote_log_location: str) -> None:
"""Upload log content to WASB."""
def download_log(self, remote_log_location: str) -> str:
"""Download log content from WASB."""from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
# Initialize hook
wasb_hook = WasbHook(wasb_conn_id='azure_default')
# Upload a file
wasb_hook.load_file(
file_path='/path/to/local/file.txt',
container_name='my-container',
blob_name='uploaded-file.txt'
)
# Check if blob exists
exists = wasb_hook.check_for_blob(
container_name='my-container',
blob_name='uploaded-file.txt'
)
# Download blob content
content = wasb_hook.read_file(
container_name='my-container',
blob_name='uploaded-file.txt'
)
# Delete blob
wasb_hook.delete_file(
container_name='my-container',
blob_name='uploaded-file.txt'
)from airflow import DAG
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor, WasbPrefixSensor
from datetime import datetime
dag = DAG(
'blob_monitoring_example',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily'
)
# Wait for specific blob
wait_for_file = WasbBlobSensor(
task_id='wait_for_input_file',
container_name='input-data',
blob_name='daily-export.csv',
wasb_conn_id='azure_default',
timeout=300,
poke_interval=30,
dag=dag
)
# Wait for files with prefix
wait_for_batch = WasbPrefixSensor(
task_id='wait_for_batch_files',
container_name='batch-data',
prefix='batch_2024_',
wasb_conn_id='azure_default',
dag=dag
)Azure Blob Storage connections support multiple authentication methods:
Connection Type: wasb
Authentication Options:
Connection Fields:
account_name: Azure storage account nameaccount_key: Storage account access key (for key auth)sas_token: Shared access signature (for SAS auth)client_id: Service principal client IDclient_secret: Service principal secrettenant_id: Azure tenant ID# Custom exceptions for blob operations
class AzureBlobStorageException(AirflowException):
"""Base exception for Azure Blob Storage operations."""
class BlobNotFound(AzureBlobStorageException):
"""Raised when a blob is not found."""
class ContainerNotFound(AzureBlobStorageException):
"""Raised when a container is not found."""The Azure Blob Storage integration provides comprehensive functionality for managing blob storage operations within Airflow workflows, with support for both simple file operations and complex data pipeline scenarios.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure