0
# Azure Blob Storage
1
2
Comprehensive Azure Blob Storage integration providing full blob operations, container management, file transfers, and monitoring capabilities. Supports both synchronous and asynchronous operations with extensive configuration options.
3
4
## Capabilities
5
6
### Blob Storage Hook
7
8
The primary interface for Azure Blob Storage operations, providing authenticated connections and core blob functionality.
9
10
```python { .api }
11
class WasbHook(AzureBaseHook):
12
"""
13
Hook for Azure Blob Storage (WASB) operations.
14
15
Provides methods for blob operations, container management, and file transfers.
16
Supports multiple authentication methods and connection configurations.
17
"""
18
19
def get_conn(self) -> BlobServiceClient:
20
"""Get authenticated Azure Blob Service client."""
21
22
def check_for_blob(self, container_name: str, blob_name: str, **kwargs) -> bool:
23
"""
24
Check if a blob exists in the specified container.
25
26
Args:
27
container_name (str): Name of the container
28
blob_name (str): Name of the blob to check
29
30
Returns:
31
bool: True if blob exists, False otherwise
32
"""
33
34
def load_file(
35
self,
36
file_path: str,
37
container_name: str,
38
blob_name: str,
39
**kwargs
40
) -> None:
41
"""
42
Upload a local file to Azure Blob Storage.
43
44
Args:
45
file_path (str): Path to local file
46
container_name (str): Target container name
47
blob_name (str): Target blob name
48
"""
49
50
def load_string(
51
self,
52
string_data: str,
53
container_name: str,
54
blob_name: str,
55
**kwargs
56
) -> None:
57
"""
58
Upload string data to Azure Blob Storage.
59
60
Args:
61
string_data (str): String data to upload
62
container_name (str): Target container name
63
blob_name (str): Target blob name
64
"""
65
66
def check_for_prefix(self, container_name: str, prefix: str, **kwargs) -> bool:
67
"""
68
Check if any blobs exist with the given prefix.
69
70
Args:
71
container_name (str): Name of the container
72
prefix (str): Prefix to search for
73
74
Returns:
75
bool: True if blobs with prefix exist, False otherwise
76
"""
77
78
def get_blobs_list(
79
self,
80
container_name: str,
81
prefix: str | None = None,
82
include: list | None = None,
83
delimiter: str = "/",
84
**kwargs
85
) -> list:
86
"""
87
List blobs in container with optional prefix filtering.
88
89
Args:
90
container_name (str): Name of the container
91
prefix (str, optional): Filter blobs by prefix
92
include (list, optional): Additional properties to include
93
delimiter (str): Delimiter for blob hierarchy
94
95
Returns:
96
list: List of blob names
97
"""
98
99
def get_blobs_list_recursive(
100
self,
101
container_name: str,
102
prefix: str | None = None,
103
include: list | None = None,
104
endswith: str = "",
105
**kwargs
106
) -> list:
107
"""
108
Recursively list all blobs in container.
109
110
Args:
111
container_name (str): Name of the container
112
prefix (str, optional): Filter blobs by prefix
113
include (list, optional): Additional properties to include
114
endswith (str): Filter blobs ending with string
115
116
Returns:
117
list: List of all blob names recursively
118
"""
119
120
def read_file(self, container_name: str, blob_name: str, **kwargs) -> bytes:
121
"""
122
Download blob content as bytes.
123
124
Args:
125
container_name (str): Name of the container
126
blob_name (str): Name of the blob
127
128
Returns:
129
bytes: Blob content as bytes
130
"""
131
132
def get_file(self, file_path: str, container_name: str, blob_name: str, **kwargs) -> None:
133
"""
134
Download blob to local file.
135
136
Args:
137
file_path (str): Local file path to save blob
138
container_name (str): Name of the container
139
blob_name (str): Name of the blob
140
"""
141
142
def upload(
143
self,
144
container_name: str,
145
blob_name: str,
146
data: Any,
147
blob_type: str = "BlockBlob",
148
length: int | None = None,
149
**kwargs
150
) -> dict[str, Any]:
151
"""
152
Upload data to blob with advanced options.
153
154
Args:
155
container_name (str): Name of the container
156
blob_name (str): Name of the blob
157
data: Data to upload
158
blob_type (str): Type of blob (BlockBlob, PageBlob, AppendBlob)
159
length (int, optional): Length of data
160
161
Returns:
162
dict: Upload response metadata
163
"""
164
165
def download(
166
self,
167
container_name: str,
168
blob_name: str,
169
offset: int | None = None,
170
length: int | None = None,
171
**kwargs
172
) -> StorageStreamDownloader:
173
"""
174
Download blob with range support.
175
176
Args:
177
container_name (str): Name of the container
178
blob_name (str): Name of the blob
179
offset (int, optional): Start offset for partial download
180
length (int, optional): Number of bytes to download
181
182
Returns:
183
StorageStreamDownloader: Stream downloader object
184
"""
185
186
def create_container(self, container_name: str, **kwargs) -> None:
187
"""
188
Create a new container.
189
190
Args:
191
container_name (str): Name of the container to create
192
"""
193
194
def delete_container(self, container_name: str, **kwargs) -> None:
195
"""
196
Delete an existing container.
197
198
Args:
199
container_name (str): Name of the container to delete
200
"""
201
202
def delete_blobs(self, container_name: str, *blobs, **kwargs) -> None:
203
"""
204
Delete multiple blobs from container.
205
206
Args:
207
container_name (str): Name of the container
208
*blobs: Variable number of blob names to delete
209
"""
210
211
def copy_blobs(
212
self,
213
source_container: str,
214
source_blob: str,
215
destination_container: str,
216
destination_blob: str,
217
**kwargs
218
) -> None:
219
"""
220
Copy blob from source to destination.
221
222
Args:
223
source_container (str): Source container name
224
source_blob (str): Source blob name
225
destination_container (str): Destination container name
226
destination_blob (str): Destination blob name
227
"""
228
229
def delete_file(
230
self,
231
container_name: str,
232
blob_name: str,
233
is_prefix: bool = False,
234
ignore_if_missing: bool = False,
235
**kwargs
236
) -> None:
237
"""
238
Delete blob(s) from container.
239
240
Args:
241
container_name (str): Name of the container
242
blob_name (str): Name of the blob or prefix
243
is_prefix (bool): Whether to delete all blobs with prefix
244
ignore_if_missing (bool): Don't raise error if blob doesn't exist
245
"""
246
```
247
248
### Async Blob Storage Hook
249
250
Async version of the WASB hook for deferrable operations and high-performance scenarios.
251
252
```python { .api }
253
class WasbAsyncHook(WasbHook):
254
"""
255
Async hook for Azure Blob Storage operations.
256
257
Provides async methods for blob operations with improved performance
258
for deferrable tasks and high-concurrency scenarios.
259
"""
260
261
async def get_async_conn(self) -> AsyncBlobServiceClient:
262
"""Get async Azure Blob Service client."""
263
264
async def check_for_blob_async(self, container_name: str, blob_name: str, **kwargs) -> bool:
265
"""Async version of check_for_blob."""
266
267
async def get_blobs_list_async(
268
self,
269
container_name: str,
270
prefix: str | None = None,
271
include: list | None = None,
272
delimiter: str = "/",
273
**kwargs
274
) -> list:
275
"""Async version of get_blobs_list."""
276
277
async def check_for_prefix_async(self, container_name: str, prefix: str, **kwargs) -> bool:
278
"""Async version of check_for_prefix."""
279
container_name: str,
280
blob_name: str,
281
**kwargs
282
) -> None:
283
"""
284
Upload string data as a blob.
285
286
Args:
287
string_data (str): String content to upload
288
container_name (str): Target container name
289
blob_name (str): Target blob name
290
"""
291
292
def read_file(self, container_name: str, blob_name: str, **kwargs) -> bytes:
293
"""
294
Download blob content as bytes.
295
296
Args:
297
container_name (str): Container name
298
blob_name (str): Blob name
299
300
Returns:
301
bytes: Blob content
302
"""
303
304
def delete_file(
305
self,
306
container_name: str,
307
blob_name: str,
308
is_prefix: bool = False,
309
ignore_if_missing: bool = False
310
) -> None:
311
"""
312
Delete a blob from the container.
313
314
Args:
315
container_name (str): Container name
316
blob_name (str): Blob name or prefix
317
is_prefix (bool): Whether to delete all blobs with this prefix
318
ignore_if_missing (bool): Don't raise error if blob doesn't exist
319
"""
320
321
def get_file(
322
self,
323
file_path: str,
324
container_name: str,
325
blob_name: str,
326
**kwargs
327
) -> None:
328
"""
329
Download a blob to local file.
330
331
Args:
332
file_path (str): Local file path to save to
333
container_name (str): Container name
334
blob_name (str): Blob name
335
"""
336
337
def create_container(self, container_name: str, **kwargs) -> None:
338
"""
339
Create a new container.
340
341
Args:
342
container_name (str): Name of container to create
343
"""
344
345
def delete_container(self, container_name: str, **kwargs) -> None:
346
"""
347
Delete a container and all its blobs.
348
349
Args:
350
container_name (str): Name of container to delete
351
"""
352
```
353
354
### Async Blob Storage Hook
355
356
Asynchronous version of the blob storage hook for non-blocking operations.
357
358
```python { .api }
359
class WasbAsyncHook(WasbHook):
360
"""Async hook for Azure Blob Storage operations."""
361
362
async def get_conn(self) -> BlobServiceClient:
363
"""Get authenticated async Azure Blob Service client."""
364
365
async def check_for_blob(self, container_name: str, blob_name: str) -> bool:
366
"""Async check if a blob exists."""
367
```
368
369
### Blob Deletion Operator
370
371
Operator for deleting blobs from Azure Blob Storage containers.
372
373
```python { .api }
374
class WasbDeleteBlobOperator(BaseOperator):
375
"""
376
Delete blobs from Azure Blob Storage.
377
378
Supports deleting single blobs or multiple blobs using prefix matching.
379
"""
380
381
def __init__(
382
self,
383
container_name: str,
384
blob_name: str,
385
wasb_conn_id: str = "wasb_default",
386
is_prefix: bool = False,
387
ignore_if_missing: bool = False,
388
**kwargs
389
):
390
"""
391
Initialize blob deletion operator.
392
393
Args:
394
container_name (str): Azure container name
395
blob_name (str): Blob name or prefix to delete
396
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
397
is_prefix (bool): Whether to delete all blobs with this prefix
398
ignore_if_missing (bool): Don't fail if blob doesn't exist
399
"""
400
```
401
402
### Blob Existence Sensor
403
404
Sensor that waits for a blob to exist in Azure Blob Storage.
405
406
```python { .api }
407
class WasbBlobSensor(BaseSensorOperator):
408
"""
409
Sensor that waits for a blob to exist in Azure Blob Storage.
410
411
Polls the blob storage container at regular intervals until the specified
412
blob is found or timeout is reached.
413
"""
414
415
def __init__(
416
self,
417
container_name: str,
418
blob_name: str,
419
wasb_conn_id: str = "wasb_default",
420
check_options: dict | None = None,
421
**kwargs
422
):
423
"""
424
Initialize blob sensor.
425
426
Args:
427
container_name (str): Azure container name to monitor
428
blob_name (str): Blob name to wait for
429
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
430
check_options (dict): Additional options for blob checking
431
"""
432
433
def poke(self, context: dict) -> bool:
434
"""Check if the blob exists."""
435
```
436
437
### Blob Prefix Sensor
438
439
Sensor that waits for blobs matching a prefix pattern.
440
441
```python { .api }
442
class WasbPrefixSensor(BaseSensorOperator):
443
"""
444
Sensor that waits for blobs with a specific prefix in Azure Blob Storage.
445
446
Useful for waiting for multiple files or files with unknown exact names
447
but known prefix patterns.
448
"""
449
450
def __init__(
451
self,
452
container_name: str,
453
prefix: str,
454
wasb_conn_id: str = "wasb_default",
455
**kwargs
456
):
457
"""
458
Initialize prefix sensor.
459
460
Args:
461
container_name (str): Azure container name to monitor
462
prefix (str): Blob name prefix to match
463
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
464
"""
465
```
466
467
### Async Blob Triggers
468
469
Deferrable triggers for blob monitoring that don't block worker slots.
470
471
```python { .api }
472
class WasbBlobSensorTrigger(BaseTrigger):
473
"""Async trigger for blob existence monitoring."""
474
475
def __init__(
476
self,
477
container_name: str,
478
blob_name: str,
479
wasb_conn_id: str,
480
poke_interval: float = 60,
481
**kwargs
482
):
483
"""
484
Initialize blob sensor trigger.
485
486
Args:
487
container_name (str): Container name to monitor
488
blob_name (str): Blob name to wait for
489
wasb_conn_id (str): Connection ID
490
poke_interval (float): Polling interval in seconds
491
"""
492
493
class WasbPrefixSensorTrigger(BaseTrigger):
494
"""Async trigger for blob prefix monitoring."""
495
496
def __init__(
497
self,
498
container_name: str,
499
prefix: str,
500
wasb_conn_id: str,
501
poke_interval: float = 60,
502
**kwargs
503
):
504
"""
505
Initialize prefix sensor trigger.
506
507
Args:
508
container_name (str): Container name to monitor
509
prefix (str): Blob prefix to match
510
wasb_conn_id (str): Connection ID
511
poke_interval (float): Polling interval in seconds
512
"""
513
```
514
515
### File Transfer Operators
516
517
Transfer data between local filesystem and Azure Blob Storage.
518
519
```python { .api }
520
class LocalFilesystemToWasbOperator(BaseOperator):
521
"""Transfer files from local filesystem to Azure Blob Storage."""
522
523
def __init__(
524
self,
525
file_path: str,
526
container_name: str,
527
blob_name: str,
528
wasb_conn_id: str = "wasb_default",
529
create_container: bool = False,
530
**kwargs
531
):
532
"""
533
Initialize local to WASB transfer operator.
534
535
Args:
536
file_path (str): Local file path to upload
537
container_name (str): Target container name
538
blob_name (str): Target blob name
539
wasb_conn_id (str): Connection ID
540
create_container (bool): Create container if it doesn't exist
541
"""
542
543
class SFTPToWasbOperator(BaseOperator):
544
"""Transfer files from SFTP server to Azure Blob Storage."""
545
546
def __init__(
547
self,
548
sftp_source_path: str,
549
container_name: str,
550
blob_name: str,
551
sftp_conn_id: str = "sftp_default",
552
wasb_conn_id: str = "wasb_default",
553
**kwargs
554
):
555
"""
556
Initialize SFTP to WASB transfer operator.
557
558
Args:
559
sftp_source_path (str): Source file path on SFTP server
560
container_name (str): Target container name
561
blob_name (str): Target blob name
562
sftp_conn_id (str): SFTP connection ID
563
wasb_conn_id (str): WASB connection ID
564
"""
565
566
class S3ToAzureBlobStorageOperator(BaseOperator):
567
"""Transfer objects from AWS S3 to Azure Blob Storage."""
568
569
def __init__(
570
self,
571
s3_source_key: str,
572
container_name: str,
573
blob_name: str,
574
s3_bucket: str | None = None,
575
aws_conn_id: str = "aws_default",
576
wasb_conn_id: str = "wasb_default",
577
**kwargs
578
):
579
"""
580
Initialize S3 to Azure Blob transfer operator.
581
582
Args:
583
s3_source_key (str): S3 object key to transfer
584
container_name (str): Target Azure container name
585
blob_name (str): Target blob name
586
s3_bucket (str): Source S3 bucket name
587
aws_conn_id (str): AWS connection ID
588
wasb_conn_id (str): WASB connection ID
589
"""
590
```
591
592
### Azure Blob Filesystem Interface
593
594
fsspec-compatible filesystem interface for Azure Blob Storage.
595
596
```python { .api }
597
def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) -> AbstractFileSystem:
598
"""
599
Create Azure Blob FileSystem (fsspec-compatible).
600
601
Args:
602
conn_id (str): Airflow connection ID for Azure storage
603
storage_options (dict): Additional storage configuration options
604
605
Returns:
606
AbstractFileSystem: fsspec filesystem for Azure Blob Storage
607
"""
608
```
609
610
### Logging Handler
611
612
Task log handler that writes Airflow logs to Azure Blob Storage.
613
614
```python { .api }
615
class WasbTaskHandler(FileTaskHandler):
616
"""Airflow task handler that writes logs to Azure Blob Storage."""
617
618
def __init__(
619
self,
620
base_log_folder: str,
621
wasb_log_folder: str,
622
wasb_container: str,
623
filename_template: str | None = None,
624
**kwargs
625
):
626
"""
627
Initialize WASB task handler.
628
629
Args:
630
base_log_folder (str): Base folder for local logs
631
wasb_log_folder (str): WASB folder for remote logs
632
wasb_container (str): WASB container for logs
633
filename_template (str): Log filename template
634
"""
635
636
class WasbRemoteLogIO:
637
"""Low-level I/O operations for WASB logging."""
638
639
def upload_log(self, log_content: str, remote_log_location: str) -> None:
640
"""Upload log content to WASB."""
641
642
def download_log(self, remote_log_location: str) -> str:
643
"""Download log content from WASB."""
644
```
645
646
## Usage Examples
647
648
### Basic Blob Operations
649
650
```python
651
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
652
653
# Initialize hook
654
wasb_hook = WasbHook(wasb_conn_id='azure_default')
655
656
# Upload a file
657
wasb_hook.load_file(
658
file_path='/path/to/local/file.txt',
659
container_name='my-container',
660
blob_name='uploaded-file.txt'
661
)
662
663
# Check if blob exists
664
exists = wasb_hook.check_for_blob(
665
container_name='my-container',
666
blob_name='uploaded-file.txt'
667
)
668
669
# Download blob content
670
content = wasb_hook.read_file(
671
container_name='my-container',
672
blob_name='uploaded-file.txt'
673
)
674
675
# Delete blob
676
wasb_hook.delete_file(
677
container_name='my-container',
678
blob_name='uploaded-file.txt'
679
)
680
```
681
682
### Using Sensors in DAGs
683
684
```python
685
from airflow import DAG
686
from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor, WasbPrefixSensor
687
from datetime import datetime
688
689
dag = DAG(
690
'blob_monitoring_example',
691
start_date=datetime(2024, 1, 1),
692
schedule_interval='@daily'
693
)
694
695
# Wait for specific blob
696
wait_for_file = WasbBlobSensor(
697
task_id='wait_for_input_file',
698
container_name='input-data',
699
blob_name='daily-export.csv',
700
wasb_conn_id='azure_default',
701
timeout=300,
702
poke_interval=30,
703
dag=dag
704
)
705
706
# Wait for files with prefix
707
wait_for_batch = WasbPrefixSensor(
708
task_id='wait_for_batch_files',
709
container_name='batch-data',
710
prefix='batch_2024_',
711
wasb_conn_id='azure_default',
712
dag=dag
713
)
714
```
715
716
## Connection Configuration
717
718
Azure Blob Storage connections support multiple authentication methods:
719
720
**Connection Type**: `wasb`
721
722
**Authentication Options**:
723
- **Account Key**: Use storage account name and key
724
- **SAS Token**: Use Shared Access Signature
725
- **Managed Identity**: Use Azure managed identity
726
- **Service Principal**: Use client credentials
727
728
**Connection Fields**:
729
- `account_name`: Azure storage account name
730
- `account_key`: Storage account access key (for key auth)
731
- `sas_token`: Shared access signature (for SAS auth)
732
- `client_id`: Service principal client ID
733
- `client_secret`: Service principal secret
734
- `tenant_id`: Azure tenant ID
735
736
## Error Handling
737
738
```python { .api }
739
# Custom exceptions for blob operations
740
class AzureBlobStorageException(AirflowException):
741
"""Base exception for Azure Blob Storage operations."""
742
743
class BlobNotFound(AzureBlobStorageException):
744
"""Raised when a blob is not found."""
745
746
class ContainerNotFound(AzureBlobStorageException):
747
"""Raised when a container is not found."""
748
```
749
750
The Azure Blob Storage integration provides comprehensive functionality for managing blob storage operations within Airflow workflows, with support for both simple file operations and complex data pipeline scenarios.