Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Azure Data Lake Storage integration supporting both Gen1 and Gen2 with complete file system operations, directory management, data upload/download capabilities, and filesystem interface compatibility.
Primary interface for Azure Data Lake Storage Gen1 operations, providing authenticated connections and file system functionality.
class AzureDataLakeHook(BaseHook):
"""
Hook for Azure Data Lake Storage Gen1 operations.
Provides methods for file operations, directory management, and data transfers.
Supports Azure Active Directory authentication and connection configurations.
"""
def get_conn(self) -> core.AzureDLFileSystem:
"""
Get authenticated Azure Data Lake Storage Gen1 client.
Returns:
core.AzureDLFileSystem: ADLS Gen1 client instance
"""
def check_for_file(self, file_path: str) -> bool:
"""
Check if a file exists in Azure Data Lake Storage.
Args:
file_path (str): Path to the file to check
Returns:
bool: True if file exists, False otherwise
"""
def upload_file(
self,
local_path: str,
remote_path: str,
nthreads: int = 64,
overwrite: bool = True,
buffersize: int = 4194304,
blocksize: int = 4194304
) -> None:
"""
Upload a local file to Azure Data Lake Storage.
Args:
local_path (str): Path to local file
remote_path (str): Target path in ADLS
nthreads (int): Number of threads for upload (default: 64)
overwrite (bool): Whether to overwrite existing file (default: True)
buffersize (int): Buffer size for upload (default: 4194304)
blocksize (int): Block size for upload (default: 4194304)
"""
def download_file(
self,
local_path: str,
remote_path: str,
nthreads: int = 64,
overwrite: bool = True,
buffersize: int = 4194304,
blocksize: int = 4194304
) -> None:
"""
Download a file from Azure Data Lake Storage to local system.
Args:
local_path (str): Local destination path
remote_path (str): Source path in ADLS
nthreads (int): Number of threads for download (default: 64)
overwrite (bool): Whether to overwrite existing local file (default: True)
buffersize (int): Buffer size for download (default: 4194304)
blocksize (int): Block size for download (default: 4194304)
"""
def list(self, path: str) -> list:
"""
List directory contents in Azure Data Lake Storage.
Args:
path (str): Directory path to list
Returns:
list: List of files and directories in the path
"""
def remove(
self,
path: str,
recursive: bool = False,
ignore_not_found: bool = True
) -> None:
"""
Remove file or directory from Azure Data Lake Storage.
Args:
path (str): Path to file or directory to remove
recursive (bool): Whether to remove directories recursively (default: False)
ignore_not_found (bool): Don't raise error if path doesn't exist (default: True)
"""Advanced interface for Azure Data Lake Storage Gen2 operations with hierarchical namespace support and enhanced capabilities.
class AzureDataLakeStorageV2Hook(BaseHook):
"""
Hook for Azure Data Lake Storage Gen2 operations.
Provides methods for file system operations, directory management, and data transfers.
Supports multiple authentication methods including managed identity and service principal.
"""
def get_conn(self) -> DataLakeServiceClient:
"""
Get authenticated Azure Data Lake Storage Gen2 service client.
Returns:
DataLakeServiceClient: ADLS Gen2 service client instance
"""
def create_file_system(self, file_system_name: str) -> None:
"""
Create a new file system (container) in Azure Data Lake Storage Gen2.
Args:
file_system_name (str): Name of the file system to create
"""
def get_file_system(self, file_system: FileSystemProperties | str) -> FileSystemClient:
"""
Get file system client for operations within a specific file system.
Args:
file_system (FileSystemProperties | str): File system name or properties
Returns:
FileSystemClient: Client for file system operations
"""
def create_directory(
self,
file_system_name: FileSystemProperties | str,
directory_name: str,
metadata: dict[str, str] | None = None,
**kwargs
) -> DataLakeDirectoryClient:
"""
Create a directory in the specified file system.
Args:
file_system_name (FileSystemProperties | str): File system name or properties
directory_name (str): Name of the directory to create
metadata (dict[str, str] | None): Optional metadata for the directory
**kwargs: Additional arguments
Returns:
DataLakeDirectoryClient: Client for directory operations
"""
def get_directory_client(
self,
file_system_name: FileSystemProperties | str,
directory_name: str
) -> DataLakeDirectoryClient:
"""
Get directory client for operations within a specific directory.
Args:
file_system_name (FileSystemProperties | str): File system name or properties
directory_name (str): Directory name
Returns:
DataLakeDirectoryClient: Client for directory operations
"""
def create_file(
self,
file_system_name: FileSystemProperties | str,
file_name: str
) -> DataLakeFileClient:
"""
Create a new file in the specified file system.
Args:
file_system_name (FileSystemProperties | str): File system name or properties
file_name (str): Name of the file to create
Returns:
DataLakeFileClient: Client for file operations
"""
def upload_file(
self,
file_system_name: FileSystemProperties | str,
file_name: str,
file_path: str,
overwrite: bool = False,
**kwargs
) -> DataLakeFileClient:
"""
Upload a local file to Azure Data Lake Storage Gen2.
Args:
file_system_name (FileSystemProperties | str): File system name or properties
file_name (str): Target file name in ADLS
file_path (str): Path to local file
overwrite (bool): Whether to overwrite existing file (default: False)
**kwargs: Additional arguments
Returns:
DataLakeFileClient: Client for the uploaded file
"""
def upload_file_to_directory(
self,
file_system_name: FileSystemProperties | str,
directory_name: str,
file_name: str,
file_path: str,
overwrite: bool = False,
**kwargs
) -> DataLakeFileClient:
"""
Upload a local file to a specific directory in Azure Data Lake Storage Gen2.
Args:
file_system_name (FileSystemProperties | str): File system name or properties
directory_name (str): Target directory name
file_name (str): Target file name
file_path (str): Path to local file
overwrite (bool): Whether to overwrite existing file (default: False)
**kwargs: Additional arguments
Returns:
DataLakeFileClient: Client for the uploaded file
"""
def list_files_directory(
self,
file_system_name: FileSystemProperties | str,
directory_name: str | None = None
) -> list:
"""
List files in a directory within the file system.
Args:
file_system_name (FileSystemProperties | str): File system name or properties
directory_name (str | None): Directory to list (None for root)
Returns:
list: List of files and directories
"""
def list_file_system(
self,
prefix: str | None = None,
include_metadata: bool = False
) -> list:
"""
List all file systems in the storage account.
Args:
prefix (str | None): Filter file systems by prefix
include_metadata (bool): Whether to include metadata (default: False)
Returns:
list: List of file systems
"""
def delete_file_system(self, file_system_name: FileSystemProperties | str) -> None:
"""
Delete a file system from Azure Data Lake Storage Gen2.
Args:
file_system_name (FileSystemProperties | str): File system name or properties
"""
def delete_directory(
self,
file_system_name: FileSystemProperties | str,
directory_name: str
) -> None:
"""
Delete a directory from the specified file system.
Args:
file_system_name (FileSystemProperties | str): File system name or properties
directory_name (str): Directory name to delete
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Azure Data Lake Storage Gen2 connection.
Returns:
tuple[bool, str]: Success status and message
"""Execute Azure Data Lake Storage operations as Airflow tasks with comprehensive file and directory management capabilities.
class ADLSCreateObjectOperator(BaseOperator):
"""
Creates objects in Azure Data Lake Storage.
Supports creating both files and directories with configurable options
and metadata.
"""
def __init__(
self,
*,
azure_data_lake_conn_id: str = "azure_data_lake_default",
path: str,
data: Any = None,
length: int | None = None,
**kwargs
):
"""
Initialize ADLS create object operator.
Args:
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
path (str): Path to create in ADLS
data (Any): Data to write to the object
length (int | None): Length of data to write
"""
class ADLSDeleteOperator(BaseOperator):
"""
Deletes objects from Azure Data Lake Storage.
Supports deleting files and directories with recursive deletion
and error handling options.
"""
def __init__(
self,
*,
azure_data_lake_conn_id: str = "azure_data_lake_default",
path: str,
recursive: bool = False,
ignore_not_found: bool = True,
**kwargs
):
"""
Initialize ADLS delete operator.
Args:
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
path (str): Path to delete from ADLS
recursive (bool): Whether to delete directories recursively
ignore_not_found (bool): Don't raise error if path doesn't exist
"""
class ADLSListOperator(BaseOperator):
"""
Lists objects in Azure Data Lake Storage.
Provides directory listing capabilities with filtering and
detailed file information retrieval.
"""
def __init__(
self,
*,
azure_data_lake_conn_id: str = "azure_data_lake_default",
path: str,
**kwargs
):
"""
Initialize ADLS list operator.
Args:
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
path (str): Path to list in ADLS
"""Provides fsspec-compatible filesystem interface for Azure Data Lake Storage integration with data processing frameworks.
def get_fs(
conn_id: str | None,
storage_options: dict[str, Any] | None = None
) -> AbstractFileSystem:
"""
Create Azure Blob FileSystem (fsspec-compatible) for Data Lake Storage.
Supports both ADLS Gen1 and Gen2 with automatic protocol detection
and credential management.
Args:
conn_id (str | None): Airflow connection ID for ADLS configuration
storage_options (dict[str, Any] | None): Additional storage options
Returns:
AbstractFileSystem: fsspec-compatible filesystem interface
Supported Schemes:
- abfs: Azure Data Lake Storage Gen2
- abfss: Azure Data Lake Storage Gen2 (secure)
- adl: Azure Data Lake Storage Gen1
"""from airflow import DAG
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def upload_to_adls():
"""Upload file to Azure Data Lake Storage Gen1."""
hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
# Upload local file
hook.upload_file(
local_path='/tmp/data.csv',
remote_path='/raw/data.csv',
overwrite=True
)
# Verify upload
if hook.check_for_file('/raw/data.csv'):
print("File uploaded successfully")
def process_adls_directory():
"""Process files in ADLS directory."""
hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
# List directory contents
files = hook.list('/raw/')
print(f"Found {len(files)} files")
# Download and process each file
for file_info in files:
if file_info['name'].endswith('.csv'):
hook.download_file(
local_path=f"/tmp/{file_info['name']}",
remote_path=file_info['name']
)
dag = DAG(
'adls_gen1_workflow',
default_args={
'owner': 'data-team',
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='ADLS Gen1 data processing workflow',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
upload_task = PythonOperator(
task_id='upload_to_adls',
python_callable=upload_to_adls,
dag=dag
)
process_task = PythonOperator(
task_id='process_directory',
python_callable=process_adls_directory,
dag=dag
)
upload_task >> process_taskfrom airflow import DAG
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook
from airflow.providers.microsoft.azure.operators.adls import (
ADLSCreateObjectOperator,
ADLSListOperator,
ADLSDeleteOperator
)
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def setup_adls_structure():
"""Set up directory structure in ADLS Gen2."""
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
# Create file system if it doesn't exist
try:
hook.create_file_system('data-lake')
except Exception as e:
print(f"File system may already exist: {e}")
# Create directory structure
directories = ['raw', 'processed', 'archive']
for directory in directories:
hook.create_directory(
file_system_name='data-lake',
directory_name=directory,
metadata={'created_by': 'airflow', 'purpose': 'data_processing'}
)
def upload_with_metadata():
"""Upload file with custom metadata to ADLS Gen2."""
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
# Upload to specific directory
file_client = hook.upload_file_to_directory(
file_system_name='data-lake',
directory_name='raw',
file_name='sales_data.json',
file_path='/tmp/sales_data.json',
overwrite=True
)
# Set custom metadata
file_client.set_metadata({
'source': 'sales_system',
'format': 'json',
'upload_date': datetime.now().isoformat()
})
def list_and_process():
"""List files and process them."""
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
# List files in raw directory
files = hook.list_files_directory(
file_system_name='data-lake',
directory_name='raw'
)
for file_info in files:
print(f"Processing file: {file_info['name']}")
# File processing logic here
# Move processed file to archive
# (Implementation would involve download, process, upload to processed/)
dag = DAG(
'adls_gen2_advanced_workflow',
default_args={
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=3)
},
description='Advanced ADLS Gen2 workflow with directory management',
schedule_interval=timedelta(hours=6),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Setup directory structure
setup_task = PythonOperator(
task_id='setup_directories',
python_callable=setup_adls_structure,
dag=dag
)
# Upload data files
upload_task = PythonOperator(
task_id='upload_with_metadata',
python_callable=upload_with_metadata,
dag=dag
)
# List and process files
list_task = ADLSListOperator(
task_id='list_raw_files',
azure_data_lake_conn_id='adls_v2_conn',
path='raw/',
dag=dag
)
process_task = PythonOperator(
task_id='process_files',
python_callable=list_and_process,
dag=dag
)
# Clean up old files
cleanup_task = ADLSDeleteOperator(
task_id='cleanup_old_files',
azure_data_lake_conn_id='adls_v2_conn',
path='archive/old/',
recursive=True,
ignore_not_found=True,
dag=dag
)
setup_task >> upload_task >> list_task >> process_task >> cleanup_taskfrom airflow.providers.microsoft.azure.fs.adls import get_fs
import pandas as pd
def use_fsspec_interface():
"""Use fsspec interface for data processing."""
# Get filesystem instance
fs = get_fs(
conn_id='adls_v2_conn',
storage_options={'account_name': 'mystorageaccount'}
)
# Use with pandas for direct file access
df = pd.read_csv('abfs://data-lake/raw/sales_data.csv', storage_options={'account_name': 'mystorageaccount'})
# Process data
processed_df = df.groupby('region').sum()
# Write back using fsspec
processed_df.to_csv('abfs://data-lake/processed/sales_summary.csv', storage_options={'account_name': 'mystorageaccount'})
# List files using fsspec
files = fs.ls('abfs://data-lake/processed/')
print(f"Processed files: {files}")azure_data_lake)Configure Azure Data Lake Storage Gen1 connections in Airflow:
# Connection configuration for ADLS Gen1
{
"conn_id": "azure_data_lake_default",
"conn_type": "azure_data_lake",
"host": "mydatalake.azuredatalakestore.net",
"extra": {
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret"
}
}adls)Configure Azure Data Lake Storage Gen2 connections in Airflow:
# Connection configuration for ADLS Gen2
{
"conn_id": "adls_default",
"conn_type": "adls",
"login": "mystorageaccount", # Storage account name
"extra": {
"account_url": "https://mystorageaccount.dfs.core.windows.net",
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret"
}
}Both ADLS Gen1 and Gen2 support multiple authentication methods:
Service Principal Authentication:
extra = {
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret"
}Managed Identity Authentication:
extra = {
"managed_identity_client_id": "your-managed-identity-client-id"
}Account Key Authentication (Gen2 only):
extra = {
"account_key": "your-storage-account-key"
}SAS Token Authentication (Gen2 only):
extra = {
"sas_token": "your-sas-token"
}from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook
def robust_file_operations():
"""Demonstrate error handling patterns."""
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_conn')
try:
# Attempt file operation
hook.upload_file(
file_system_name='data-lake',
file_name='data.csv',
file_path='/tmp/data.csv'
)
except ResourceExistsError:
print("File already exists, skipping upload")
except ResourceNotFoundError:
print("File system doesn't exist, creating it first")
hook.create_file_system('data-lake')
# Retry upload
hook.upload_file(
file_system_name='data-lake',
file_name='data.csv',
file_path='/tmp/data.csv'
)
except Exception as e:
print(f"Unexpected error: {e}")
raisedef test_adls_connections():
"""Test both ADLS Gen1 and Gen2 connections."""
# Test Gen1 connection
try:
gen1_hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_gen1_conn')
client = gen1_hook.get_conn()
files = gen1_hook.list('/')
print("ADLS Gen1 connection successful")
except Exception as e:
print(f"ADLS Gen1 connection failed: {e}")
# Test Gen2 connection
try:
gen2_hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_gen2_conn')
success, message = gen2_hook.test_connection()
print(f"ADLS Gen2 connection: {message}")
except Exception as e:
print(f"ADLS Gen2 connection failed: {e}")def optimized_bulk_upload():
"""Optimize bulk file uploads to ADLS."""
hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
# Use multiple threads for large files
hook.upload_file(
local_path='/tmp/large_file.csv',
remote_path='/data/large_file.csv',
nthreads=128, # Increase threads for better performance
buffersize=8388608, # 8MB buffer for large files
blocksize=8388608
)
def batch_directory_operations():
"""Batch operations for better performance."""
hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
# Get file system client once
fs_client = hook.get_file_system('data-lake')
# Batch multiple operations
files_to_upload = ['file1.csv', 'file2.json', 'file3.parquet']
for filename in files_to_upload:
file_client = hook.create_file('data-lake', f'batch/{filename}')
with open(f'/tmp/{filename}', 'rb') as data:
file_client.upload_data(data, overwrite=True)This comprehensive documentation covers all Azure Data Lake Storage capabilities in the Apache Airflow Microsoft Azure Provider, including both Gen1 and Gen2 implementations, filesystem interfaces, and practical usage patterns for data lake operations.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure