Provider package for Microsoft Azure integrations with Apache Airflow
npx @tessl/cli install tessl/pypi-apache-airflow-providers-microsoft-azure@12.6.00
# Apache Airflow Microsoft Azure Provider
1
2
A comprehensive Apache Airflow provider package that enables seamless integration with Microsoft Azure cloud services. This provider offers operators, hooks, sensors, and triggers for orchestrating and automating Azure-based workflows across a wide range of Azure services including Azure Batch, Blob Storage, Container Instances, Cosmos DB, Data Explorer, Data Lake Storage, Data Factory, Key Vault, Service Bus, and Synapse Analytics.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-microsoft-azure
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-microsoft-azure`
9
- **Provider Type**: Apache Airflow Provider Package
10
- **License**: Apache-2.0
11
- **Supported Azure Services**: 19+ services including Batch, Blob Storage, Container services, Cosmos DB, Data Explorer, Data Factory, File Share, PowerBI, and more
12
13
## Core Imports
14
15
Base Azure functionality:
16
17
```python
18
from airflow.providers.microsoft.azure.hooks.base_azure import AzureBaseHook
19
```
20
21
Common service-specific imports:
22
23
```python
24
# Azure Blob Storage
25
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
26
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
27
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor
28
29
# Azure Data Factory
30
from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook
31
from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
32
33
# Azure Cosmos DB
34
from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook
35
from airflow.providers.microsoft.azure.operators.cosmos import AzureCosmosInsertDocumentOperator
36
```
37
38
## Basic Usage
39
40
```python
41
from airflow import DAG
42
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
43
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor
44
from datetime import datetime, timedelta
45
46
# Define DAG
47
dag = DAG(
48
'azure_workflow_example',
49
default_args={
50
'owner': 'data-team',
51
'retries': 1,
52
'retry_delay': timedelta(minutes=5)
53
},
54
description='Example Azure workflow',
55
schedule_interval=timedelta(days=1),
56
start_date=datetime(2024, 1, 1),
57
catchup=False
58
)
59
60
# Wait for a blob to exist
61
wait_for_file = WasbBlobSensor(
62
task_id='wait_for_input_file',
63
container_name='input-data',
64
blob_name='daily_data.csv',
65
azure_conn_id='azure_default',
66
dag=dag
67
)
68
69
# Delete processed blob
70
cleanup_blob = WasbDeleteBlobOperator(
71
task_id='cleanup_processed_file',
72
container_name='processed-data',
73
blob_name='processed_data.csv',
74
azure_conn_id='azure_default',
75
dag=dag
76
)
77
78
wait_for_file >> cleanup_blob
79
```
80
81
## Architecture
82
83
The Azure provider follows Airflow's standard provider architecture with distinct component types:
84
85
- **Hooks**: Authenticated connections to Azure services, handling credentials and API clients
86
- **Operators**: Task executors that perform actions on Azure resources (create, delete, run, etc.)
87
- **Sensors**: Monitors that wait for specific conditions in Azure services
88
- **Triggers**: Async/deferrable components for long-running operations
89
- **Transfers**: Specialized operators for moving data between Azure and other systems
90
91
All components support multiple authentication methods including service principals, managed identities, workload identity federation, and connection strings.
92
93
## Capabilities
94
95
### Azure Blob Storage (WASB)
96
97
Complete Azure Blob Storage integration with extensive blob operations, container management, and data transfer capabilities. Supports both sync and async operations.
98
99
```python { .api }
100
class WasbHook(AzureBaseHook):
101
"""Hook for Azure Blob Storage operations."""
102
103
def get_conn(self) -> BlobServiceClient: ...
104
def check_for_blob(self, container_name: str, blob_name: str) -> bool: ...
105
def load_file(self, file_path: str, container_name: str, blob_name: str) -> None: ...
106
def load_string(self, string_data: str, container_name: str, blob_name: str) -> None: ...
107
def read_file(self, container_name: str, blob_name: str) -> bytes: ...
108
def delete_file(self, container_name: str, blob_name: str) -> None: ...
109
```
110
111
[Azure Blob Storage](./blob-storage.md)
112
113
### Azure Data Factory
114
115
Execute and monitor Azure Data Factory pipelines with comprehensive pipeline management, run monitoring, and status tracking capabilities.
116
117
```python { .api }
118
class AzureDataFactoryHook(BaseHook):
119
"""Hook for Azure Data Factory operations."""
120
121
def get_conn(self) -> DataFactoryManagementClient: ...
122
def run_pipeline(self, pipeline_name: str, resource_group_name: str, factory_name: str, **config: Any) -> CreateRunResponse: ...
123
def get_pipeline_run(self, run_id: str, resource_group_name: str, factory_name: str) -> PipelineRun: ...
124
def cancel_pipeline_run(self, run_id: str, resource_group_name: str, factory_name: str) -> None: ...
125
```
126
127
[Azure Data Factory](./data-factory.md)
128
129
### Azure Cosmos DB
130
131
Comprehensive Azure Cosmos DB integration supporting database and collection management, document operations, and query execution across all Cosmos DB APIs.
132
133
```python { .api }
134
class AzureCosmosDBHook(BaseHook):
135
"""Hook for Azure Cosmos DB operations."""
136
137
def get_conn(self) -> CosmosClient: ...
138
def create_database(self, database_name: str) -> None: ...
139
def create_collection(self, collection_name: str, database_name: str) -> None: ...
140
def upsert_document(self, document: dict, database_name: str, collection_name: str) -> dict: ...
141
def get_document(self, document_id: str, database_name: str, collection_name: str) -> dict: ...
142
```
143
144
[Azure Cosmos DB](./cosmos-db.md)
145
146
### Azure Data Lake Storage
147
148
Support for both Azure Data Lake Storage Gen1 and Gen2 with file system operations, directory management, and data upload/download capabilities.
149
150
```python { .api }
151
class AzureDataLakeHook(BaseHook):
152
"""Hook for Azure Data Lake Storage Gen1."""
153
154
def get_conn(self) -> core.AzureDLFileSystem: ...
155
def upload_file(self, local_path: str, remote_path: str, overwrite: bool = True) -> None: ...
156
def download_file(self, local_path: str, remote_path: str, overwrite: bool = True) -> None: ...
157
```
158
159
```python { .api }
160
class AzureDataLakeStorageV2Hook(BaseHook):
161
"""Hook for Azure Data Lake Storage Gen2."""
162
163
def get_conn(self) -> DataLakeServiceClient: ...
164
def create_file_system(self, file_system_name: str) -> None: ...
165
def upload_file(self, file_system_name: str, file_name: str, file_path: str) -> DataLakeFileClient: ...
166
```
167
168
[Azure Data Lake Storage](./data-lake-storage.md)
169
170
### Azure Service Bus
171
172
Complete Azure Service Bus integration with queue and topic management, message operations, and subscription handling for reliable messaging scenarios.
173
174
```python { .api }
175
class AdminClientHook(BaseAzureServiceBusHook):
176
"""Hook for Azure Service Bus administrative operations."""
177
178
def create_queue(self, queue_name: str, **kwargs: Any) -> None: ...
179
def create_topic(self, topic_name: str, **kwargs: Any) -> None: ...
180
def create_subscription(self, topic_name: str, subscription_name: str, **kwargs: Any) -> None: ...
181
```
182
183
```python { .api }
184
class MessageHook(BaseAzureServiceBusHook):
185
"""Hook for Azure Service Bus message operations."""
186
187
def send_message(self, queue_name: str, message: str | ServiceBusMessage, **kwargs: Any) -> None: ...
188
def receive_message(self, queue_name: str, **kwargs: Any) -> list[ServiceBusReceivedMessage]: ...
189
```
190
191
[Azure Service Bus](./service-bus.md)
192
193
### Azure Container Services
194
195
Container orchestration capabilities including Azure Container Instances, Container Registry, and Container Volume management for containerized workloads.
196
197
```python { .api }
198
class AzureContainerInstanceHook(BaseHook):
199
"""Hook for Azure Container Instances management."""
200
201
def get_conn(self) -> ContainerInstanceManagementClient: ...
202
def create_or_update(self, resource_group_name: str, container_group_name: str, container_group: ContainerGroup) -> ContainerGroup: ...
203
def get_logs(self, resource_group_name: str, container_group_name: str, container_name: str) -> str: ...
204
```
205
206
[Azure Container Services](./container-services.md)
207
208
### Azure Synapse Analytics
209
210
Execute Spark jobs and manage pipeline operations on Azure Synapse Analytics for big data processing and analytics workloads.
211
212
```python { .api }
213
class AzureSynapseHook(BaseAzureSynapseHook):
214
"""Hook for Azure Synapse Spark operations."""
215
216
def get_conn(self) -> SparkClient: ...
217
def run_spark_job(self, payload: dict) -> dict: ...
218
def get_job_run_status(self, job_id: int) -> str: ...
219
```
220
221
```python { .api }
222
class AzureSynapsePipelineHook(BaseAzureSynapseHook):
223
"""Hook for Azure Synapse Pipeline operations."""
224
225
def run_pipeline(self, pipeline_name: str, **config: Any) -> CreateRunResponse: ...
226
def get_pipeline_run_status(self, run_id: str) -> str: ...
227
```
228
229
[Azure Synapse Analytics](./synapse-analytics.md)
230
231
### Microsoft Graph API
232
233
Access Microsoft Graph API for Microsoft 365 services integration with support for various Graph API endpoints and operations.
234
235
```python { .api }
236
class KiotaRequestAdapterHook(BaseHook):
237
"""Hook for Microsoft Graph API using Kiota request adapter."""
238
239
def get_conn(self) -> RequestAdapter: ...
240
def test_connection(self) -> tuple[bool, str]: ...
241
```
242
243
[Microsoft Graph API](./microsoft-graph.md)
244
245
### Data Transfer Operations
246
247
Specialized operators for transferring data between Azure services and external systems including local filesystem, SFTP, Oracle databases, and AWS S3.
248
249
```python { .api }
250
class LocalFilesystemToWasbOperator(BaseOperator):
251
"""Transfer files from local filesystem to Azure Blob Storage."""
252
253
def __init__(self, file_path: str, container_name: str, blob_name: str, **kwargs): ...
254
```
255
256
```python { .api }
257
class S3ToAzureBlobStorageOperator(BaseOperator):
258
"""Transfer objects from AWS S3 to Azure Blob Storage."""
259
260
def __init__(self, s3_source_key: str, container_name: str, blob_name: str, **kwargs): ...
261
```
262
263
[Data Transfer Operations](./data-transfers.md)
264
265
### Azure Data Explorer (ADX)
266
267
Execute KQL queries and manage connections to Azure Data Explorer clusters for real-time analytics on large volumes of data.
268
269
```python { .api }
270
class AzureDataExplorerHook(BaseHook):
271
"""Hook for Azure Data Explorer (Kusto) operations."""
272
273
def get_conn(self) -> KustoClient: ...
274
def run_query(self, query: str, database: str, options: dict | None = None) -> KustoResponseDataSet: ...
275
```
276
277
```python { .api }
278
class AzureDataExplorerQueryOperator(BaseOperator):
279
"""Operator for querying Azure Data Explorer (Kusto)."""
280
281
def __init__(self, *, query: str, database: str, **kwargs): ...
282
```
283
284
[Azure Data Explorer](./azure-data-explorer.md)
285
286
### Microsoft Power BI
287
288
Manage Power BI datasets, trigger refreshes, and monitor workspace operations through Microsoft Graph API integration.
289
290
```python { .api }
291
class PowerBIHook(KiotaRequestAdapterHook):
292
"""Hook for Power BI operations via Microsoft Graph API."""
293
294
async def trigger_dataset_refresh(self, dataset_id: str, group_id: str, **kwargs) -> str: ...
295
async def get_refresh_details_by_refresh_id(self, dataset_id: str, group_id: str, dataset_refresh_id: str) -> dict: ...
296
```
297
298
```python { .api }
299
class PowerBIDatasetRefreshOperator(BaseOperator):
300
"""Refreshes a Power BI dataset."""
301
302
def __init__(self, *, dataset_id: str, group_id: str, **kwargs): ...
303
```
304
305
[Microsoft Power BI](./powerbi.md)
306
307
### Azure Batch
308
309
Create and manage compute pools, jobs, and tasks for large-scale parallel and high-performance computing applications in the cloud.
310
311
```python { .api }
312
class AzureBatchHook(BaseHook):
313
"""Hook for Azure Batch APIs."""
314
315
def get_conn(self) -> BatchServiceClient: ...
316
def create_pool(self, pool: PoolAddParameter) -> None: ...
317
def create_job(self, job: JobAddParameter) -> None: ...
318
def add_single_task_to_job(self, job_id: str, task: TaskAddParameter) -> None: ...
319
```
320
321
```python { .api }
322
class AzureBatchOperator(BaseOperator):
323
"""Executes a job on Azure Batch Service."""
324
325
def __init__(self, *, batch_pool_id: str, batch_job_id: str, batch_task_command_line: str, **kwargs): ...
326
```
327
328
[Azure Batch](./azure-batch.md)
329
330
### Azure File Share
331
332
Manage file shares, directories, and files within Azure Storage with SMB protocol support and REST API operations.
333
334
```python { .api }
335
class AzureFileShareHook(BaseHook):
336
"""Hook for Azure File Share operations."""
337
338
def create_share(self, share_name: str, **kwargs) -> bool: ...
339
def create_directory(self, **kwargs) -> Any: ...
340
def load_file(self, file_path: str, **kwargs) -> None: ...
341
def get_file(self, file_path: str, **kwargs) -> None: ...
342
```
343
344
[Azure File Share](./azure-file-share.md)
345
346
## Authentication Methods
347
348
The provider supports multiple Azure authentication mechanisms:
349
350
- **Service Principal**: Using client ID, client secret, and tenant ID
351
- **Managed Identity**: Azure managed identity for resources
352
- **Workload Identity**: Workload identity federation for Kubernetes
353
- **DefaultAzureCredential**: Azure SDK default credential chain
354
- **Connection String**: Service-specific connection strings
355
- **Account Key**: Storage account key authentication
356
- **SAS Token**: Shared Access Signature tokens
357
358
## Connection Configuration
359
360
All Azure services use Airflow connections for configuration. The provider supports 18+ different connection types for various Azure services, each with specific configuration requirements and authentication options.
361
362
## Type Definitions
363
364
```python { .api }
365
# Base Azure connection information
366
class AzureBaseHook(BaseHook):
367
conn_name_attr: str = "azure_conn_id"
368
default_conn_name: str = "azure_default"
369
conn_type: str = "azure"
370
371
# Common authentication credentials
372
AzureCredentials = Union[
373
ServicePrincipal,
374
ManagedIdentity,
375
WorkloadIdentity,
376
DefaultAzureCredential
377
]
378
379
# Pipeline run statuses
380
class AzureDataFactoryPipelineRunStatus:
381
QUEUED: str = "Queued"
382
IN_PROGRESS: str = "InProgress"
383
SUCCEEDED: str = "Succeeded"
384
FAILED: str = "Failed"
385
CANCELLED: str = "Cancelled"
386
387
# Synapse job statuses
388
class AzureSynapseSparkBatchRunStatus:
389
NOT_STARTED: str = "not_started"
390
STARTING: str = "starting"
391
RUNNING: str = "running"
392
IDLE: str = "idle"
393
BUSY: str = "busy"
394
SHUTTING_DOWN: str = "shutting_down"
395
ERROR: str = "error"
396
DEAD: str = "dead"
397
KILLED: str = "killed"
398
SUCCESS: str = "success"
399
```