Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Execute Spark jobs and manage pipeline operations on Azure Synapse Analytics for big data processing and analytics workloads. Provides comprehensive integration for both Spark batch processing and pipeline orchestration capabilities.
Foundation hook for Azure Synapse Analytics operations providing common functionality and connection management.
class BaseAzureSynapseHook(BaseHook):
"""
Base hook for Azure Synapse Analytics operations.
Provides common functionality and connection management for Synapse
Spark and pipeline operations.
"""
def get_conn(self) -> Any:
"""
Get authenticated Azure Synapse client.
Returns:
Any: Synapse client instance
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Azure Synapse Analytics connection.
Returns:
tuple[bool, str]: Success status and message
"""Hook for Azure Synapse Spark operations providing Spark job execution and monitoring capabilities.
class AzureSynapseHook(BaseAzureSynapseHook):
"""
Hook for Azure Synapse Spark operations.
Provides methods for submitting Spark jobs, monitoring execution,
and managing Spark batch sessions on Azure Synapse Analytics.
"""
def get_conn(self) -> SparkClient:
"""
Get authenticated Synapse Spark client.
Returns:
SparkClient: Synapse Spark client instance
"""
def run_spark_job(
self,
payload: dict[str, Any],
**kwargs: Any
) -> dict[str, Any]:
"""
Submit a Spark job to Azure Synapse Analytics.
Args:
payload (dict[str, Any]): Spark job configuration including:
- name: Job name
- file: Main application file (jar, py, etc.)
- className: Main class name (for Scala/Java)
- args: Application arguments
- conf: Spark configuration
- executorCount: Number of executors
- executorCores: Cores per executor
- executorMemory: Memory per executor
- driverCores: Driver cores
- driverMemory: Driver memory
**kwargs: Additional job submission parameters
Returns:
dict[str, Any]: Job submission response with job ID and status
"""
def get_job_run_status(
self,
job_id: int,
**kwargs: Any
) -> str:
"""
Get the current status of a Spark job.
Args:
job_id (int): Spark job ID
**kwargs: Additional parameters
Returns:
str: Current job status (not_started, starting, running, idle, busy,
shutting_down, error, dead, killed, success)
"""
def wait_for_job_run_status(
self,
job_id: int,
expected_statuses: list[str],
check_interval: int = 30,
timeout: int = 3600
) -> bool:
"""
Wait for Spark job to reach expected status.
Args:
job_id (int): Spark job ID
expected_statuses (list[str]): List of acceptable statuses
check_interval (int): Check interval in seconds (default: 30)
timeout (int): Timeout in seconds (default: 3600)
Returns:
bool: True if job reached expected status, False if timeout
"""
def cancel_job_run(
self,
job_id: int,
**kwargs: Any
) -> None:
"""
Cancel a running Spark job.
Args:
job_id (int): Spark job ID to cancel
**kwargs: Additional parameters
"""
def get_job_logs(
self,
job_id: int,
**kwargs: Any
) -> dict[str, Any]:
"""
Get logs for a Spark job.
Args:
job_id (int): Spark job ID
**kwargs: Additional parameters
Returns:
dict[str, Any]: Job logs including stdout, stderr, and driver logs
"""
def list_spark_pools(self) -> list[dict[str, Any]]:
"""
List available Spark pools in the workspace.
Returns:
list[dict[str, Any]]: List of Spark pool configurations
"""
def get_spark_pool_details(
self,
spark_pool_name: str
) -> dict[str, Any]:
"""
Get details of a specific Spark pool.
Args:
spark_pool_name (str): Name of the Spark pool
Returns:
dict[str, Any]: Spark pool configuration and status
"""Hook for Azure Synapse Pipeline operations providing pipeline execution and monitoring capabilities.
class AzureSynapsePipelineHook(BaseAzureSynapseHook):
"""
Hook for Azure Synapse Pipeline operations.
Provides methods for running pipelines, monitoring execution,
and managing pipeline runs on Azure Synapse Analytics.
"""
def get_conn(self) -> ArtifactsClient:
"""
Get authenticated Synapse artifacts client.
Returns:
ArtifactsClient: Synapse artifacts client instance
"""
def run_pipeline(
self,
pipeline_name: str,
**config: Any
) -> CreateRunResponse:
"""
Run a pipeline in Azure Synapse Analytics.
Args:
pipeline_name (str): Name of the pipeline to run
**config: Pipeline run configuration including:
- parameters: Pipeline parameters
- reference_pipeline_run_id: Reference run ID
- is_recovery: Whether this is a recovery run
Returns:
CreateRunResponse: Pipeline run response with run ID
"""
def get_pipeline_run(
self,
run_id: str,
**kwargs: Any
) -> PipelineRun:
"""
Get details of a pipeline run.
Args:
run_id (str): Pipeline run ID
**kwargs: Additional parameters
Returns:
PipelineRun: Pipeline run details including status and metadata
"""
def get_pipeline_run_status(
self,
run_id: str,
**kwargs: Any
) -> str:
"""
Get the current status of a pipeline run.
Args:
run_id (str): Pipeline run ID
**kwargs: Additional parameters
Returns:
str: Current pipeline status (Queued, InProgress, Succeeded, Failed, Cancelled)
"""
def refresh_conn(self) -> ArtifactsClient:
"""
Refresh the Synapse artifacts connection.
Returns:
ArtifactsClient: Refreshed artifacts client instance
"""
def wait_for_pipeline_run_status(
self,
run_id: str,
expected_statuses: list[str],
check_interval: int = 60,
timeout: int = 7200
) -> bool:
"""
Wait for pipeline run to reach expected status.
Args:
run_id (str): Pipeline run ID
expected_statuses (list[str]): List of acceptable statuses
check_interval (int): Check interval in seconds (default: 60)
timeout (int): Timeout in seconds (default: 7200)
Returns:
bool: True if pipeline reached expected status, False if timeout
"""
def cancel_run_pipeline(
self,
run_id: str,
**kwargs: Any
) -> None:
"""
Cancel a running pipeline.
Args:
run_id (str): Pipeline run ID to cancel
**kwargs: Additional parameters
"""
def get_pipeline_activities(
self,
run_id: str,
**kwargs: Any
) -> list[dict[str, Any]]:
"""
Get activity runs for a pipeline run.
Args:
run_id (str): Pipeline run ID
**kwargs: Additional parameters including:
- activity_name: Filter by activity name
- activity_type: Filter by activity type
Returns:
list[dict[str, Any]]: List of activity run details
"""
def get_pipeline_details(
self,
pipeline_name: str
) -> dict[str, Any]:
"""
Get pipeline definition and metadata.
Args:
pipeline_name (str): Name of the pipeline
Returns:
dict[str, Any]: Pipeline definition and configuration
"""
def list_pipelines(self) -> list[dict[str, Any]]:
"""
List all pipelines in the workspace.
Returns:
list[dict[str, Any]]: List of pipeline definitions
"""Execute Azure Synapse Analytics operations as Airflow tasks with comprehensive Spark and pipeline management capabilities.
class AzureSynapseRunSparkBatchOperator(BaseOperator):
"""
Runs Spark batch jobs on Azure Synapse Analytics.
Supports running Spark applications with custom configurations,
resource allocation, and dependency management.
"""
def __init__(
self,
*,
azure_synapse_conn_id: str = "azure_synapse_default",
spark_pool_name: str,
payload: dict[str, Any],
timeout: int = 60 * 60 * 24 * 7,
check_interval: int = 60,
**kwargs
):
"""
Initialize Synapse Spark batch operator.
Args:
azure_synapse_conn_id (str): Airflow connection ID for Synapse
spark_pool_name (str): Name of the Spark pool to use
payload (dict[str, Any]): Spark job configuration
timeout (int): Job timeout in seconds (default: 7 days)
check_interval (int): Status check interval in seconds (default: 60)
"""
def execute(self, context: Context) -> dict[str, Any]:
"""
Execute Spark batch job on Synapse Analytics.
Args:
context (Context): Airflow task context
Returns:
dict[str, Any]: Job execution results and metadata
"""
def on_kill(self) -> None:
"""Cancel running Spark job on task termination."""
class AzureSynapseRunPipelineOperator(BaseOperator):
"""
Runs pipelines on Azure Synapse Analytics.
Supports executing Synapse pipelines with parameter passing
and comprehensive monitoring capabilities.
"""
def __init__(
self,
*,
pipeline_name: str,
azure_synapse_conn_id: str = "azure_synapse_default",
pipeline_timeout: int = 60 * 60 * 24 * 7,
check_interval: int = 60,
**pipeline_run_parameters: Any
):
"""
Initialize Synapse pipeline operator.
Args:
pipeline_name (str): Name of the pipeline to run
azure_synapse_conn_id (str): Airflow connection ID for Synapse
pipeline_timeout (int): Pipeline timeout in seconds (default: 7 days)
check_interval (int): Status check interval in seconds (default: 60)
**pipeline_run_parameters: Parameters to pass to the pipeline
"""
def execute(self, context: Context) -> str:
"""
Execute pipeline on Synapse Analytics.
Args:
context (Context): Airflow task context
Returns:
str: Pipeline run ID
"""
def on_kill(self) -> None:
"""Cancel running pipeline on task termination."""class AzureSynapseSparkBatchRunStatus:
"""Constants for Synapse Spark job statuses."""
NOT_STARTED: str = "not_started"
STARTING: str = "starting"
RUNNING: str = "running"
IDLE: str = "idle"
BUSY: str = "busy"
SHUTTING_DOWN: str = "shutting_down"
ERROR: str = "error"
DEAD: str = "dead"
KILLED: str = "killed"
SUCCESS: str = "success"
class AzureSynapsePipelineRunStatus:
"""Constants for Synapse pipeline run statuses."""
QUEUED: str = "Queued"
IN_PROGRESS: str = "InProgress"
SUCCEEDED: str = "Succeeded"
FAILED: str = "Failed"
CANCELLED: str = "Cancelled"
class AzureSynapsePipelineRunException(Exception):
"""Custom exception for Synapse pipeline operations."""
passclass AzureSynapsePipelineRunLink(BaseOperatorLink):
"""
Link to Synapse pipeline run in Azure portal.
Provides direct access to pipeline run details and monitoring
in the Azure Synapse Studio interface.
"""
name: str = "Azure Synapse Pipeline Run"
def get_link(
self,
operator: BaseOperator,
dttm: datetime | None = None,
**kwargs: Any
) -> str:
"""
Generate link to Azure Synapse pipeline run.
Args:
operator (BaseOperator): Airflow operator instance
dttm (datetime | None): Execution date
**kwargs: Additional parameters
Returns:
str: URL to Synapse pipeline run in Azure portal
"""from airflow import DAG
from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunSparkBatchOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_spark_results(**context):
"""Process results from Spark job execution."""
result = context['task_instance'].xcom_pull(task_ids='run_data_analysis')
print(f"Spark job result: {result}")
job_id = result.get('job_id')
status = result.get('status')
if status == 'success':
print(f"Spark job {job_id} completed successfully")
else:
print(f"Spark job {job_id} failed with status: {status}")
return result
dag = DAG(
'synapse_spark_workflow',
default_args={
'owner': 'analytics-team',
'retries': 2,
'retry_delay': timedelta(minutes=5)
},
description='Synapse Spark data analysis workflow',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Configure Spark job payload
spark_payload = {
"name": "daily-data-analysis",
"file": "abfss://data@mystorageaccount.dfs.core.windows.net/scripts/analyze_data.py",
"args": [
"--input-path", "abfss://data@mystorageaccount.dfs.core.windows.net/input/",
"--output-path", "abfss://data@mystorageaccount.dfs.core.windows.net/output/",
"--date", "{{ ds }}"
],
"conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
},
"executorCount": 4,
"executorCores": 4,
"executorMemory": "8g",
"driverCores": 2,
"driverMemory": "4g"
}
# Run Spark job
run_analysis = AzureSynapseRunSparkBatchOperator(
task_id='run_data_analysis',
azure_synapse_conn_id='synapse_conn',
spark_pool_name='analytics-pool',
payload=spark_payload,
timeout=3600, # 1 hour timeout
check_interval=30, # Check every 30 seconds
dag=dag
)
# Process results
process_results = PythonOperator(
task_id='process_results',
python_callable=process_spark_results,
dag=dag
)
run_analysis >> process_resultsfrom airflow import DAG
from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunSparkBatchOperator
from airflow.providers.microsoft.azure.hooks.synapse import AzureSynapseHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def setup_spark_environment():
"""Set up Spark environment and validate configuration."""
hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
# List available Spark pools
pools = hook.list_spark_pools()
print(f"Available Spark pools: {[pool['name'] for pool in pools]}")
# Get details of the target pool
pool_details = hook.get_spark_pool_details('ml-processing-pool')
print(f"Pool configuration: {pool_details}")
return pool_details
def monitor_spark_job(**context):
"""Monitor Spark job execution with detailed logging."""
hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
# Get job ID from previous task
job_result = context['task_instance'].xcom_pull(task_ids='run_ml_training')
job_id = job_result['job_id']
# Get detailed logs
logs = hook.get_job_logs(job_id)
print("=== Spark Driver Logs ===")
print(logs.get('driverLogs', 'No driver logs available'))
print("=== Spark Executor Logs ===")
print(logs.get('executorLogs', 'No executor logs available'))
# Final status check
final_status = hook.get_job_run_status(job_id)
print(f"Final job status: {final_status}")
return {
'job_id': job_id,
'final_status': final_status,
'logs_available': bool(logs.get('driverLogs') or logs.get('executorLogs'))
}
dag = DAG(
'advanced_spark_ml_workflow',
default_args={
'owner': 'ml-team',
'retries': 1,
'retry_delay': timedelta(minutes=10)
},
description='Advanced Spark ML workflow with monitoring',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Setup environment
setup_env = PythonOperator(
task_id='setup_environment',
python_callable=setup_spark_environment,
dag=dag
)
# Advanced ML training job configuration
ml_payload = {
"name": "ml-model-training-{{ ds_nodash }}",
"file": "abfss://ml@mlstorageaccount.dfs.core.windows.net/scripts/train_model.py",
"className": None, # Python job
"args": [
"--training-data", "abfss://ml@mlstorageaccount.dfs.core.windows.net/data/training/{{ ds }}/",
"--model-output", "abfss://ml@mlstorageaccount.dfs.core.windows.net/models/{{ ds_nodash }}/",
"--algorithm", "random_forest",
"--cross-validation", "5",
"--feature-selection", "true"
],
"jars": [
"abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/spark-ml-extended.jar"
],
"pyFiles": [
"abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/ml_utils.py",
"abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/feature_engineering.py"
],
"files": [
"abfss://ml@mlstorageaccount.dfs.core.windows.net/config/ml_config.json"
],
"conf": {
# Performance tuning
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
# Memory management
"spark.sql.execution.arrow.pyspark.enabled": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
# ML specific configurations
"spark.ml.stage.parallelism": "4",
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB",
# Checkpointing for long-running ML jobs
"spark.sql.streaming.checkpointLocation": "abfss://ml@mlstorageaccount.dfs.core.windows.net/checkpoints/{{ ds_nodash }}/"
},
"executorCount": 8,
"executorCores": 4,
"executorMemory": "16g",
"driverCores": 4,
"driverMemory": "8g",
"tags": {
"project": "ml-pipeline",
"environment": "production",
"model_type": "random_forest"
}
}
# Run ML training job
run_training = AzureSynapseRunSparkBatchOperator(
task_id='run_ml_training',
azure_synapse_conn_id='synapse_conn',
spark_pool_name='ml-processing-pool',
payload=ml_payload,
timeout=7200, # 2 hours timeout for ML training
check_interval=60, # Check every minute
dag=dag
)
# Monitor job execution
monitor_job = PythonOperator(
task_id='monitor_training_job',
python_callable=monitor_spark_job,
dag=dag
)
setup_env >> run_training >> monitor_jobfrom airflow import DAG
from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunPipelineOperator
from airflow.providers.microsoft.azure.hooks.synapse import AzureSynapsePipelineHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def setup_pipeline_parameters():
"""Set up dynamic parameters for pipeline execution."""
execution_date = datetime.now().strftime('%Y-%m-%d')
parameters = {
"processingDate": execution_date,
"inputDataPath": f"abfss://data@datastorageaccount.dfs.core.windows.net/raw/{execution_date}/",
"outputDataPath": f"abfss://data@datastorageaccount.dfs.core.windows.net/processed/{execution_date}/",
"batchSize": "1000",
"parallelism": "4",
"retryAttempts": "3"
}
print(f"Pipeline parameters: {parameters}")
return parameters
def monitor_pipeline_activities(**context):
"""Monitor individual activities within the pipeline run."""
hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')
# Get pipeline run ID from previous task
run_id = context['task_instance'].xcom_pull(task_ids='run_etl_pipeline')
# Get activity runs
activities = hook.get_pipeline_activities(run_id)
print(f"Pipeline run {run_id} activities:")
for activity in activities:
print(f"- {activity['activityName']}: {activity['status']} "
f"(Duration: {activity.get('durationInMs', 0)}ms)")
if activity['status'] == 'Failed':
print(f" Error: {activity.get('error', {}).get('message', 'Unknown error')}")
# Get overall pipeline status
pipeline_run = hook.get_pipeline_run(run_id)
return {
'run_id': run_id,
'status': pipeline_run.status,
'duration_ms': pipeline_run.duration_in_ms,
'activities': len(activities),
'failed_activities': len([a for a in activities if a['status'] == 'Failed'])
}
def validate_pipeline_outputs(**context):
"""Validate pipeline execution results."""
monitoring_result = context['task_instance'].xcom_pull(task_ids='monitor_activities')
if monitoring_result['failed_activities'] > 0:
raise ValueError(f"Pipeline has {monitoring_result['failed_activities']} failed activities")
if monitoring_result['status'] != 'Succeeded':
raise ValueError(f"Pipeline failed with status: {monitoring_result['status']}")
print(f"Pipeline validation passed. Duration: {monitoring_result['duration_ms']}ms")
return monitoring_result
dag = DAG(
'synapse_pipeline_workflow',
default_args={
'owner': 'data-engineering-team',
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='Synapse pipeline ETL workflow',
schedule_interval=timedelta(hours=6),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Setup parameters
setup_params = PythonOperator(
task_id='setup_parameters',
python_callable=setup_pipeline_parameters,
dag=dag
)
# Run ETL pipeline
run_pipeline = AzureSynapseRunPipelineOperator(
task_id='run_etl_pipeline',
pipeline_name='data-processing-etl',
azure_synapse_conn_id='synapse_conn',
pipeline_timeout=3600, # 1 hour timeout
check_interval=30, # Check every 30 seconds
# Dynamic parameters from previous task
processingDate="{{ task_instance.xcom_pull(task_ids='setup_parameters')['processingDate'] }}",
inputDataPath="{{ task_instance.xcom_pull(task_ids='setup_parameters')['inputDataPath'] }}",
outputDataPath="{{ task_instance.xcom_pull(task_ids='setup_parameters')['outputDataPath'] }}",
batchSize="{{ task_instance.xcom_pull(task_ids='setup_parameters')['batchSize'] }}",
dag=dag
)
# Monitor activities
monitor_activities = PythonOperator(
task_id='monitor_activities',
python_callable=monitor_pipeline_activities,
dag=dag
)
# Validate outputs
validate_outputs = PythonOperator(
task_id='validate_outputs',
python_callable=validate_pipeline_outputs,
dag=dag
)
setup_params >> run_pipeline >> monitor_activities >> validate_outputsfrom airflow import DAG
from airflow.providers.microsoft.azure.operators.synapse import (
AzureSynapseRunSparkBatchOperator,
AzureSynapseRunPipelineOperator
)
from airflow.providers.microsoft.azure.hooks.synapse import (
AzureSynapseHook,
AzureSynapsePipelineHook
)
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
def check_data_availability():
"""Check if input data is available for processing."""
# This would typically check Azure Data Lake or other data sources
# For this example, we'll simulate the check
import random
data_available = random.choice([True, False])
if data_available:
print("Input data is available, proceeding with processing")
return 'data_preprocessing'
else:
print("Input data not available, skipping processing")
return 'skip_processing'
def choose_processing_method(**context):
"""Choose between Spark job or pipeline based on data size."""
# This would typically analyze data characteristics
# For this example, we'll simulate the decision
import random
data_size = random.choice(['small', 'large'])
if data_size == 'large':
print("Large dataset detected, using Spark batch processing")
return 'spark_processing'
else:
print("Small dataset detected, using pipeline processing")
return 'pipeline_processing'
dag = DAG(
'complex_synapse_workflow',
default_args={
'owner': 'analytics-platform-team',
'retries': 2,
'retry_delay': timedelta(minutes=3)
},
description='Complex Synapse workflow with conditional processing',
schedule_interval=timedelta(hours=4),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Check data availability
check_data = BranchPythonOperator(
task_id='check_data_availability',
python_callable=check_data_availability,
dag=dag
)
# Data preprocessing (common step)
preprocessing_payload = {
"name": "data-preprocessing-{{ ds_nodash }}",
"file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/preprocess.py",
"args": ["--date", "{{ ds }}", "--validate", "true"],
"executorCount": 2,
"executorCores": 2,
"executorMemory": "4g",
"driverMemory": "2g"
}
data_preprocessing = AzureSynapseRunSparkBatchOperator(
task_id='data_preprocessing',
azure_synapse_conn_id='synapse_conn',
spark_pool_name='preprocessing-pool',
payload=preprocessing_payload,
timeout=1800,
dag=dag
)
# Choose processing method
choose_method = BranchPythonOperator(
task_id='choose_processing_method',
python_callable=choose_processing_method,
dag=dag
)
# Spark processing branch
spark_payload = {
"name": "large-data-processing-{{ ds_nodash }}",
"file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/spark_processing.py",
"args": [
"--input", "abfss://analytics@analyticsstorage.dfs.core.windows.net/preprocessed/{{ ds }}/",
"--output", "abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/"
],
"executorCount": 8,
"executorCores": 4,
"executorMemory": "16g",
"driverMemory": "8g",
"conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true"
}
}
spark_processing = AzureSynapseRunSparkBatchOperator(
task_id='spark_processing',
azure_synapse_conn_id='synapse_conn',
spark_pool_name='large-processing-pool',
payload=spark_payload,
timeout=3600,
dag=dag
)
# Pipeline processing branch
pipeline_processing = AzureSynapseRunPipelineOperator(
task_id='pipeline_processing',
pipeline_name='small-data-pipeline',
azure_synapse_conn_id='synapse_conn',
pipeline_timeout=1800,
inputPath="abfss://analytics@analyticsstorage.dfs.core.windows.net/preprocessed/{{ ds }}/",
outputPath="abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/",
dag=dag
)
# Skip processing (when no data available)
skip_processing = DummyOperator(
task_id='skip_processing',
dag=dag
)
# Join point for all branches
join_processing = DummyOperator(
task_id='join_processing',
trigger_rule='none_failed_or_skipped',
dag=dag
)
# Post-processing
postprocessing_payload = {
"name": "postprocessing-{{ ds_nodash }}",
"file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/postprocess.py",
"args": [
"--results-path", "abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/",
"--final-output", "abfss://analytics@analyticsstorage.dfs.core.windows.net/final/{{ ds }}/"
],
"executorCount": 2,
"executorCores": 2,
"executorMemory": "4g"
}
postprocessing = AzureSynapseRunSparkBatchOperator(
task_id='postprocessing',
azure_synapse_conn_id='synapse_conn',
spark_pool_name='postprocessing-pool',
payload=postprocessing_payload,
timeout=900,
dag=dag
)
# Set up dependencies
check_data >> [data_preprocessing, skip_processing]
data_preprocessing >> choose_method
choose_method >> [spark_processing, pipeline_processing]
[spark_processing, pipeline_processing, skip_processing] >> join_processing
join_processing >> postprocessingazure_synapse)Configure Azure Synapse Analytics connections in Airflow:
# Connection configuration for Synapse Analytics
{
"conn_id": "azure_synapse_default",
"conn_type": "azure_synapse",
"host": "myworkspace.dev.azuresynapse.net", # Synapse workspace URL
"extra": {
"subscriptionId": "your-subscription-id",
"resourceGroupName": "your-resource-group",
"workspaceName": "your-workspace-name",
"tenantId": "your-tenant-id",
"clientId": "your-client-id",
"clientSecret": "your-client-secret"
}
}Azure Synapse Analytics supports multiple authentication methods:
Service Principal Authentication:
extra = {
"tenantId": "your-tenant-id",
"clientId": "your-client-id",
"clientSecret": "your-client-secret"
}Managed Identity Authentication:
extra = {
"managedIdentityClientId": "your-managed-identity-client-id"
}Azure CLI Authentication:
extra = {
"use_azure_cli": True
}from airflow.providers.microsoft.azure.hooks.synapse import (
AzureSynapseHook,
AzureSynapsePipelineHook,
AzureSynapsePipelineRunException
)
def robust_synapse_operations():
"""Demonstrate error handling patterns for Synapse operations."""
spark_hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
pipeline_hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')
# Spark job error handling
try:
job_result = spark_hook.run_spark_job({
"name": "test-job",
"file": "test.py"
})
job_id = job_result['job_id']
# Wait for completion with timeout
success = spark_hook.wait_for_job_run_status(
job_id=job_id,
expected_statuses=['success', 'error', 'dead', 'killed'],
timeout=3600
)
if not success:
print("Job timed out, cancelling...")
spark_hook.cancel_job_run(job_id)
raise TimeoutError("Spark job timed out")
# Check final status
final_status = spark_hook.get_job_run_status(job_id)
if final_status != 'success':
# Get logs for debugging
logs = spark_hook.get_job_logs(job_id)
print(f"Job failed with status: {final_status}")
print(f"Error logs: {logs.get('stderr', 'No error logs')}")
raise RuntimeError(f"Spark job failed with status: {final_status}")
except Exception as e:
print(f"Spark job error: {e}")
raise
# Pipeline error handling
try:
run_response = pipeline_hook.run_pipeline("test-pipeline")
run_id = run_response.run_id
# Monitor pipeline with error handling
success = pipeline_hook.wait_for_pipeline_run_status(
run_id=run_id,
expected_statuses=['Succeeded', 'Failed', 'Cancelled'],
timeout=7200
)
if not success:
print("Pipeline timed out, attempting to cancel...")
try:
pipeline_hook.cancel_run_pipeline(run_id)
except Exception as cancel_error:
print(f"Failed to cancel pipeline: {cancel_error}")
raise TimeoutError("Pipeline timed out")
# Check final status and get activity details
pipeline_run = pipeline_hook.get_pipeline_run(run_id)
if pipeline_run.status == 'Failed':
activities = pipeline_hook.get_pipeline_activities(run_id)
failed_activities = [a for a in activities if a['status'] == 'Failed']
print(f"Pipeline failed. Failed activities: {len(failed_activities)}")
for activity in failed_activities:
print(f"- {activity['activityName']}: {activity.get('error', {}).get('message', 'Unknown error')}")
raise AzureSynapsePipelineRunException(f"Pipeline failed with {len(failed_activities)} failed activities")
except AzureSynapsePipelineRunException:
raise
except Exception as e:
print(f"Pipeline error: {e}")
raisedef test_synapse_connections():
"""Test Synapse Analytics connections and capabilities."""
# Test Spark hook
try:
spark_hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
success, message = spark_hook.test_connection()
if success:
print("Synapse Spark connection successful")
# List available pools
pools = spark_hook.list_spark_pools()
print(f"Available Spark pools: {[p['name'] for p in pools]}")
else:
print(f"Synapse Spark connection failed: {message}")
except Exception as e:
print(f"Synapse Spark connection test failed: {e}")
# Test Pipeline hook
try:
pipeline_hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')
success, message = pipeline_hook.test_connection()
if success:
print("Synapse Pipeline connection successful")
# List available pipelines
pipelines = pipeline_hook.list_pipelines()
print(f"Available pipelines: {[p['name'] for p in pipelines]}")
else:
print(f"Synapse Pipeline connection failed: {message}")
except Exception as e:
print(f"Synapse Pipeline connection test failed: {e}")def optimize_spark_configuration():
"""Demonstrate Spark optimization techniques for Synapse."""
# Optimized configuration for different workload types
# ETL/Data Processing workload
etl_config = {
"executorCount": 8,
"executorCores": 4,
"executorMemory": "16g",
"driverCores": 2,
"driverMemory": "8g",
"conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true",
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.execution.arrow.pyspark.enabled": "true"
}
}
# Machine Learning workload
ml_config = {
"executorCount": 6,
"executorCores": 6,
"executorMemory": "24g",
"driverCores": 4,
"driverMemory": "12g",
"conf": {
"spark.ml.stage.parallelism": "6",
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.localShuffleReader.enabled": "true"
}
}
# Streaming workload
streaming_config = {
"executorCount": 4,
"executorCores": 2,
"executorMemory": "8g",
"driverMemory": "4g",
"conf": {
"spark.streaming.backpressure.enabled": "true",
"spark.sql.streaming.checkpointLocation": "/checkpoint/path",
"spark.sql.adaptive.enabled": "false" # Disable for streaming
}
}
return {
'etl': etl_config,
'ml': ml_config,
'streaming': streaming_config
}
def implement_spark_monitoring():
"""Implement comprehensive Spark job monitoring."""
def monitor_spark_job_detailed(job_id: int):
"""Detailed monitoring of Spark job execution."""
hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
monitoring_data = {
'job_id': job_id,
'status_history': [],
'duration_seconds': 0,
'resource_usage': {}
}
start_time = datetime.now()
while True:
current_status = hook.get_job_run_status(job_id)
monitoring_data['status_history'].append({
'timestamp': datetime.now(),
'status': current_status
})
# Terminal statuses
if current_status in ['success', 'error', 'dead', 'killed']:
break
# Check for stuck jobs
monitoring_data['duration_seconds'] = (datetime.now() - start_time).total_seconds()
if monitoring_data['duration_seconds'] > 3600: # 1 hour
print("Job appears to be stuck, investigating...")
logs = hook.get_job_logs(job_id)
if 'OutOfMemoryError' in logs.get('stderr', ''):
print("OutOfMemoryError detected - job needs more memory")
time.sleep(30) # Check every 30 seconds
# Get final logs and metrics
final_logs = hook.get_job_logs(job_id)
monitoring_data['final_logs'] = final_logs
monitoring_data['final_status'] = current_status
return monitoring_data
return monitor_spark_job_detailedThis comprehensive documentation covers all Azure Synapse Analytics capabilities in the Apache Airflow Microsoft Azure Provider, including Spark job execution, pipeline orchestration, monitoring, and performance optimization techniques.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure