Provider package for Microsoft Azure integrations with Apache Airflow
npx @tessl/cli install tessl/pypi-apache-airflow-providers-microsoft-azure@12.6.0A comprehensive Apache Airflow provider package that enables seamless integration with Microsoft Azure cloud services. This provider offers operators, hooks, sensors, and triggers for orchestrating and automating Azure-based workflows across a wide range of Azure services including Azure Batch, Blob Storage, Container Instances, Cosmos DB, Data Explorer, Data Lake Storage, Data Factory, Key Vault, Service Bus, and Synapse Analytics.
pip install apache-airflow-providers-microsoft-azureBase Azure functionality:
from airflow.providers.microsoft.azure.hooks.base_azure import AzureBaseHookCommon service-specific imports:
# Azure Blob Storage
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor
# Azure Data Factory
from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
# Azure Cosmos DB
from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook
from airflow.providers.microsoft.azure.operators.cosmos import AzureCosmosInsertDocumentOperatorfrom airflow import DAG
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor
from datetime import datetime, timedelta
# Define DAG
dag = DAG(
'azure_workflow_example',
default_args={
'owner': 'data-team',
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='Example Azure workflow',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Wait for a blob to exist
wait_for_file = WasbBlobSensor(
task_id='wait_for_input_file',
container_name='input-data',
blob_name='daily_data.csv',
azure_conn_id='azure_default',
dag=dag
)
# Delete processed blob
cleanup_blob = WasbDeleteBlobOperator(
task_id='cleanup_processed_file',
container_name='processed-data',
blob_name='processed_data.csv',
azure_conn_id='azure_default',
dag=dag
)
wait_for_file >> cleanup_blobThe Azure provider follows Airflow's standard provider architecture with distinct component types:
All components support multiple authentication methods including service principals, managed identities, workload identity federation, and connection strings.
Complete Azure Blob Storage integration with extensive blob operations, container management, and data transfer capabilities. Supports both sync and async operations.
class WasbHook(AzureBaseHook):
"""Hook for Azure Blob Storage operations."""
def get_conn(self) -> BlobServiceClient: ...
def check_for_blob(self, container_name: str, blob_name: str) -> bool: ...
def load_file(self, file_path: str, container_name: str, blob_name: str) -> None: ...
def load_string(self, string_data: str, container_name: str, blob_name: str) -> None: ...
def read_file(self, container_name: str, blob_name: str) -> bytes: ...
def delete_file(self, container_name: str, blob_name: str) -> None: ...Execute and monitor Azure Data Factory pipelines with comprehensive pipeline management, run monitoring, and status tracking capabilities.
class AzureDataFactoryHook(BaseHook):
"""Hook for Azure Data Factory operations."""
def get_conn(self) -> DataFactoryManagementClient: ...
def run_pipeline(self, pipeline_name: str, resource_group_name: str, factory_name: str, **config: Any) -> CreateRunResponse: ...
def get_pipeline_run(self, run_id: str, resource_group_name: str, factory_name: str) -> PipelineRun: ...
def cancel_pipeline_run(self, run_id: str, resource_group_name: str, factory_name: str) -> None: ...Comprehensive Azure Cosmos DB integration supporting database and collection management, document operations, and query execution across all Cosmos DB APIs.
class AzureCosmosDBHook(BaseHook):
"""Hook for Azure Cosmos DB operations."""
def get_conn(self) -> CosmosClient: ...
def create_database(self, database_name: str) -> None: ...
def create_collection(self, collection_name: str, database_name: str) -> None: ...
def upsert_document(self, document: dict, database_name: str, collection_name: str) -> dict: ...
def get_document(self, document_id: str, database_name: str, collection_name: str) -> dict: ...Support for both Azure Data Lake Storage Gen1 and Gen2 with file system operations, directory management, and data upload/download capabilities.
class AzureDataLakeHook(BaseHook):
"""Hook for Azure Data Lake Storage Gen1."""
def get_conn(self) -> core.AzureDLFileSystem: ...
def upload_file(self, local_path: str, remote_path: str, overwrite: bool = True) -> None: ...
def download_file(self, local_path: str, remote_path: str, overwrite: bool = True) -> None: ...class AzureDataLakeStorageV2Hook(BaseHook):
"""Hook for Azure Data Lake Storage Gen2."""
def get_conn(self) -> DataLakeServiceClient: ...
def create_file_system(self, file_system_name: str) -> None: ...
def upload_file(self, file_system_name: str, file_name: str, file_path: str) -> DataLakeFileClient: ...Complete Azure Service Bus integration with queue and topic management, message operations, and subscription handling for reliable messaging scenarios.
class AdminClientHook(BaseAzureServiceBusHook):
"""Hook for Azure Service Bus administrative operations."""
def create_queue(self, queue_name: str, **kwargs: Any) -> None: ...
def create_topic(self, topic_name: str, **kwargs: Any) -> None: ...
def create_subscription(self, topic_name: str, subscription_name: str, **kwargs: Any) -> None: ...class MessageHook(BaseAzureServiceBusHook):
"""Hook for Azure Service Bus message operations."""
def send_message(self, queue_name: str, message: str | ServiceBusMessage, **kwargs: Any) -> None: ...
def receive_message(self, queue_name: str, **kwargs: Any) -> list[ServiceBusReceivedMessage]: ...Container orchestration capabilities including Azure Container Instances, Container Registry, and Container Volume management for containerized workloads.
class AzureContainerInstanceHook(BaseHook):
"""Hook for Azure Container Instances management."""
def get_conn(self) -> ContainerInstanceManagementClient: ...
def create_or_update(self, resource_group_name: str, container_group_name: str, container_group: ContainerGroup) -> ContainerGroup: ...
def get_logs(self, resource_group_name: str, container_group_name: str, container_name: str) -> str: ...Execute Spark jobs and manage pipeline operations on Azure Synapse Analytics for big data processing and analytics workloads.
class AzureSynapseHook(BaseAzureSynapseHook):
"""Hook for Azure Synapse Spark operations."""
def get_conn(self) -> SparkClient: ...
def run_spark_job(self, payload: dict) -> dict: ...
def get_job_run_status(self, job_id: int) -> str: ...class AzureSynapsePipelineHook(BaseAzureSynapseHook):
"""Hook for Azure Synapse Pipeline operations."""
def run_pipeline(self, pipeline_name: str, **config: Any) -> CreateRunResponse: ...
def get_pipeline_run_status(self, run_id: str) -> str: ...Access Microsoft Graph API for Microsoft 365 services integration with support for various Graph API endpoints and operations.
class KiotaRequestAdapterHook(BaseHook):
"""Hook for Microsoft Graph API using Kiota request adapter."""
def get_conn(self) -> RequestAdapter: ...
def test_connection(self) -> tuple[bool, str]: ...Specialized operators for transferring data between Azure services and external systems including local filesystem, SFTP, Oracle databases, and AWS S3.
class LocalFilesystemToWasbOperator(BaseOperator):
"""Transfer files from local filesystem to Azure Blob Storage."""
def __init__(self, file_path: str, container_name: str, blob_name: str, **kwargs): ...class S3ToAzureBlobStorageOperator(BaseOperator):
"""Transfer objects from AWS S3 to Azure Blob Storage."""
def __init__(self, s3_source_key: str, container_name: str, blob_name: str, **kwargs): ...Execute KQL queries and manage connections to Azure Data Explorer clusters for real-time analytics on large volumes of data.
class AzureDataExplorerHook(BaseHook):
"""Hook for Azure Data Explorer (Kusto) operations."""
def get_conn(self) -> KustoClient: ...
def run_query(self, query: str, database: str, options: dict | None = None) -> KustoResponseDataSet: ...class AzureDataExplorerQueryOperator(BaseOperator):
"""Operator for querying Azure Data Explorer (Kusto)."""
def __init__(self, *, query: str, database: str, **kwargs): ...Manage Power BI datasets, trigger refreshes, and monitor workspace operations through Microsoft Graph API integration.
class PowerBIHook(KiotaRequestAdapterHook):
"""Hook for Power BI operations via Microsoft Graph API."""
async def trigger_dataset_refresh(self, dataset_id: str, group_id: str, **kwargs) -> str: ...
async def get_refresh_details_by_refresh_id(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> dict: ...class PowerBIDatasetRefreshOperator(BaseOperator):
"""Refreshes a Power BI dataset."""
def __init__(self, *, dataset_id: str, group_id: str, **kwargs): ...Create and manage compute pools, jobs, and tasks for large-scale parallel and high-performance computing applications in the cloud.
class AzureBatchHook(BaseHook):
"""Hook for Azure Batch APIs."""
def get_conn(self) -> BatchServiceClient: ...
def create_pool(self, pool: PoolAddParameter) -> None: ...
def create_job(self, job: JobAddParameter) -> None: ...
def add_single_task_to_job(self, job_id: str, task: TaskAddParameter) -> None: ...class AzureBatchOperator(BaseOperator):
"""Executes a job on Azure Batch Service."""
def __init__(self, *, batch_pool_id: str, batch_job_id: str, batch_task_command_line: str, **kwargs): ...Manage file shares, directories, and files within Azure Storage with SMB protocol support and REST API operations.
class AzureFileShareHook(BaseHook):
"""Hook for Azure File Share operations."""
def create_share(self, share_name: str, **kwargs) -> bool: ...
def create_directory(self, **kwargs) -> Any: ...
def load_file(self, file_path: str, **kwargs) -> None: ...
def get_file(self, file_path: str, **kwargs) -> None: ...The provider supports multiple Azure authentication mechanisms:
All Azure services use Airflow connections for configuration. The provider supports 18+ different connection types for various Azure services, each with specific configuration requirements and authentication options.
# Base Azure connection information
class AzureBaseHook(BaseHook):
conn_name_attr: str = "azure_conn_id"
default_conn_name: str = "azure_default"
conn_type: str = "azure"
# Common authentication credentials
AzureCredentials = Union[
ServicePrincipal,
ManagedIdentity,
WorkloadIdentity,
DefaultAzureCredential
]
# Pipeline run statuses
class AzureDataFactoryPipelineRunStatus:
QUEUED: str = "Queued"
IN_PROGRESS: str = "InProgress"
SUCCEEDED: str = "Succeeded"
FAILED: str = "Failed"
CANCELLED: str = "Cancelled"
# Synapse job statuses
class AzureSynapseSparkBatchRunStatus:
NOT_STARTED: str = "not_started"
STARTING: str = "starting"
RUNNING: str = "running"
IDLE: str = "idle"
BUSY: str = "busy"
SHUTTING_DOWN: str = "shutting_down"
ERROR: str = "error"
DEAD: str = "dead"
KILLED: str = "killed"
SUCCESS: str = "success"