or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md
tile.json

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-microsoft-azure@12.6.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-microsoft-azure@12.6.0

index.mddocs/

Apache Airflow Microsoft Azure Provider

A comprehensive Apache Airflow provider package that enables seamless integration with Microsoft Azure cloud services. This provider offers operators, hooks, sensors, and triggers for orchestrating and automating Azure-based workflows across a wide range of Azure services including Azure Batch, Blob Storage, Container Instances, Cosmos DB, Data Explorer, Data Lake Storage, Data Factory, Key Vault, Service Bus, and Synapse Analytics.

Package Information

  • Package Name: apache-airflow-providers-microsoft-azure
  • Language: Python
  • Installation: pip install apache-airflow-providers-microsoft-azure
  • Provider Type: Apache Airflow Provider Package
  • License: Apache-2.0
  • Supported Azure Services: 19+ services including Batch, Blob Storage, Container services, Cosmos DB, Data Explorer, Data Factory, File Share, PowerBI, and more

Core Imports

Base Azure functionality:

from airflow.providers.microsoft.azure.hooks.base_azure import AzureBaseHook

Common service-specific imports:

# Azure Blob Storage
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor

# Azure Data Factory
from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator

# Azure Cosmos DB
from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook
from airflow.providers.microsoft.azure.operators.cosmos import AzureCosmosInsertDocumentOperator

Basic Usage

from airflow import DAG
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor
from datetime import datetime, timedelta

# Define DAG
dag = DAG(
    'azure_workflow_example',
    default_args={
        'owner': 'data-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    description='Example Azure workflow',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Wait for a blob to exist
wait_for_file = WasbBlobSensor(
    task_id='wait_for_input_file',
    container_name='input-data',
    blob_name='daily_data.csv',
    azure_conn_id='azure_default',
    dag=dag
)

# Delete processed blob
cleanup_blob = WasbDeleteBlobOperator(
    task_id='cleanup_processed_file',
    container_name='processed-data',
    blob_name='processed_data.csv',
    azure_conn_id='azure_default',
    dag=dag
)

wait_for_file >> cleanup_blob

Architecture

The Azure provider follows Airflow's standard provider architecture with distinct component types:

  • Hooks: Authenticated connections to Azure services, handling credentials and API clients
  • Operators: Task executors that perform actions on Azure resources (create, delete, run, etc.)
  • Sensors: Monitors that wait for specific conditions in Azure services
  • Triggers: Async/deferrable components for long-running operations
  • Transfers: Specialized operators for moving data between Azure and other systems

All components support multiple authentication methods including service principals, managed identities, workload identity federation, and connection strings.

Capabilities

Azure Blob Storage (WASB)

Complete Azure Blob Storage integration with extensive blob operations, container management, and data transfer capabilities. Supports both sync and async operations.

class WasbHook(AzureBaseHook):
    """Hook for Azure Blob Storage operations."""
    
    def get_conn(self) -> BlobServiceClient: ...
    def check_for_blob(self, container_name: str, blob_name: str) -> bool: ...
    def load_file(self, file_path: str, container_name: str, blob_name: str) -> None: ...
    def load_string(self, string_data: str, container_name: str, blob_name: str) -> None: ...
    def read_file(self, container_name: str, blob_name: str) -> bytes: ...
    def delete_file(self, container_name: str, blob_name: str) -> None: ...

Azure Blob Storage

Azure Data Factory

Execute and monitor Azure Data Factory pipelines with comprehensive pipeline management, run monitoring, and status tracking capabilities.

class AzureDataFactoryHook(BaseHook):
    """Hook for Azure Data Factory operations."""
    
    def get_conn(self) -> DataFactoryManagementClient: ...
    def run_pipeline(self, pipeline_name: str, resource_group_name: str, factory_name: str, **config: Any) -> CreateRunResponse: ...
    def get_pipeline_run(self, run_id: str, resource_group_name: str, factory_name: str) -> PipelineRun: ...
    def cancel_pipeline_run(self, run_id: str, resource_group_name: str, factory_name: str) -> None: ...

Azure Data Factory

Azure Cosmos DB

Comprehensive Azure Cosmos DB integration supporting database and collection management, document operations, and query execution across all Cosmos DB APIs.

class AzureCosmosDBHook(BaseHook):
    """Hook for Azure Cosmos DB operations."""
    
    def get_conn(self) -> CosmosClient: ...
    def create_database(self, database_name: str) -> None: ...
    def create_collection(self, collection_name: str, database_name: str) -> None: ...
    def upsert_document(self, document: dict, database_name: str, collection_name: str) -> dict: ...
    def get_document(self, document_id: str, database_name: str, collection_name: str) -> dict: ...

Azure Cosmos DB

Azure Data Lake Storage

Support for both Azure Data Lake Storage Gen1 and Gen2 with file system operations, directory management, and data upload/download capabilities.

class AzureDataLakeHook(BaseHook):
    """Hook for Azure Data Lake Storage Gen1."""
    
    def get_conn(self) -> core.AzureDLFileSystem: ...
    def upload_file(self, local_path: str, remote_path: str, overwrite: bool = True) -> None: ...
    def download_file(self, local_path: str, remote_path: str, overwrite: bool = True) -> None: ...
class AzureDataLakeStorageV2Hook(BaseHook):
    """Hook for Azure Data Lake Storage Gen2."""
    
    def get_conn(self) -> DataLakeServiceClient: ...
    def create_file_system(self, file_system_name: str) -> None: ...
    def upload_file(self, file_system_name: str, file_name: str, file_path: str) -> DataLakeFileClient: ...

Azure Data Lake Storage

Azure Service Bus

Complete Azure Service Bus integration with queue and topic management, message operations, and subscription handling for reliable messaging scenarios.

class AdminClientHook(BaseAzureServiceBusHook):
    """Hook for Azure Service Bus administrative operations."""
    
    def create_queue(self, queue_name: str, **kwargs: Any) -> None: ...
    def create_topic(self, topic_name: str, **kwargs: Any) -> None: ...
    def create_subscription(self, topic_name: str, subscription_name: str, **kwargs: Any) -> None: ...
class MessageHook(BaseAzureServiceBusHook):
    """Hook for Azure Service Bus message operations."""
    
    def send_message(self, queue_name: str, message: str | ServiceBusMessage, **kwargs: Any) -> None: ...
    def receive_message(self, queue_name: str, **kwargs: Any) -> list[ServiceBusReceivedMessage]: ...

Azure Service Bus

Azure Container Services

Container orchestration capabilities including Azure Container Instances, Container Registry, and Container Volume management for containerized workloads.

class AzureContainerInstanceHook(BaseHook):
    """Hook for Azure Container Instances management."""
    
    def get_conn(self) -> ContainerInstanceManagementClient: ...
    def create_or_update(self, resource_group_name: str, container_group_name: str, container_group: ContainerGroup) -> ContainerGroup: ...
    def get_logs(self, resource_group_name: str, container_group_name: str, container_name: str) -> str: ...

Azure Container Services

Azure Synapse Analytics

Execute Spark jobs and manage pipeline operations on Azure Synapse Analytics for big data processing and analytics workloads.

class AzureSynapseHook(BaseAzureSynapseHook):
    """Hook for Azure Synapse Spark operations."""
    
    def get_conn(self) -> SparkClient: ...
    def run_spark_job(self, payload: dict) -> dict: ...
    def get_job_run_status(self, job_id: int) -> str: ...
class AzureSynapsePipelineHook(BaseAzureSynapseHook):
    """Hook for Azure Synapse Pipeline operations."""
    
    def run_pipeline(self, pipeline_name: str, **config: Any) -> CreateRunResponse: ...
    def get_pipeline_run_status(self, run_id: str) -> str: ...

Azure Synapse Analytics

Microsoft Graph API

Access Microsoft Graph API for Microsoft 365 services integration with support for various Graph API endpoints and operations.

class KiotaRequestAdapterHook(BaseHook):
    """Hook for Microsoft Graph API using Kiota request adapter."""
    
    def get_conn(self) -> RequestAdapter: ...
    def test_connection(self) -> tuple[bool, str]: ...

Microsoft Graph API

Data Transfer Operations

Specialized operators for transferring data between Azure services and external systems including local filesystem, SFTP, Oracle databases, and AWS S3.

class LocalFilesystemToWasbOperator(BaseOperator):
    """Transfer files from local filesystem to Azure Blob Storage."""
    
    def __init__(self, file_path: str, container_name: str, blob_name: str, **kwargs): ...
class S3ToAzureBlobStorageOperator(BaseOperator):
    """Transfer objects from AWS S3 to Azure Blob Storage."""
    
    def __init__(self, s3_source_key: str, container_name: str, blob_name: str, **kwargs): ...

Data Transfer Operations

Azure Data Explorer (ADX)

Execute KQL queries and manage connections to Azure Data Explorer clusters for real-time analytics on large volumes of data.

class AzureDataExplorerHook(BaseHook):
    """Hook for Azure Data Explorer (Kusto) operations."""
    
    def get_conn(self) -> KustoClient: ...
    def run_query(self, query: str, database: str, options: dict | None = None) -> KustoResponseDataSet: ...
class AzureDataExplorerQueryOperator(BaseOperator):
    """Operator for querying Azure Data Explorer (Kusto)."""
    
    def __init__(self, *, query: str, database: str, **kwargs): ...

Azure Data Explorer

Microsoft Power BI

Manage Power BI datasets, trigger refreshes, and monitor workspace operations through Microsoft Graph API integration.

class PowerBIHook(KiotaRequestAdapterHook):
    """Hook for Power BI operations via Microsoft Graph API."""
    
    async def trigger_dataset_refresh(self, dataset_id: str, group_id: str, **kwargs) -> str: ...
    async def get_refresh_details_by_refresh_id(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> dict: ...
class PowerBIDatasetRefreshOperator(BaseOperator):
    """Refreshes a Power BI dataset."""
    
    def __init__(self, *, dataset_id: str, group_id: str, **kwargs): ...

Microsoft Power BI

Azure Batch

Create and manage compute pools, jobs, and tasks for large-scale parallel and high-performance computing applications in the cloud.

class AzureBatchHook(BaseHook):
    """Hook for Azure Batch APIs."""
    
    def get_conn(self) -> BatchServiceClient: ...
    def create_pool(self, pool: PoolAddParameter) -> None: ...
    def create_job(self, job: JobAddParameter) -> None: ...
    def add_single_task_to_job(self, job_id: str, task: TaskAddParameter) -> None: ...
class AzureBatchOperator(BaseOperator):
    """Executes a job on Azure Batch Service."""
    
    def __init__(self, *, batch_pool_id: str, batch_job_id: str, batch_task_command_line: str, **kwargs): ...

Azure Batch

Azure File Share

Manage file shares, directories, and files within Azure Storage with SMB protocol support and REST API operations.

class AzureFileShareHook(BaseHook):
    """Hook for Azure File Share operations."""
    
    def create_share(self, share_name: str, **kwargs) -> bool: ...
    def create_directory(self, **kwargs) -> Any: ...
    def load_file(self, file_path: str, **kwargs) -> None: ...
    def get_file(self, file_path: str, **kwargs) -> None: ...

Azure File Share

Authentication Methods

The provider supports multiple Azure authentication mechanisms:

  • Service Principal: Using client ID, client secret, and tenant ID
  • Managed Identity: Azure managed identity for resources
  • Workload Identity: Workload identity federation for Kubernetes
  • DefaultAzureCredential: Azure SDK default credential chain
  • Connection String: Service-specific connection strings
  • Account Key: Storage account key authentication
  • SAS Token: Shared Access Signature tokens

Connection Configuration

All Azure services use Airflow connections for configuration. The provider supports 18+ different connection types for various Azure services, each with specific configuration requirements and authentication options.

Type Definitions

# Base Azure connection information
class AzureBaseHook(BaseHook):
    conn_name_attr: str = "azure_conn_id"
    default_conn_name: str = "azure_default"
    conn_type: str = "azure"

# Common authentication credentials
AzureCredentials = Union[
    ServicePrincipal,
    ManagedIdentity, 
    WorkloadIdentity,
    DefaultAzureCredential
]

# Pipeline run statuses
class AzureDataFactoryPipelineRunStatus:
    QUEUED: str = "Queued"
    IN_PROGRESS: str = "InProgress" 
    SUCCEEDED: str = "Succeeded"
    FAILED: str = "Failed"
    CANCELLED: str = "Cancelled"

# Synapse job statuses  
class AzureSynapseSparkBatchRunStatus:
    NOT_STARTED: str = "not_started"
    STARTING: str = "starting"
    RUNNING: str = "running"
    IDLE: str = "idle"
    BUSY: str = "busy"
    SHUTTING_DOWN: str = "shutting_down"
    ERROR: str = "error"
    DEAD: str = "dead"
    KILLED: str = "killed"
    SUCCESS: str = "success"