CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-transfers.mddocs/

Data Transfer Operations

Specialized operators for transferring data between Azure services and external systems including local filesystem, SFTP servers, Oracle databases, and AWS S3. Provides comprehensive data movement capabilities with error handling and performance optimization.

Capabilities

Local to Azure Transfers

Transfer data from local filesystem to Azure services with comprehensive file handling and upload capabilities.

class LocalFilesystemToADLSOperator(BaseOperator):
    """
    Transfers files from local filesystem to Azure Data Lake Storage.
    
    Supports uploading local files to both ADLS Gen1 and Gen2 with
    configurable options for overwrite, directory creation, and metadata.
    """
    
    def __init__(
        self,
        *,
        local_path: str,
        remote_path: str,
        azure_data_lake_conn_id: str = "azure_data_lake_default",
        overwrite: bool = True,
        nthreads: int = 64,
        buffersize: int = 4194304,
        blocksize: int = 4194304,
        **kwargs
    ):
        """
        Initialize local to ADLS transfer operator.
        
        Args:
            local_path (str): Path to local file or directory
            remote_path (str): Target path in Azure Data Lake Storage
            azure_data_lake_conn_id (str): Airflow connection ID for ADLS
            overwrite (bool): Whether to overwrite existing files (default: True)
            nthreads (int): Number of threads for upload (default: 64)
            buffersize (int): Buffer size for upload (default: 4194304)
            blocksize (int): Block size for upload (default: 4194304)
        """
    
    def execute(self, context: Context) -> dict[str, Any]:
        """
        Execute file transfer from local filesystem to ADLS.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            dict[str, Any]: Transfer results including file count and sizes
        """

class LocalFilesystemToWasbOperator(BaseOperator):
    """
    Transfers files from local filesystem to Azure Blob Storage.
    
    Supports uploading local files to Azure Blob Storage with configurable
    options for container creation, overwrite behavior, and metadata.
    """
    
    def __init__(
        self,
        *,
        file_path: str,
        container_name: str,
        blob_name: str,
        azure_conn_id: str = "wasb_default",
        create_container: bool = False,
        overwrite: bool = True,
        content_settings: dict[str, Any] | None = None,
        metadata: dict[str, str] | None = None,
        **kwargs
    ):
        """
        Initialize local to Azure Blob Storage transfer operator.
        
        Args:
            file_path (str): Path to local file
            container_name (str): Target container name in Azure Blob Storage
            blob_name (str): Target blob name
            azure_conn_id (str): Airflow connection ID for Azure Blob Storage
            create_container (bool): Whether to create container if it doesn't exist
            overwrite (bool): Whether to overwrite existing blob (default: True)
            content_settings (dict[str, Any] | None): Blob content settings
            metadata (dict[str, str] | None): Blob metadata
        """
    
    def execute(self, context: Context) -> str:
        """
        Execute file transfer from local filesystem to Azure Blob Storage.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            str: Blob URL of uploaded file
        """

Database to Azure Transfers

Transfer data from database systems to Azure services with query execution and data transformation capabilities.

class OracleToAzureDataLakeOperator(BaseOperator):
    """
    Transfers data from Oracle database to Azure Data Lake Storage.
    
    Executes Oracle queries and uploads results to ADLS with support for
    various data formats, partitioning, and incremental transfers.
    """
    
    def __init__(
        self,
        *,
        filename: str,
        azure_data_lake_conn_id: str,
        oracle_conn_id: str,
        sql: str,
        sql_params: dict[str, Any] | None = None,
        delimiter: str = "\t",
        encoding: str = "utf-8",
        quotechar: str = '"',
        quoting: int = csv.QUOTE_MINIMAL,
        **kwargs
    ):
        """
        Initialize Oracle to Azure Data Lake transfer operator.
        
        Args:
            filename (str): Target filename in Azure Data Lake Storage
            azure_data_lake_conn_id (str): Airflow connection ID for ADLS
            oracle_conn_id (str): Airflow connection ID for Oracle database
            sql (str): SQL query to execute on Oracle database
            sql_params (dict[str, Any] | None): Parameters for SQL query
            delimiter (str): Field delimiter for output file (default: tab)
            encoding (str): File encoding (default: "utf-8")
            quotechar (str): Quote character for CSV (default: '"')
            quoting (int): Quoting behavior (default: csv.QUOTE_MINIMAL)
        """
    
    def execute(self, context: Context) -> str:
        """
        Execute data transfer from Oracle to Azure Data Lake Storage.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            str: Path to uploaded file in ADLS
        """

Cloud-to-Cloud Transfers

Transfer data between different cloud services with comprehensive protocol support and authentication handling.

class SFTPToWasbOperator(BaseOperator):
    """
    Transfers files from SFTP server to Azure Blob Storage.
    
    Downloads files from SFTP servers and uploads them to Azure Blob Storage
    with support for directory traversal, file filtering, and batch processing.
    """
    
    def __init__(
        self,
        *,
        sftp_source_path: str,
        container_name: str,
        blob_name: str,
        sftp_conn_id: str = "sftp_default",
        wasb_conn_id: str = "wasb_default",
        create_container: bool = False,
        overwrite: bool = True,
        move_object: bool = False,
        **kwargs
    ):
        """
        Initialize SFTP to Azure Blob Storage transfer operator.
        
        Args:
            sftp_source_path (str): Path to source file on SFTP server
            container_name (str): Target container name in Azure Blob Storage
            blob_name (str): Target blob name
            sftp_conn_id (str): Airflow connection ID for SFTP server
            wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
            create_container (bool): Whether to create container if it doesn't exist
            overwrite (bool): Whether to overwrite existing blob (default: True)
            move_object (bool): Whether to delete source file after transfer
        """
    
    def execute(self, context: Context) -> str:
        """
        Execute file transfer from SFTP to Azure Blob Storage.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            str: Blob URL of transferred file
        """

class S3ToAzureBlobStorageOperator(BaseOperator):
    """
    Transfers objects from AWS S3 to Azure Blob Storage.
    
    Downloads objects from AWS S3 and uploads them to Azure Blob Storage
    with support for large files, batch processing, and metadata preservation.
    """
    
    def __init__(
        self,
        *,
        s3_source_key: str,
        container_name: str,
        blob_name: str,
        s3_bucket: str | None = None,
        aws_conn_id: str = "aws_default",
        wasb_conn_id: str = "wasb_default",
        create_container: bool = False,
        overwrite: bool = True,
        s3_verify: bool | str | None = None,
        s3_extra_args: dict[str, Any] | None = None,
        wasb_extra_args: dict[str, Any] | None = None,
        **kwargs
    ):
        """
        Initialize AWS S3 to Azure Blob Storage transfer operator.
        
        Args:
            s3_source_key (str): Source object key in AWS S3
            container_name (str): Target container name in Azure Blob Storage
            blob_name (str): Target blob name
            s3_bucket (str | None): Source S3 bucket name
            aws_conn_id (str): Airflow connection ID for AWS S3
            wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
            create_container (bool): Whether to create container if it doesn't exist
            overwrite (bool): Whether to overwrite existing blob (default: True)
            s3_verify (bool | str | None): S3 SSL verification configuration
            s3_extra_args (dict[str, Any] | None): Additional S3 arguments
            wasb_extra_args (dict[str, Any] | None): Additional WASB arguments
        """
    
    def execute(self, context: Context) -> str:
        """
        Execute object transfer from AWS S3 to Azure Blob Storage.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            str: Blob URL of transferred object
        """

Supporting Exception Classes

Custom exception classes for handling transfer operation errors and edge cases.

class TooManyFilesToMoveException(Exception):
    """
    Exception for bulk transfer limits.
    
    Raised when transfer operations exceed configured limits for
    batch processing or concurrent file transfers.
    """
    pass

class InvalidAzureBlobParameters(Exception):
    """
    Exception for invalid blob parameters.
    
    Raised when blob storage parameters are invalid or
    incompatible with the operation being performed.
    """
    pass

class InvalidKeyComponents(Exception):
    """
    Exception for invalid key components.
    
    Raised when file path or key components are invalid
    for the target storage system.
    """
    pass

Usage Examples

Local to Azure Transfers

from airflow import DAG
from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator
from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os

def prepare_local_files():
    """Prepare local files for transfer to Azure."""
    # Create sample data files
    data_dir = '/tmp/data_export'
    os.makedirs(data_dir, exist_ok=True)
    
    # Generate sample CSV file
    import csv
    csv_file = os.path.join(data_dir, 'sales_data.csv')
    with open(csv_file, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['date', 'product', 'sales', 'region'])
        for i in range(1000):
            writer.writerow([
                f'2024-01-{(i % 31) + 1:02d}',
                f'Product_{i % 10}',
                f'{1000 + (i * 10)}',
                f'Region_{i % 5}'
            ])
    
    # Generate sample JSON file
    import json
    json_file = os.path.join(data_dir, 'customer_data.json')
    customer_data = {
        'customers': [
            {'id': i, 'name': f'Customer_{i}', 'email': f'customer{i}@example.com'}
            for i in range(100)
        ]
    }
    with open(json_file, 'w') as f:
        json.dump(customer_data, f, indent=2)
    
    return {
        'csv_file': csv_file,
        'json_file': json_file,
        'data_dir': data_dir
    }

def verify_transfers(**context):
    """Verify that files were transferred successfully."""
    from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
    
    wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')
    
    # Check if files exist in blob storage
    files_to_check = [
        ('data-exports', 'sales/sales_data.csv'),
        ('data-exports', 'customers/customer_data.json')
    ]
    
    results = {}
    for container, blob_name in files_to_check:
        exists = wasb_hook.check_for_blob(container, blob_name)
        results[blob_name] = exists
        print(f"File {blob_name}: {'✓ Found' if exists else '✗ Not found'}")
    
    if all(results.values()):
        print("All files transferred successfully!")
    else:
        raise ValueError("Some files were not transferred successfully")
    
    return results

dag = DAG(
    'local_to_azure_transfers',
    default_args={
        'owner': 'data-transfer-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=5)
    },
    description='Transfer local files to Azure services',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Prepare local files
prep_files = PythonOperator(
    task_id='prepare_files',
    python_callable=prepare_local_files,
    dag=dag
)

# Transfer CSV to Azure Blob Storage
transfer_csv = LocalFilesystemToWasbOperator(
    task_id='transfer_csv_to_blob',
    file_path='/tmp/data_export/sales_data.csv',
    container_name='data-exports',
    blob_name='sales/sales_data.csv',
    azure_conn_id='azure_blob_conn',
    create_container=True,
    overwrite=True,
    metadata={
        'source': 'local_filesystem',
        'export_date': '{{ ds }}',
        'file_type': 'csv'
    },
    dag=dag
)

# Transfer JSON to Azure Data Lake Storage
transfer_json = LocalFilesystemToADLSOperator(
    task_id='transfer_json_to_adls',
    local_path='/tmp/data_export/customer_data.json',
    remote_path='/exports/customers/customer_data_{{ ds_nodash }}.json',
    azure_data_lake_conn_id='adls_conn',
    overwrite=True,
    dag=dag
)

# Verify transfers
verify_files = PythonOperator(
    task_id='verify_transfers',
    python_callable=verify_transfers,
    dag=dag
)

prep_files >> [transfer_csv, transfer_json] >> verify_files

Database to Azure Transfer

from airflow import DAG
from airflow.providers.microsoft.azure.transfers.oracle_to_azure_data_lake import OracleToAzureDataLakeOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def prepare_oracle_queries():
    """Prepare Oracle queries for data extraction."""
    queries = {
        'sales_summary': """
            SELECT 
                TO_CHAR(order_date, 'YYYY-MM-DD') as order_date,
                product_category,
                COUNT(*) as order_count,
                SUM(order_total) as total_sales,
                AVG(order_total) as avg_order_value
            FROM sales_orders 
            WHERE order_date >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
                AND order_date < TO_DATE('{{ next_ds }}', 'YYYY-MM-DD')
            GROUP BY TO_CHAR(order_date, 'YYYY-MM-DD'), product_category
            ORDER BY order_date, product_category
        """,
        
        'customer_activity': """
            SELECT 
                c.customer_id,
                c.customer_name,
                c.email,
                COUNT(o.order_id) as order_count,
                SUM(o.order_total) as total_spent,
                MAX(o.order_date) as last_order_date
            FROM customers c
            LEFT JOIN sales_orders o ON c.customer_id = o.customer_id
            WHERE c.created_date <= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
            GROUP BY c.customer_id, c.customer_name, c.email
            HAVING COUNT(o.order_id) > 0
            ORDER BY total_spent DESC
        """,
        
        'inventory_status': """
            SELECT 
                p.product_id,
                p.product_name,
                p.category,
                i.current_stock,
                i.reserved_stock,
                i.available_stock,
                TO_CHAR(i.last_updated, 'YYYY-MM-DD HH24:MI:SS') as last_updated
            FROM products p
            JOIN inventory i ON p.product_id = i.product_id
            WHERE i.last_updated >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
            ORDER BY p.category, p.product_name
        """
    }
    
    return queries

def validate_extracted_data(**context):
    """Validate extracted data in Azure Data Lake Storage."""
    from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
    
    adls_hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
    
    # Check extracted files
    extracted_files = [
        f'/oracle_exports/{{ ds }}/sales_summary.tsv',
        f'/oracle_exports/{{ ds }}/customer_activity.tsv',
        f'/oracle_exports/{{ ds }}/inventory_status.tsv'
    ]
    
    validation_results = {}
    
    for file_path in extracted_files:
        rendered_path = file_path.replace('{{ ds }}', context['ds'])
        
        if adls_hook.check_for_file(rendered_path):
            # Get file size and row count
            file_content = adls_hook.get_conn().cat(rendered_path)
            row_count = len(file_content.decode('utf-8').split('\n')) - 1  # Subtract header
            file_size = len(file_content)
            
            validation_results[rendered_path] = {
                'exists': True,
                'size_bytes': file_size,
                'row_count': row_count
            }
            print(f"✓ {rendered_path}: {row_count} rows, {file_size} bytes")
        else:
            validation_results[rendered_path] = {
                'exists': False,
                'size_bytes': 0,
                'row_count': 0
            }
            print(f"✗ {rendered_path}: File not found")
    
    # Validate minimum data requirements
    min_requirements = {
        'sales_summary.tsv': 1,    # At least 1 row
        'customer_activity.tsv': 10,  # At least 10 customers
        'inventory_status.tsv': 50   # At least 50 products
    }
    
    validation_passed = True
    for file_path, result in validation_results.items():
        file_name = file_path.split('/')[-1]
        min_rows = min_requirements.get(file_name, 0)
        
        if not result['exists'] or result['row_count'] < min_rows:
            validation_passed = False
            print(f"Validation failed for {file_name}: Expected >= {min_rows} rows, got {result['row_count']}")
    
    if not validation_passed:
        raise ValueError("Data validation failed")
    
    return validation_results

dag = DAG(
    'oracle_to_azure_transfer',
    default_args={
        'owner': 'data-engineering-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=10)
    },
    description='Extract data from Oracle to Azure Data Lake Storage',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Prepare queries
prep_queries = PythonOperator(
    task_id='prepare_queries',
    python_callable=prepare_oracle_queries,
    dag=dag
)

# Extract sales summary data
extract_sales = OracleToAzureDataLakeOperator(
    task_id='extract_sales_summary',
    filename='/oracle_exports/{{ ds }}/sales_summary.tsv',
    azure_data_lake_conn_id='adls_conn',
    oracle_conn_id='oracle_conn',
    sql="""
        SELECT 
            TO_CHAR(order_date, 'YYYY-MM-DD') as order_date,
            product_category,
            COUNT(*) as order_count,
            SUM(order_total) as total_sales,
            AVG(order_total) as avg_order_value
        FROM sales_orders 
        WHERE order_date >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
            AND order_date < TO_DATE('{{ next_ds }}', 'YYYY-MM-DD')
        GROUP BY TO_CHAR(order_date, 'YYYY-MM-DD'), product_category
        ORDER BY order_date, product_category
    """,
    delimiter='\t',
    encoding='utf-8',
    dag=dag
)

# Extract customer activity data
extract_customers = OracleToAzureDataLakeOperator(
    task_id='extract_customer_activity',
    filename='/oracle_exports/{{ ds }}/customer_activity.tsv',
    azure_data_lake_conn_id='adls_conn',
    oracle_conn_id='oracle_conn',
    sql="""
        SELECT 
            c.customer_id,
            c.customer_name,
            c.email,
            COUNT(o.order_id) as order_count,
            SUM(o.order_total) as total_spent,
            MAX(o.order_date) as last_order_date
        FROM customers c
        LEFT JOIN sales_orders o ON c.customer_id = o.customer_id
        WHERE c.created_date <= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
        GROUP BY c.customer_id, c.customer_name, c.email
        HAVING COUNT(o.order_id) > 0
        ORDER BY total_spent DESC
    """,
    delimiter='\t',
    encoding='utf-8',
    dag=dag
)

# Extract inventory data
extract_inventory = OracleToAzureDataLakeOperator(
    task_id='extract_inventory_status',
    filename='/oracle_exports/{{ ds }}/inventory_status.tsv',
    azure_data_lake_conn_id='adls_conn',
    oracle_conn_id='oracle_conn',
    sql="""
        SELECT 
            p.product_id,
            p.product_name,
            p.category,
            i.current_stock,
            i.reserved_stock,
            i.available_stock,
            TO_CHAR(i.last_updated, 'YYYY-MM-DD HH24:MI:SS') as last_updated
        FROM products p
        JOIN inventory i ON p.product_id = i.product_id
        WHERE i.last_updated >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
        ORDER BY p.category, p.product_name
    """,
    delimiter='\t',
    encoding='utf-8',
    dag=dag
)

# Validate extracted data
validate_data = PythonOperator(
    task_id='validate_data',
    python_callable=validate_extracted_data,
    dag=dag
)

prep_queries >> [extract_sales, extract_customers, extract_inventory] >> validate_data

Cloud-to-Cloud Transfers

from airflow import DAG
from airflow.providers.microsoft.azure.transfers.s3_to_wasb import S3ToAzureBlobStorageOperator
from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def discover_source_files():
    """Discover files available for transfer from various sources."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook
    from airflow.providers.sftp.hooks.sftp import SFTPHook
    
    # Discover S3 files
    s3_hook = S3Hook(aws_conn_id='aws_conn')
    s3_files = s3_hook.list_keys(
        bucket_name='source-data-bucket',
        prefix='daily-exports/{{ ds }}/',
        delimiter=''
    )
    
    # Discover SFTP files
    sftp_hook = SFTPHook(ssh_conn_id='sftp_conn')
    sftp_files = sftp_hook.list_directory('/exports/{{ ds }}/')
    
    return {
        's3_files': s3_files or [],
        'sftp_files': sftp_files or []
    }

def monitor_transfer_progress(**context):
    """Monitor transfer progress and generate summary."""
    from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
    
    wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')
    
    # Check transferred files
    containers_to_check = ['s3-transfers', 'sftp-transfers']
    transfer_summary = {}
    
    for container in containers_to_check:
        try:
            blobs = wasb_hook.get_container_client(container).list_blobs(
                name_starts_with=f"{{ ds }}/"
            )
            
            blob_list = []
            total_size = 0
            
            for blob in blobs:
                blob_info = {
                    'name': blob.name,
                    'size': blob.size,
                    'last_modified': blob.last_modified.isoformat() if blob.last_modified else None
                }
                blob_list.append(blob_info)
                total_size += blob.size or 0
            
            transfer_summary[container] = {
                'file_count': len(blob_list),
                'total_size_bytes': total_size,
                'files': blob_list
            }
            
            print(f"Container {container}: {len(blob_list)} files, {total_size:,} bytes")
            
        except Exception as e:
            print(f"Error checking container {container}: {e}")
            transfer_summary[container] = {
                'error': str(e),
                'file_count': 0,
                'total_size_bytes': 0
            }
    
    return transfer_summary

def cleanup_source_files(**context):
    """Clean up source files after successful transfer (optional)."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook
    
    # Only cleanup if all transfers were successful
    transfer_summary = context['task_instance'].xcom_pull(task_ids='monitor_progress')
    
    total_files_transferred = sum(
        container_info.get('file_count', 0) 
        for container_info in transfer_summary.values()
    )
    
    if total_files_transferred > 0:
        print(f"Successfully transferred {total_files_transferred} files")
        
        # Optional: Delete source files from S3 after successful transfer
        # Uncomment the following lines if cleanup is desired
        """
        s3_hook = S3Hook(aws_conn_id='aws_conn')
        source_files = context['task_instance'].xcom_pull(task_ids='discover_files')['s3_files']
        
        for s3_key in source_files:
            try:
                s3_hook.delete_objects(
                    bucket='source-data-bucket',
                    keys=[s3_key]
                )
                print(f"Deleted source file: {s3_key}")
            except Exception as e:
                print(f"Failed to delete {s3_key}: {e}")
        """
    
    return total_files_transferred

dag = DAG(
    'cloud_to_azure_transfers',
    default_args={
        'owner': 'integration-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=5)
    },
    description='Transfer files from various cloud sources to Azure',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Discover source files
discover_files = PythonOperator(
    task_id='discover_files',
    python_callable=discover_source_files,
    dag=dag
)

# Transfer from AWS S3 to Azure Blob Storage
transfer_s3_files = S3ToAzureBlobStorageOperator(
    task_id='transfer_s3_data',
    s3_source_key='daily-exports/{{ ds }}/sales_data.csv',
    s3_bucket='source-data-bucket',
    container_name='s3-transfers',
    blob_name='{{ ds }}/sales_data.csv',
    aws_conn_id='aws_conn',
    wasb_conn_id='azure_blob_conn',
    create_container=True,
    overwrite=True,
    s3_extra_args={
        'ServerSideEncryption': 'AES256'
    },
    wasb_extra_args={
        'content_settings': {
            'content_type': 'text/csv',
            'cache_control': 'no-cache'
        },
        'metadata': {
            'source': 's3',
            'transfer_date': '{{ ds }}',
            'original_bucket': 'source-data-bucket'
        }
    },
    dag=dag
)

# Transfer from SFTP to Azure Blob Storage
transfer_sftp_files = SFTPToWasbOperator(
    task_id='transfer_sftp_data',
    sftp_source_path='/exports/{{ ds }}/inventory_update.json',
    container_name='sftp-transfers',
    blob_name='{{ ds }}/inventory_update.json',
    sftp_conn_id='sftp_conn',
    wasb_conn_id='azure_blob_conn',
    create_container=True,
    overwrite=True,
    move_object=False,  # Keep original file on SFTP server
    dag=dag
)

# Transfer additional S3 files with pattern matching
transfer_s3_logs = S3ToAzureBlobStorageOperator(
    task_id='transfer_s3_logs',
    s3_source_key='logs/{{ ds }}/application.log',
    s3_bucket='source-data-bucket',
    container_name='s3-transfers',
    blob_name='{{ ds }}/logs/application.log',
    aws_conn_id='aws_conn',
    wasb_conn_id='azure_blob_conn',
    create_container=True,
    overwrite=True,
    dag=dag
)

# Monitor transfer progress
monitor_progress = PythonOperator(
    task_id='monitor_progress',
    python_callable=monitor_transfer_progress,
    dag=dag
)

# Optional cleanup
cleanup_sources = PythonOperator(
    task_id='cleanup_sources',
    python_callable=cleanup_source_files,
    dag=dag
)

# Set up dependencies
discover_files >> [transfer_s3_files, transfer_sftp_files, transfer_s3_logs]
[transfer_s3_files, transfer_sftp_files, transfer_s3_logs] >> monitor_progress >> cleanup_sources

Batch Transfer Operations

from airflow import DAG
from airflow.providers.microsoft.azure.transfers.s3_to_wasb import S3ToAzureBlobStorageOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def create_dynamic_transfer_tasks(**context):
    """Create transfer tasks dynamically based on available files."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook
    from airflow.models import TaskInstance
    
    s3_hook = S3Hook(aws_conn_id='aws_conn')
    
    # List all files in the source bucket for the current date
    source_prefix = f"batch-export/{context['ds']}/"
    files = s3_hook.list_keys(
        bucket_name='batch-data-source',
        prefix=source_prefix,
        delimiter=''
    )
    
    if not files:
        print("No files found for transfer")
        return []
    
    # Filter files by type and size
    transfer_jobs = []
    
    for file_key in files:
        file_info = s3_hook.get_key(file_key, bucket_name='batch-data-source')
        file_size = file_info.size if file_info else 0
        file_name = file_key.split('/')[-1]
        
        # Skip very large files (>1GB) or very small files (<1KB)
        if file_size > 1024**3 or file_size < 1024:
            print(f"Skipping {file_name}: size {file_size} bytes")
            continue
        
        # Determine target container based on file type
        file_extension = file_name.split('.')[-1].lower()
        container_mapping = {
            'csv': 'structured-data',
            'json': 'json-data',
            'xml': 'xml-data',
            'txt': 'text-data',
            'parquet': 'columnar-data'
        }
        
        target_container = container_mapping.get(file_extension, 'unclassified-data')
        
        transfer_job = {
            'source_key': file_key,
            'target_container': target_container,
            'target_blob': f"{context['ds']}/{file_name}",
            'file_size': file_size,
            'file_type': file_extension
        }
        
        transfer_jobs.append(transfer_job)
    
    print(f"Created {len(transfer_jobs)} transfer jobs")
    return transfer_jobs

def execute_batch_transfers(**context):
    """Execute batch transfers with error handling and retries."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook
    from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
    
    transfer_jobs = context['task_instance'].xcom_pull(task_ids='create_transfer_jobs')
    
    if not transfer_jobs:
        print("No transfer jobs to execute")
        return {'completed': 0, 'failed': 0, 'skipped': 0}
    
    s3_hook = S3Hook(aws_conn_id='aws_conn')
    wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')
    
    results = {
        'completed': 0,
        'failed': 0,
        'skipped': 0,
        'transfer_details': []
    }
    
    for job in transfer_jobs:
        try:
            source_key = job['source_key']
            target_container = job['target_container']
            target_blob = job['target_blob']
            
            print(f"Transferring {source_key} -> {target_container}/{target_blob}")
            
            # Check if target already exists
            if wasb_hook.check_for_blob(target_container, target_blob):
                print(f"Target blob already exists, skipping: {target_blob}")
                results['skipped'] += 1
                continue
            
            # Create container if it doesn't exist
            try:
                wasb_hook.create_container(target_container)
            except Exception:
                pass  # Container might already exist
            
            # Download from S3
            s3_object = s3_hook.get_key(source_key, bucket_name='batch-data-source')
            file_content = s3_object.get()['Body'].read()
            
            # Upload to Azure Blob Storage
            wasb_hook.load_bytes(
                data=file_content,
                container_name=target_container,
                blob_name=target_blob,
                overwrite=True
            )
            
            # Set metadata
            wasb_hook.get_blob_client(
                container=target_container,
                blob=target_blob
            ).set_blob_metadata({
                'source': 's3',
                'source_bucket': 'batch-data-source',
                'source_key': source_key,
                'transfer_date': context['ds'],
                'file_size': str(job['file_size']),
                'file_type': job['file_type']
            })
            
            results['completed'] += 1
            results['transfer_details'].append({
                'source': source_key,
                'target': f"{target_container}/{target_blob}",
                'status': 'completed',
                'size_bytes': job['file_size']
            })
            
            print(f"Successfully transferred: {source_key}")
            
        except Exception as e:
            print(f"Failed to transfer {job['source_key']}: {e}")
            results['failed'] += 1
            results['transfer_details'].append({
                'source': job['source_key'],
                'target': f"{job['target_container']}/{job['target_blob']}",
                'status': 'failed',
                'error': str(e)
            })
    
    print(f"Batch transfer completed: {results['completed']} successful, "
          f"{results['failed']} failed, {results['skipped']} skipped")
    
    return results

dag = DAG(
    'batch_transfer_workflow',
    default_args={
        'owner': 'batch-processing-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    description='Batch transfer workflow for multiple files',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Create dynamic transfer jobs
create_jobs = PythonOperator(
    task_id='create_transfer_jobs',
    python_callable=create_dynamic_transfer_tasks,
    dag=dag
)

# Execute batch transfers
execute_transfers = PythonOperator(
    task_id='execute_batch_transfers',
    python_callable=execute_batch_transfers,
    dag=dag
)

create_jobs >> execute_transfers

Error Handling and Best Practices

Transfer Operation Error Handling

from airflow.providers.microsoft.azure.transfers.s3_to_wasb import (
    S3ToAzureBlobStorageOperator,
    TooManyFilesToMoveException,
    InvalidAzureBlobParameters,
    InvalidKeyComponents
)
from airflow.exceptions import AirflowException

def robust_transfer_with_error_handling():
    """Demonstrate comprehensive error handling for transfer operations."""
    
    try:
        # Example of handling specific transfer exceptions
        operator = S3ToAzureBlobStorageOperator(
            task_id='safe_transfer',
            s3_source_key='large-dataset/data.csv',
            container_name='target-container',
            blob_name='processed-data.csv',
            aws_conn_id='aws_conn',
            wasb_conn_id='azure_conn'
        )
        
        # This would be called by Airflow's execution engine
        # result = operator.execute(context)
        
    except TooManyFilesToMoveException as e:
        print(f"Too many files in transfer operation: {e}")
        # Implement chunking or batch processing
        
    except InvalidAzureBlobParameters as e:
        print(f"Invalid blob parameters: {e}")
        # Validate and correct blob parameters
        
    except InvalidKeyComponents as e:
        print(f"Invalid key components: {e}")
        # Validate and correct file path components
        
    except Exception as e:
        print(f"Unexpected transfer error: {e}")
        raise AirflowException(f"Transfer failed: {e}")

def implement_transfer_validation():
    """Implement validation patterns for transfer operations."""
    
    def validate_source_file(source_path: str, min_size: int = 1024) -> bool:
        """Validate source file before transfer."""
        import os
        
        if not os.path.exists(source_path):
            raise FileNotFoundError(f"Source file not found: {source_path}")
        
        file_size = os.path.getsize(source_path)
        if file_size < min_size:
            raise ValueError(f"File too small: {file_size} bytes < {min_size} bytes")
        
        return True
    
    def validate_target_parameters(container_name: str, blob_name: str) -> bool:
        """Validate target parameters."""
        if not container_name or len(container_name) < 3:
            raise InvalidAzureBlobParameters("Container name must be at least 3 characters")
        
        if not blob_name or blob_name.startswith('/'):
            raise InvalidKeyComponents("Blob name cannot start with '/'")
        
        return True
    
    def validate_transfer_result(source_size: int, target_size: int, tolerance: float = 0.01) -> bool:
        """Validate transfer result by comparing sizes."""
        if abs(source_size - target_size) > (source_size * tolerance):
            raise ValueError(f"Size mismatch: source={source_size}, target={target_size}")
        
        return True
    
    return {
        'validate_source': validate_source_file,
        'validate_target': validate_target_parameters,
        'validate_result': validate_transfer_result
    }

def implement_transfer_monitoring():
    """Implement monitoring for transfer operations."""
    
    class TransferMonitor:
        def __init__(self):
            self.transfer_stats = {
                'start_time': None,
                'end_time': None,
                'bytes_transferred': 0,
                'transfer_rate_mbps': 0,
                'status': 'pending'
            }
        
        def start_transfer(self):
            """Mark transfer start time."""
            import time
            self.transfer_stats['start_time'] = time.time()
            self.transfer_stats['status'] = 'in_progress'
        
        def update_progress(self, bytes_transferred: int):
            """Update transfer progress."""
            self.transfer_stats['bytes_transferred'] = bytes_transferred
            
            if self.transfer_stats['start_time']:
                import time
                elapsed_time = time.time() - self.transfer_stats['start_time']
                if elapsed_time > 0:
                    rate_bps = bytes_transferred / elapsed_time
                    self.transfer_stats['transfer_rate_mbps'] = rate_bps / (1024 * 1024)
        
        def complete_transfer(self):
            """Mark transfer completion."""
            import time
            self.transfer_stats['end_time'] = time.time()
            self.transfer_stats['status'] = 'completed'
            
            if self.transfer_stats['start_time']:
                total_time = self.transfer_stats['end_time'] - self.transfer_stats['start_time']
                print(f"Transfer completed in {total_time:.2f} seconds")
                print(f"Average rate: {self.transfer_stats['transfer_rate_mbps']:.2f} MB/s")
        
        def get_stats(self):
            """Get transfer statistics."""
            return self.transfer_stats.copy()
    
    return TransferMonitor

Performance Optimization

Optimizing Transfer Operations

def optimize_large_file_transfers():
    """Optimize transfers for large files."""
    
    # Configuration for large file transfers
    large_file_config = {
        'chunk_size': 64 * 1024 * 1024,  # 64MB chunks
        'max_connections': 10,            # Parallel connections
        'timeout': 300,                   # 5 minute timeout per chunk
        'retry_attempts': 3,              # Retry failed chunks
        'use_compression': True           # Compress during transfer
    }
    
    # Configuration for small file batch transfers
    batch_config = {
        'batch_size': 100,                # Files per batch
        'parallel_batches': 5,            # Concurrent batches
        'batch_timeout': 600,             # 10 minute timeout per batch
        'skip_existing': True             # Skip existing files
    }
    
    return {
        'large_files': large_file_config,
        'batch_processing': batch_config
    }

def implement_transfer_caching():
    """Implement caching for frequently transferred files."""
    
    class TransferCache:
        def __init__(self):
            self.cache = {}
            self.cache_ttl = 3600  # 1 hour TTL
        
        def get_cached_transfer(self, source_path: str) -> dict | None:
            """Get cached transfer information."""
            import time
            
            if source_path in self.cache:
                cache_entry = self.cache[source_path]
                if time.time() - cache_entry['timestamp'] < self.cache_ttl:
                    return cache_entry['data']
                else:
                    del self.cache[source_path]
            
            return None
        
        def cache_transfer_result(self, source_path: str, result: dict):
            """Cache transfer result."""
            import time
            
            self.cache[source_path] = {
                'timestamp': time.time(),
                'data': result
            }
        
        def clear_cache(self):
            """Clear transfer cache."""
            self.cache.clear()
    
    return TransferCache

def implement_parallel_transfers():
    """Implement parallel transfer processing."""
    
    import concurrent.futures
    import threading
    
    class ParallelTransferManager:
        def __init__(self, max_workers: int = 5):
            self.max_workers = max_workers
            self.transfer_results = {}
            self.lock = threading.Lock()
        
        def transfer_file(self, transfer_config: dict) -> dict:
            """Transfer a single file."""
            try:
                # Simulate file transfer logic
                source = transfer_config['source']
                target = transfer_config['target']
                
                # Actual transfer implementation would go here
                result = {
                    'source': source,
                    'target': target,
                    'status': 'success',
                    'size_bytes': transfer_config.get('size', 0)
                }
                
                with self.lock:
                    self.transfer_results[source] = result
                
                return result
                
            except Exception as e:
                error_result = {
                    'source': transfer_config['source'],
                    'status': 'failed',
                    'error': str(e)
                }
                
                with self.lock:
                    self.transfer_results[transfer_config['source']] = error_result
                
                return error_result
        
        def execute_parallel_transfers(self, transfer_configs: list[dict]) -> dict:
            """Execute multiple transfers in parallel."""
            
            with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                # Submit all transfer tasks
                future_to_config = {
                    executor.submit(self.transfer_file, config): config
                    for config in transfer_configs
                }
                
                # Wait for completion
                for future in concurrent.futures.as_completed(future_to_config):
                    config = future_to_config[future]
                    try:
                        result = future.result()
                        print(f"Completed: {config['source']} -> {result['status']}")
                    except Exception as e:
                        print(f"Failed: {config['source']} -> {e}")
            
            # Return summary
            successful = sum(1 for r in self.transfer_results.values() if r['status'] == 'success')
            failed = len(self.transfer_results) - successful
            
            return {
                'total': len(transfer_configs),
                'successful': successful,
                'failed': failed,
                'results': self.transfer_results
            }
    
    return ParallelTransferManager

This comprehensive documentation covers all data transfer capabilities in the Apache Airflow Microsoft Azure Provider, including local-to-Azure transfers, database-to-Azure transfers, cloud-to-cloud transfers, error handling patterns, and performance optimization techniques.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json