CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-factory.mddocs/

Azure Data Factory

Execute and monitor Azure Data Factory pipelines with comprehensive pipeline management, run monitoring, status tracking, and factory management capabilities. Supports both synchronous and asynchronous operations for long-running pipeline executions.

Capabilities

Data Factory Hook

Primary interface for Azure Data Factory operations, providing authenticated connections and pipeline management functionality.

class AzureDataFactoryHook(BaseHook):
    """
    Hook for Azure Data Factory operations.
    
    Provides methods for pipeline execution, monitoring, and factory management.
    Supports multiple authentication methods and connection configurations.
    """
    
    def get_conn(self) -> DataFactoryManagementClient:
        """Get authenticated Azure Data Factory Management client."""
    
    def refresh_conn(self) -> DataFactoryManagementClient:
        """Refresh the Data Factory Management client connection."""
    
    def get_factory(
        self, 
        resource_group_name: str, 
        factory_name: str, 
        **config: Any
    ) -> Factory | None:
        """
        Get Azure Data Factory details.
        
        Args:
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            **config: Additional configuration parameters
            
        Returns:
            Factory: Data Factory details or None if not found
        """
    
    def update_factory(
        self,
        factory: Factory,
        resource_group_name: str, 
        factory_name: str,
        **config: Any
    ) -> Factory:
        """
        Update Azure Data Factory configuration.
        
        Args:
            factory (Factory): Factory configuration object
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            **config: Additional configuration parameters
            
        Returns:
            Factory: Updated factory details
        """
    
    def create_factory(
        self,
        factory: Factory,
        resource_group_name: str,
        factory_name: str, 
        **config: Any
    ) -> Factory:
        """
        Create a new Azure Data Factory.
        
        Args:
            factory (Factory): Factory configuration object
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            **config: Additional configuration parameters
            
        Returns:
            Factory: Created factory details
        """
    
    def delete_factory(
        self,
        resource_group_name: str,
        factory_name: str,
        **config: Any
    ) -> None:
        """
        Delete Azure Data Factory.
        
        Args:
            resource_group_name (str): Azure resource group name  
            factory_name (str): Data Factory name to delete
            **config: Additional configuration parameters
        """
    
    def run_pipeline(
        self,
        pipeline_name: str,
        resource_group_name: str,
        factory_name: str,
        reference_pipeline_run_id: str | None = None,
        is_recovery: bool | None = None,
        start_activity_name: str | None = None,
        start_from_failure: bool | None = None,
        parameters: dict[str, Any] | None = None,
        **config: Any
    ) -> CreateRunResponse:
        """
        Execute Azure Data Factory pipeline.
        
        Args:
            pipeline_name (str): Name of pipeline to execute
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            reference_pipeline_run_id (str): Reference run ID for recovery
            is_recovery (bool): Whether this is a recovery run
            start_activity_name (str): Activity to start from
            start_from_failure (bool): Start from previous failure point
            parameters (dict): Pipeline parameters
            **config: Additional configuration parameters
            
        Returns:
            CreateRunResponse: Pipeline run details with run ID
        """
    
    def get_pipeline_run(
        self,
        run_id: str,
        resource_group_name: str,
        factory_name: str,
        **config: Any
    ) -> PipelineRun:
        """
        Get pipeline run details and status.
        
        Args:
            run_id (str): Pipeline run ID
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            **config: Additional configuration parameters
            
        Returns:
            PipelineRun: Pipeline run details and status
        """
    
    def get_pipeline_run_status(
        self,
        run_id: str,
        resource_group_name: str,
        factory_name: str,
        **config: Any
    ) -> str:
        """
        Get current status of pipeline run.
        
        Args:
            run_id (str): Pipeline run ID
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            **config: Additional configuration parameters
            
        Returns:
            str: Current pipeline run status
        """
    
    def wait_for_pipeline_run_status(
        self,
        run_id: str,
        expected_statuses: str | set[str],
        resource_group_name: str,
        factory_name: str,
        check_interval: int = 60,
        timeout: int = 60 * 60 * 24 * 7,
        **config: Any
    ) -> bool:
        """
        Wait for pipeline run to reach expected status.
        
        Args:
            run_id (str): Pipeline run ID
            expected_statuses (str | set[str]): Expected status(es) to wait for
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            check_interval (int): Polling interval in seconds
            timeout (int): Maximum wait time in seconds
            **config: Additional configuration parameters
            
        Returns:
            bool: True if expected status reached, False if timeout
        """
    
    def cancel_pipeline_run(
        self,
        run_id: str,
        resource_group_name: str,
        factory_name: str,
        **config: Any
    ) -> None:
        """
        Cancel a running pipeline.
        
        Args:
            run_id (str): Pipeline run ID to cancel
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            **config: Additional configuration parameters
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """Test the Data Factory connection."""

Async Data Factory Hook

Asynchronous version for non-blocking operations.

class AzureDataFactoryAsyncHook(AzureDataFactoryHook):
    """Async hook for Azure Data Factory operations."""
    
    async def get_conn(self) -> DataFactoryManagementClient:
        """Get authenticated async Data Factory Management client."""
    
    async def get_pipeline_run_status(
        self,
        run_id: str,
        resource_group_name: str,
        factory_name: str
    ) -> str:
        """Async get pipeline run status."""

Pipeline Run Operator

Operator for executing Azure Data Factory pipelines.

class AzureDataFactoryRunPipelineOperator(BaseOperator):
    """
    Execute Azure Data Factory pipeline.
    
    Runs a pipeline and optionally waits for completion with status monitoring.
    """
    
    def __init__(
        self,
        pipeline_name: str,
        resource_group_name: str,
        factory_name: str,
        azure_data_factory_conn_id: str = "azure_data_factory_default",
        wait_for_termination: bool = True,
        reference_pipeline_run_id: str | None = None,
        is_recovery: bool | None = None,
        start_activity_name: str | None = None,
        start_from_failure: bool | None = None,
        parameters: dict[str, Any] | None = None,
        timeout: int = 60 * 60 * 24 * 7,
        check_interval: int = 60,
        deferrable: bool = False,
        **kwargs
    ):
        """
        Initialize Data Factory pipeline operator.
        
        Args:
            pipeline_name (str): Name of pipeline to execute
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            azure_data_factory_conn_id (str): Airflow connection ID
            wait_for_termination (bool): Wait for pipeline completion
            reference_pipeline_run_id (str): Reference run for recovery
            is_recovery (bool): Whether this is a recovery run
            start_activity_name (str): Activity to start from
            start_from_failure (bool): Start from failure point
            parameters (dict): Pipeline parameters
            timeout (int): Maximum wait time in seconds
            check_interval (int): Status check interval in seconds
            deferrable (bool): Use async execution
        """

Pipeline Status Sensor

Sensor that monitors Azure Data Factory pipeline execution status.

class AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
    """
    Sensor that waits for Azure Data Factory pipeline run to reach target status.
    
    Monitors a pipeline run until it reaches one of the specified target statuses
    or times out.
    """
    
    def __init__(
        self,
        run_id: str,
        resource_group_name: str,
        factory_name: str,
        azure_data_factory_conn_id: str = "azure_data_factory_default",
        target_status: str | list[str] = AzureDataFactoryPipelineRunStatus.SUCCEEDED,
        **kwargs
    ):
        """
        Initialize Data Factory pipeline status sensor.
        
        Args:
            run_id (str): Pipeline run ID to monitor
            resource_group_name (str): Azure resource group name
            factory_name (str): Data Factory name
            azure_data_factory_conn_id (str): Airflow connection ID
            target_status (str | list[str]): Target status(es) to wait for
        """
    
    def poke(self, context: dict) -> bool:
        """Check if pipeline run has reached target status."""

Async Pipeline Triggers

Deferrable triggers for pipeline monitoring.

class AzureDataFactoryTrigger(BaseTrigger):
    """General async trigger for Azure Data Factory operations."""
    
    def __init__(
        self,
        run_id: str,
        resource_group_name: str,
        factory_name: str,
        conn_id: str,
        end_time: datetime,
        check_interval: int = 60,
        **kwargs
    ):
        """
        Initialize Data Factory trigger.
        
        Args:
            run_id (str): Pipeline run ID
            resource_group_name (str): Resource group name
            factory_name (str): Factory name
            conn_id (str): Connection ID
            end_time (datetime): Maximum end time
            check_interval (int): Polling interval in seconds
        """

class ADFPipelineRunStatusSensorTrigger(BaseTrigger):
    """Async trigger for ADF pipeline run status monitoring."""
    
    def __init__(
        self,
        run_id: str,
        resource_group_name: str,
        factory_name: str,
        target_status: str | list[str],
        conn_id: str,
        poke_interval: int = 60,
        **kwargs
    ):
        """
        Initialize pipeline status sensor trigger.
        
        Args:
            run_id (str): Pipeline run ID
            resource_group_name (str): Resource group name  
            factory_name (str): Factory name
            target_status (str | list[str]): Target status(es)
            conn_id (str): Connection ID
            poke_interval (int): Polling interval in seconds
        """

Pipeline Run Link

Extra link for viewing pipeline runs in Azure portal.

class AzureDataFactoryPipelineRunLink(BaseOperatorLink):
    """Link to Azure Data Factory pipeline run in Azure portal."""
    
    name: str = "Monitor Pipeline Run"
    
    def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
        """Get link URL to pipeline run in Azure portal."""

Status Constants and Exceptions

class AzureDataFactoryPipelineRunStatus:
    """Azure Data Factory pipeline run status constants."""
    
    QUEUED: str = "Queued"
    IN_PROGRESS: str = "InProgress"
    SUCCEEDED: str = "Succeeded" 
    FAILED: str = "Failed"
    CANCELLING: str = "Cancelling"
    CANCELLED: str = "Cancelled"

class AzureDataFactoryPipelineRunException(AirflowException):
    """Exception raised for Data Factory pipeline run errors."""

Usage Examples

Basic Pipeline Execution

from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook

# Initialize hook
adf_hook = AzureDataFactoryHook(azure_data_factory_conn_id='adf_default')

# Run pipeline
run_response = adf_hook.run_pipeline(
    pipeline_name='MyDataPipeline',
    resource_group_name='my-resource-group',
    factory_name='my-data-factory',
    parameters={'inputPath': '/data/input/', 'outputPath': '/data/output/'}
)

# Get run ID
run_id = run_response.run_id

# Monitor pipeline status
status = adf_hook.get_pipeline_run_status(
    run_id=run_id,
    resource_group_name='my-resource-group', 
    factory_name='my-data-factory'
)

# Wait for completion
success = adf_hook.wait_for_pipeline_run_status(
    run_id=run_id,
    expected_statuses=['Succeeded'],
    resource_group_name='my-resource-group',
    factory_name='my-data-factory',
    check_interval=30,
    timeout=1800  # 30 minutes
)

Using in Airflow DAGs

from airflow import DAG
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
from datetime import datetime, timedelta

dag = DAG(
    'adf_pipeline_example',
    default_args={
        'owner': 'data-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    description='Execute ADF pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1)
)

# Execute pipeline
run_pipeline = AzureDataFactoryRunPipelineOperator(
    task_id='run_etl_pipeline',
    pipeline_name='ETL_Pipeline', 
    resource_group_name='analytics-rg',
    factory_name='analytics-adf',
    parameters={
        'source_table': 'raw_data',
        'target_table': 'processed_data',
        'batch_date': '{{ ds }}'
    },
    wait_for_termination=False,  # Don't wait, use sensor instead
    azure_data_factory_conn_id='adf_default',
    dag=dag
)

# Monitor pipeline with sensor (alternative approach)
wait_for_completion = AzureDataFactoryPipelineRunStatusSensor(
    task_id='wait_for_pipeline_completion',
    run_id='{{ ti.xcom_pull("run_etl_pipeline")["run_id"] }}',
    resource_group_name='analytics-rg',
    factory_name='analytics-adf',
    target_status='Succeeded',
    timeout=3600,  # 1 hour timeout
    poke_interval=60,  # Check every minute
    dag=dag
)

run_pipeline >> wait_for_completion

Deferrable Pipeline Execution

# Using deferrable mode for long-running pipelines
run_pipeline_async = AzureDataFactoryRunPipelineOperator(
    task_id='run_long_pipeline',
    pipeline_name='LongRunningETL',
    resource_group_name='analytics-rg', 
    factory_name='analytics-adf',
    deferrable=True,  # Use async execution
    timeout=7200,  # 2 hours
    check_interval=120,  # Check every 2 minutes
    dag=dag
)

Connection Configuration

Azure Data Factory connections require specific authentication and resource information.

Connection Type: azure_data_factory

Required Fields:

  • resource_group_name: Azure resource group containing the Data Factory
  • factory_name: Name of the Azure Data Factory
  • subscription_id: Azure subscription ID

Authentication Options:

  • Service Principal: Use client credentials
  • Managed Identity: Use Azure managed identity
  • DefaultAzureCredential: Use Azure SDK default credential chain

Connection Fields:

  • client_id: Service principal client ID
  • client_secret: Service principal secret
  • tenant_id: Azure tenant ID
  • subscription_id: Azure subscription ID

Error Handling

The Azure Data Factory integration includes comprehensive error handling for pipeline execution failures, authentication issues, and resource access problems. All exceptions inherit from standard Airflow exception classes and provide detailed error messages for troubleshooting pipeline issues.

Azure Data Factory integration provides robust pipeline orchestration capabilities with support for complex ETL workflows, parameter passing, error handling, and comprehensive monitoring within Airflow environments.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json