CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

synapse-analytics.mddocs/

Azure Synapse Analytics

Execute Spark jobs and manage pipeline operations on Azure Synapse Analytics for big data processing and analytics workloads. Provides comprehensive integration for both Spark batch processing and pipeline orchestration capabilities.

Capabilities

Base Synapse Hook

Foundation hook for Azure Synapse Analytics operations providing common functionality and connection management.

class BaseAzureSynapseHook(BaseHook):
    """
    Base hook for Azure Synapse Analytics operations.
    
    Provides common functionality and connection management for Synapse
    Spark and pipeline operations.
    """
    
    def get_conn(self) -> Any:
        """
        Get authenticated Azure Synapse client.
        
        Returns:
            Any: Synapse client instance
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Azure Synapse Analytics connection.
        
        Returns:
            tuple[bool, str]: Success status and message
        """

Synapse Spark Hook

Hook for Azure Synapse Spark operations providing Spark job execution and monitoring capabilities.

class AzureSynapseHook(BaseAzureSynapseHook):
    """
    Hook for Azure Synapse Spark operations.
    
    Provides methods for submitting Spark jobs, monitoring execution,
    and managing Spark batch sessions on Azure Synapse Analytics.
    """
    
    def get_conn(self) -> SparkClient:
        """
        Get authenticated Synapse Spark client.
        
        Returns:
            SparkClient: Synapse Spark client instance
        """
    
    def run_spark_job(
        self,
        payload: dict[str, Any],
        **kwargs: Any
    ) -> dict[str, Any]:
        """
        Submit a Spark job to Azure Synapse Analytics.
        
        Args:
            payload (dict[str, Any]): Spark job configuration including:
                - name: Job name
                - file: Main application file (jar, py, etc.)
                - className: Main class name (for Scala/Java)
                - args: Application arguments
                - conf: Spark configuration
                - executorCount: Number of executors
                - executorCores: Cores per executor
                - executorMemory: Memory per executor
                - driverCores: Driver cores
                - driverMemory: Driver memory
            **kwargs: Additional job submission parameters
            
        Returns:
            dict[str, Any]: Job submission response with job ID and status
        """
    
    def get_job_run_status(
        self,
        job_id: int,
        **kwargs: Any
    ) -> str:
        """
        Get the current status of a Spark job.
        
        Args:
            job_id (int): Spark job ID
            **kwargs: Additional parameters
            
        Returns:
            str: Current job status (not_started, starting, running, idle, busy, 
                 shutting_down, error, dead, killed, success)
        """
    
    def wait_for_job_run_status(
        self,
        job_id: int,
        expected_statuses: list[str],
        check_interval: int = 30,
        timeout: int = 3600
    ) -> bool:
        """
        Wait for Spark job to reach expected status.
        
        Args:
            job_id (int): Spark job ID
            expected_statuses (list[str]): List of acceptable statuses
            check_interval (int): Check interval in seconds (default: 30)
            timeout (int): Timeout in seconds (default: 3600)
            
        Returns:
            bool: True if job reached expected status, False if timeout
        """
    
    def cancel_job_run(
        self,
        job_id: int,
        **kwargs: Any
    ) -> None:
        """
        Cancel a running Spark job.
        
        Args:
            job_id (int): Spark job ID to cancel
            **kwargs: Additional parameters
        """
    
    def get_job_logs(
        self,
        job_id: int,
        **kwargs: Any
    ) -> dict[str, Any]:
        """
        Get logs for a Spark job.
        
        Args:
            job_id (int): Spark job ID
            **kwargs: Additional parameters
            
        Returns:
            dict[str, Any]: Job logs including stdout, stderr, and driver logs
        """
    
    def list_spark_pools(self) -> list[dict[str, Any]]:
        """
        List available Spark pools in the workspace.
        
        Returns:
            list[dict[str, Any]]: List of Spark pool configurations
        """
    
    def get_spark_pool_details(
        self,
        spark_pool_name: str
    ) -> dict[str, Any]:
        """
        Get details of a specific Spark pool.
        
        Args:
            spark_pool_name (str): Name of the Spark pool
            
        Returns:
            dict[str, Any]: Spark pool configuration and status
        """

Synapse Pipeline Hook

Hook for Azure Synapse Pipeline operations providing pipeline execution and monitoring capabilities.

class AzureSynapsePipelineHook(BaseAzureSynapseHook):
    """
    Hook for Azure Synapse Pipeline operations.
    
    Provides methods for running pipelines, monitoring execution,
    and managing pipeline runs on Azure Synapse Analytics.
    """
    
    def get_conn(self) -> ArtifactsClient:
        """
        Get authenticated Synapse artifacts client.
        
        Returns:
            ArtifactsClient: Synapse artifacts client instance
        """
    
    def run_pipeline(
        self,
        pipeline_name: str,
        **config: Any
    ) -> CreateRunResponse:
        """
        Run a pipeline in Azure Synapse Analytics.
        
        Args:
            pipeline_name (str): Name of the pipeline to run
            **config: Pipeline run configuration including:
                - parameters: Pipeline parameters
                - reference_pipeline_run_id: Reference run ID
                - is_recovery: Whether this is a recovery run
                
        Returns:
            CreateRunResponse: Pipeline run response with run ID
        """
    
    def get_pipeline_run(
        self,
        run_id: str,
        **kwargs: Any
    ) -> PipelineRun:
        """
        Get details of a pipeline run.
        
        Args:
            run_id (str): Pipeline run ID
            **kwargs: Additional parameters
            
        Returns:
            PipelineRun: Pipeline run details including status and metadata
        """
    
    def get_pipeline_run_status(
        self,
        run_id: str,
        **kwargs: Any
    ) -> str:
        """
        Get the current status of a pipeline run.
        
        Args:
            run_id (str): Pipeline run ID
            **kwargs: Additional parameters
            
        Returns:
            str: Current pipeline status (Queued, InProgress, Succeeded, Failed, Cancelled)
        """
    
    def refresh_conn(self) -> ArtifactsClient:
        """
        Refresh the Synapse artifacts connection.
        
        Returns:
            ArtifactsClient: Refreshed artifacts client instance
        """
    
    def wait_for_pipeline_run_status(
        self,
        run_id: str,
        expected_statuses: list[str],
        check_interval: int = 60,
        timeout: int = 7200
    ) -> bool:
        """
        Wait for pipeline run to reach expected status.
        
        Args:
            run_id (str): Pipeline run ID
            expected_statuses (list[str]): List of acceptable statuses
            check_interval (int): Check interval in seconds (default: 60)
            timeout (int): Timeout in seconds (default: 7200)
            
        Returns:
            bool: True if pipeline reached expected status, False if timeout
        """
    
    def cancel_run_pipeline(
        self,
        run_id: str,
        **kwargs: Any
    ) -> None:
        """
        Cancel a running pipeline.
        
        Args:
            run_id (str): Pipeline run ID to cancel
            **kwargs: Additional parameters
        """
    
    def get_pipeline_activities(
        self,
        run_id: str,
        **kwargs: Any
    ) -> list[dict[str, Any]]:
        """
        Get activity runs for a pipeline run.
        
        Args:
            run_id (str): Pipeline run ID
            **kwargs: Additional parameters including:
                - activity_name: Filter by activity name
                - activity_type: Filter by activity type
                
        Returns:
            list[dict[str, Any]]: List of activity run details
        """
    
    def get_pipeline_details(
        self,
        pipeline_name: str
    ) -> dict[str, Any]:
        """
        Get pipeline definition and metadata.
        
        Args:
            pipeline_name (str): Name of the pipeline
            
        Returns:
            dict[str, Any]: Pipeline definition and configuration
        """
    
    def list_pipelines(self) -> list[dict[str, Any]]:
        """
        List all pipelines in the workspace.
        
        Returns:
            list[dict[str, Any]]: List of pipeline definitions
        """

Synapse Analytics Operators

Execute Azure Synapse Analytics operations as Airflow tasks with comprehensive Spark and pipeline management capabilities.

Spark Batch Operator

class AzureSynapseRunSparkBatchOperator(BaseOperator):
    """
    Runs Spark batch jobs on Azure Synapse Analytics.
    
    Supports running Spark applications with custom configurations,
    resource allocation, and dependency management.
    """
    
    def __init__(
        self,
        *,
        azure_synapse_conn_id: str = "azure_synapse_default",
        spark_pool_name: str,
        payload: dict[str, Any],
        timeout: int = 60 * 60 * 24 * 7,
        check_interval: int = 60,
        **kwargs
    ):
        """
        Initialize Synapse Spark batch operator.
        
        Args:
            azure_synapse_conn_id (str): Airflow connection ID for Synapse
            spark_pool_name (str): Name of the Spark pool to use
            payload (dict[str, Any]): Spark job configuration
            timeout (int): Job timeout in seconds (default: 7 days)
            check_interval (int): Status check interval in seconds (default: 60)
        """
    
    def execute(self, context: Context) -> dict[str, Any]:
        """
        Execute Spark batch job on Synapse Analytics.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            dict[str, Any]: Job execution results and metadata
        """
    
    def on_kill(self) -> None:
        """Cancel running Spark job on task termination."""

class AzureSynapseRunPipelineOperator(BaseOperator):
    """
    Runs pipelines on Azure Synapse Analytics.
    
    Supports executing Synapse pipelines with parameter passing
    and comprehensive monitoring capabilities.
    """
    
    def __init__(
        self,
        *,
        pipeline_name: str,
        azure_synapse_conn_id: str = "azure_synapse_default",
        pipeline_timeout: int = 60 * 60 * 24 * 7,
        check_interval: int = 60,
        **pipeline_run_parameters: Any
    ):
        """
        Initialize Synapse pipeline operator.
        
        Args:
            pipeline_name (str): Name of the pipeline to run
            azure_synapse_conn_id (str): Airflow connection ID for Synapse
            pipeline_timeout (int): Pipeline timeout in seconds (default: 7 days)
            check_interval (int): Status check interval in seconds (default: 60)
            **pipeline_run_parameters: Parameters to pass to the pipeline
        """
    
    def execute(self, context: Context) -> str:
        """
        Execute pipeline on Synapse Analytics.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            str: Pipeline run ID
        """
    
    def on_kill(self) -> None:
        """Cancel running pipeline on task termination."""

Supporting Classes

Status Constants and Exceptions

class AzureSynapseSparkBatchRunStatus:
    """Constants for Synapse Spark job statuses."""
    
    NOT_STARTED: str = "not_started"
    STARTING: str = "starting"
    RUNNING: str = "running"
    IDLE: str = "idle"
    BUSY: str = "busy"
    SHUTTING_DOWN: str = "shutting_down"
    ERROR: str = "error"
    DEAD: str = "dead"
    KILLED: str = "killed"
    SUCCESS: str = "success"

class AzureSynapsePipelineRunStatus:
    """Constants for Synapse pipeline run statuses."""
    
    QUEUED: str = "Queued"
    IN_PROGRESS: str = "InProgress"
    SUCCEEDED: str = "Succeeded"
    FAILED: str = "Failed"
    CANCELLED: str = "Cancelled"

class AzureSynapsePipelineRunException(Exception):
    """Custom exception for Synapse pipeline operations."""
    pass

Extra Links

class AzureSynapsePipelineRunLink(BaseOperatorLink):
    """
    Link to Synapse pipeline run in Azure portal.
    
    Provides direct access to pipeline run details and monitoring
    in the Azure Synapse Studio interface.
    """
    
    name: str = "Azure Synapse Pipeline Run"
    
    def get_link(
        self,
        operator: BaseOperator,
        dttm: datetime | None = None,
        **kwargs: Any
    ) -> str:
        """
        Generate link to Azure Synapse pipeline run.
        
        Args:
            operator (BaseOperator): Airflow operator instance
            dttm (datetime | None): Execution date
            **kwargs: Additional parameters
            
        Returns:
            str: URL to Synapse pipeline run in Azure portal
        """

Usage Examples

Basic Spark Job Execution

from airflow import DAG
from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunSparkBatchOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_spark_results(**context):
    """Process results from Spark job execution."""
    result = context['task_instance'].xcom_pull(task_ids='run_data_analysis')
    
    print(f"Spark job result: {result}")
    job_id = result.get('job_id')
    status = result.get('status')
    
    if status == 'success':
        print(f"Spark job {job_id} completed successfully")
    else:
        print(f"Spark job {job_id} failed with status: {status}")
    
    return result

dag = DAG(
    'synapse_spark_workflow',
    default_args={
        'owner': 'analytics-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=5)
    },
    description='Synapse Spark data analysis workflow',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Configure Spark job payload
spark_payload = {
    "name": "daily-data-analysis",
    "file": "abfss://data@mystorageaccount.dfs.core.windows.net/scripts/analyze_data.py",
    "args": [
        "--input-path", "abfss://data@mystorageaccount.dfs.core.windows.net/input/",
        "--output-path", "abfss://data@mystorageaccount.dfs.core.windows.net/output/",
        "--date", "{{ ds }}"
    ],
    "conf": {
        "spark.sql.adaptive.enabled": "true",
        "spark.sql.adaptive.coalescePartitions.enabled": "true",
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
    },
    "executorCount": 4,
    "executorCores": 4,
    "executorMemory": "8g",
    "driverCores": 2,
    "driverMemory": "4g"
}

# Run Spark job
run_analysis = AzureSynapseRunSparkBatchOperator(
    task_id='run_data_analysis',
    azure_synapse_conn_id='synapse_conn',
    spark_pool_name='analytics-pool',
    payload=spark_payload,
    timeout=3600,  # 1 hour timeout
    check_interval=30,  # Check every 30 seconds
    dag=dag
)

# Process results
process_results = PythonOperator(
    task_id='process_results',
    python_callable=process_spark_results,
    dag=dag
)

run_analysis >> process_results

Advanced Spark Configuration

from airflow import DAG
from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunSparkBatchOperator
from airflow.providers.microsoft.azure.hooks.synapse import AzureSynapseHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def setup_spark_environment():
    """Set up Spark environment and validate configuration."""
    hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
    
    # List available Spark pools
    pools = hook.list_spark_pools()
    print(f"Available Spark pools: {[pool['name'] for pool in pools]}")
    
    # Get details of the target pool
    pool_details = hook.get_spark_pool_details('ml-processing-pool')
    print(f"Pool configuration: {pool_details}")
    
    return pool_details

def monitor_spark_job(**context):
    """Monitor Spark job execution with detailed logging."""
    hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
    
    # Get job ID from previous task
    job_result = context['task_instance'].xcom_pull(task_ids='run_ml_training')
    job_id = job_result['job_id']
    
    # Get detailed logs
    logs = hook.get_job_logs(job_id)
    
    print("=== Spark Driver Logs ===")
    print(logs.get('driverLogs', 'No driver logs available'))
    
    print("=== Spark Executor Logs ===") 
    print(logs.get('executorLogs', 'No executor logs available'))
    
    # Final status check
    final_status = hook.get_job_run_status(job_id)
    print(f"Final job status: {final_status}")
    
    return {
        'job_id': job_id,
        'final_status': final_status,
        'logs_available': bool(logs.get('driverLogs') or logs.get('executorLogs'))
    }

dag = DAG(
    'advanced_spark_ml_workflow',
    default_args={
        'owner': 'ml-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=10)
    },
    description='Advanced Spark ML workflow with monitoring',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Setup environment
setup_env = PythonOperator(
    task_id='setup_environment',
    python_callable=setup_spark_environment,
    dag=dag
)

# Advanced ML training job configuration
ml_payload = {
    "name": "ml-model-training-{{ ds_nodash }}",
    "file": "abfss://ml@mlstorageaccount.dfs.core.windows.net/scripts/train_model.py",
    "className": None,  # Python job
    "args": [
        "--training-data", "abfss://ml@mlstorageaccount.dfs.core.windows.net/data/training/{{ ds }}/",
        "--model-output", "abfss://ml@mlstorageaccount.dfs.core.windows.net/models/{{ ds_nodash }}/",
        "--algorithm", "random_forest",
        "--cross-validation", "5",
        "--feature-selection", "true"
    ],
    "jars": [
        "abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/spark-ml-extended.jar"
    ],
    "pyFiles": [
        "abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/ml_utils.py",
        "abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/feature_engineering.py"
    ],
    "files": [
        "abfss://ml@mlstorageaccount.dfs.core.windows.net/config/ml_config.json"
    ],
    "conf": {
        # Performance tuning
        "spark.sql.adaptive.enabled": "true",
        "spark.sql.adaptive.coalescePartitions.enabled": "true",
        "spark.sql.adaptive.skewJoin.enabled": "true",
        
        # Memory management
        "spark.sql.execution.arrow.pyspark.enabled": "true",
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
        "spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
        
        # ML specific configurations
        "spark.ml.stage.parallelism": "4",
        "spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB",
        
        # Checkpointing for long-running ML jobs
        "spark.sql.streaming.checkpointLocation": "abfss://ml@mlstorageaccount.dfs.core.windows.net/checkpoints/{{ ds_nodash }}/"
    },
    "executorCount": 8,
    "executorCores": 4,
    "executorMemory": "16g",
    "driverCores": 4,
    "driverMemory": "8g",
    "tags": {
        "project": "ml-pipeline",
        "environment": "production",
        "model_type": "random_forest"
    }
}

# Run ML training job
run_training = AzureSynapseRunSparkBatchOperator(
    task_id='run_ml_training',
    azure_synapse_conn_id='synapse_conn',
    spark_pool_name='ml-processing-pool',
    payload=ml_payload,
    timeout=7200,  # 2 hours timeout for ML training
    check_interval=60,  # Check every minute
    dag=dag
)

# Monitor job execution
monitor_job = PythonOperator(
    task_id='monitor_training_job',
    python_callable=monitor_spark_job,
    dag=dag
)

setup_env >> run_training >> monitor_job

Pipeline Orchestration

from airflow import DAG
from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunPipelineOperator
from airflow.providers.microsoft.azure.hooks.synapse import AzureSynapsePipelineHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def setup_pipeline_parameters():
    """Set up dynamic parameters for pipeline execution."""
    execution_date = datetime.now().strftime('%Y-%m-%d')
    
    parameters = {
        "processingDate": execution_date,
        "inputDataPath": f"abfss://data@datastorageaccount.dfs.core.windows.net/raw/{execution_date}/",
        "outputDataPath": f"abfss://data@datastorageaccount.dfs.core.windows.net/processed/{execution_date}/",
        "batchSize": "1000",
        "parallelism": "4",
        "retryAttempts": "3"
    }
    
    print(f"Pipeline parameters: {parameters}")
    return parameters

def monitor_pipeline_activities(**context):
    """Monitor individual activities within the pipeline run."""
    hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')
    
    # Get pipeline run ID from previous task
    run_id = context['task_instance'].xcom_pull(task_ids='run_etl_pipeline')
    
    # Get activity runs
    activities = hook.get_pipeline_activities(run_id)
    
    print(f"Pipeline run {run_id} activities:")
    for activity in activities:
        print(f"- {activity['activityName']}: {activity['status']} "
              f"(Duration: {activity.get('durationInMs', 0)}ms)")
        
        if activity['status'] == 'Failed':
            print(f"  Error: {activity.get('error', {}).get('message', 'Unknown error')}")
    
    # Get overall pipeline status
    pipeline_run = hook.get_pipeline_run(run_id)
    
    return {
        'run_id': run_id,
        'status': pipeline_run.status,
        'duration_ms': pipeline_run.duration_in_ms,
        'activities': len(activities),
        'failed_activities': len([a for a in activities if a['status'] == 'Failed'])
    }

def validate_pipeline_outputs(**context):
    """Validate pipeline execution results."""
    monitoring_result = context['task_instance'].xcom_pull(task_ids='monitor_activities')
    
    if monitoring_result['failed_activities'] > 0:
        raise ValueError(f"Pipeline has {monitoring_result['failed_activities']} failed activities")
    
    if monitoring_result['status'] != 'Succeeded':
        raise ValueError(f"Pipeline failed with status: {monitoring_result['status']}")
    
    print(f"Pipeline validation passed. Duration: {monitoring_result['duration_ms']}ms")
    return monitoring_result

dag = DAG(
    'synapse_pipeline_workflow',
    default_args={
        'owner': 'data-engineering-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    description='Synapse pipeline ETL workflow',
    schedule_interval=timedelta(hours=6),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Setup parameters
setup_params = PythonOperator(
    task_id='setup_parameters',
    python_callable=setup_pipeline_parameters,
    dag=dag
)

# Run ETL pipeline
run_pipeline = AzureSynapseRunPipelineOperator(
    task_id='run_etl_pipeline',
    pipeline_name='data-processing-etl',
    azure_synapse_conn_id='synapse_conn',
    pipeline_timeout=3600,  # 1 hour timeout
    check_interval=30,  # Check every 30 seconds
    # Dynamic parameters from previous task
    processingDate="{{ task_instance.xcom_pull(task_ids='setup_parameters')['processingDate'] }}",
    inputDataPath="{{ task_instance.xcom_pull(task_ids='setup_parameters')['inputDataPath'] }}",
    outputDataPath="{{ task_instance.xcom_pull(task_ids='setup_parameters')['outputDataPath'] }}",
    batchSize="{{ task_instance.xcom_pull(task_ids='setup_parameters')['batchSize'] }}",
    dag=dag
)

# Monitor activities
monitor_activities = PythonOperator(
    task_id='monitor_activities',
    python_callable=monitor_pipeline_activities,
    dag=dag
)

# Validate outputs
validate_outputs = PythonOperator(
    task_id='validate_outputs',
    python_callable=validate_pipeline_outputs,
    dag=dag
)

setup_params >> run_pipeline >> monitor_activities >> validate_outputs

Complex Workflow with Multiple Synapse Services

from airflow import DAG
from airflow.providers.microsoft.azure.operators.synapse import (
    AzureSynapseRunSparkBatchOperator,
    AzureSynapseRunPipelineOperator
)
from airflow.providers.microsoft.azure.hooks.synapse import (
    AzureSynapseHook,
    AzureSynapsePipelineHook
)
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta

def check_data_availability():
    """Check if input data is available for processing."""
    # This would typically check Azure Data Lake or other data sources
    # For this example, we'll simulate the check
    
    import random
    data_available = random.choice([True, False])
    
    if data_available:
        print("Input data is available, proceeding with processing")
        return 'data_preprocessing'
    else:
        print("Input data not available, skipping processing")
        return 'skip_processing'

def choose_processing_method(**context):
    """Choose between Spark job or pipeline based on data size."""
    # This would typically analyze data characteristics
    # For this example, we'll simulate the decision
    
    import random
    data_size = random.choice(['small', 'large'])
    
    if data_size == 'large':
        print("Large dataset detected, using Spark batch processing")
        return 'spark_processing'
    else:
        print("Small dataset detected, using pipeline processing")
        return 'pipeline_processing'

dag = DAG(
    'complex_synapse_workflow',
    default_args={
        'owner': 'analytics-platform-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=3)
    },
    description='Complex Synapse workflow with conditional processing',
    schedule_interval=timedelta(hours=4),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Check data availability
check_data = BranchPythonOperator(
    task_id='check_data_availability',
    python_callable=check_data_availability,
    dag=dag
)

# Data preprocessing (common step)
preprocessing_payload = {
    "name": "data-preprocessing-{{ ds_nodash }}",
    "file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/preprocess.py",
    "args": ["--date", "{{ ds }}", "--validate", "true"],
    "executorCount": 2,
    "executorCores": 2,
    "executorMemory": "4g",
    "driverMemory": "2g"
}

data_preprocessing = AzureSynapseRunSparkBatchOperator(
    task_id='data_preprocessing',
    azure_synapse_conn_id='synapse_conn',
    spark_pool_name='preprocessing-pool',
    payload=preprocessing_payload,
    timeout=1800,
    dag=dag
)

# Choose processing method
choose_method = BranchPythonOperator(
    task_id='choose_processing_method',
    python_callable=choose_processing_method,
    dag=dag
)

# Spark processing branch
spark_payload = {
    "name": "large-data-processing-{{ ds_nodash }}",
    "file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/spark_processing.py",
    "args": [
        "--input", "abfss://analytics@analyticsstorage.dfs.core.windows.net/preprocessed/{{ ds }}/",
        "--output", "abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/"
    ],
    "executorCount": 8,
    "executorCores": 4,
    "executorMemory": "16g",
    "driverMemory": "8g",
    "conf": {
        "spark.sql.adaptive.enabled": "true",
        "spark.sql.adaptive.coalescePartitions.enabled": "true"
    }
}

spark_processing = AzureSynapseRunSparkBatchOperator(
    task_id='spark_processing',
    azure_synapse_conn_id='synapse_conn',
    spark_pool_name='large-processing-pool',
    payload=spark_payload,
    timeout=3600,
    dag=dag
)

# Pipeline processing branch
pipeline_processing = AzureSynapseRunPipelineOperator(
    task_id='pipeline_processing',
    pipeline_name='small-data-pipeline',
    azure_synapse_conn_id='synapse_conn',
    pipeline_timeout=1800,
    inputPath="abfss://analytics@analyticsstorage.dfs.core.windows.net/preprocessed/{{ ds }}/",
    outputPath="abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/",
    dag=dag
)

# Skip processing (when no data available)
skip_processing = DummyOperator(
    task_id='skip_processing',
    dag=dag
)

# Join point for all branches
join_processing = DummyOperator(
    task_id='join_processing',
    trigger_rule='none_failed_or_skipped',
    dag=dag
)

# Post-processing
postprocessing_payload = {
    "name": "postprocessing-{{ ds_nodash }}",
    "file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/postprocess.py",
    "args": [
        "--results-path", "abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/",
        "--final-output", "abfss://analytics@analyticsstorage.dfs.core.windows.net/final/{{ ds }}/"
    ],
    "executorCount": 2,
    "executorCores": 2,
    "executorMemory": "4g"
}

postprocessing = AzureSynapseRunSparkBatchOperator(
    task_id='postprocessing',
    azure_synapse_conn_id='synapse_conn',
    spark_pool_name='postprocessing-pool',
    payload=postprocessing_payload,
    timeout=900,
    dag=dag
)

# Set up dependencies
check_data >> [data_preprocessing, skip_processing]
data_preprocessing >> choose_method
choose_method >> [spark_processing, pipeline_processing]
[spark_processing, pipeline_processing, skip_processing] >> join_processing
join_processing >> postprocessing

Connection Configuration

Synapse Analytics Connection (azure_synapse)

Configure Azure Synapse Analytics connections in Airflow:

# Connection configuration for Synapse Analytics
{
    "conn_id": "azure_synapse_default",
    "conn_type": "azure_synapse",
    "host": "myworkspace.dev.azuresynapse.net",  # Synapse workspace URL
    "extra": {
        "subscriptionId": "your-subscription-id",
        "resourceGroupName": "your-resource-group",
        "workspaceName": "your-workspace-name",
        "tenantId": "your-tenant-id",
        "clientId": "your-client-id",
        "clientSecret": "your-client-secret"
    }
}

Authentication Methods

Azure Synapse Analytics supports multiple authentication methods:

  1. Service Principal Authentication:

    extra = {
        "tenantId": "your-tenant-id",
        "clientId": "your-client-id",
        "clientSecret": "your-client-secret"
    }
  2. Managed Identity Authentication:

    extra = {
        "managedIdentityClientId": "your-managed-identity-client-id"
    }
  3. Azure CLI Authentication:

    extra = {
        "use_azure_cli": True
    }

Error Handling

Common Exception Patterns

from airflow.providers.microsoft.azure.hooks.synapse import (
    AzureSynapseHook, 
    AzureSynapsePipelineHook,
    AzureSynapsePipelineRunException
)

def robust_synapse_operations():
    """Demonstrate error handling patterns for Synapse operations."""
    
    spark_hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
    pipeline_hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')
    
    # Spark job error handling
    try:
        job_result = spark_hook.run_spark_job({
            "name": "test-job",
            "file": "test.py"
        })
        
        job_id = job_result['job_id']
        
        # Wait for completion with timeout
        success = spark_hook.wait_for_job_run_status(
            job_id=job_id,
            expected_statuses=['success', 'error', 'dead', 'killed'],
            timeout=3600
        )
        
        if not success:
            print("Job timed out, cancelling...")
            spark_hook.cancel_job_run(job_id)
            raise TimeoutError("Spark job timed out")
        
        # Check final status
        final_status = spark_hook.get_job_run_status(job_id)
        if final_status != 'success':
            # Get logs for debugging
            logs = spark_hook.get_job_logs(job_id)
            print(f"Job failed with status: {final_status}")
            print(f"Error logs: {logs.get('stderr', 'No error logs')}")
            raise RuntimeError(f"Spark job failed with status: {final_status}")
            
    except Exception as e:
        print(f"Spark job error: {e}")
        raise
    
    # Pipeline error handling
    try:
        run_response = pipeline_hook.run_pipeline("test-pipeline")
        run_id = run_response.run_id
        
        # Monitor pipeline with error handling
        success = pipeline_hook.wait_for_pipeline_run_status(
            run_id=run_id,
            expected_statuses=['Succeeded', 'Failed', 'Cancelled'],
            timeout=7200
        )
        
        if not success:
            print("Pipeline timed out, attempting to cancel...")
            try:
                pipeline_hook.cancel_run_pipeline(run_id)
            except Exception as cancel_error:
                print(f"Failed to cancel pipeline: {cancel_error}")
            raise TimeoutError("Pipeline timed out")
        
        # Check final status and get activity details
        pipeline_run = pipeline_hook.get_pipeline_run(run_id)
        if pipeline_run.status == 'Failed':
            activities = pipeline_hook.get_pipeline_activities(run_id)
            failed_activities = [a for a in activities if a['status'] == 'Failed']
            
            print(f"Pipeline failed. Failed activities: {len(failed_activities)}")
            for activity in failed_activities:
                print(f"- {activity['activityName']}: {activity.get('error', {}).get('message', 'Unknown error')}")
            
            raise AzureSynapsePipelineRunException(f"Pipeline failed with {len(failed_activities)} failed activities")
            
    except AzureSynapsePipelineRunException:
        raise
    except Exception as e:
        print(f"Pipeline error: {e}")
        raise

Connection Testing

def test_synapse_connections():
    """Test Synapse Analytics connections and capabilities."""
    
    # Test Spark hook
    try:
        spark_hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
        success, message = spark_hook.test_connection()
        
        if success:
            print("Synapse Spark connection successful")
            
            # List available pools
            pools = spark_hook.list_spark_pools()
            print(f"Available Spark pools: {[p['name'] for p in pools]}")
        else:
            print(f"Synapse Spark connection failed: {message}")
            
    except Exception as e:
        print(f"Synapse Spark connection test failed: {e}")
    
    # Test Pipeline hook
    try:
        pipeline_hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')
        success, message = pipeline_hook.test_connection()
        
        if success:
            print("Synapse Pipeline connection successful")
            
            # List available pipelines
            pipelines = pipeline_hook.list_pipelines()
            print(f"Available pipelines: {[p['name'] for p in pipelines]}")
        else:
            print(f"Synapse Pipeline connection failed: {message}")
            
    except Exception as e:
        print(f"Synapse Pipeline connection test failed: {e}")

Performance Considerations

Optimizing Spark Jobs

def optimize_spark_configuration():
    """Demonstrate Spark optimization techniques for Synapse."""
    
    # Optimized configuration for different workload types
    
    # ETL/Data Processing workload
    etl_config = {
        "executorCount": 8,
        "executorCores": 4,
        "executorMemory": "16g",
        "driverCores": 2,
        "driverMemory": "8g",
        "conf": {
            "spark.sql.adaptive.enabled": "true",
            "spark.sql.adaptive.coalescePartitions.enabled": "true",
            "spark.sql.adaptive.skewJoin.enabled": "true",
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
            "spark.sql.execution.arrow.pyspark.enabled": "true"
        }
    }
    
    # Machine Learning workload
    ml_config = {
        "executorCount": 6,
        "executorCores": 6,
        "executorMemory": "24g",
        "driverCores": 4,
        "driverMemory": "12g",
        "conf": {
            "spark.ml.stage.parallelism": "6",
            "spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
            "spark.sql.adaptive.enabled": "true",
            "spark.sql.adaptive.localShuffleReader.enabled": "true"
        }
    }
    
    # Streaming workload
    streaming_config = {
        "executorCount": 4,
        "executorCores": 2,
        "executorMemory": "8g",
        "driverMemory": "4g",
        "conf": {
            "spark.streaming.backpressure.enabled": "true",
            "spark.sql.streaming.checkpointLocation": "/checkpoint/path",
            "spark.sql.adaptive.enabled": "false"  # Disable for streaming
        }
    }
    
    return {
        'etl': etl_config,
        'ml': ml_config,
        'streaming': streaming_config
    }

def implement_spark_monitoring():
    """Implement comprehensive Spark job monitoring."""
    
    def monitor_spark_job_detailed(job_id: int):
        """Detailed monitoring of Spark job execution."""
        hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
        
        monitoring_data = {
            'job_id': job_id,
            'status_history': [],
            'duration_seconds': 0,
            'resource_usage': {}
        }
        
        start_time = datetime.now()
        
        while True:
            current_status = hook.get_job_run_status(job_id)
            monitoring_data['status_history'].append({
                'timestamp': datetime.now(),
                'status': current_status
            })
            
            # Terminal statuses
            if current_status in ['success', 'error', 'dead', 'killed']:
                break
            
            # Check for stuck jobs
            monitoring_data['duration_seconds'] = (datetime.now() - start_time).total_seconds()
            if monitoring_data['duration_seconds'] > 3600:  # 1 hour
                print("Job appears to be stuck, investigating...")
                logs = hook.get_job_logs(job_id)
                if 'OutOfMemoryError' in logs.get('stderr', ''):
                    print("OutOfMemoryError detected - job needs more memory")
                
            time.sleep(30)  # Check every 30 seconds
        
        # Get final logs and metrics
        final_logs = hook.get_job_logs(job_id)
        monitoring_data['final_logs'] = final_logs
        monitoring_data['final_status'] = current_status
        
        return monitoring_data
    
    return monitor_spark_job_detailed

This comprehensive documentation covers all Azure Synapse Analytics capabilities in the Apache Airflow Microsoft Azure Provider, including Spark job execution, pipeline orchestration, monitoring, and performance optimization techniques.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json