0
# Azure Data Lake Storage
1
2
Comprehensive Azure Data Lake Storage integration supporting both Gen1 and Gen2 with complete file system operations, directory management, data upload/download capabilities, and filesystem interface compatibility.
3
4
## Capabilities
5
6
### Data Lake Storage Gen1 Hook
7
8
Primary interface for Azure Data Lake Storage Gen1 operations, providing authenticated connections and file system functionality.
9
10
```python { .api }
11
class AzureDataLakeHook(BaseHook):
12
"""
13
Hook for Azure Data Lake Storage Gen1 operations.
14
15
Provides methods for file operations, directory management, and data transfers.
16
Supports Azure Active Directory authentication and connection configurations.
17
"""
18
19
def get_conn(self) -> core.AzureDLFileSystem:
20
"""
21
Get authenticated Azure Data Lake Storage Gen1 client.
22
23
Returns:
24
core.AzureDLFileSystem: ADLS Gen1 client instance
25
"""
26
27
def check_for_file(self, file_path: str) -> bool:
28
"""
29
Check if a file exists in Azure Data Lake Storage.
30
31
Args:
32
file_path (str): Path to the file to check
33
34
Returns:
35
bool: True if file exists, False otherwise
36
"""
37
38
def upload_file(
39
self,
40
local_path: str,
41
remote_path: str,
42
nthreads: int = 64,
43
overwrite: bool = True,
44
buffersize: int = 4194304,
45
blocksize: int = 4194304
46
) -> None:
47
"""
48
Upload a local file to Azure Data Lake Storage.
49
50
Args:
51
local_path (str): Path to local file
52
remote_path (str): Target path in ADLS
53
nthreads (int): Number of threads for upload (default: 64)
54
overwrite (bool): Whether to overwrite existing file (default: True)
55
buffersize (int): Buffer size for upload (default: 4194304)
56
blocksize (int): Block size for upload (default: 4194304)
57
"""
58
59
def download_file(
60
self,
61
local_path: str,
62
remote_path: str,
63
nthreads: int = 64,
64
overwrite: bool = True,
65
buffersize: int = 4194304,
66
blocksize: int = 4194304
67
) -> None:
68
"""
69
Download a file from Azure Data Lake Storage to local system.
70
71
Args:
72
local_path (str): Local destination path
73
remote_path (str): Source path in ADLS
74
nthreads (int): Number of threads for download (default: 64)
75
overwrite (bool): Whether to overwrite existing local file (default: True)
76
buffersize (int): Buffer size for download (default: 4194304)
77
blocksize (int): Block size for download (default: 4194304)
78
"""
79
80
def list(self, path: str) -> list:
81
"""
82
List directory contents in Azure Data Lake Storage.
83
84
Args:
85
path (str): Directory path to list
86
87
Returns:
88
list: List of files and directories in the path
89
"""
90
91
def remove(
92
self,
93
path: str,
94
recursive: bool = False,
95
ignore_not_found: bool = True
96
) -> None:
97
"""
98
Remove file or directory from Azure Data Lake Storage.
99
100
Args:
101
path (str): Path to file or directory to remove
102
recursive (bool): Whether to remove directories recursively (default: False)
103
ignore_not_found (bool): Don't raise error if path doesn't exist (default: True)
104
"""
105
```
106
107
### Data Lake Storage Gen2 Hook
108
109
Advanced interface for Azure Data Lake Storage Gen2 operations with hierarchical namespace support and enhanced capabilities.
110
111
```python { .api }
112
class AzureDataLakeStorageV2Hook(BaseHook):
113
"""
114
Hook for Azure Data Lake Storage Gen2 operations.
115
116
Provides methods for file system operations, directory management, and data transfers.
117
Supports multiple authentication methods including managed identity and service principal.
118
"""
119
120
def get_conn(self) -> DataLakeServiceClient:
121
"""
122
Get authenticated Azure Data Lake Storage Gen2 service client.
123
124
Returns:
125
DataLakeServiceClient: ADLS Gen2 service client instance
126
"""
127
128
def create_file_system(self, file_system_name: str) -> None:
129
"""
130
Create a new file system (container) in Azure Data Lake Storage Gen2.
131
132
Args:
133
file_system_name (str): Name of the file system to create
134
"""
135
136
def get_file_system(self, file_system: FileSystemProperties | str) -> FileSystemClient:
137
"""
138
Get file system client for operations within a specific file system.
139
140
Args:
141
file_system (FileSystemProperties | str): File system name or properties
142
143
Returns:
144
FileSystemClient: Client for file system operations
145
"""
146
147
def create_directory(
148
self,
149
file_system_name: FileSystemProperties | str,
150
directory_name: str,
151
metadata: dict[str, str] | None = None,
152
**kwargs
153
) -> DataLakeDirectoryClient:
154
"""
155
Create a directory in the specified file system.
156
157
Args:
158
file_system_name (FileSystemProperties | str): File system name or properties
159
directory_name (str): Name of the directory to create
160
metadata (dict[str, str] | None): Optional metadata for the directory
161
**kwargs: Additional arguments
162
163
Returns:
164
DataLakeDirectoryClient: Client for directory operations
165
"""
166
167
def get_directory_client(
168
self,
169
file_system_name: FileSystemProperties | str,
170
directory_name: str
171
) -> DataLakeDirectoryClient:
172
"""
173
Get directory client for operations within a specific directory.
174
175
Args:
176
file_system_name (FileSystemProperties | str): File system name or properties
177
directory_name (str): Directory name
178
179
Returns:
180
DataLakeDirectoryClient: Client for directory operations
181
"""
182
183
def create_file(
184
self,
185
file_system_name: FileSystemProperties | str,
186
file_name: str
187
) -> DataLakeFileClient:
188
"""
189
Create a new file in the specified file system.
190
191
Args:
192
file_system_name (FileSystemProperties | str): File system name or properties
193
file_name (str): Name of the file to create
194
195
Returns:
196
DataLakeFileClient: Client for file operations
197
"""
198
199
def upload_file(
200
self,
201
file_system_name: FileSystemProperties | str,
202
file_name: str,
203
file_path: str,
204
overwrite: bool = False,
205
**kwargs
206
) -> DataLakeFileClient:
207
"""
208
Upload a local file to Azure Data Lake Storage Gen2.
209
210
Args:
211
file_system_name (FileSystemProperties | str): File system name or properties
212
file_name (str): Target file name in ADLS
213
file_path (str): Path to local file
214
overwrite (bool): Whether to overwrite existing file (default: False)
215
**kwargs: Additional arguments
216
217
Returns:
218
DataLakeFileClient: Client for the uploaded file
219
"""
220
221
def upload_file_to_directory(
222
self,
223
file_system_name: FileSystemProperties | str,
224
directory_name: str,
225
file_name: str,
226
file_path: str,
227
overwrite: bool = False,
228
**kwargs
229
) -> DataLakeFileClient:
230
"""
231
Upload a local file to a specific directory in Azure Data Lake Storage Gen2.
232
233
Args:
234
file_system_name (FileSystemProperties | str): File system name or properties
235
directory_name (str): Target directory name
236
file_name (str): Target file name
237
file_path (str): Path to local file
238
overwrite (bool): Whether to overwrite existing file (default: False)
239
**kwargs: Additional arguments
240
241
Returns:
242
DataLakeFileClient: Client for the uploaded file
243
"""
244
245
def list_files_directory(
246
self,
247
file_system_name: FileSystemProperties | str,
248
directory_name: str | None = None
249
) -> list:
250
"""
251
List files in a directory within the file system.
252
253
Args:
254
file_system_name (FileSystemProperties | str): File system name or properties
255
directory_name (str | None): Directory to list (None for root)
256
257
Returns:
258
list: List of files and directories
259
"""
260
261
def list_file_system(
262
self,
263
prefix: str | None = None,
264
include_metadata: bool = False
265
) -> list:
266
"""
267
List all file systems in the storage account.
268
269
Args:
270
prefix (str | None): Filter file systems by prefix
271
include_metadata (bool): Whether to include metadata (default: False)
272
273
Returns:
274
list: List of file systems
275
"""
276
277
def delete_file_system(self, file_system_name: FileSystemProperties | str) -> None:
278
"""
279
Delete a file system from Azure Data Lake Storage Gen2.
280
281
Args:
282
file_system_name (FileSystemProperties | str): File system name or properties
283
"""
284
285
def delete_directory(
286
self,
287
file_system_name: FileSystemProperties | str,
288
directory_name: str
289
) -> None:
290
"""
291
Delete a directory from the specified file system.
292
293
Args:
294
file_system_name (FileSystemProperties | str): File system name or properties
295
directory_name (str): Directory name to delete
296
"""
297
298
def test_connection(self) -> tuple[bool, str]:
299
"""
300
Test the Azure Data Lake Storage Gen2 connection.
301
302
Returns:
303
tuple[bool, str]: Success status and message
304
"""
305
```
306
307
### Data Lake Storage Operators
308
309
Execute Azure Data Lake Storage operations as Airflow tasks with comprehensive file and directory management capabilities.
310
311
```python { .api }
312
class ADLSCreateObjectOperator(BaseOperator):
313
"""
314
Creates objects in Azure Data Lake Storage.
315
316
Supports creating both files and directories with configurable options
317
and metadata.
318
"""
319
320
def __init__(
321
self,
322
*,
323
azure_data_lake_conn_id: str = "azure_data_lake_default",
324
path: str,
325
data: Any = None,
326
length: int | None = None,
327
**kwargs
328
):
329
"""
330
Initialize ADLS create object operator.
331
332
Args:
333
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
334
path (str): Path to create in ADLS
335
data (Any): Data to write to the object
336
length (int | None): Length of data to write
337
"""
338
339
class ADLSDeleteOperator(BaseOperator):
340
"""
341
Deletes objects from Azure Data Lake Storage.
342
343
Supports deleting files and directories with recursive deletion
344
and error handling options.
345
"""
346
347
def __init__(
348
self,
349
*,
350
azure_data_lake_conn_id: str = "azure_data_lake_default",
351
path: str,
352
recursive: bool = False,
353
ignore_not_found: bool = True,
354
**kwargs
355
):
356
"""
357
Initialize ADLS delete operator.
358
359
Args:
360
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
361
path (str): Path to delete from ADLS
362
recursive (bool): Whether to delete directories recursively
363
ignore_not_found (bool): Don't raise error if path doesn't exist
364
"""
365
366
class ADLSListOperator(BaseOperator):
367
"""
368
Lists objects in Azure Data Lake Storage.
369
370
Provides directory listing capabilities with filtering and
371
detailed file information retrieval.
372
"""
373
374
def __init__(
375
self,
376
*,
377
azure_data_lake_conn_id: str = "azure_data_lake_default",
378
path: str,
379
**kwargs
380
):
381
"""
382
Initialize ADLS list operator.
383
384
Args:
385
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
386
path (str): Path to list in ADLS
387
"""
388
```
389
390
### Filesystem Interface
391
392
Provides fsspec-compatible filesystem interface for Azure Data Lake Storage integration with data processing frameworks.
393
394
```python { .api }
395
def get_fs(
396
conn_id: str | None,
397
storage_options: dict[str, Any] | None = None
398
) -> AbstractFileSystem:
399
"""
400
Create Azure Blob FileSystem (fsspec-compatible) for Data Lake Storage.
401
402
Supports both ADLS Gen1 and Gen2 with automatic protocol detection
403
and credential management.
404
405
Args:
406
conn_id (str | None): Airflow connection ID for ADLS configuration
407
storage_options (dict[str, Any] | None): Additional storage options
408
409
Returns:
410
AbstractFileSystem: fsspec-compatible filesystem interface
411
412
Supported Schemes:
413
- abfs: Azure Data Lake Storage Gen2
414
- abfss: Azure Data Lake Storage Gen2 (secure)
415
- adl: Azure Data Lake Storage Gen1
416
"""
417
```
418
419
## Usage Examples
420
421
### Basic File Operations with ADLS Gen1
422
423
```python
424
from airflow import DAG
425
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
426
from airflow.operators.python import PythonOperator
427
from datetime import datetime, timedelta
428
429
def upload_to_adls():
430
"""Upload file to Azure Data Lake Storage Gen1."""
431
hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
432
433
# Upload local file
434
hook.upload_file(
435
local_path='/tmp/data.csv',
436
remote_path='/raw/data.csv',
437
overwrite=True
438
)
439
440
# Verify upload
441
if hook.check_for_file('/raw/data.csv'):
442
print("File uploaded successfully")
443
444
def process_adls_directory():
445
"""Process files in ADLS directory."""
446
hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
447
448
# List directory contents
449
files = hook.list('/raw/')
450
print(f"Found {len(files)} files")
451
452
# Download and process each file
453
for file_info in files:
454
if file_info['name'].endswith('.csv'):
455
hook.download_file(
456
local_path=f"/tmp/{file_info['name']}",
457
remote_path=file_info['name']
458
)
459
460
dag = DAG(
461
'adls_gen1_workflow',
462
default_args={
463
'owner': 'data-team',
464
'retries': 1,
465
'retry_delay': timedelta(minutes=5)
466
},
467
description='ADLS Gen1 data processing workflow',
468
schedule_interval=timedelta(days=1),
469
start_date=datetime(2024, 1, 1),
470
catchup=False
471
)
472
473
upload_task = PythonOperator(
474
task_id='upload_to_adls',
475
python_callable=upload_to_adls,
476
dag=dag
477
)
478
479
process_task = PythonOperator(
480
task_id='process_directory',
481
python_callable=process_adls_directory,
482
dag=dag
483
)
484
485
upload_task >> process_task
486
```
487
488
### Advanced Operations with ADLS Gen2
489
490
```python
491
from airflow import DAG
492
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook
493
from airflow.providers.microsoft.azure.operators.adls import (
494
ADLSCreateObjectOperator,
495
ADLSListOperator,
496
ADLSDeleteOperator
497
)
498
from airflow.operators.python import PythonOperator
499
from datetime import datetime, timedelta
500
501
def setup_adls_structure():
502
"""Set up directory structure in ADLS Gen2."""
503
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
504
505
# Create file system if it doesn't exist
506
try:
507
hook.create_file_system('data-lake')
508
except Exception as e:
509
print(f"File system may already exist: {e}")
510
511
# Create directory structure
512
directories = ['raw', 'processed', 'archive']
513
for directory in directories:
514
hook.create_directory(
515
file_system_name='data-lake',
516
directory_name=directory,
517
metadata={'created_by': 'airflow', 'purpose': 'data_processing'}
518
)
519
520
def upload_with_metadata():
521
"""Upload file with custom metadata to ADLS Gen2."""
522
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
523
524
# Upload to specific directory
525
file_client = hook.upload_file_to_directory(
526
file_system_name='data-lake',
527
directory_name='raw',
528
file_name='sales_data.json',
529
file_path='/tmp/sales_data.json',
530
overwrite=True
531
)
532
533
# Set custom metadata
534
file_client.set_metadata({
535
'source': 'sales_system',
536
'format': 'json',
537
'upload_date': datetime.now().isoformat()
538
})
539
540
def list_and_process():
541
"""List files and process them."""
542
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
543
544
# List files in raw directory
545
files = hook.list_files_directory(
546
file_system_name='data-lake',
547
directory_name='raw'
548
)
549
550
for file_info in files:
551
print(f"Processing file: {file_info['name']}")
552
# File processing logic here
553
554
# Move processed file to archive
555
# (Implementation would involve download, process, upload to processed/)
556
557
dag = DAG(
558
'adls_gen2_advanced_workflow',
559
default_args={
560
'owner': 'data-team',
561
'retries': 2,
562
'retry_delay': timedelta(minutes=3)
563
},
564
description='Advanced ADLS Gen2 workflow with directory management',
565
schedule_interval=timedelta(hours=6),
566
start_date=datetime(2024, 1, 1),
567
catchup=False
568
)
569
570
# Setup directory structure
571
setup_task = PythonOperator(
572
task_id='setup_directories',
573
python_callable=setup_adls_structure,
574
dag=dag
575
)
576
577
# Upload data files
578
upload_task = PythonOperator(
579
task_id='upload_with_metadata',
580
python_callable=upload_with_metadata,
581
dag=dag
582
)
583
584
# List and process files
585
list_task = ADLSListOperator(
586
task_id='list_raw_files',
587
azure_data_lake_conn_id='adls_v2_conn',
588
path='raw/',
589
dag=dag
590
)
591
592
process_task = PythonOperator(
593
task_id='process_files',
594
python_callable=list_and_process,
595
dag=dag
596
)
597
598
# Clean up old files
599
cleanup_task = ADLSDeleteOperator(
600
task_id='cleanup_old_files',
601
azure_data_lake_conn_id='adls_v2_conn',
602
path='archive/old/',
603
recursive=True,
604
ignore_not_found=True,
605
dag=dag
606
)
607
608
setup_task >> upload_task >> list_task >> process_task >> cleanup_task
609
```
610
611
### Filesystem Interface Usage
612
613
```python
614
from airflow.providers.microsoft.azure.fs.adls import get_fs
615
import pandas as pd
616
617
def use_fsspec_interface():
618
"""Use fsspec interface for data processing."""
619
# Get filesystem instance
620
fs = get_fs(
621
conn_id='adls_v2_conn',
622
storage_options={'account_name': 'mystorageaccount'}
623
)
624
625
# Use with pandas for direct file access
626
df = pd.read_csv('abfs://data-lake/raw/sales_data.csv', storage_options={'account_name': 'mystorageaccount'})
627
628
# Process data
629
processed_df = df.groupby('region').sum()
630
631
# Write back using fsspec
632
processed_df.to_csv('abfs://data-lake/processed/sales_summary.csv', storage_options={'account_name': 'mystorageaccount'})
633
634
# List files using fsspec
635
files = fs.ls('abfs://data-lake/processed/')
636
print(f"Processed files: {files}")
637
```
638
639
## Connection Configuration
640
641
### ADLS Gen1 Connection (`azure_data_lake`)
642
643
Configure Azure Data Lake Storage Gen1 connections in Airflow:
644
645
```python
646
# Connection configuration for ADLS Gen1
647
{
648
"conn_id": "azure_data_lake_default",
649
"conn_type": "azure_data_lake",
650
"host": "mydatalake.azuredatalakestore.net",
651
"extra": {
652
"tenant_id": "your-tenant-id",
653
"client_id": "your-client-id",
654
"client_secret": "your-client-secret"
655
}
656
}
657
```
658
659
### ADLS Gen2 Connection (`adls`)
660
661
Configure Azure Data Lake Storage Gen2 connections in Airflow:
662
663
```python
664
# Connection configuration for ADLS Gen2
665
{
666
"conn_id": "adls_default",
667
"conn_type": "adls",
668
"login": "mystorageaccount", # Storage account name
669
"extra": {
670
"account_url": "https://mystorageaccount.dfs.core.windows.net",
671
"tenant_id": "your-tenant-id",
672
"client_id": "your-client-id",
673
"client_secret": "your-client-secret"
674
}
675
}
676
```
677
678
### Authentication Methods
679
680
Both ADLS Gen1 and Gen2 support multiple authentication methods:
681
682
1. **Service Principal Authentication**:
683
```python
684
extra = {
685
"tenant_id": "your-tenant-id",
686
"client_id": "your-client-id",
687
"client_secret": "your-client-secret"
688
}
689
```
690
691
2. **Managed Identity Authentication**:
692
```python
693
extra = {
694
"managed_identity_client_id": "your-managed-identity-client-id"
695
}
696
```
697
698
3. **Account Key Authentication** (Gen2 only):
699
```python
700
extra = {
701
"account_key": "your-storage-account-key"
702
}
703
```
704
705
4. **SAS Token Authentication** (Gen2 only):
706
```python
707
extra = {
708
"sas_token": "your-sas-token"
709
}
710
```
711
712
## Error Handling
713
714
### Common Exception Patterns
715
716
```python
717
from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError
718
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook
719
720
def robust_file_operations():
721
"""Demonstrate error handling patterns."""
722
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_conn')
723
724
try:
725
# Attempt file operation
726
hook.upload_file(
727
file_system_name='data-lake',
728
file_name='data.csv',
729
file_path='/tmp/data.csv'
730
)
731
except ResourceExistsError:
732
print("File already exists, skipping upload")
733
except ResourceNotFoundError:
734
print("File system doesn't exist, creating it first")
735
hook.create_file_system('data-lake')
736
# Retry upload
737
hook.upload_file(
738
file_system_name='data-lake',
739
file_name='data.csv',
740
file_path='/tmp/data.csv'
741
)
742
except Exception as e:
743
print(f"Unexpected error: {e}")
744
raise
745
```
746
747
### Connection Testing
748
749
```python
750
def test_adls_connections():
751
"""Test both ADLS Gen1 and Gen2 connections."""
752
753
# Test Gen1 connection
754
try:
755
gen1_hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_gen1_conn')
756
client = gen1_hook.get_conn()
757
files = gen1_hook.list('/')
758
print("ADLS Gen1 connection successful")
759
except Exception as e:
760
print(f"ADLS Gen1 connection failed: {e}")
761
762
# Test Gen2 connection
763
try:
764
gen2_hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_gen2_conn')
765
success, message = gen2_hook.test_connection()
766
print(f"ADLS Gen2 connection: {message}")
767
except Exception as e:
768
print(f"ADLS Gen2 connection failed: {e}")
769
```
770
771
## Performance Considerations
772
773
### Optimizing File Operations
774
775
```python
776
def optimized_bulk_upload():
777
"""Optimize bulk file uploads to ADLS."""
778
hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
779
780
# Use multiple threads for large files
781
hook.upload_file(
782
local_path='/tmp/large_file.csv',
783
remote_path='/data/large_file.csv',
784
nthreads=128, # Increase threads for better performance
785
buffersize=8388608, # 8MB buffer for large files
786
blocksize=8388608
787
)
788
789
def batch_directory_operations():
790
"""Batch operations for better performance."""
791
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
792
793
# Get file system client once
794
fs_client = hook.get_file_system('data-lake')
795
796
# Batch multiple operations
797
files_to_upload = ['file1.csv', 'file2.json', 'file3.parquet']
798
799
for filename in files_to_upload:
800
file_client = hook.create_file('data-lake', f'batch/{filename}')
801
with open(f'/tmp/{filename}', 'rb') as data:
802
file_client.upload_data(data, overwrite=True)
803
```
804
805
This comprehensive documentation covers all Azure Data Lake Storage capabilities in the Apache Airflow Microsoft Azure Provider, including both Gen1 and Gen2 implementations, filesystem interfaces, and practical usage patterns for data lake operations.