CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-lake-storage.mddocs/

Azure Data Lake Storage

Comprehensive Azure Data Lake Storage integration supporting both Gen1 and Gen2 with complete file system operations, directory management, data upload/download capabilities, and filesystem interface compatibility.

Capabilities

Data Lake Storage Gen1 Hook

Primary interface for Azure Data Lake Storage Gen1 operations, providing authenticated connections and file system functionality.

class AzureDataLakeHook(BaseHook):
    """
    Hook for Azure Data Lake Storage Gen1 operations.
    
    Provides methods for file operations, directory management, and data transfers.
    Supports Azure Active Directory authentication and connection configurations.
    """
    
    def get_conn(self) -> core.AzureDLFileSystem:
        """
        Get authenticated Azure Data Lake Storage Gen1 client.
        
        Returns:
            core.AzureDLFileSystem: ADLS Gen1 client instance
        """
    
    def check_for_file(self, file_path: str) -> bool:
        """
        Check if a file exists in Azure Data Lake Storage.
        
        Args:
            file_path (str): Path to the file to check
            
        Returns:
            bool: True if file exists, False otherwise
        """
    
    def upload_file(
        self,
        local_path: str,
        remote_path: str,
        nthreads: int = 64,
        overwrite: bool = True,
        buffersize: int = 4194304,
        blocksize: int = 4194304
    ) -> None:
        """
        Upload a local file to Azure Data Lake Storage.
        
        Args:
            local_path (str): Path to local file
            remote_path (str): Target path in ADLS
            nthreads (int): Number of threads for upload (default: 64)
            overwrite (bool): Whether to overwrite existing file (default: True)
            buffersize (int): Buffer size for upload (default: 4194304)
            blocksize (int): Block size for upload (default: 4194304)
        """
    
    def download_file(
        self,
        local_path: str,
        remote_path: str,
        nthreads: int = 64,
        overwrite: bool = True,
        buffersize: int = 4194304,
        blocksize: int = 4194304
    ) -> None:
        """
        Download a file from Azure Data Lake Storage to local system.
        
        Args:
            local_path (str): Local destination path
            remote_path (str): Source path in ADLS
            nthreads (int): Number of threads for download (default: 64)
            overwrite (bool): Whether to overwrite existing local file (default: True)
            buffersize (int): Buffer size for download (default: 4194304)
            blocksize (int): Block size for download (default: 4194304)
        """
    
    def list(self, path: str) -> list:
        """
        List directory contents in Azure Data Lake Storage.
        
        Args:
            path (str): Directory path to list
            
        Returns:
            list: List of files and directories in the path
        """
    
    def remove(
        self,
        path: str,
        recursive: bool = False,
        ignore_not_found: bool = True
    ) -> None:
        """
        Remove file or directory from Azure Data Lake Storage.
        
        Args:
            path (str): Path to file or directory to remove
            recursive (bool): Whether to remove directories recursively (default: False)
            ignore_not_found (bool): Don't raise error if path doesn't exist (default: True)
        """

Data Lake Storage Gen2 Hook

Advanced interface for Azure Data Lake Storage Gen2 operations with hierarchical namespace support and enhanced capabilities.

class AzureDataLakeStorageV2Hook(BaseHook):
    """
    Hook for Azure Data Lake Storage Gen2 operations.
    
    Provides methods for file system operations, directory management, and data transfers.
    Supports multiple authentication methods including managed identity and service principal.
    """
    
    def get_conn(self) -> DataLakeServiceClient:
        """
        Get authenticated Azure Data Lake Storage Gen2 service client.
        
        Returns:
            DataLakeServiceClient: ADLS Gen2 service client instance
        """
    
    def create_file_system(self, file_system_name: str) -> None:
        """
        Create a new file system (container) in Azure Data Lake Storage Gen2.
        
        Args:
            file_system_name (str): Name of the file system to create
        """
    
    def get_file_system(self, file_system: FileSystemProperties | str) -> FileSystemClient:
        """
        Get file system client for operations within a specific file system.
        
        Args:
            file_system (FileSystemProperties | str): File system name or properties
            
        Returns:
            FileSystemClient: Client for file system operations
        """
    
    def create_directory(
        self,
        file_system_name: FileSystemProperties | str,
        directory_name: str,
        metadata: dict[str, str] | None = None,
        **kwargs
    ) -> DataLakeDirectoryClient:
        """
        Create a directory in the specified file system.
        
        Args:
            file_system_name (FileSystemProperties | str): File system name or properties
            directory_name (str): Name of the directory to create
            metadata (dict[str, str] | None): Optional metadata for the directory
            **kwargs: Additional arguments
            
        Returns:
            DataLakeDirectoryClient: Client for directory operations
        """
    
    def get_directory_client(
        self,
        file_system_name: FileSystemProperties | str,
        directory_name: str
    ) -> DataLakeDirectoryClient:
        """
        Get directory client for operations within a specific directory.
        
        Args:
            file_system_name (FileSystemProperties | str): File system name or properties
            directory_name (str): Directory name
            
        Returns:
            DataLakeDirectoryClient: Client for directory operations
        """
    
    def create_file(
        self,
        file_system_name: FileSystemProperties | str,
        file_name: str
    ) -> DataLakeFileClient:
        """
        Create a new file in the specified file system.
        
        Args:
            file_system_name (FileSystemProperties | str): File system name or properties
            file_name (str): Name of the file to create
            
        Returns:
            DataLakeFileClient: Client for file operations
        """
    
    def upload_file(
        self,
        file_system_name: FileSystemProperties | str,
        file_name: str,
        file_path: str,
        overwrite: bool = False,
        **kwargs
    ) -> DataLakeFileClient:
        """
        Upload a local file to Azure Data Lake Storage Gen2.
        
        Args:
            file_system_name (FileSystemProperties | str): File system name or properties
            file_name (str): Target file name in ADLS
            file_path (str): Path to local file
            overwrite (bool): Whether to overwrite existing file (default: False)
            **kwargs: Additional arguments
            
        Returns:
            DataLakeFileClient: Client for the uploaded file
        """
    
    def upload_file_to_directory(
        self,
        file_system_name: FileSystemProperties | str,
        directory_name: str,
        file_name: str,
        file_path: str,
        overwrite: bool = False,
        **kwargs
    ) -> DataLakeFileClient:
        """
        Upload a local file to a specific directory in Azure Data Lake Storage Gen2.
        
        Args:
            file_system_name (FileSystemProperties | str): File system name or properties
            directory_name (str): Target directory name
            file_name (str): Target file name
            file_path (str): Path to local file
            overwrite (bool): Whether to overwrite existing file (default: False)
            **kwargs: Additional arguments
            
        Returns:
            DataLakeFileClient: Client for the uploaded file
        """
    
    def list_files_directory(
        self,
        file_system_name: FileSystemProperties | str,
        directory_name: str | None = None
    ) -> list:
        """
        List files in a directory within the file system.
        
        Args:
            file_system_name (FileSystemProperties | str): File system name or properties
            directory_name (str | None): Directory to list (None for root)
            
        Returns:
            list: List of files and directories
        """
    
    def list_file_system(
        self,
        prefix: str | None = None,
        include_metadata: bool = False
    ) -> list:
        """
        List all file systems in the storage account.
        
        Args:
            prefix (str | None): Filter file systems by prefix
            include_metadata (bool): Whether to include metadata (default: False)
            
        Returns:
            list: List of file systems
        """
    
    def delete_file_system(self, file_system_name: FileSystemProperties | str) -> None:
        """
        Delete a file system from Azure Data Lake Storage Gen2.
        
        Args:
            file_system_name (FileSystemProperties | str): File system name or properties
        """
    
    def delete_directory(
        self,
        file_system_name: FileSystemProperties | str,
        directory_name: str
    ) -> None:
        """
        Delete a directory from the specified file system.
        
        Args:
            file_system_name (FileSystemProperties | str): File system name or properties
            directory_name (str): Directory name to delete
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Azure Data Lake Storage Gen2 connection.
        
        Returns:
            tuple[bool, str]: Success status and message
        """

Data Lake Storage Operators

Execute Azure Data Lake Storage operations as Airflow tasks with comprehensive file and directory management capabilities.

class ADLSCreateObjectOperator(BaseOperator):
    """
    Creates objects in Azure Data Lake Storage.
    
    Supports creating both files and directories with configurable options
    and metadata.
    """
    
    def __init__(
        self,
        *,
        azure_data_lake_conn_id: str = "azure_data_lake_default",
        path: str,
        data: Any = None,
        length: int | None = None,
        **kwargs
    ):
        """
        Initialize ADLS create object operator.
        
        Args:
            azure_data_lake_conn_id (str): Airflow connection ID for ADLS
            path (str): Path to create in ADLS
            data (Any): Data to write to the object
            length (int | None): Length of data to write
        """

class ADLSDeleteOperator(BaseOperator):
    """
    Deletes objects from Azure Data Lake Storage.
    
    Supports deleting files and directories with recursive deletion
    and error handling options.
    """
    
    def __init__(
        self,
        *,
        azure_data_lake_conn_id: str = "azure_data_lake_default",
        path: str,
        recursive: bool = False,
        ignore_not_found: bool = True,
        **kwargs
    ):
        """
        Initialize ADLS delete operator.
        
        Args:
            azure_data_lake_conn_id (str): Airflow connection ID for ADLS
            path (str): Path to delete from ADLS
            recursive (bool): Whether to delete directories recursively
            ignore_not_found (bool): Don't raise error if path doesn't exist
        """

class ADLSListOperator(BaseOperator):
    """
    Lists objects in Azure Data Lake Storage.
    
    Provides directory listing capabilities with filtering and
    detailed file information retrieval.
    """
    
    def __init__(
        self,
        *,
        azure_data_lake_conn_id: str = "azure_data_lake_default",
        path: str,
        **kwargs
    ):
        """
        Initialize ADLS list operator.
        
        Args:
            azure_data_lake_conn_id (str): Airflow connection ID for ADLS
            path (str): Path to list in ADLS
        """

Filesystem Interface

Provides fsspec-compatible filesystem interface for Azure Data Lake Storage integration with data processing frameworks.

def get_fs(
    conn_id: str | None,
    storage_options: dict[str, Any] | None = None
) -> AbstractFileSystem:
    """
    Create Azure Blob FileSystem (fsspec-compatible) for Data Lake Storage.
    
    Supports both ADLS Gen1 and Gen2 with automatic protocol detection
    and credential management.
    
    Args:
        conn_id (str | None): Airflow connection ID for ADLS configuration
        storage_options (dict[str, Any] | None): Additional storage options
        
    Returns:
        AbstractFileSystem: fsspec-compatible filesystem interface
        
    Supported Schemes:
        - abfs: Azure Data Lake Storage Gen2
        - abfss: Azure Data Lake Storage Gen2 (secure)
        - adl: Azure Data Lake Storage Gen1
    """

Usage Examples

Basic File Operations with ADLS Gen1

from airflow import DAG
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def upload_to_adls():
    """Upload file to Azure Data Lake Storage Gen1."""
    hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
    
    # Upload local file
    hook.upload_file(
        local_path='/tmp/data.csv',
        remote_path='/raw/data.csv',
        overwrite=True
    )
    
    # Verify upload
    if hook.check_for_file('/raw/data.csv'):
        print("File uploaded successfully")

def process_adls_directory():
    """Process files in ADLS directory."""
    hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
    
    # List directory contents
    files = hook.list('/raw/')
    print(f"Found {len(files)} files")
    
    # Download and process each file
    for file_info in files:
        if file_info['name'].endswith('.csv'):
            hook.download_file(
                local_path=f"/tmp/{file_info['name']}",
                remote_path=file_info['name']
            )

dag = DAG(
    'adls_gen1_workflow',
    default_args={
        'owner': 'data-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    description='ADLS Gen1 data processing workflow',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

upload_task = PythonOperator(
    task_id='upload_to_adls',
    python_callable=upload_to_adls,
    dag=dag
)

process_task = PythonOperator(
    task_id='process_directory',
    python_callable=process_adls_directory,
    dag=dag
)

upload_task >> process_task

Advanced Operations with ADLS Gen2

from airflow import DAG
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook
from airflow.providers.microsoft.azure.operators.adls import (
    ADLSCreateObjectOperator,
    ADLSListOperator,
    ADLSDeleteOperator
)
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def setup_adls_structure():
    """Set up directory structure in ADLS Gen2."""
    hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
    
    # Create file system if it doesn't exist
    try:
        hook.create_file_system('data-lake')
    except Exception as e:
        print(f"File system may already exist: {e}")
    
    # Create directory structure
    directories = ['raw', 'processed', 'archive']
    for directory in directories:
        hook.create_directory(
            file_system_name='data-lake',
            directory_name=directory,
            metadata={'created_by': 'airflow', 'purpose': 'data_processing'}
        )

def upload_with_metadata():
    """Upload file with custom metadata to ADLS Gen2."""
    hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
    
    # Upload to specific directory
    file_client = hook.upload_file_to_directory(
        file_system_name='data-lake',
        directory_name='raw',
        file_name='sales_data.json',
        file_path='/tmp/sales_data.json',
        overwrite=True
    )
    
    # Set custom metadata
    file_client.set_metadata({
        'source': 'sales_system',
        'format': 'json',
        'upload_date': datetime.now().isoformat()
    })

def list_and_process():
    """List files and process them."""
    hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
    
    # List files in raw directory
    files = hook.list_files_directory(
        file_system_name='data-lake',
        directory_name='raw'
    )
    
    for file_info in files:
        print(f"Processing file: {file_info['name']}")
        # File processing logic here
        
        # Move processed file to archive
        # (Implementation would involve download, process, upload to processed/)

dag = DAG(
    'adls_gen2_advanced_workflow',
    default_args={
        'owner': 'data-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=3)
    },
    description='Advanced ADLS Gen2 workflow with directory management',
    schedule_interval=timedelta(hours=6),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Setup directory structure
setup_task = PythonOperator(
    task_id='setup_directories',
    python_callable=setup_adls_structure,
    dag=dag
)

# Upload data files
upload_task = PythonOperator(
    task_id='upload_with_metadata',
    python_callable=upload_with_metadata,
    dag=dag
)

# List and process files
list_task = ADLSListOperator(
    task_id='list_raw_files',
    azure_data_lake_conn_id='adls_v2_conn',
    path='raw/',
    dag=dag
)

process_task = PythonOperator(
    task_id='process_files',
    python_callable=list_and_process,
    dag=dag
)

# Clean up old files
cleanup_task = ADLSDeleteOperator(
    task_id='cleanup_old_files',
    azure_data_lake_conn_id='adls_v2_conn',
    path='archive/old/',
    recursive=True,
    ignore_not_found=True,
    dag=dag
)

setup_task >> upload_task >> list_task >> process_task >> cleanup_task

Filesystem Interface Usage

from airflow.providers.microsoft.azure.fs.adls import get_fs
import pandas as pd

def use_fsspec_interface():
    """Use fsspec interface for data processing."""
    # Get filesystem instance
    fs = get_fs(
        conn_id='adls_v2_conn',
        storage_options={'account_name': 'mystorageaccount'}
    )
    
    # Use with pandas for direct file access
    df = pd.read_csv('abfs://data-lake/raw/sales_data.csv', storage_options={'account_name': 'mystorageaccount'})
    
    # Process data
    processed_df = df.groupby('region').sum()
    
    # Write back using fsspec
    processed_df.to_csv('abfs://data-lake/processed/sales_summary.csv', storage_options={'account_name': 'mystorageaccount'})
    
    # List files using fsspec
    files = fs.ls('abfs://data-lake/processed/')
    print(f"Processed files: {files}")

Connection Configuration

ADLS Gen1 Connection (azure_data_lake)

Configure Azure Data Lake Storage Gen1 connections in Airflow:

# Connection configuration for ADLS Gen1
{
    "conn_id": "azure_data_lake_default",
    "conn_type": "azure_data_lake",
    "host": "mydatalake.azuredatalakestore.net",
    "extra": {
        "tenant_id": "your-tenant-id",
        "client_id": "your-client-id",
        "client_secret": "your-client-secret"
    }
}

ADLS Gen2 Connection (adls)

Configure Azure Data Lake Storage Gen2 connections in Airflow:

# Connection configuration for ADLS Gen2
{
    "conn_id": "adls_default", 
    "conn_type": "adls",
    "login": "mystorageaccount",  # Storage account name
    "extra": {
        "account_url": "https://mystorageaccount.dfs.core.windows.net",
        "tenant_id": "your-tenant-id",
        "client_id": "your-client-id", 
        "client_secret": "your-client-secret"
    }
}

Authentication Methods

Both ADLS Gen1 and Gen2 support multiple authentication methods:

  1. Service Principal Authentication:

    extra = {
        "tenant_id": "your-tenant-id",
        "client_id": "your-client-id", 
        "client_secret": "your-client-secret"
    }
  2. Managed Identity Authentication:

    extra = {
        "managed_identity_client_id": "your-managed-identity-client-id"
    }
  3. Account Key Authentication (Gen2 only):

    extra = {
        "account_key": "your-storage-account-key"
    }
  4. SAS Token Authentication (Gen2 only):

    extra = {
        "sas_token": "your-sas-token"
    }

Error Handling

Common Exception Patterns

from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook

def robust_file_operations():
    """Demonstrate error handling patterns."""
    hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_conn')
    
    try:
        # Attempt file operation
        hook.upload_file(
            file_system_name='data-lake',
            file_name='data.csv',
            file_path='/tmp/data.csv'
        )
    except ResourceExistsError:
        print("File already exists, skipping upload")
    except ResourceNotFoundError:
        print("File system doesn't exist, creating it first")
        hook.create_file_system('data-lake')
        # Retry upload
        hook.upload_file(
            file_system_name='data-lake',
            file_name='data.csv', 
            file_path='/tmp/data.csv'
        )
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise

Connection Testing

def test_adls_connections():
    """Test both ADLS Gen1 and Gen2 connections."""
    
    # Test Gen1 connection
    try:
        gen1_hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_gen1_conn')
        client = gen1_hook.get_conn()
        files = gen1_hook.list('/')
        print("ADLS Gen1 connection successful")
    except Exception as e:
        print(f"ADLS Gen1 connection failed: {e}")
    
    # Test Gen2 connection
    try:
        gen2_hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_gen2_conn')
        success, message = gen2_hook.test_connection()
        print(f"ADLS Gen2 connection: {message}")
    except Exception as e:
        print(f"ADLS Gen2 connection failed: {e}")

Performance Considerations

Optimizing File Operations

def optimized_bulk_upload():
    """Optimize bulk file uploads to ADLS."""
    hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
    
    # Use multiple threads for large files
    hook.upload_file(
        local_path='/tmp/large_file.csv',
        remote_path='/data/large_file.csv',
        nthreads=128,  # Increase threads for better performance
        buffersize=8388608,  # 8MB buffer for large files
        blocksize=8388608
    )

def batch_directory_operations():
    """Batch operations for better performance."""
    hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')
    
    # Get file system client once
    fs_client = hook.get_file_system('data-lake')
    
    # Batch multiple operations
    files_to_upload = ['file1.csv', 'file2.json', 'file3.parquet']
    
    for filename in files_to_upload:
        file_client = hook.create_file('data-lake', f'batch/{filename}')
        with open(f'/tmp/{filename}', 'rb') as data:
            file_client.upload_data(data, overwrite=True)

This comprehensive documentation covers all Azure Data Lake Storage capabilities in the Apache Airflow Microsoft Azure Provider, including both Gen1 and Gen2 implementations, filesystem interfaces, and practical usage patterns for data lake operations.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json