0
# Azure Data Factory
1
2
Execute and monitor Azure Data Factory pipelines with comprehensive pipeline management, run monitoring, status tracking, and factory management capabilities. Supports both synchronous and asynchronous operations for long-running pipeline executions.
3
4
## Capabilities
5
6
### Data Factory Hook
7
8
Primary interface for Azure Data Factory operations, providing authenticated connections and pipeline management functionality.
9
10
```python { .api }
11
class AzureDataFactoryHook(BaseHook):
12
"""
13
Hook for Azure Data Factory operations.
14
15
Provides methods for pipeline execution, monitoring, and factory management.
16
Supports multiple authentication methods and connection configurations.
17
"""
18
19
def get_conn(self) -> DataFactoryManagementClient:
20
"""Get authenticated Azure Data Factory Management client."""
21
22
def refresh_conn(self) -> DataFactoryManagementClient:
23
"""Refresh the Data Factory Management client connection."""
24
25
def get_factory(
26
self,
27
resource_group_name: str,
28
factory_name: str,
29
**config: Any
30
) -> Factory | None:
31
"""
32
Get Azure Data Factory details.
33
34
Args:
35
resource_group_name (str): Azure resource group name
36
factory_name (str): Data Factory name
37
**config: Additional configuration parameters
38
39
Returns:
40
Factory: Data Factory details or None if not found
41
"""
42
43
def update_factory(
44
self,
45
factory: Factory,
46
resource_group_name: str,
47
factory_name: str,
48
**config: Any
49
) -> Factory:
50
"""
51
Update Azure Data Factory configuration.
52
53
Args:
54
factory (Factory): Factory configuration object
55
resource_group_name (str): Azure resource group name
56
factory_name (str): Data Factory name
57
**config: Additional configuration parameters
58
59
Returns:
60
Factory: Updated factory details
61
"""
62
63
def create_factory(
64
self,
65
factory: Factory,
66
resource_group_name: str,
67
factory_name: str,
68
**config: Any
69
) -> Factory:
70
"""
71
Create a new Azure Data Factory.
72
73
Args:
74
factory (Factory): Factory configuration object
75
resource_group_name (str): Azure resource group name
76
factory_name (str): Data Factory name
77
**config: Additional configuration parameters
78
79
Returns:
80
Factory: Created factory details
81
"""
82
83
def delete_factory(
84
self,
85
resource_group_name: str,
86
factory_name: str,
87
**config: Any
88
) -> None:
89
"""
90
Delete Azure Data Factory.
91
92
Args:
93
resource_group_name (str): Azure resource group name
94
factory_name (str): Data Factory name to delete
95
**config: Additional configuration parameters
96
"""
97
98
def run_pipeline(
99
self,
100
pipeline_name: str,
101
resource_group_name: str,
102
factory_name: str,
103
reference_pipeline_run_id: str | None = None,
104
is_recovery: bool | None = None,
105
start_activity_name: str | None = None,
106
start_from_failure: bool | None = None,
107
parameters: dict[str, Any] | None = None,
108
**config: Any
109
) -> CreateRunResponse:
110
"""
111
Execute Azure Data Factory pipeline.
112
113
Args:
114
pipeline_name (str): Name of pipeline to execute
115
resource_group_name (str): Azure resource group name
116
factory_name (str): Data Factory name
117
reference_pipeline_run_id (str): Reference run ID for recovery
118
is_recovery (bool): Whether this is a recovery run
119
start_activity_name (str): Activity to start from
120
start_from_failure (bool): Start from previous failure point
121
parameters (dict): Pipeline parameters
122
**config: Additional configuration parameters
123
124
Returns:
125
CreateRunResponse: Pipeline run details with run ID
126
"""
127
128
def get_pipeline_run(
129
self,
130
run_id: str,
131
resource_group_name: str,
132
factory_name: str,
133
**config: Any
134
) -> PipelineRun:
135
"""
136
Get pipeline run details and status.
137
138
Args:
139
run_id (str): Pipeline run ID
140
resource_group_name (str): Azure resource group name
141
factory_name (str): Data Factory name
142
**config: Additional configuration parameters
143
144
Returns:
145
PipelineRun: Pipeline run details and status
146
"""
147
148
def get_pipeline_run_status(
149
self,
150
run_id: str,
151
resource_group_name: str,
152
factory_name: str,
153
**config: Any
154
) -> str:
155
"""
156
Get current status of pipeline run.
157
158
Args:
159
run_id (str): Pipeline run ID
160
resource_group_name (str): Azure resource group name
161
factory_name (str): Data Factory name
162
**config: Additional configuration parameters
163
164
Returns:
165
str: Current pipeline run status
166
"""
167
168
def wait_for_pipeline_run_status(
169
self,
170
run_id: str,
171
expected_statuses: str | set[str],
172
resource_group_name: str,
173
factory_name: str,
174
check_interval: int = 60,
175
timeout: int = 60 * 60 * 24 * 7,
176
**config: Any
177
) -> bool:
178
"""
179
Wait for pipeline run to reach expected status.
180
181
Args:
182
run_id (str): Pipeline run ID
183
expected_statuses (str | set[str]): Expected status(es) to wait for
184
resource_group_name (str): Azure resource group name
185
factory_name (str): Data Factory name
186
check_interval (int): Polling interval in seconds
187
timeout (int): Maximum wait time in seconds
188
**config: Additional configuration parameters
189
190
Returns:
191
bool: True if expected status reached, False if timeout
192
"""
193
194
def cancel_pipeline_run(
195
self,
196
run_id: str,
197
resource_group_name: str,
198
factory_name: str,
199
**config: Any
200
) -> None:
201
"""
202
Cancel a running pipeline.
203
204
Args:
205
run_id (str): Pipeline run ID to cancel
206
resource_group_name (str): Azure resource group name
207
factory_name (str): Data Factory name
208
**config: Additional configuration parameters
209
"""
210
211
def test_connection(self) -> tuple[bool, str]:
212
"""Test the Data Factory connection."""
213
```
214
215
### Async Data Factory Hook
216
217
Asynchronous version for non-blocking operations.
218
219
```python { .api }
220
class AzureDataFactoryAsyncHook(AzureDataFactoryHook):
221
"""Async hook for Azure Data Factory operations."""
222
223
async def get_conn(self) -> DataFactoryManagementClient:
224
"""Get authenticated async Data Factory Management client."""
225
226
async def get_pipeline_run_status(
227
self,
228
run_id: str,
229
resource_group_name: str,
230
factory_name: str
231
) -> str:
232
"""Async get pipeline run status."""
233
```
234
235
### Pipeline Run Operator
236
237
Operator for executing Azure Data Factory pipelines.
238
239
```python { .api }
240
class AzureDataFactoryRunPipelineOperator(BaseOperator):
241
"""
242
Execute Azure Data Factory pipeline.
243
244
Runs a pipeline and optionally waits for completion with status monitoring.
245
"""
246
247
def __init__(
248
self,
249
pipeline_name: str,
250
resource_group_name: str,
251
factory_name: str,
252
azure_data_factory_conn_id: str = "azure_data_factory_default",
253
wait_for_termination: bool = True,
254
reference_pipeline_run_id: str | None = None,
255
is_recovery: bool | None = None,
256
start_activity_name: str | None = None,
257
start_from_failure: bool | None = None,
258
parameters: dict[str, Any] | None = None,
259
timeout: int = 60 * 60 * 24 * 7,
260
check_interval: int = 60,
261
deferrable: bool = False,
262
**kwargs
263
):
264
"""
265
Initialize Data Factory pipeline operator.
266
267
Args:
268
pipeline_name (str): Name of pipeline to execute
269
resource_group_name (str): Azure resource group name
270
factory_name (str): Data Factory name
271
azure_data_factory_conn_id (str): Airflow connection ID
272
wait_for_termination (bool): Wait for pipeline completion
273
reference_pipeline_run_id (str): Reference run for recovery
274
is_recovery (bool): Whether this is a recovery run
275
start_activity_name (str): Activity to start from
276
start_from_failure (bool): Start from failure point
277
parameters (dict): Pipeline parameters
278
timeout (int): Maximum wait time in seconds
279
check_interval (int): Status check interval in seconds
280
deferrable (bool): Use async execution
281
"""
282
```
283
284
### Pipeline Status Sensor
285
286
Sensor that monitors Azure Data Factory pipeline execution status.
287
288
```python { .api }
289
class AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):
290
"""
291
Sensor that waits for Azure Data Factory pipeline run to reach target status.
292
293
Monitors a pipeline run until it reaches one of the specified target statuses
294
or times out.
295
"""
296
297
def __init__(
298
self,
299
run_id: str,
300
resource_group_name: str,
301
factory_name: str,
302
azure_data_factory_conn_id: str = "azure_data_factory_default",
303
target_status: str | list[str] = AzureDataFactoryPipelineRunStatus.SUCCEEDED,
304
**kwargs
305
):
306
"""
307
Initialize Data Factory pipeline status sensor.
308
309
Args:
310
run_id (str): Pipeline run ID to monitor
311
resource_group_name (str): Azure resource group name
312
factory_name (str): Data Factory name
313
azure_data_factory_conn_id (str): Airflow connection ID
314
target_status (str | list[str]): Target status(es) to wait for
315
"""
316
317
def poke(self, context: dict) -> bool:
318
"""Check if pipeline run has reached target status."""
319
```
320
321
### Async Pipeline Triggers
322
323
Deferrable triggers for pipeline monitoring.
324
325
```python { .api }
326
class AzureDataFactoryTrigger(BaseTrigger):
327
"""General async trigger for Azure Data Factory operations."""
328
329
def __init__(
330
self,
331
run_id: str,
332
resource_group_name: str,
333
factory_name: str,
334
conn_id: str,
335
end_time: datetime,
336
check_interval: int = 60,
337
**kwargs
338
):
339
"""
340
Initialize Data Factory trigger.
341
342
Args:
343
run_id (str): Pipeline run ID
344
resource_group_name (str): Resource group name
345
factory_name (str): Factory name
346
conn_id (str): Connection ID
347
end_time (datetime): Maximum end time
348
check_interval (int): Polling interval in seconds
349
"""
350
351
class ADFPipelineRunStatusSensorTrigger(BaseTrigger):
352
"""Async trigger for ADF pipeline run status monitoring."""
353
354
def __init__(
355
self,
356
run_id: str,
357
resource_group_name: str,
358
factory_name: str,
359
target_status: str | list[str],
360
conn_id: str,
361
poke_interval: int = 60,
362
**kwargs
363
):
364
"""
365
Initialize pipeline status sensor trigger.
366
367
Args:
368
run_id (str): Pipeline run ID
369
resource_group_name (str): Resource group name
370
factory_name (str): Factory name
371
target_status (str | list[str]): Target status(es)
372
conn_id (str): Connection ID
373
poke_interval (int): Polling interval in seconds
374
"""
375
```
376
377
### Pipeline Run Link
378
379
Extra link for viewing pipeline runs in Azure portal.
380
381
```python { .api }
382
class AzureDataFactoryPipelineRunLink(BaseOperatorLink):
383
"""Link to Azure Data Factory pipeline run in Azure portal."""
384
385
name: str = "Monitor Pipeline Run"
386
387
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
388
"""Get link URL to pipeline run in Azure portal."""
389
```
390
391
### Status Constants and Exceptions
392
393
```python { .api }
394
class AzureDataFactoryPipelineRunStatus:
395
"""Azure Data Factory pipeline run status constants."""
396
397
QUEUED: str = "Queued"
398
IN_PROGRESS: str = "InProgress"
399
SUCCEEDED: str = "Succeeded"
400
FAILED: str = "Failed"
401
CANCELLING: str = "Cancelling"
402
CANCELLED: str = "Cancelled"
403
404
class AzureDataFactoryPipelineRunException(AirflowException):
405
"""Exception raised for Data Factory pipeline run errors."""
406
```
407
408
## Usage Examples
409
410
### Basic Pipeline Execution
411
412
```python
413
from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook
414
415
# Initialize hook
416
adf_hook = AzureDataFactoryHook(azure_data_factory_conn_id='adf_default')
417
418
# Run pipeline
419
run_response = adf_hook.run_pipeline(
420
pipeline_name='MyDataPipeline',
421
resource_group_name='my-resource-group',
422
factory_name='my-data-factory',
423
parameters={'inputPath': '/data/input/', 'outputPath': '/data/output/'}
424
)
425
426
# Get run ID
427
run_id = run_response.run_id
428
429
# Monitor pipeline status
430
status = adf_hook.get_pipeline_run_status(
431
run_id=run_id,
432
resource_group_name='my-resource-group',
433
factory_name='my-data-factory'
434
)
435
436
# Wait for completion
437
success = adf_hook.wait_for_pipeline_run_status(
438
run_id=run_id,
439
expected_statuses=['Succeeded'],
440
resource_group_name='my-resource-group',
441
factory_name='my-data-factory',
442
check_interval=30,
443
timeout=1800 # 30 minutes
444
)
445
```
446
447
### Using in Airflow DAGs
448
449
```python
450
from airflow import DAG
451
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
452
from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
453
from datetime import datetime, timedelta
454
455
dag = DAG(
456
'adf_pipeline_example',
457
default_args={
458
'owner': 'data-team',
459
'retries': 1,
460
'retry_delay': timedelta(minutes=5)
461
},
462
description='Execute ADF pipeline',
463
schedule_interval='@daily',
464
start_date=datetime(2024, 1, 1)
465
)
466
467
# Execute pipeline
468
run_pipeline = AzureDataFactoryRunPipelineOperator(
469
task_id='run_etl_pipeline',
470
pipeline_name='ETL_Pipeline',
471
resource_group_name='analytics-rg',
472
factory_name='analytics-adf',
473
parameters={
474
'source_table': 'raw_data',
475
'target_table': 'processed_data',
476
'batch_date': '{{ ds }}'
477
},
478
wait_for_termination=False, # Don't wait, use sensor instead
479
azure_data_factory_conn_id='adf_default',
480
dag=dag
481
)
482
483
# Monitor pipeline with sensor (alternative approach)
484
wait_for_completion = AzureDataFactoryPipelineRunStatusSensor(
485
task_id='wait_for_pipeline_completion',
486
run_id='{{ ti.xcom_pull("run_etl_pipeline")["run_id"] }}',
487
resource_group_name='analytics-rg',
488
factory_name='analytics-adf',
489
target_status='Succeeded',
490
timeout=3600, # 1 hour timeout
491
poke_interval=60, # Check every minute
492
dag=dag
493
)
494
495
run_pipeline >> wait_for_completion
496
```
497
498
### Deferrable Pipeline Execution
499
500
```python
501
# Using deferrable mode for long-running pipelines
502
run_pipeline_async = AzureDataFactoryRunPipelineOperator(
503
task_id='run_long_pipeline',
504
pipeline_name='LongRunningETL',
505
resource_group_name='analytics-rg',
506
factory_name='analytics-adf',
507
deferrable=True, # Use async execution
508
timeout=7200, # 2 hours
509
check_interval=120, # Check every 2 minutes
510
dag=dag
511
)
512
```
513
514
## Connection Configuration
515
516
Azure Data Factory connections require specific authentication and resource information.
517
518
**Connection Type**: `azure_data_factory`
519
520
**Required Fields**:
521
- `resource_group_name`: Azure resource group containing the Data Factory
522
- `factory_name`: Name of the Azure Data Factory
523
- `subscription_id`: Azure subscription ID
524
525
**Authentication Options**:
526
- **Service Principal**: Use client credentials
527
- **Managed Identity**: Use Azure managed identity
528
- **DefaultAzureCredential**: Use Azure SDK default credential chain
529
530
**Connection Fields**:
531
- `client_id`: Service principal client ID
532
- `client_secret`: Service principal secret
533
- `tenant_id`: Azure tenant ID
534
- `subscription_id`: Azure subscription ID
535
536
## Error Handling
537
538
The Azure Data Factory integration includes comprehensive error handling for pipeline execution failures, authentication issues, and resource access problems. All exceptions inherit from standard Airflow exception classes and provide detailed error messages for troubleshooting pipeline issues.
539
540
Azure Data Factory integration provides robust pipeline orchestration capabilities with support for complex ETL workflows, parameter passing, error handling, and comprehensive monitoring within Airflow environments.