Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Specialized operators for transferring data between Azure services and external systems including local filesystem, SFTP servers, Oracle databases, and AWS S3. Provides comprehensive data movement capabilities with error handling and performance optimization.
Transfer data from local filesystem to Azure services with comprehensive file handling and upload capabilities.
class LocalFilesystemToADLSOperator(BaseOperator):
"""
Transfers files from local filesystem to Azure Data Lake Storage.
Supports uploading local files to both ADLS Gen1 and Gen2 with
configurable options for overwrite, directory creation, and metadata.
"""
def __init__(
self,
*,
local_path: str,
remote_path: str,
azure_data_lake_conn_id: str = "azure_data_lake_default",
overwrite: bool = True,
nthreads: int = 64,
buffersize: int = 4194304,
blocksize: int = 4194304,
**kwargs
):
"""
Initialize local to ADLS transfer operator.
Args:
local_path (str): Path to local file or directory
remote_path (str): Target path in Azure Data Lake Storage
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
overwrite (bool): Whether to overwrite existing files (default: True)
nthreads (int): Number of threads for upload (default: 64)
buffersize (int): Buffer size for upload (default: 4194304)
blocksize (int): Block size for upload (default: 4194304)
"""
def execute(self, context: Context) -> dict[str, Any]:
"""
Execute file transfer from local filesystem to ADLS.
Args:
context (Context): Airflow task context
Returns:
dict[str, Any]: Transfer results including file count and sizes
"""
class LocalFilesystemToWasbOperator(BaseOperator):
"""
Transfers files from local filesystem to Azure Blob Storage.
Supports uploading local files to Azure Blob Storage with configurable
options for container creation, overwrite behavior, and metadata.
"""
def __init__(
self,
*,
file_path: str,
container_name: str,
blob_name: str,
azure_conn_id: str = "wasb_default",
create_container: bool = False,
overwrite: bool = True,
content_settings: dict[str, Any] | None = None,
metadata: dict[str, str] | None = None,
**kwargs
):
"""
Initialize local to Azure Blob Storage transfer operator.
Args:
file_path (str): Path to local file
container_name (str): Target container name in Azure Blob Storage
blob_name (str): Target blob name
azure_conn_id (str): Airflow connection ID for Azure Blob Storage
create_container (bool): Whether to create container if it doesn't exist
overwrite (bool): Whether to overwrite existing blob (default: True)
content_settings (dict[str, Any] | None): Blob content settings
metadata (dict[str, str] | None): Blob metadata
"""
def execute(self, context: Context) -> str:
"""
Execute file transfer from local filesystem to Azure Blob Storage.
Args:
context (Context): Airflow task context
Returns:
str: Blob URL of uploaded file
"""Transfer data from database systems to Azure services with query execution and data transformation capabilities.
class OracleToAzureDataLakeOperator(BaseOperator):
"""
Transfers data from Oracle database to Azure Data Lake Storage.
Executes Oracle queries and uploads results to ADLS with support for
various data formats, partitioning, and incremental transfers.
"""
def __init__(
self,
*,
filename: str,
azure_data_lake_conn_id: str,
oracle_conn_id: str,
sql: str,
sql_params: dict[str, Any] | None = None,
delimiter: str = "\t",
encoding: str = "utf-8",
quotechar: str = '"',
quoting: int = csv.QUOTE_MINIMAL,
**kwargs
):
"""
Initialize Oracle to Azure Data Lake transfer operator.
Args:
filename (str): Target filename in Azure Data Lake Storage
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
oracle_conn_id (str): Airflow connection ID for Oracle database
sql (str): SQL query to execute on Oracle database
sql_params (dict[str, Any] | None): Parameters for SQL query
delimiter (str): Field delimiter for output file (default: tab)
encoding (str): File encoding (default: "utf-8")
quotechar (str): Quote character for CSV (default: '"')
quoting (int): Quoting behavior (default: csv.QUOTE_MINIMAL)
"""
def execute(self, context: Context) -> str:
"""
Execute data transfer from Oracle to Azure Data Lake Storage.
Args:
context (Context): Airflow task context
Returns:
str: Path to uploaded file in ADLS
"""Transfer data between different cloud services with comprehensive protocol support and authentication handling.
class SFTPToWasbOperator(BaseOperator):
"""
Transfers files from SFTP server to Azure Blob Storage.
Downloads files from SFTP servers and uploads them to Azure Blob Storage
with support for directory traversal, file filtering, and batch processing.
"""
def __init__(
self,
*,
sftp_source_path: str,
container_name: str,
blob_name: str,
sftp_conn_id: str = "sftp_default",
wasb_conn_id: str = "wasb_default",
create_container: bool = False,
overwrite: bool = True,
move_object: bool = False,
**kwargs
):
"""
Initialize SFTP to Azure Blob Storage transfer operator.
Args:
sftp_source_path (str): Path to source file on SFTP server
container_name (str): Target container name in Azure Blob Storage
blob_name (str): Target blob name
sftp_conn_id (str): Airflow connection ID for SFTP server
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
create_container (bool): Whether to create container if it doesn't exist
overwrite (bool): Whether to overwrite existing blob (default: True)
move_object (bool): Whether to delete source file after transfer
"""
def execute(self, context: Context) -> str:
"""
Execute file transfer from SFTP to Azure Blob Storage.
Args:
context (Context): Airflow task context
Returns:
str: Blob URL of transferred file
"""
class S3ToAzureBlobStorageOperator(BaseOperator):
"""
Transfers objects from AWS S3 to Azure Blob Storage.
Downloads objects from AWS S3 and uploads them to Azure Blob Storage
with support for large files, batch processing, and metadata preservation.
"""
def __init__(
self,
*,
s3_source_key: str,
container_name: str,
blob_name: str,
s3_bucket: str | None = None,
aws_conn_id: str = "aws_default",
wasb_conn_id: str = "wasb_default",
create_container: bool = False,
overwrite: bool = True,
s3_verify: bool | str | None = None,
s3_extra_args: dict[str, Any] | None = None,
wasb_extra_args: dict[str, Any] | None = None,
**kwargs
):
"""
Initialize AWS S3 to Azure Blob Storage transfer operator.
Args:
s3_source_key (str): Source object key in AWS S3
container_name (str): Target container name in Azure Blob Storage
blob_name (str): Target blob name
s3_bucket (str | None): Source S3 bucket name
aws_conn_id (str): Airflow connection ID for AWS S3
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
create_container (bool): Whether to create container if it doesn't exist
overwrite (bool): Whether to overwrite existing blob (default: True)
s3_verify (bool | str | None): S3 SSL verification configuration
s3_extra_args (dict[str, Any] | None): Additional S3 arguments
wasb_extra_args (dict[str, Any] | None): Additional WASB arguments
"""
def execute(self, context: Context) -> str:
"""
Execute object transfer from AWS S3 to Azure Blob Storage.
Args:
context (Context): Airflow task context
Returns:
str: Blob URL of transferred object
"""Custom exception classes for handling transfer operation errors and edge cases.
class TooManyFilesToMoveException(Exception):
"""
Exception for bulk transfer limits.
Raised when transfer operations exceed configured limits for
batch processing or concurrent file transfers.
"""
pass
class InvalidAzureBlobParameters(Exception):
"""
Exception for invalid blob parameters.
Raised when blob storage parameters are invalid or
incompatible with the operation being performed.
"""
pass
class InvalidKeyComponents(Exception):
"""
Exception for invalid key components.
Raised when file path or key components are invalid
for the target storage system.
"""
passfrom airflow import DAG
from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator
from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os
def prepare_local_files():
"""Prepare local files for transfer to Azure."""
# Create sample data files
data_dir = '/tmp/data_export'
os.makedirs(data_dir, exist_ok=True)
# Generate sample CSV file
import csv
csv_file = os.path.join(data_dir, 'sales_data.csv')
with open(csv_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['date', 'product', 'sales', 'region'])
for i in range(1000):
writer.writerow([
f'2024-01-{(i % 31) + 1:02d}',
f'Product_{i % 10}',
f'{1000 + (i * 10)}',
f'Region_{i % 5}'
])
# Generate sample JSON file
import json
json_file = os.path.join(data_dir, 'customer_data.json')
customer_data = {
'customers': [
{'id': i, 'name': f'Customer_{i}', 'email': f'customer{i}@example.com'}
for i in range(100)
]
}
with open(json_file, 'w') as f:
json.dump(customer_data, f, indent=2)
return {
'csv_file': csv_file,
'json_file': json_file,
'data_dir': data_dir
}
def verify_transfers(**context):
"""Verify that files were transferred successfully."""
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')
# Check if files exist in blob storage
files_to_check = [
('data-exports', 'sales/sales_data.csv'),
('data-exports', 'customers/customer_data.json')
]
results = {}
for container, blob_name in files_to_check:
exists = wasb_hook.check_for_blob(container, blob_name)
results[blob_name] = exists
print(f"File {blob_name}: {'✓ Found' if exists else '✗ Not found'}")
if all(results.values()):
print("All files transferred successfully!")
else:
raise ValueError("Some files were not transferred successfully")
return results
dag = DAG(
'local_to_azure_transfers',
default_args={
'owner': 'data-transfer-team',
'retries': 2,
'retry_delay': timedelta(minutes=5)
},
description='Transfer local files to Azure services',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Prepare local files
prep_files = PythonOperator(
task_id='prepare_files',
python_callable=prepare_local_files,
dag=dag
)
# Transfer CSV to Azure Blob Storage
transfer_csv = LocalFilesystemToWasbOperator(
task_id='transfer_csv_to_blob',
file_path='/tmp/data_export/sales_data.csv',
container_name='data-exports',
blob_name='sales/sales_data.csv',
azure_conn_id='azure_blob_conn',
create_container=True,
overwrite=True,
metadata={
'source': 'local_filesystem',
'export_date': '{{ ds }}',
'file_type': 'csv'
},
dag=dag
)
# Transfer JSON to Azure Data Lake Storage
transfer_json = LocalFilesystemToADLSOperator(
task_id='transfer_json_to_adls',
local_path='/tmp/data_export/customer_data.json',
remote_path='/exports/customers/customer_data_{{ ds_nodash }}.json',
azure_data_lake_conn_id='adls_conn',
overwrite=True,
dag=dag
)
# Verify transfers
verify_files = PythonOperator(
task_id='verify_transfers',
python_callable=verify_transfers,
dag=dag
)
prep_files >> [transfer_csv, transfer_json] >> verify_filesfrom airflow import DAG
from airflow.providers.microsoft.azure.transfers.oracle_to_azure_data_lake import OracleToAzureDataLakeOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def prepare_oracle_queries():
"""Prepare Oracle queries for data extraction."""
queries = {
'sales_summary': """
SELECT
TO_CHAR(order_date, 'YYYY-MM-DD') as order_date,
product_category,
COUNT(*) as order_count,
SUM(order_total) as total_sales,
AVG(order_total) as avg_order_value
FROM sales_orders
WHERE order_date >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
AND order_date < TO_DATE('{{ next_ds }}', 'YYYY-MM-DD')
GROUP BY TO_CHAR(order_date, 'YYYY-MM-DD'), product_category
ORDER BY order_date, product_category
""",
'customer_activity': """
SELECT
c.customer_id,
c.customer_name,
c.email,
COUNT(o.order_id) as order_count,
SUM(o.order_total) as total_spent,
MAX(o.order_date) as last_order_date
FROM customers c
LEFT JOIN sales_orders o ON c.customer_id = o.customer_id
WHERE c.created_date <= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
GROUP BY c.customer_id, c.customer_name, c.email
HAVING COUNT(o.order_id) > 0
ORDER BY total_spent DESC
""",
'inventory_status': """
SELECT
p.product_id,
p.product_name,
p.category,
i.current_stock,
i.reserved_stock,
i.available_stock,
TO_CHAR(i.last_updated, 'YYYY-MM-DD HH24:MI:SS') as last_updated
FROM products p
JOIN inventory i ON p.product_id = i.product_id
WHERE i.last_updated >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
ORDER BY p.category, p.product_name
"""
}
return queries
def validate_extracted_data(**context):
"""Validate extracted data in Azure Data Lake Storage."""
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
adls_hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
# Check extracted files
extracted_files = [
f'/oracle_exports/{{ ds }}/sales_summary.tsv',
f'/oracle_exports/{{ ds }}/customer_activity.tsv',
f'/oracle_exports/{{ ds }}/inventory_status.tsv'
]
validation_results = {}
for file_path in extracted_files:
rendered_path = file_path.replace('{{ ds }}', context['ds'])
if adls_hook.check_for_file(rendered_path):
# Get file size and row count
file_content = adls_hook.get_conn().cat(rendered_path)
row_count = len(file_content.decode('utf-8').split('\n')) - 1 # Subtract header
file_size = len(file_content)
validation_results[rendered_path] = {
'exists': True,
'size_bytes': file_size,
'row_count': row_count
}
print(f"✓ {rendered_path}: {row_count} rows, {file_size} bytes")
else:
validation_results[rendered_path] = {
'exists': False,
'size_bytes': 0,
'row_count': 0
}
print(f"✗ {rendered_path}: File not found")
# Validate minimum data requirements
min_requirements = {
'sales_summary.tsv': 1, # At least 1 row
'customer_activity.tsv': 10, # At least 10 customers
'inventory_status.tsv': 50 # At least 50 products
}
validation_passed = True
for file_path, result in validation_results.items():
file_name = file_path.split('/')[-1]
min_rows = min_requirements.get(file_name, 0)
if not result['exists'] or result['row_count'] < min_rows:
validation_passed = False
print(f"Validation failed for {file_name}: Expected >= {min_rows} rows, got {result['row_count']}")
if not validation_passed:
raise ValueError("Data validation failed")
return validation_results
dag = DAG(
'oracle_to_azure_transfer',
default_args={
'owner': 'data-engineering-team',
'retries': 3,
'retry_delay': timedelta(minutes=10)
},
description='Extract data from Oracle to Azure Data Lake Storage',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Prepare queries
prep_queries = PythonOperator(
task_id='prepare_queries',
python_callable=prepare_oracle_queries,
dag=dag
)
# Extract sales summary data
extract_sales = OracleToAzureDataLakeOperator(
task_id='extract_sales_summary',
filename='/oracle_exports/{{ ds }}/sales_summary.tsv',
azure_data_lake_conn_id='adls_conn',
oracle_conn_id='oracle_conn',
sql="""
SELECT
TO_CHAR(order_date, 'YYYY-MM-DD') as order_date,
product_category,
COUNT(*) as order_count,
SUM(order_total) as total_sales,
AVG(order_total) as avg_order_value
FROM sales_orders
WHERE order_date >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
AND order_date < TO_DATE('{{ next_ds }}', 'YYYY-MM-DD')
GROUP BY TO_CHAR(order_date, 'YYYY-MM-DD'), product_category
ORDER BY order_date, product_category
""",
delimiter='\t',
encoding='utf-8',
dag=dag
)
# Extract customer activity data
extract_customers = OracleToAzureDataLakeOperator(
task_id='extract_customer_activity',
filename='/oracle_exports/{{ ds }}/customer_activity.tsv',
azure_data_lake_conn_id='adls_conn',
oracle_conn_id='oracle_conn',
sql="""
SELECT
c.customer_id,
c.customer_name,
c.email,
COUNT(o.order_id) as order_count,
SUM(o.order_total) as total_spent,
MAX(o.order_date) as last_order_date
FROM customers c
LEFT JOIN sales_orders o ON c.customer_id = o.customer_id
WHERE c.created_date <= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
GROUP BY c.customer_id, c.customer_name, c.email
HAVING COUNT(o.order_id) > 0
ORDER BY total_spent DESC
""",
delimiter='\t',
encoding='utf-8',
dag=dag
)
# Extract inventory data
extract_inventory = OracleToAzureDataLakeOperator(
task_id='extract_inventory_status',
filename='/oracle_exports/{{ ds }}/inventory_status.tsv',
azure_data_lake_conn_id='adls_conn',
oracle_conn_id='oracle_conn',
sql="""
SELECT
p.product_id,
p.product_name,
p.category,
i.current_stock,
i.reserved_stock,
i.available_stock,
TO_CHAR(i.last_updated, 'YYYY-MM-DD HH24:MI:SS') as last_updated
FROM products p
JOIN inventory i ON p.product_id = i.product_id
WHERE i.last_updated >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
ORDER BY p.category, p.product_name
""",
delimiter='\t',
encoding='utf-8',
dag=dag
)
# Validate extracted data
validate_data = PythonOperator(
task_id='validate_data',
python_callable=validate_extracted_data,
dag=dag
)
prep_queries >> [extract_sales, extract_customers, extract_inventory] >> validate_datafrom airflow import DAG
from airflow.providers.microsoft.azure.transfers.s3_to_wasb import S3ToAzureBlobStorageOperator
from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def discover_source_files():
"""Discover files available for transfer from various sources."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.sftp.hooks.sftp import SFTPHook
# Discover S3 files
s3_hook = S3Hook(aws_conn_id='aws_conn')
s3_files = s3_hook.list_keys(
bucket_name='source-data-bucket',
prefix='daily-exports/{{ ds }}/',
delimiter=''
)
# Discover SFTP files
sftp_hook = SFTPHook(ssh_conn_id='sftp_conn')
sftp_files = sftp_hook.list_directory('/exports/{{ ds }}/')
return {
's3_files': s3_files or [],
'sftp_files': sftp_files or []
}
def monitor_transfer_progress(**context):
"""Monitor transfer progress and generate summary."""
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')
# Check transferred files
containers_to_check = ['s3-transfers', 'sftp-transfers']
transfer_summary = {}
for container in containers_to_check:
try:
blobs = wasb_hook.get_container_client(container).list_blobs(
name_starts_with=f"{{ ds }}/"
)
blob_list = []
total_size = 0
for blob in blobs:
blob_info = {
'name': blob.name,
'size': blob.size,
'last_modified': blob.last_modified.isoformat() if blob.last_modified else None
}
blob_list.append(blob_info)
total_size += blob.size or 0
transfer_summary[container] = {
'file_count': len(blob_list),
'total_size_bytes': total_size,
'files': blob_list
}
print(f"Container {container}: {len(blob_list)} files, {total_size:,} bytes")
except Exception as e:
print(f"Error checking container {container}: {e}")
transfer_summary[container] = {
'error': str(e),
'file_count': 0,
'total_size_bytes': 0
}
return transfer_summary
def cleanup_source_files(**context):
"""Clean up source files after successful transfer (optional)."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# Only cleanup if all transfers were successful
transfer_summary = context['task_instance'].xcom_pull(task_ids='monitor_progress')
total_files_transferred = sum(
container_info.get('file_count', 0)
for container_info in transfer_summary.values()
)
if total_files_transferred > 0:
print(f"Successfully transferred {total_files_transferred} files")
# Optional: Delete source files from S3 after successful transfer
# Uncomment the following lines if cleanup is desired
"""
s3_hook = S3Hook(aws_conn_id='aws_conn')
source_files = context['task_instance'].xcom_pull(task_ids='discover_files')['s3_files']
for s3_key in source_files:
try:
s3_hook.delete_objects(
bucket='source-data-bucket',
keys=[s3_key]
)
print(f"Deleted source file: {s3_key}")
except Exception as e:
print(f"Failed to delete {s3_key}: {e}")
"""
return total_files_transferred
dag = DAG(
'cloud_to_azure_transfers',
default_args={
'owner': 'integration-team',
'retries': 2,
'retry_delay': timedelta(minutes=5)
},
description='Transfer files from various cloud sources to Azure',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Discover source files
discover_files = PythonOperator(
task_id='discover_files',
python_callable=discover_source_files,
dag=dag
)
# Transfer from AWS S3 to Azure Blob Storage
transfer_s3_files = S3ToAzureBlobStorageOperator(
task_id='transfer_s3_data',
s3_source_key='daily-exports/{{ ds }}/sales_data.csv',
s3_bucket='source-data-bucket',
container_name='s3-transfers',
blob_name='{{ ds }}/sales_data.csv',
aws_conn_id='aws_conn',
wasb_conn_id='azure_blob_conn',
create_container=True,
overwrite=True,
s3_extra_args={
'ServerSideEncryption': 'AES256'
},
wasb_extra_args={
'content_settings': {
'content_type': 'text/csv',
'cache_control': 'no-cache'
},
'metadata': {
'source': 's3',
'transfer_date': '{{ ds }}',
'original_bucket': 'source-data-bucket'
}
},
dag=dag
)
# Transfer from SFTP to Azure Blob Storage
transfer_sftp_files = SFTPToWasbOperator(
task_id='transfer_sftp_data',
sftp_source_path='/exports/{{ ds }}/inventory_update.json',
container_name='sftp-transfers',
blob_name='{{ ds }}/inventory_update.json',
sftp_conn_id='sftp_conn',
wasb_conn_id='azure_blob_conn',
create_container=True,
overwrite=True,
move_object=False, # Keep original file on SFTP server
dag=dag
)
# Transfer additional S3 files with pattern matching
transfer_s3_logs = S3ToAzureBlobStorageOperator(
task_id='transfer_s3_logs',
s3_source_key='logs/{{ ds }}/application.log',
s3_bucket='source-data-bucket',
container_name='s3-transfers',
blob_name='{{ ds }}/logs/application.log',
aws_conn_id='aws_conn',
wasb_conn_id='azure_blob_conn',
create_container=True,
overwrite=True,
dag=dag
)
# Monitor transfer progress
monitor_progress = PythonOperator(
task_id='monitor_progress',
python_callable=monitor_transfer_progress,
dag=dag
)
# Optional cleanup
cleanup_sources = PythonOperator(
task_id='cleanup_sources',
python_callable=cleanup_source_files,
dag=dag
)
# Set up dependencies
discover_files >> [transfer_s3_files, transfer_sftp_files, transfer_s3_logs]
[transfer_s3_files, transfer_sftp_files, transfer_s3_logs] >> monitor_progress >> cleanup_sourcesfrom airflow import DAG
from airflow.providers.microsoft.azure.transfers.s3_to_wasb import S3ToAzureBlobStorageOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def create_dynamic_transfer_tasks(**context):
"""Create transfer tasks dynamically based on available files."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.models import TaskInstance
s3_hook = S3Hook(aws_conn_id='aws_conn')
# List all files in the source bucket for the current date
source_prefix = f"batch-export/{context['ds']}/"
files = s3_hook.list_keys(
bucket_name='batch-data-source',
prefix=source_prefix,
delimiter=''
)
if not files:
print("No files found for transfer")
return []
# Filter files by type and size
transfer_jobs = []
for file_key in files:
file_info = s3_hook.get_key(file_key, bucket_name='batch-data-source')
file_size = file_info.size if file_info else 0
file_name = file_key.split('/')[-1]
# Skip very large files (>1GB) or very small files (<1KB)
if file_size > 1024**3 or file_size < 1024:
print(f"Skipping {file_name}: size {file_size} bytes")
continue
# Determine target container based on file type
file_extension = file_name.split('.')[-1].lower()
container_mapping = {
'csv': 'structured-data',
'json': 'json-data',
'xml': 'xml-data',
'txt': 'text-data',
'parquet': 'columnar-data'
}
target_container = container_mapping.get(file_extension, 'unclassified-data')
transfer_job = {
'source_key': file_key,
'target_container': target_container,
'target_blob': f"{context['ds']}/{file_name}",
'file_size': file_size,
'file_type': file_extension
}
transfer_jobs.append(transfer_job)
print(f"Created {len(transfer_jobs)} transfer jobs")
return transfer_jobs
def execute_batch_transfers(**context):
"""Execute batch transfers with error handling and retries."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
transfer_jobs = context['task_instance'].xcom_pull(task_ids='create_transfer_jobs')
if not transfer_jobs:
print("No transfer jobs to execute")
return {'completed': 0, 'failed': 0, 'skipped': 0}
s3_hook = S3Hook(aws_conn_id='aws_conn')
wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')
results = {
'completed': 0,
'failed': 0,
'skipped': 0,
'transfer_details': []
}
for job in transfer_jobs:
try:
source_key = job['source_key']
target_container = job['target_container']
target_blob = job['target_blob']
print(f"Transferring {source_key} -> {target_container}/{target_blob}")
# Check if target already exists
if wasb_hook.check_for_blob(target_container, target_blob):
print(f"Target blob already exists, skipping: {target_blob}")
results['skipped'] += 1
continue
# Create container if it doesn't exist
try:
wasb_hook.create_container(target_container)
except Exception:
pass # Container might already exist
# Download from S3
s3_object = s3_hook.get_key(source_key, bucket_name='batch-data-source')
file_content = s3_object.get()['Body'].read()
# Upload to Azure Blob Storage
wasb_hook.load_bytes(
data=file_content,
container_name=target_container,
blob_name=target_blob,
overwrite=True
)
# Set metadata
wasb_hook.get_blob_client(
container=target_container,
blob=target_blob
).set_blob_metadata({
'source': 's3',
'source_bucket': 'batch-data-source',
'source_key': source_key,
'transfer_date': context['ds'],
'file_size': str(job['file_size']),
'file_type': job['file_type']
})
results['completed'] += 1
results['transfer_details'].append({
'source': source_key,
'target': f"{target_container}/{target_blob}",
'status': 'completed',
'size_bytes': job['file_size']
})
print(f"Successfully transferred: {source_key}")
except Exception as e:
print(f"Failed to transfer {job['source_key']}: {e}")
results['failed'] += 1
results['transfer_details'].append({
'source': job['source_key'],
'target': f"{job['target_container']}/{job['target_blob']}",
'status': 'failed',
'error': str(e)
})
print(f"Batch transfer completed: {results['completed']} successful, "
f"{results['failed']} failed, {results['skipped']} skipped")
return results
dag = DAG(
'batch_transfer_workflow',
default_args={
'owner': 'batch-processing-team',
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='Batch transfer workflow for multiple files',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Create dynamic transfer jobs
create_jobs = PythonOperator(
task_id='create_transfer_jobs',
python_callable=create_dynamic_transfer_tasks,
dag=dag
)
# Execute batch transfers
execute_transfers = PythonOperator(
task_id='execute_batch_transfers',
python_callable=execute_batch_transfers,
dag=dag
)
create_jobs >> execute_transfersfrom airflow.providers.microsoft.azure.transfers.s3_to_wasb import (
S3ToAzureBlobStorageOperator,
TooManyFilesToMoveException,
InvalidAzureBlobParameters,
InvalidKeyComponents
)
from airflow.exceptions import AirflowException
def robust_transfer_with_error_handling():
"""Demonstrate comprehensive error handling for transfer operations."""
try:
# Example of handling specific transfer exceptions
operator = S3ToAzureBlobStorageOperator(
task_id='safe_transfer',
s3_source_key='large-dataset/data.csv',
container_name='target-container',
blob_name='processed-data.csv',
aws_conn_id='aws_conn',
wasb_conn_id='azure_conn'
)
# This would be called by Airflow's execution engine
# result = operator.execute(context)
except TooManyFilesToMoveException as e:
print(f"Too many files in transfer operation: {e}")
# Implement chunking or batch processing
except InvalidAzureBlobParameters as e:
print(f"Invalid blob parameters: {e}")
# Validate and correct blob parameters
except InvalidKeyComponents as e:
print(f"Invalid key components: {e}")
# Validate and correct file path components
except Exception as e:
print(f"Unexpected transfer error: {e}")
raise AirflowException(f"Transfer failed: {e}")
def implement_transfer_validation():
"""Implement validation patterns for transfer operations."""
def validate_source_file(source_path: str, min_size: int = 1024) -> bool:
"""Validate source file before transfer."""
import os
if not os.path.exists(source_path):
raise FileNotFoundError(f"Source file not found: {source_path}")
file_size = os.path.getsize(source_path)
if file_size < min_size:
raise ValueError(f"File too small: {file_size} bytes < {min_size} bytes")
return True
def validate_target_parameters(container_name: str, blob_name: str) -> bool:
"""Validate target parameters."""
if not container_name or len(container_name) < 3:
raise InvalidAzureBlobParameters("Container name must be at least 3 characters")
if not blob_name or blob_name.startswith('/'):
raise InvalidKeyComponents("Blob name cannot start with '/'")
return True
def validate_transfer_result(source_size: int, target_size: int, tolerance: float = 0.01) -> bool:
"""Validate transfer result by comparing sizes."""
if abs(source_size - target_size) > (source_size * tolerance):
raise ValueError(f"Size mismatch: source={source_size}, target={target_size}")
return True
return {
'validate_source': validate_source_file,
'validate_target': validate_target_parameters,
'validate_result': validate_transfer_result
}
def implement_transfer_monitoring():
"""Implement monitoring for transfer operations."""
class TransferMonitor:
def __init__(self):
self.transfer_stats = {
'start_time': None,
'end_time': None,
'bytes_transferred': 0,
'transfer_rate_mbps': 0,
'status': 'pending'
}
def start_transfer(self):
"""Mark transfer start time."""
import time
self.transfer_stats['start_time'] = time.time()
self.transfer_stats['status'] = 'in_progress'
def update_progress(self, bytes_transferred: int):
"""Update transfer progress."""
self.transfer_stats['bytes_transferred'] = bytes_transferred
if self.transfer_stats['start_time']:
import time
elapsed_time = time.time() - self.transfer_stats['start_time']
if elapsed_time > 0:
rate_bps = bytes_transferred / elapsed_time
self.transfer_stats['transfer_rate_mbps'] = rate_bps / (1024 * 1024)
def complete_transfer(self):
"""Mark transfer completion."""
import time
self.transfer_stats['end_time'] = time.time()
self.transfer_stats['status'] = 'completed'
if self.transfer_stats['start_time']:
total_time = self.transfer_stats['end_time'] - self.transfer_stats['start_time']
print(f"Transfer completed in {total_time:.2f} seconds")
print(f"Average rate: {self.transfer_stats['transfer_rate_mbps']:.2f} MB/s")
def get_stats(self):
"""Get transfer statistics."""
return self.transfer_stats.copy()
return TransferMonitordef optimize_large_file_transfers():
"""Optimize transfers for large files."""
# Configuration for large file transfers
large_file_config = {
'chunk_size': 64 * 1024 * 1024, # 64MB chunks
'max_connections': 10, # Parallel connections
'timeout': 300, # 5 minute timeout per chunk
'retry_attempts': 3, # Retry failed chunks
'use_compression': True # Compress during transfer
}
# Configuration for small file batch transfers
batch_config = {
'batch_size': 100, # Files per batch
'parallel_batches': 5, # Concurrent batches
'batch_timeout': 600, # 10 minute timeout per batch
'skip_existing': True # Skip existing files
}
return {
'large_files': large_file_config,
'batch_processing': batch_config
}
def implement_transfer_caching():
"""Implement caching for frequently transferred files."""
class TransferCache:
def __init__(self):
self.cache = {}
self.cache_ttl = 3600 # 1 hour TTL
def get_cached_transfer(self, source_path: str) -> dict | None:
"""Get cached transfer information."""
import time
if source_path in self.cache:
cache_entry = self.cache[source_path]
if time.time() - cache_entry['timestamp'] < self.cache_ttl:
return cache_entry['data']
else:
del self.cache[source_path]
return None
def cache_transfer_result(self, source_path: str, result: dict):
"""Cache transfer result."""
import time
self.cache[source_path] = {
'timestamp': time.time(),
'data': result
}
def clear_cache(self):
"""Clear transfer cache."""
self.cache.clear()
return TransferCache
def implement_parallel_transfers():
"""Implement parallel transfer processing."""
import concurrent.futures
import threading
class ParallelTransferManager:
def __init__(self, max_workers: int = 5):
self.max_workers = max_workers
self.transfer_results = {}
self.lock = threading.Lock()
def transfer_file(self, transfer_config: dict) -> dict:
"""Transfer a single file."""
try:
# Simulate file transfer logic
source = transfer_config['source']
target = transfer_config['target']
# Actual transfer implementation would go here
result = {
'source': source,
'target': target,
'status': 'success',
'size_bytes': transfer_config.get('size', 0)
}
with self.lock:
self.transfer_results[source] = result
return result
except Exception as e:
error_result = {
'source': transfer_config['source'],
'status': 'failed',
'error': str(e)
}
with self.lock:
self.transfer_results[transfer_config['source']] = error_result
return error_result
def execute_parallel_transfers(self, transfer_configs: list[dict]) -> dict:
"""Execute multiple transfers in parallel."""
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all transfer tasks
future_to_config = {
executor.submit(self.transfer_file, config): config
for config in transfer_configs
}
# Wait for completion
for future in concurrent.futures.as_completed(future_to_config):
config = future_to_config[future]
try:
result = future.result()
print(f"Completed: {config['source']} -> {result['status']}")
except Exception as e:
print(f"Failed: {config['source']} -> {e}")
# Return summary
successful = sum(1 for r in self.transfer_results.values() if r['status'] == 'success')
failed = len(self.transfer_results) - successful
return {
'total': len(transfer_configs),
'successful': successful,
'failed': failed,
'results': self.transfer_results
}
return ParallelTransferManagerThis comprehensive documentation covers all data transfer capabilities in the Apache Airflow Microsoft Azure Provider, including local-to-Azure transfers, database-to-Azure transfers, cloud-to-cloud transfers, error handling patterns, and performance optimization techniques.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure