Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Execute and monitor Azure Data Factory pipelines with comprehensive pipeline management, run monitoring, status tracking, and factory management capabilities. Supports both synchronous and asynchronous operations for long-running pipeline executions.
Primary interface for Azure Data Factory operations, providing authenticated connections and pipeline management functionality.
class AzureDataFactoryHook(BaseHook):
"""
Hook for Azure Data Factory operations.
Provides methods for pipeline execution, monitoring, and factory management.
Supports multiple authentication methods and connection configurations.
"""
def get_conn(self) -> DataFactoryManagementClient:
"""Get authenticated Azure Data Factory Management client."""
def refresh_conn(self) -> DataFactoryManagementClient:
"""Refresh the Data Factory Management client connection."""
def get_factory(
self,
resource_group_name: str,
factory_name: str,
**config: Any
) -> Factory | None:
"""
Get Azure Data Factory details.
Args:
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
**config: Additional configuration parameters
Returns:
Factory: Data Factory details or None if not found
"""
def update_factory(
self,
factory: Factory,
resource_group_name: str,
factory_name: str,
**config: Any
) -> Factory:
"""
Update Azure Data Factory configuration.
Args:
factory (Factory): Factory configuration object
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
**config: Additional configuration parameters
Returns:
Factory: Updated factory details
"""
def create_factory(
self,
factory: Factory,
resource_group_name: str,
factory_name: str,
**config: Any
) -> Factory:
"""
Create a new Azure Data Factory.
Args:
factory (Factory): Factory configuration object
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
**config: Additional configuration parameters
Returns:
Factory: Created factory details
"""
def delete_factory(
self,
resource_group_name: str,
factory_name: str,
**config: Any
) -> None:
"""
Delete Azure Data Factory.
Args:
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name to delete
**config: Additional configuration parameters
"""
def run_pipeline(
self,
pipeline_name: str,
resource_group_name: str,
factory_name: str,
reference_pipeline_run_id: str | None = None,
is_recovery: bool | None = None,
start_activity_name: str | None = None,
start_from_failure: bool | None = None,
parameters: dict[str, Any] | None = None,
**config: Any
) -> CreateRunResponse:
"""
Execute Azure Data Factory pipeline.
Args:
pipeline_name (str): Name of pipeline to execute
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
reference_pipeline_run_id (str): Reference run ID for recovery
is_recovery (bool): Whether this is a recovery run
start_activity_name (str): Activity to start from
start_from_failure (bool): Start from previous failure point
parameters (dict): Pipeline parameters
**config: Additional configuration parameters
Returns:
CreateRunResponse: Pipeline run details with run ID
"""
def get_pipeline_run(
self,
run_id: str,
resource_group_name: str,
factory_name: str,
**config: Any
) -> PipelineRun:
"""
Get pipeline run details and status.
Args:
run_id (str): Pipeline run ID
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
**config: Additional configuration parameters
Returns:
PipelineRun: Pipeline run details and status
"""
def get_pipeline_run_status(
self,
run_id: str,
resource_group_name: str,
factory_name: str,
**config: Any
) -> str:
"""
Get current status of pipeline run.
Args:
run_id (str): Pipeline run ID
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
**config: Additional configuration parameters
Returns:
str: Current pipeline run status
"""
def wait_for_pipeline_run_status(
self,
run_id: str,
expected_statuses: str | set[str],
resource_group_name: str,
factory_name: str,
check_interval: int = 60,
timeout: int = 60 * 60 * 24 * 7,
**config: Any
) -> bool:
"""
Wait for pipeline run to reach expected status.
Args:
run_id (str): Pipeline run ID
expected_statuses (str | set[str]): Expected status(es) to wait for
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
check_interval (int): Polling interval in seconds
timeout (int): Maximum wait time in seconds
**config: Additional configuration parameters
Returns:
bool: True if expected status reached, False if timeout
"""
def cancel_pipeline_run(
self,
run_id: str,
resource_group_name: str,
factory_name: str,
**config: Any
) -> None:
"""
Cancel a running pipeline.
Args:
run_id (str): Pipeline run ID to cancel
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
**config: Additional configuration parameters
"""
def test_connection(self) -> tuple[bool, str]:
"""Test the Data Factory connection."""Asynchronous version for non-blocking operations.
class AzureDataFactoryAsyncHook(AzureDataFactoryHook):
"""Async hook for Azure Data Factory operations."""
async def get_conn(self) -> DataFactoryManagementClient:
"""Get authenticated async Data Factory Management client."""
async def get_pipeline_run_status(
self,
run_id: str,
resource_group_name: str,
factory_name: str
) -> str:
"""Async get pipeline run status."""Operator for executing Azure Data Factory pipelines.
class AzureDataFactoryRunPipelineOperator(BaseOperator):
"""
Execute Azure Data Factory pipeline.
Runs a pipeline and optionally waits for completion with status monitoring.
"""
def __init__(
self,
pipeline_name: str,
resource_group_name: str,
factory_name: str,
azure_data_factory_conn_id: str = "azure_data_factory_default",
wait_for_termination: bool = True,
reference_pipeline_run_id: str | None = None,
is_recovery: bool | None = None,
start_activity_name: str | None = None,
start_from_failure: bool | None = None,
parameters: dict[str, Any] | None = None,
timeout: int = 60 * 60 * 24 * 7,
check_interval: int = 60,
deferrable: bool = False,
**kwargs
):
"""
Initialize Data Factory pipeline operator.
Args:
pipeline_name (str): Name of pipeline to execute
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
azure_data_factory_conn_id (str): Airflow connection ID
wait_for_termination (bool): Wait for pipeline completion
reference_pipeline_run_id (str): Reference run for recovery
is_recovery (bool): Whether this is a recovery run
start_activity_name (str): Activity to start from
start_from_failure (bool): Start from failure point
parameters (dict): Pipeline parameters
timeout (int): Maximum wait time in seconds
check_interval (int): Status check interval in seconds
deferrable (bool): Use async execution
"""Sensor that monitors Azure Data Factory pipeline execution status.
class AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
"""
Sensor that waits for Azure Data Factory pipeline run to reach target status.
Monitors a pipeline run until it reaches one of the specified target statuses
or times out.
"""
def __init__(
self,
run_id: str,
resource_group_name: str,
factory_name: str,
azure_data_factory_conn_id: str = "azure_data_factory_default",
target_status: str | list[str] = AzureDataFactoryPipelineRunStatus.SUCCEEDED,
**kwargs
):
"""
Initialize Data Factory pipeline status sensor.
Args:
run_id (str): Pipeline run ID to monitor
resource_group_name (str): Azure resource group name
factory_name (str): Data Factory name
azure_data_factory_conn_id (str): Airflow connection ID
target_status (str | list[str]): Target status(es) to wait for
"""
def poke(self, context: dict) -> bool:
"""Check if pipeline run has reached target status."""Deferrable triggers for pipeline monitoring.
class AzureDataFactoryTrigger(BaseTrigger):
"""General async trigger for Azure Data Factory operations."""
def __init__(
self,
run_id: str,
resource_group_name: str,
factory_name: str,
conn_id: str,
end_time: datetime,
check_interval: int = 60,
**kwargs
):
"""
Initialize Data Factory trigger.
Args:
run_id (str): Pipeline run ID
resource_group_name (str): Resource group name
factory_name (str): Factory name
conn_id (str): Connection ID
end_time (datetime): Maximum end time
check_interval (int): Polling interval in seconds
"""
class ADFPipelineRunStatusSensorTrigger(BaseTrigger):
"""Async trigger for ADF pipeline run status monitoring."""
def __init__(
self,
run_id: str,
resource_group_name: str,
factory_name: str,
target_status: str | list[str],
conn_id: str,
poke_interval: int = 60,
**kwargs
):
"""
Initialize pipeline status sensor trigger.
Args:
run_id (str): Pipeline run ID
resource_group_name (str): Resource group name
factory_name (str): Factory name
target_status (str | list[str]): Target status(es)
conn_id (str): Connection ID
poke_interval (int): Polling interval in seconds
"""Extra link for viewing pipeline runs in Azure portal.
class AzureDataFactoryPipelineRunLink(BaseOperatorLink):
"""Link to Azure Data Factory pipeline run in Azure portal."""
name: str = "Monitor Pipeline Run"
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
"""Get link URL to pipeline run in Azure portal."""class AzureDataFactoryPipelineRunStatus:
"""Azure Data Factory pipeline run status constants."""
QUEUED: str = "Queued"
IN_PROGRESS: str = "InProgress"
SUCCEEDED: str = "Succeeded"
FAILED: str = "Failed"
CANCELLING: str = "Cancelling"
CANCELLED: str = "Cancelled"
class AzureDataFactoryPipelineRunException(AirflowException):
"""Exception raised for Data Factory pipeline run errors."""from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook
# Initialize hook
adf_hook = AzureDataFactoryHook(azure_data_factory_conn_id='adf_default')
# Run pipeline
run_response = adf_hook.run_pipeline(
pipeline_name='MyDataPipeline',
resource_group_name='my-resource-group',
factory_name='my-data-factory',
parameters={'inputPath': '/data/input/', 'outputPath': '/data/output/'}
)
# Get run ID
run_id = run_response.run_id
# Monitor pipeline status
status = adf_hook.get_pipeline_run_status(
run_id=run_id,
resource_group_name='my-resource-group',
factory_name='my-data-factory'
)
# Wait for completion
success = adf_hook.wait_for_pipeline_run_status(
run_id=run_id,
expected_statuses=['Succeeded'],
resource_group_name='my-resource-group',
factory_name='my-data-factory',
check_interval=30,
timeout=1800 # 30 minutes
)from airflow import DAG
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
from datetime import datetime, timedelta
dag = DAG(
'adf_pipeline_example',
default_args={
'owner': 'data-team',
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='Execute ADF pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1)
)
# Execute pipeline
run_pipeline = AzureDataFactoryRunPipelineOperator(
task_id='run_etl_pipeline',
pipeline_name='ETL_Pipeline',
resource_group_name='analytics-rg',
factory_name='analytics-adf',
parameters={
'source_table': 'raw_data',
'target_table': 'processed_data',
'batch_date': '{{ ds }}'
},
wait_for_termination=False, # Don't wait, use sensor instead
azure_data_factory_conn_id='adf_default',
dag=dag
)
# Monitor pipeline with sensor (alternative approach)
wait_for_completion = AzureDataFactoryPipelineRunStatusSensor(
task_id='wait_for_pipeline_completion',
run_id='{{ ti.xcom_pull("run_etl_pipeline")["run_id"] }}',
resource_group_name='analytics-rg',
factory_name='analytics-adf',
target_status='Succeeded',
timeout=3600, # 1 hour timeout
poke_interval=60, # Check every minute
dag=dag
)
run_pipeline >> wait_for_completion# Using deferrable mode for long-running pipelines
run_pipeline_async = AzureDataFactoryRunPipelineOperator(
task_id='run_long_pipeline',
pipeline_name='LongRunningETL',
resource_group_name='analytics-rg',
factory_name='analytics-adf',
deferrable=True, # Use async execution
timeout=7200, # 2 hours
check_interval=120, # Check every 2 minutes
dag=dag
)Azure Data Factory connections require specific authentication and resource information.
Connection Type: azure_data_factory
Required Fields:
resource_group_name: Azure resource group containing the Data Factoryfactory_name: Name of the Azure Data Factorysubscription_id: Azure subscription IDAuthentication Options:
Connection Fields:
client_id: Service principal client IDclient_secret: Service principal secrettenant_id: Azure tenant IDsubscription_id: Azure subscription IDThe Azure Data Factory integration includes comprehensive error handling for pipeline execution failures, authentication issues, and resource access problems. All exceptions inherit from standard Airflow exception classes and provide detailed error messages for troubleshooting pipeline issues.
Azure Data Factory integration provides robust pipeline orchestration capabilities with support for complex ETL workflows, parameter passing, error handling, and comprehensive monitoring within Airflow environments.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure