0
# Azure Synapse Analytics
1
2
Execute Spark jobs and manage pipeline operations on Azure Synapse Analytics for big data processing and analytics workloads. Provides comprehensive integration for both Spark batch processing and pipeline orchestration capabilities.
3
4
## Capabilities
5
6
### Base Synapse Hook
7
8
Foundation hook for Azure Synapse Analytics operations providing common functionality and connection management.
9
10
```python { .api }
11
class BaseAzureSynapseHook(BaseHook):
12
"""
13
Base hook for Azure Synapse Analytics operations.
14
15
Provides common functionality and connection management for Synapse
16
Spark and pipeline operations.
17
"""
18
19
def get_conn(self) -> Any:
20
"""
21
Get authenticated Azure Synapse client.
22
23
Returns:
24
Any: Synapse client instance
25
"""
26
27
def test_connection(self) -> tuple[bool, str]:
28
"""
29
Test the Azure Synapse Analytics connection.
30
31
Returns:
32
tuple[bool, str]: Success status and message
33
"""
34
```
35
36
### Synapse Spark Hook
37
38
Hook for Azure Synapse Spark operations providing Spark job execution and monitoring capabilities.
39
40
```python { .api }
41
class AzureSynapseHook(BaseAzureSynapseHook):
42
"""
43
Hook for Azure Synapse Spark operations.
44
45
Provides methods for submitting Spark jobs, monitoring execution,
46
and managing Spark batch sessions on Azure Synapse Analytics.
47
"""
48
49
def get_conn(self) -> SparkClient:
50
"""
51
Get authenticated Synapse Spark client.
52
53
Returns:
54
SparkClient: Synapse Spark client instance
55
"""
56
57
def run_spark_job(
58
self,
59
payload: dict[str, Any],
60
**kwargs: Any
61
) -> dict[str, Any]:
62
"""
63
Submit a Spark job to Azure Synapse Analytics.
64
65
Args:
66
payload (dict[str, Any]): Spark job configuration including:
67
- name: Job name
68
- file: Main application file (jar, py, etc.)
69
- className: Main class name (for Scala/Java)
70
- args: Application arguments
71
- conf: Spark configuration
72
- executorCount: Number of executors
73
- executorCores: Cores per executor
74
- executorMemory: Memory per executor
75
- driverCores: Driver cores
76
- driverMemory: Driver memory
77
**kwargs: Additional job submission parameters
78
79
Returns:
80
dict[str, Any]: Job submission response with job ID and status
81
"""
82
83
def get_job_run_status(
84
self,
85
job_id: int,
86
**kwargs: Any
87
) -> str:
88
"""
89
Get the current status of a Spark job.
90
91
Args:
92
job_id (int): Spark job ID
93
**kwargs: Additional parameters
94
95
Returns:
96
str: Current job status (not_started, starting, running, idle, busy,
97
shutting_down, error, dead, killed, success)
98
"""
99
100
def wait_for_job_run_status(
101
self,
102
job_id: int,
103
expected_statuses: list[str],
104
check_interval: int = 30,
105
timeout: int = 3600
106
) -> bool:
107
"""
108
Wait for Spark job to reach expected status.
109
110
Args:
111
job_id (int): Spark job ID
112
expected_statuses (list[str]): List of acceptable statuses
113
check_interval (int): Check interval in seconds (default: 30)
114
timeout (int): Timeout in seconds (default: 3600)
115
116
Returns:
117
bool: True if job reached expected status, False if timeout
118
"""
119
120
def cancel_job_run(
121
self,
122
job_id: int,
123
**kwargs: Any
124
) -> None:
125
"""
126
Cancel a running Spark job.
127
128
Args:
129
job_id (int): Spark job ID to cancel
130
**kwargs: Additional parameters
131
"""
132
133
def get_job_logs(
134
self,
135
job_id: int,
136
**kwargs: Any
137
) -> dict[str, Any]:
138
"""
139
Get logs for a Spark job.
140
141
Args:
142
job_id (int): Spark job ID
143
**kwargs: Additional parameters
144
145
Returns:
146
dict[str, Any]: Job logs including stdout, stderr, and driver logs
147
"""
148
149
def list_spark_pools(self) -> list[dict[str, Any]]:
150
"""
151
List available Spark pools in the workspace.
152
153
Returns:
154
list[dict[str, Any]]: List of Spark pool configurations
155
"""
156
157
def get_spark_pool_details(
158
self,
159
spark_pool_name: str
160
) -> dict[str, Any]:
161
"""
162
Get details of a specific Spark pool.
163
164
Args:
165
spark_pool_name (str): Name of the Spark pool
166
167
Returns:
168
dict[str, Any]: Spark pool configuration and status
169
"""
170
```
171
172
### Synapse Pipeline Hook
173
174
Hook for Azure Synapse Pipeline operations providing pipeline execution and monitoring capabilities.
175
176
```python { .api }
177
class AzureSynapsePipelineHook(BaseAzureSynapseHook):
178
"""
179
Hook for Azure Synapse Pipeline operations.
180
181
Provides methods for running pipelines, monitoring execution,
182
and managing pipeline runs on Azure Synapse Analytics.
183
"""
184
185
def get_conn(self) -> ArtifactsClient:
186
"""
187
Get authenticated Synapse artifacts client.
188
189
Returns:
190
ArtifactsClient: Synapse artifacts client instance
191
"""
192
193
def run_pipeline(
194
self,
195
pipeline_name: str,
196
**config: Any
197
) -> CreateRunResponse:
198
"""
199
Run a pipeline in Azure Synapse Analytics.
200
201
Args:
202
pipeline_name (str): Name of the pipeline to run
203
**config: Pipeline run configuration including:
204
- parameters: Pipeline parameters
205
- reference_pipeline_run_id: Reference run ID
206
- is_recovery: Whether this is a recovery run
207
208
Returns:
209
CreateRunResponse: Pipeline run response with run ID
210
"""
211
212
def get_pipeline_run(
213
self,
214
run_id: str,
215
**kwargs: Any
216
) -> PipelineRun:
217
"""
218
Get details of a pipeline run.
219
220
Args:
221
run_id (str): Pipeline run ID
222
**kwargs: Additional parameters
223
224
Returns:
225
PipelineRun: Pipeline run details including status and metadata
226
"""
227
228
def get_pipeline_run_status(
229
self,
230
run_id: str,
231
**kwargs: Any
232
) -> str:
233
"""
234
Get the current status of a pipeline run.
235
236
Args:
237
run_id (str): Pipeline run ID
238
**kwargs: Additional parameters
239
240
Returns:
241
str: Current pipeline status (Queued, InProgress, Succeeded, Failed, Cancelled)
242
"""
243
244
def refresh_conn(self) -> ArtifactsClient:
245
"""
246
Refresh the Synapse artifacts connection.
247
248
Returns:
249
ArtifactsClient: Refreshed artifacts client instance
250
"""
251
252
def wait_for_pipeline_run_status(
253
self,
254
run_id: str,
255
expected_statuses: list[str],
256
check_interval: int = 60,
257
timeout: int = 7200
258
) -> bool:
259
"""
260
Wait for pipeline run to reach expected status.
261
262
Args:
263
run_id (str): Pipeline run ID
264
expected_statuses (list[str]): List of acceptable statuses
265
check_interval (int): Check interval in seconds (default: 60)
266
timeout (int): Timeout in seconds (default: 7200)
267
268
Returns:
269
bool: True if pipeline reached expected status, False if timeout
270
"""
271
272
def cancel_run_pipeline(
273
self,
274
run_id: str,
275
**kwargs: Any
276
) -> None:
277
"""
278
Cancel a running pipeline.
279
280
Args:
281
run_id (str): Pipeline run ID to cancel
282
**kwargs: Additional parameters
283
"""
284
285
def get_pipeline_activities(
286
self,
287
run_id: str,
288
**kwargs: Any
289
) -> list[dict[str, Any]]:
290
"""
291
Get activity runs for a pipeline run.
292
293
Args:
294
run_id (str): Pipeline run ID
295
**kwargs: Additional parameters including:
296
- activity_name: Filter by activity name
297
- activity_type: Filter by activity type
298
299
Returns:
300
list[dict[str, Any]]: List of activity run details
301
"""
302
303
def get_pipeline_details(
304
self,
305
pipeline_name: str
306
) -> dict[str, Any]:
307
"""
308
Get pipeline definition and metadata.
309
310
Args:
311
pipeline_name (str): Name of the pipeline
312
313
Returns:
314
dict[str, Any]: Pipeline definition and configuration
315
"""
316
317
def list_pipelines(self) -> list[dict[str, Any]]:
318
"""
319
List all pipelines in the workspace.
320
321
Returns:
322
list[dict[str, Any]]: List of pipeline definitions
323
"""
324
```
325
326
## Synapse Analytics Operators
327
328
Execute Azure Synapse Analytics operations as Airflow tasks with comprehensive Spark and pipeline management capabilities.
329
330
### Spark Batch Operator
331
332
```python { .api }
333
class AzureSynapseRunSparkBatchOperator(BaseOperator):
334
"""
335
Runs Spark batch jobs on Azure Synapse Analytics.
336
337
Supports running Spark applications with custom configurations,
338
resource allocation, and dependency management.
339
"""
340
341
def __init__(
342
self,
343
*,
344
azure_synapse_conn_id: str = "azure_synapse_default",
345
spark_pool_name: str,
346
payload: dict[str, Any],
347
timeout: int = 60 * 60 * 24 * 7,
348
check_interval: int = 60,
349
**kwargs
350
):
351
"""
352
Initialize Synapse Spark batch operator.
353
354
Args:
355
azure_synapse_conn_id (str): Airflow connection ID for Synapse
356
spark_pool_name (str): Name of the Spark pool to use
357
payload (dict[str, Any]): Spark job configuration
358
timeout (int): Job timeout in seconds (default: 7 days)
359
check_interval (int): Status check interval in seconds (default: 60)
360
"""
361
362
def execute(self, context: Context) -> dict[str, Any]:
363
"""
364
Execute Spark batch job on Synapse Analytics.
365
366
Args:
367
context (Context): Airflow task context
368
369
Returns:
370
dict[str, Any]: Job execution results and metadata
371
"""
372
373
def on_kill(self) -> None:
374
"""Cancel running Spark job on task termination."""
375
376
class AzureSynapseRunPipelineOperator(BaseOperator):
377
"""
378
Runs pipelines on Azure Synapse Analytics.
379
380
Supports executing Synapse pipelines with parameter passing
381
and comprehensive monitoring capabilities.
382
"""
383
384
def __init__(
385
self,
386
*,
387
pipeline_name: str,
388
azure_synapse_conn_id: str = "azure_synapse_default",
389
pipeline_timeout: int = 60 * 60 * 24 * 7,
390
check_interval: int = 60,
391
**pipeline_run_parameters: Any
392
):
393
"""
394
Initialize Synapse pipeline operator.
395
396
Args:
397
pipeline_name (str): Name of the pipeline to run
398
azure_synapse_conn_id (str): Airflow connection ID for Synapse
399
pipeline_timeout (int): Pipeline timeout in seconds (default: 7 days)
400
check_interval (int): Status check interval in seconds (default: 60)
401
**pipeline_run_parameters: Parameters to pass to the pipeline
402
"""
403
404
def execute(self, context: Context) -> str:
405
"""
406
Execute pipeline on Synapse Analytics.
407
408
Args:
409
context (Context): Airflow task context
410
411
Returns:
412
str: Pipeline run ID
413
"""
414
415
def on_kill(self) -> None:
416
"""Cancel running pipeline on task termination."""
417
```
418
419
## Supporting Classes
420
421
### Status Constants and Exceptions
422
423
```python { .api }
424
class AzureSynapseSparkBatchRunStatus:
425
"""Constants for Synapse Spark job statuses."""
426
427
NOT_STARTED: str = "not_started"
428
STARTING: str = "starting"
429
RUNNING: str = "running"
430
IDLE: str = "idle"
431
BUSY: str = "busy"
432
SHUTTING_DOWN: str = "shutting_down"
433
ERROR: str = "error"
434
DEAD: str = "dead"
435
KILLED: str = "killed"
436
SUCCESS: str = "success"
437
438
class AzureSynapsePipelineRunStatus:
439
"""Constants for Synapse pipeline run statuses."""
440
441
QUEUED: str = "Queued"
442
IN_PROGRESS: str = "InProgress"
443
SUCCEEDED: str = "Succeeded"
444
FAILED: str = "Failed"
445
CANCELLED: str = "Cancelled"
446
447
class AzureSynapsePipelineRunException(Exception):
448
"""Custom exception for Synapse pipeline operations."""
449
pass
450
```
451
452
### Extra Links
453
454
```python { .api }
455
class AzureSynapsePipelineRunLink(BaseOperatorLink):
456
"""
457
Link to Synapse pipeline run in Azure portal.
458
459
Provides direct access to pipeline run details and monitoring
460
in the Azure Synapse Studio interface.
461
"""
462
463
name: str = "Azure Synapse Pipeline Run"
464
465
def get_link(
466
self,
467
operator: BaseOperator,
468
dttm: datetime | None = None,
469
**kwargs: Any
470
) -> str:
471
"""
472
Generate link to Azure Synapse pipeline run.
473
474
Args:
475
operator (BaseOperator): Airflow operator instance
476
dttm (datetime | None): Execution date
477
**kwargs: Additional parameters
478
479
Returns:
480
str: URL to Synapse pipeline run in Azure portal
481
"""
482
```
483
484
## Usage Examples
485
486
### Basic Spark Job Execution
487
488
```python
489
from airflow import DAG
490
from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunSparkBatchOperator
491
from airflow.operators.python import PythonOperator
492
from datetime import datetime, timedelta
493
494
def process_spark_results(**context):
495
"""Process results from Spark job execution."""
496
result = context['task_instance'].xcom_pull(task_ids='run_data_analysis')
497
498
print(f"Spark job result: {result}")
499
job_id = result.get('job_id')
500
status = result.get('status')
501
502
if status == 'success':
503
print(f"Spark job {job_id} completed successfully")
504
else:
505
print(f"Spark job {job_id} failed with status: {status}")
506
507
return result
508
509
dag = DAG(
510
'synapse_spark_workflow',
511
default_args={
512
'owner': 'analytics-team',
513
'retries': 2,
514
'retry_delay': timedelta(minutes=5)
515
},
516
description='Synapse Spark data analysis workflow',
517
schedule_interval=timedelta(days=1),
518
start_date=datetime(2024, 1, 1),
519
catchup=False
520
)
521
522
# Configure Spark job payload
523
spark_payload = {
524
"name": "daily-data-analysis",
525
"file": "abfss://data@mystorageaccount.dfs.core.windows.net/scripts/analyze_data.py",
526
"args": [
527
"--input-path", "abfss://data@mystorageaccount.dfs.core.windows.net/input/",
528
"--output-path", "abfss://data@mystorageaccount.dfs.core.windows.net/output/",
529
"--date", "{{ ds }}"
530
],
531
"conf": {
532
"spark.sql.adaptive.enabled": "true",
533
"spark.sql.adaptive.coalescePartitions.enabled": "true",
534
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
535
},
536
"executorCount": 4,
537
"executorCores": 4,
538
"executorMemory": "8g",
539
"driverCores": 2,
540
"driverMemory": "4g"
541
}
542
543
# Run Spark job
544
run_analysis = AzureSynapseRunSparkBatchOperator(
545
task_id='run_data_analysis',
546
azure_synapse_conn_id='synapse_conn',
547
spark_pool_name='analytics-pool',
548
payload=spark_payload,
549
timeout=3600, # 1 hour timeout
550
check_interval=30, # Check every 30 seconds
551
dag=dag
552
)
553
554
# Process results
555
process_results = PythonOperator(
556
task_id='process_results',
557
python_callable=process_spark_results,
558
dag=dag
559
)
560
561
run_analysis >> process_results
562
```
563
564
### Advanced Spark Configuration
565
566
```python
567
from airflow import DAG
568
from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunSparkBatchOperator
569
from airflow.providers.microsoft.azure.hooks.synapse import AzureSynapseHook
570
from airflow.operators.python import PythonOperator
571
from datetime import datetime, timedelta
572
573
def setup_spark_environment():
574
"""Set up Spark environment and validate configuration."""
575
hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
576
577
# List available Spark pools
578
pools = hook.list_spark_pools()
579
print(f"Available Spark pools: {[pool['name'] for pool in pools]}")
580
581
# Get details of the target pool
582
pool_details = hook.get_spark_pool_details('ml-processing-pool')
583
print(f"Pool configuration: {pool_details}")
584
585
return pool_details
586
587
def monitor_spark_job(**context):
588
"""Monitor Spark job execution with detailed logging."""
589
hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
590
591
# Get job ID from previous task
592
job_result = context['task_instance'].xcom_pull(task_ids='run_ml_training')
593
job_id = job_result['job_id']
594
595
# Get detailed logs
596
logs = hook.get_job_logs(job_id)
597
598
print("=== Spark Driver Logs ===")
599
print(logs.get('driverLogs', 'No driver logs available'))
600
601
print("=== Spark Executor Logs ===")
602
print(logs.get('executorLogs', 'No executor logs available'))
603
604
# Final status check
605
final_status = hook.get_job_run_status(job_id)
606
print(f"Final job status: {final_status}")
607
608
return {
609
'job_id': job_id,
610
'final_status': final_status,
611
'logs_available': bool(logs.get('driverLogs') or logs.get('executorLogs'))
612
}
613
614
dag = DAG(
615
'advanced_spark_ml_workflow',
616
default_args={
617
'owner': 'ml-team',
618
'retries': 1,
619
'retry_delay': timedelta(minutes=10)
620
},
621
description='Advanced Spark ML workflow with monitoring',
622
schedule_interval=timedelta(days=1),
623
start_date=datetime(2024, 1, 1),
624
catchup=False
625
)
626
627
# Setup environment
628
setup_env = PythonOperator(
629
task_id='setup_environment',
630
python_callable=setup_spark_environment,
631
dag=dag
632
)
633
634
# Advanced ML training job configuration
635
ml_payload = {
636
"name": "ml-model-training-{{ ds_nodash }}",
637
"file": "abfss://ml@mlstorageaccount.dfs.core.windows.net/scripts/train_model.py",
638
"className": None, # Python job
639
"args": [
640
"--training-data", "abfss://ml@mlstorageaccount.dfs.core.windows.net/data/training/{{ ds }}/",
641
"--model-output", "abfss://ml@mlstorageaccount.dfs.core.windows.net/models/{{ ds_nodash }}/",
642
"--algorithm", "random_forest",
643
"--cross-validation", "5",
644
"--feature-selection", "true"
645
],
646
"jars": [
647
"abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/spark-ml-extended.jar"
648
],
649
"pyFiles": [
650
"abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/ml_utils.py",
651
"abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/feature_engineering.py"
652
],
653
"files": [
654
"abfss://ml@mlstorageaccount.dfs.core.windows.net/config/ml_config.json"
655
],
656
"conf": {
657
# Performance tuning
658
"spark.sql.adaptive.enabled": "true",
659
"spark.sql.adaptive.coalescePartitions.enabled": "true",
660
"spark.sql.adaptive.skewJoin.enabled": "true",
661
662
# Memory management
663
"spark.sql.execution.arrow.pyspark.enabled": "true",
664
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
665
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
666
667
# ML specific configurations
668
"spark.ml.stage.parallelism": "4",
669
"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB",
670
671
# Checkpointing for long-running ML jobs
672
"spark.sql.streaming.checkpointLocation": "abfss://ml@mlstorageaccount.dfs.core.windows.net/checkpoints/{{ ds_nodash }}/"
673
},
674
"executorCount": 8,
675
"executorCores": 4,
676
"executorMemory": "16g",
677
"driverCores": 4,
678
"driverMemory": "8g",
679
"tags": {
680
"project": "ml-pipeline",
681
"environment": "production",
682
"model_type": "random_forest"
683
}
684
}
685
686
# Run ML training job
687
run_training = AzureSynapseRunSparkBatchOperator(
688
task_id='run_ml_training',
689
azure_synapse_conn_id='synapse_conn',
690
spark_pool_name='ml-processing-pool',
691
payload=ml_payload,
692
timeout=7200, # 2 hours timeout for ML training
693
check_interval=60, # Check every minute
694
dag=dag
695
)
696
697
# Monitor job execution
698
monitor_job = PythonOperator(
699
task_id='monitor_training_job',
700
python_callable=monitor_spark_job,
701
dag=dag
702
)
703
704
setup_env >> run_training >> monitor_job
705
```
706
707
### Pipeline Orchestration
708
709
```python
710
from airflow import DAG
711
from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunPipelineOperator
712
from airflow.providers.microsoft.azure.hooks.synapse import AzureSynapsePipelineHook
713
from airflow.operators.python import PythonOperator
714
from datetime import datetime, timedelta
715
716
def setup_pipeline_parameters():
717
"""Set up dynamic parameters for pipeline execution."""
718
execution_date = datetime.now().strftime('%Y-%m-%d')
719
720
parameters = {
721
"processingDate": execution_date,
722
"inputDataPath": f"abfss://data@datastorageaccount.dfs.core.windows.net/raw/{execution_date}/",
723
"outputDataPath": f"abfss://data@datastorageaccount.dfs.core.windows.net/processed/{execution_date}/",
724
"batchSize": "1000",
725
"parallelism": "4",
726
"retryAttempts": "3"
727
}
728
729
print(f"Pipeline parameters: {parameters}")
730
return parameters
731
732
def monitor_pipeline_activities(**context):
733
"""Monitor individual activities within the pipeline run."""
734
hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')
735
736
# Get pipeline run ID from previous task
737
run_id = context['task_instance'].xcom_pull(task_ids='run_etl_pipeline')
738
739
# Get activity runs
740
activities = hook.get_pipeline_activities(run_id)
741
742
print(f"Pipeline run {run_id} activities:")
743
for activity in activities:
744
print(f"- {activity['activityName']}: {activity['status']} "
745
f"(Duration: {activity.get('durationInMs', 0)}ms)")
746
747
if activity['status'] == 'Failed':
748
print(f" Error: {activity.get('error', {}).get('message', 'Unknown error')}")
749
750
# Get overall pipeline status
751
pipeline_run = hook.get_pipeline_run(run_id)
752
753
return {
754
'run_id': run_id,
755
'status': pipeline_run.status,
756
'duration_ms': pipeline_run.duration_in_ms,
757
'activities': len(activities),
758
'failed_activities': len([a for a in activities if a['status'] == 'Failed'])
759
}
760
761
def validate_pipeline_outputs(**context):
762
"""Validate pipeline execution results."""
763
monitoring_result = context['task_instance'].xcom_pull(task_ids='monitor_activities')
764
765
if monitoring_result['failed_activities'] > 0:
766
raise ValueError(f"Pipeline has {monitoring_result['failed_activities']} failed activities")
767
768
if monitoring_result['status'] != 'Succeeded':
769
raise ValueError(f"Pipeline failed with status: {monitoring_result['status']}")
770
771
print(f"Pipeline validation passed. Duration: {monitoring_result['duration_ms']}ms")
772
return monitoring_result
773
774
dag = DAG(
775
'synapse_pipeline_workflow',
776
default_args={
777
'owner': 'data-engineering-team',
778
'retries': 1,
779
'retry_delay': timedelta(minutes=5)
780
},
781
description='Synapse pipeline ETL workflow',
782
schedule_interval=timedelta(hours=6),
783
start_date=datetime(2024, 1, 1),
784
catchup=False
785
)
786
787
# Setup parameters
788
setup_params = PythonOperator(
789
task_id='setup_parameters',
790
python_callable=setup_pipeline_parameters,
791
dag=dag
792
)
793
794
# Run ETL pipeline
795
run_pipeline = AzureSynapseRunPipelineOperator(
796
task_id='run_etl_pipeline',
797
pipeline_name='data-processing-etl',
798
azure_synapse_conn_id='synapse_conn',
799
pipeline_timeout=3600, # 1 hour timeout
800
check_interval=30, # Check every 30 seconds
801
# Dynamic parameters from previous task
802
processingDate="{{ task_instance.xcom_pull(task_ids='setup_parameters')['processingDate'] }}",
803
inputDataPath="{{ task_instance.xcom_pull(task_ids='setup_parameters')['inputDataPath'] }}",
804
outputDataPath="{{ task_instance.xcom_pull(task_ids='setup_parameters')['outputDataPath'] }}",
805
batchSize="{{ task_instance.xcom_pull(task_ids='setup_parameters')['batchSize'] }}",
806
dag=dag
807
)
808
809
# Monitor activities
810
monitor_activities = PythonOperator(
811
task_id='monitor_activities',
812
python_callable=monitor_pipeline_activities,
813
dag=dag
814
)
815
816
# Validate outputs
817
validate_outputs = PythonOperator(
818
task_id='validate_outputs',
819
python_callable=validate_pipeline_outputs,
820
dag=dag
821
)
822
823
setup_params >> run_pipeline >> monitor_activities >> validate_outputs
824
```
825
826
### Complex Workflow with Multiple Synapse Services
827
828
```python
829
from airflow import DAG
830
from airflow.providers.microsoft.azure.operators.synapse import (
831
AzureSynapseRunSparkBatchOperator,
832
AzureSynapseRunPipelineOperator
833
)
834
from airflow.providers.microsoft.azure.hooks.synapse import (
835
AzureSynapseHook,
836
AzureSynapsePipelineHook
837
)
838
from airflow.operators.python import PythonOperator, BranchPythonOperator
839
from airflow.operators.dummy import DummyOperator
840
from datetime import datetime, timedelta
841
842
def check_data_availability():
843
"""Check if input data is available for processing."""
844
# This would typically check Azure Data Lake or other data sources
845
# For this example, we'll simulate the check
846
847
import random
848
data_available = random.choice([True, False])
849
850
if data_available:
851
print("Input data is available, proceeding with processing")
852
return 'data_preprocessing'
853
else:
854
print("Input data not available, skipping processing")
855
return 'skip_processing'
856
857
def choose_processing_method(**context):
858
"""Choose between Spark job or pipeline based on data size."""
859
# This would typically analyze data characteristics
860
# For this example, we'll simulate the decision
861
862
import random
863
data_size = random.choice(['small', 'large'])
864
865
if data_size == 'large':
866
print("Large dataset detected, using Spark batch processing")
867
return 'spark_processing'
868
else:
869
print("Small dataset detected, using pipeline processing")
870
return 'pipeline_processing'
871
872
dag = DAG(
873
'complex_synapse_workflow',
874
default_args={
875
'owner': 'analytics-platform-team',
876
'retries': 2,
877
'retry_delay': timedelta(minutes=3)
878
},
879
description='Complex Synapse workflow with conditional processing',
880
schedule_interval=timedelta(hours=4),
881
start_date=datetime(2024, 1, 1),
882
catchup=False
883
)
884
885
# Check data availability
886
check_data = BranchPythonOperator(
887
task_id='check_data_availability',
888
python_callable=check_data_availability,
889
dag=dag
890
)
891
892
# Data preprocessing (common step)
893
preprocessing_payload = {
894
"name": "data-preprocessing-{{ ds_nodash }}",
895
"file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/preprocess.py",
896
"args": ["--date", "{{ ds }}", "--validate", "true"],
897
"executorCount": 2,
898
"executorCores": 2,
899
"executorMemory": "4g",
900
"driverMemory": "2g"
901
}
902
903
data_preprocessing = AzureSynapseRunSparkBatchOperator(
904
task_id='data_preprocessing',
905
azure_synapse_conn_id='synapse_conn',
906
spark_pool_name='preprocessing-pool',
907
payload=preprocessing_payload,
908
timeout=1800,
909
dag=dag
910
)
911
912
# Choose processing method
913
choose_method = BranchPythonOperator(
914
task_id='choose_processing_method',
915
python_callable=choose_processing_method,
916
dag=dag
917
)
918
919
# Spark processing branch
920
spark_payload = {
921
"name": "large-data-processing-{{ ds_nodash }}",
922
"file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/spark_processing.py",
923
"args": [
924
"--input", "abfss://analytics@analyticsstorage.dfs.core.windows.net/preprocessed/{{ ds }}/",
925
"--output", "abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/"
926
],
927
"executorCount": 8,
928
"executorCores": 4,
929
"executorMemory": "16g",
930
"driverMemory": "8g",
931
"conf": {
932
"spark.sql.adaptive.enabled": "true",
933
"spark.sql.adaptive.coalescePartitions.enabled": "true"
934
}
935
}
936
937
spark_processing = AzureSynapseRunSparkBatchOperator(
938
task_id='spark_processing',
939
azure_synapse_conn_id='synapse_conn',
940
spark_pool_name='large-processing-pool',
941
payload=spark_payload,
942
timeout=3600,
943
dag=dag
944
)
945
946
# Pipeline processing branch
947
pipeline_processing = AzureSynapseRunPipelineOperator(
948
task_id='pipeline_processing',
949
pipeline_name='small-data-pipeline',
950
azure_synapse_conn_id='synapse_conn',
951
pipeline_timeout=1800,
952
inputPath="abfss://analytics@analyticsstorage.dfs.core.windows.net/preprocessed/{{ ds }}/",
953
outputPath="abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/",
954
dag=dag
955
)
956
957
# Skip processing (when no data available)
958
skip_processing = DummyOperator(
959
task_id='skip_processing',
960
dag=dag
961
)
962
963
# Join point for all branches
964
join_processing = DummyOperator(
965
task_id='join_processing',
966
trigger_rule='none_failed_or_skipped',
967
dag=dag
968
)
969
970
# Post-processing
971
postprocessing_payload = {
972
"name": "postprocessing-{{ ds_nodash }}",
973
"file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/postprocess.py",
974
"args": [
975
"--results-path", "abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/",
976
"--final-output", "abfss://analytics@analyticsstorage.dfs.core.windows.net/final/{{ ds }}/"
977
],
978
"executorCount": 2,
979
"executorCores": 2,
980
"executorMemory": "4g"
981
}
982
983
postprocessing = AzureSynapseRunSparkBatchOperator(
984
task_id='postprocessing',
985
azure_synapse_conn_id='synapse_conn',
986
spark_pool_name='postprocessing-pool',
987
payload=postprocessing_payload,
988
timeout=900,
989
dag=dag
990
)
991
992
# Set up dependencies
993
check_data >> [data_preprocessing, skip_processing]
994
data_preprocessing >> choose_method
995
choose_method >> [spark_processing, pipeline_processing]
996
[spark_processing, pipeline_processing, skip_processing] >> join_processing
997
join_processing >> postprocessing
998
```
999
1000
## Connection Configuration
1001
1002
### Synapse Analytics Connection (`azure_synapse`)
1003
1004
Configure Azure Synapse Analytics connections in Airflow:
1005
1006
```python
1007
# Connection configuration for Synapse Analytics
1008
{
1009
"conn_id": "azure_synapse_default",
1010
"conn_type": "azure_synapse",
1011
"host": "myworkspace.dev.azuresynapse.net", # Synapse workspace URL
1012
"extra": {
1013
"subscriptionId": "your-subscription-id",
1014
"resourceGroupName": "your-resource-group",
1015
"workspaceName": "your-workspace-name",
1016
"tenantId": "your-tenant-id",
1017
"clientId": "your-client-id",
1018
"clientSecret": "your-client-secret"
1019
}
1020
}
1021
```
1022
1023
### Authentication Methods
1024
1025
Azure Synapse Analytics supports multiple authentication methods:
1026
1027
1. **Service Principal Authentication**:
1028
```python
1029
extra = {
1030
"tenantId": "your-tenant-id",
1031
"clientId": "your-client-id",
1032
"clientSecret": "your-client-secret"
1033
}
1034
```
1035
1036
2. **Managed Identity Authentication**:
1037
```python
1038
extra = {
1039
"managedIdentityClientId": "your-managed-identity-client-id"
1040
}
1041
```
1042
1043
3. **Azure CLI Authentication**:
1044
```python
1045
extra = {
1046
"use_azure_cli": True
1047
}
1048
```
1049
1050
## Error Handling
1051
1052
### Common Exception Patterns
1053
1054
```python
1055
from airflow.providers.microsoft.azure.hooks.synapse import (
1056
AzureSynapseHook,
1057
AzureSynapsePipelineHook,
1058
AzureSynapsePipelineRunException
1059
)
1060
1061
def robust_synapse_operations():
1062
"""Demonstrate error handling patterns for Synapse operations."""
1063
1064
spark_hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
1065
pipeline_hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')
1066
1067
# Spark job error handling
1068
try:
1069
job_result = spark_hook.run_spark_job({
1070
"name": "test-job",
1071
"file": "test.py"
1072
})
1073
1074
job_id = job_result['job_id']
1075
1076
# Wait for completion with timeout
1077
success = spark_hook.wait_for_job_run_status(
1078
job_id=job_id,
1079
expected_statuses=['success', 'error', 'dead', 'killed'],
1080
timeout=3600
1081
)
1082
1083
if not success:
1084
print("Job timed out, cancelling...")
1085
spark_hook.cancel_job_run(job_id)
1086
raise TimeoutError("Spark job timed out")
1087
1088
# Check final status
1089
final_status = spark_hook.get_job_run_status(job_id)
1090
if final_status != 'success':
1091
# Get logs for debugging
1092
logs = spark_hook.get_job_logs(job_id)
1093
print(f"Job failed with status: {final_status}")
1094
print(f"Error logs: {logs.get('stderr', 'No error logs')}")
1095
raise RuntimeError(f"Spark job failed with status: {final_status}")
1096
1097
except Exception as e:
1098
print(f"Spark job error: {e}")
1099
raise
1100
1101
# Pipeline error handling
1102
try:
1103
run_response = pipeline_hook.run_pipeline("test-pipeline")
1104
run_id = run_response.run_id
1105
1106
# Monitor pipeline with error handling
1107
success = pipeline_hook.wait_for_pipeline_run_status(
1108
run_id=run_id,
1109
expected_statuses=['Succeeded', 'Failed', 'Cancelled'],
1110
timeout=7200
1111
)
1112
1113
if not success:
1114
print("Pipeline timed out, attempting to cancel...")
1115
try:
1116
pipeline_hook.cancel_run_pipeline(run_id)
1117
except Exception as cancel_error:
1118
print(f"Failed to cancel pipeline: {cancel_error}")
1119
raise TimeoutError("Pipeline timed out")
1120
1121
# Check final status and get activity details
1122
pipeline_run = pipeline_hook.get_pipeline_run(run_id)
1123
if pipeline_run.status == 'Failed':
1124
activities = pipeline_hook.get_pipeline_activities(run_id)
1125
failed_activities = [a for a in activities if a['status'] == 'Failed']
1126
1127
print(f"Pipeline failed. Failed activities: {len(failed_activities)}")
1128
for activity in failed_activities:
1129
print(f"- {activity['activityName']}: {activity.get('error', {}).get('message', 'Unknown error')}")
1130
1131
raise AzureSynapsePipelineRunException(f"Pipeline failed with {len(failed_activities)} failed activities")
1132
1133
except AzureSynapsePipelineRunException:
1134
raise
1135
except Exception as e:
1136
print(f"Pipeline error: {e}")
1137
raise
1138
```
1139
1140
### Connection Testing
1141
1142
```python
1143
def test_synapse_connections():
1144
"""Test Synapse Analytics connections and capabilities."""
1145
1146
# Test Spark hook
1147
try:
1148
spark_hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
1149
success, message = spark_hook.test_connection()
1150
1151
if success:
1152
print("Synapse Spark connection successful")
1153
1154
# List available pools
1155
pools = spark_hook.list_spark_pools()
1156
print(f"Available Spark pools: {[p['name'] for p in pools]}")
1157
else:
1158
print(f"Synapse Spark connection failed: {message}")
1159
1160
except Exception as e:
1161
print(f"Synapse Spark connection test failed: {e}")
1162
1163
# Test Pipeline hook
1164
try:
1165
pipeline_hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')
1166
success, message = pipeline_hook.test_connection()
1167
1168
if success:
1169
print("Synapse Pipeline connection successful")
1170
1171
# List available pipelines
1172
pipelines = pipeline_hook.list_pipelines()
1173
print(f"Available pipelines: {[p['name'] for p in pipelines]}")
1174
else:
1175
print(f"Synapse Pipeline connection failed: {message}")
1176
1177
except Exception as e:
1178
print(f"Synapse Pipeline connection test failed: {e}")
1179
```
1180
1181
## Performance Considerations
1182
1183
### Optimizing Spark Jobs
1184
1185
```python
1186
def optimize_spark_configuration():
1187
"""Demonstrate Spark optimization techniques for Synapse."""
1188
1189
# Optimized configuration for different workload types
1190
1191
# ETL/Data Processing workload
1192
etl_config = {
1193
"executorCount": 8,
1194
"executorCores": 4,
1195
"executorMemory": "16g",
1196
"driverCores": 2,
1197
"driverMemory": "8g",
1198
"conf": {
1199
"spark.sql.adaptive.enabled": "true",
1200
"spark.sql.adaptive.coalescePartitions.enabled": "true",
1201
"spark.sql.adaptive.skewJoin.enabled": "true",
1202
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
1203
"spark.sql.execution.arrow.pyspark.enabled": "true"
1204
}
1205
}
1206
1207
# Machine Learning workload
1208
ml_config = {
1209
"executorCount": 6,
1210
"executorCores": 6,
1211
"executorMemory": "24g",
1212
"driverCores": 4,
1213
"driverMemory": "12g",
1214
"conf": {
1215
"spark.ml.stage.parallelism": "6",
1216
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
1217
"spark.sql.adaptive.enabled": "true",
1218
"spark.sql.adaptive.localShuffleReader.enabled": "true"
1219
}
1220
}
1221
1222
# Streaming workload
1223
streaming_config = {
1224
"executorCount": 4,
1225
"executorCores": 2,
1226
"executorMemory": "8g",
1227
"driverMemory": "4g",
1228
"conf": {
1229
"spark.streaming.backpressure.enabled": "true",
1230
"spark.sql.streaming.checkpointLocation": "/checkpoint/path",
1231
"spark.sql.adaptive.enabled": "false" # Disable for streaming
1232
}
1233
}
1234
1235
return {
1236
'etl': etl_config,
1237
'ml': ml_config,
1238
'streaming': streaming_config
1239
}
1240
1241
def implement_spark_monitoring():
1242
"""Implement comprehensive Spark job monitoring."""
1243
1244
def monitor_spark_job_detailed(job_id: int):
1245
"""Detailed monitoring of Spark job execution."""
1246
hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')
1247
1248
monitoring_data = {
1249
'job_id': job_id,
1250
'status_history': [],
1251
'duration_seconds': 0,
1252
'resource_usage': {}
1253
}
1254
1255
start_time = datetime.now()
1256
1257
while True:
1258
current_status = hook.get_job_run_status(job_id)
1259
monitoring_data['status_history'].append({
1260
'timestamp': datetime.now(),
1261
'status': current_status
1262
})
1263
1264
# Terminal statuses
1265
if current_status in ['success', 'error', 'dead', 'killed']:
1266
break
1267
1268
# Check for stuck jobs
1269
monitoring_data['duration_seconds'] = (datetime.now() - start_time).total_seconds()
1270
if monitoring_data['duration_seconds'] > 3600: # 1 hour
1271
print("Job appears to be stuck, investigating...")
1272
logs = hook.get_job_logs(job_id)
1273
if 'OutOfMemoryError' in logs.get('stderr', ''):
1274
print("OutOfMemoryError detected - job needs more memory")
1275
1276
time.sleep(30) # Check every 30 seconds
1277
1278
# Get final logs and metrics
1279
final_logs = hook.get_job_logs(job_id)
1280
monitoring_data['final_logs'] = final_logs
1281
monitoring_data['final_status'] = current_status
1282
1283
return monitoring_data
1284
1285
return monitor_spark_job_detailed
1286
```
1287
1288
This comprehensive documentation covers all Azure Synapse Analytics capabilities in the Apache Airflow Microsoft Azure Provider, including Spark job execution, pipeline orchestration, monitoring, and performance optimization techniques.