0
# Data Transfer Operations
1
2
Specialized operators for transferring data between Azure services and external systems including local filesystem, SFTP servers, Oracle databases, and AWS S3. Provides comprehensive data movement capabilities with error handling and performance optimization.
3
4
## Capabilities
5
6
### Local to Azure Transfers
7
8
Transfer data from local filesystem to Azure services with comprehensive file handling and upload capabilities.
9
10
```python { .api }
11
class LocalFilesystemToADLSOperator(BaseOperator):
12
"""
13
Transfers files from local filesystem to Azure Data Lake Storage.
14
15
Supports uploading local files to both ADLS Gen1 and Gen2 with
16
configurable options for overwrite, directory creation, and metadata.
17
"""
18
19
def __init__(
20
self,
21
*,
22
local_path: str,
23
remote_path: str,
24
azure_data_lake_conn_id: str = "azure_data_lake_default",
25
overwrite: bool = True,
26
nthreads: int = 64,
27
buffersize: int = 4194304,
28
blocksize: int = 4194304,
29
**kwargs
30
):
31
"""
32
Initialize local to ADLS transfer operator.
33
34
Args:
35
local_path (str): Path to local file or directory
36
remote_path (str): Target path in Azure Data Lake Storage
37
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
38
overwrite (bool): Whether to overwrite existing files (default: True)
39
nthreads (int): Number of threads for upload (default: 64)
40
buffersize (int): Buffer size for upload (default: 4194304)
41
blocksize (int): Block size for upload (default: 4194304)
42
"""
43
44
def execute(self, context: Context) -> dict[str, Any]:
45
"""
46
Execute file transfer from local filesystem to ADLS.
47
48
Args:
49
context (Context): Airflow task context
50
51
Returns:
52
dict[str, Any]: Transfer results including file count and sizes
53
"""
54
55
class LocalFilesystemToWasbOperator(BaseOperator):
56
"""
57
Transfers files from local filesystem to Azure Blob Storage.
58
59
Supports uploading local files to Azure Blob Storage with configurable
60
options for container creation, overwrite behavior, and metadata.
61
"""
62
63
def __init__(
64
self,
65
*,
66
file_path: str,
67
container_name: str,
68
blob_name: str,
69
azure_conn_id: str = "wasb_default",
70
create_container: bool = False,
71
overwrite: bool = True,
72
content_settings: dict[str, Any] | None = None,
73
metadata: dict[str, str] | None = None,
74
**kwargs
75
):
76
"""
77
Initialize local to Azure Blob Storage transfer operator.
78
79
Args:
80
file_path (str): Path to local file
81
container_name (str): Target container name in Azure Blob Storage
82
blob_name (str): Target blob name
83
azure_conn_id (str): Airflow connection ID for Azure Blob Storage
84
create_container (bool): Whether to create container if it doesn't exist
85
overwrite (bool): Whether to overwrite existing blob (default: True)
86
content_settings (dict[str, Any] | None): Blob content settings
87
metadata (dict[str, str] | None): Blob metadata
88
"""
89
90
def execute(self, context: Context) -> str:
91
"""
92
Execute file transfer from local filesystem to Azure Blob Storage.
93
94
Args:
95
context (Context): Airflow task context
96
97
Returns:
98
str: Blob URL of uploaded file
99
"""
100
```
101
102
### Database to Azure Transfers
103
104
Transfer data from database systems to Azure services with query execution and data transformation capabilities.
105
106
```python { .api }
107
class OracleToAzureDataLakeOperator(BaseOperator):
108
"""
109
Transfers data from Oracle database to Azure Data Lake Storage.
110
111
Executes Oracle queries and uploads results to ADLS with support for
112
various data formats, partitioning, and incremental transfers.
113
"""
114
115
def __init__(
116
self,
117
*,
118
filename: str,
119
azure_data_lake_conn_id: str,
120
oracle_conn_id: str,
121
sql: str,
122
sql_params: dict[str, Any] | None = None,
123
delimiter: str = "\t",
124
encoding: str = "utf-8",
125
quotechar: str = '"',
126
quoting: int = csv.QUOTE_MINIMAL,
127
**kwargs
128
):
129
"""
130
Initialize Oracle to Azure Data Lake transfer operator.
131
132
Args:
133
filename (str): Target filename in Azure Data Lake Storage
134
azure_data_lake_conn_id (str): Airflow connection ID for ADLS
135
oracle_conn_id (str): Airflow connection ID for Oracle database
136
sql (str): SQL query to execute on Oracle database
137
sql_params (dict[str, Any] | None): Parameters for SQL query
138
delimiter (str): Field delimiter for output file (default: tab)
139
encoding (str): File encoding (default: "utf-8")
140
quotechar (str): Quote character for CSV (default: '"')
141
quoting (int): Quoting behavior (default: csv.QUOTE_MINIMAL)
142
"""
143
144
def execute(self, context: Context) -> str:
145
"""
146
Execute data transfer from Oracle to Azure Data Lake Storage.
147
148
Args:
149
context (Context): Airflow task context
150
151
Returns:
152
str: Path to uploaded file in ADLS
153
"""
154
```
155
156
### Cloud-to-Cloud Transfers
157
158
Transfer data between different cloud services with comprehensive protocol support and authentication handling.
159
160
```python { .api }
161
class SFTPToWasbOperator(BaseOperator):
162
"""
163
Transfers files from SFTP server to Azure Blob Storage.
164
165
Downloads files from SFTP servers and uploads them to Azure Blob Storage
166
with support for directory traversal, file filtering, and batch processing.
167
"""
168
169
def __init__(
170
self,
171
*,
172
sftp_source_path: str,
173
container_name: str,
174
blob_name: str,
175
sftp_conn_id: str = "sftp_default",
176
wasb_conn_id: str = "wasb_default",
177
create_container: bool = False,
178
overwrite: bool = True,
179
move_object: bool = False,
180
**kwargs
181
):
182
"""
183
Initialize SFTP to Azure Blob Storage transfer operator.
184
185
Args:
186
sftp_source_path (str): Path to source file on SFTP server
187
container_name (str): Target container name in Azure Blob Storage
188
blob_name (str): Target blob name
189
sftp_conn_id (str): Airflow connection ID for SFTP server
190
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
191
create_container (bool): Whether to create container if it doesn't exist
192
overwrite (bool): Whether to overwrite existing blob (default: True)
193
move_object (bool): Whether to delete source file after transfer
194
"""
195
196
def execute(self, context: Context) -> str:
197
"""
198
Execute file transfer from SFTP to Azure Blob Storage.
199
200
Args:
201
context (Context): Airflow task context
202
203
Returns:
204
str: Blob URL of transferred file
205
"""
206
207
class S3ToAzureBlobStorageOperator(BaseOperator):
208
"""
209
Transfers objects from AWS S3 to Azure Blob Storage.
210
211
Downloads objects from AWS S3 and uploads them to Azure Blob Storage
212
with support for large files, batch processing, and metadata preservation.
213
"""
214
215
def __init__(
216
self,
217
*,
218
s3_source_key: str,
219
container_name: str,
220
blob_name: str,
221
s3_bucket: str | None = None,
222
aws_conn_id: str = "aws_default",
223
wasb_conn_id: str = "wasb_default",
224
create_container: bool = False,
225
overwrite: bool = True,
226
s3_verify: bool | str | None = None,
227
s3_extra_args: dict[str, Any] | None = None,
228
wasb_extra_args: dict[str, Any] | None = None,
229
**kwargs
230
):
231
"""
232
Initialize AWS S3 to Azure Blob Storage transfer operator.
233
234
Args:
235
s3_source_key (str): Source object key in AWS S3
236
container_name (str): Target container name in Azure Blob Storage
237
blob_name (str): Target blob name
238
s3_bucket (str | None): Source S3 bucket name
239
aws_conn_id (str): Airflow connection ID for AWS S3
240
wasb_conn_id (str): Airflow connection ID for Azure Blob Storage
241
create_container (bool): Whether to create container if it doesn't exist
242
overwrite (bool): Whether to overwrite existing blob (default: True)
243
s3_verify (bool | str | None): S3 SSL verification configuration
244
s3_extra_args (dict[str, Any] | None): Additional S3 arguments
245
wasb_extra_args (dict[str, Any] | None): Additional WASB arguments
246
"""
247
248
def execute(self, context: Context) -> str:
249
"""
250
Execute object transfer from AWS S3 to Azure Blob Storage.
251
252
Args:
253
context (Context): Airflow task context
254
255
Returns:
256
str: Blob URL of transferred object
257
"""
258
```
259
260
## Supporting Exception Classes
261
262
Custom exception classes for handling transfer operation errors and edge cases.
263
264
```python { .api }
265
class TooManyFilesToMoveException(Exception):
266
"""
267
Exception for bulk transfer limits.
268
269
Raised when transfer operations exceed configured limits for
270
batch processing or concurrent file transfers.
271
"""
272
pass
273
274
class InvalidAzureBlobParameters(Exception):
275
"""
276
Exception for invalid blob parameters.
277
278
Raised when blob storage parameters are invalid or
279
incompatible with the operation being performed.
280
"""
281
pass
282
283
class InvalidKeyComponents(Exception):
284
"""
285
Exception for invalid key components.
286
287
Raised when file path or key components are invalid
288
for the target storage system.
289
"""
290
pass
291
```
292
293
## Usage Examples
294
295
### Local to Azure Transfers
296
297
```python
298
from airflow import DAG
299
from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator
300
from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator
301
from airflow.operators.python import PythonOperator
302
from datetime import datetime, timedelta
303
import os
304
305
def prepare_local_files():
306
"""Prepare local files for transfer to Azure."""
307
# Create sample data files
308
data_dir = '/tmp/data_export'
309
os.makedirs(data_dir, exist_ok=True)
310
311
# Generate sample CSV file
312
import csv
313
csv_file = os.path.join(data_dir, 'sales_data.csv')
314
with open(csv_file, 'w', newline='') as f:
315
writer = csv.writer(f)
316
writer.writerow(['date', 'product', 'sales', 'region'])
317
for i in range(1000):
318
writer.writerow([
319
f'2024-01-{(i % 31) + 1:02d}',
320
f'Product_{i % 10}',
321
f'{1000 + (i * 10)}',
322
f'Region_{i % 5}'
323
])
324
325
# Generate sample JSON file
326
import json
327
json_file = os.path.join(data_dir, 'customer_data.json')
328
customer_data = {
329
'customers': [
330
{'id': i, 'name': f'Customer_{i}', 'email': f'customer{i}@example.com'}
331
for i in range(100)
332
]
333
}
334
with open(json_file, 'w') as f:
335
json.dump(customer_data, f, indent=2)
336
337
return {
338
'csv_file': csv_file,
339
'json_file': json_file,
340
'data_dir': data_dir
341
}
342
343
def verify_transfers(**context):
344
"""Verify that files were transferred successfully."""
345
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
346
347
wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')
348
349
# Check if files exist in blob storage
350
files_to_check = [
351
('data-exports', 'sales/sales_data.csv'),
352
('data-exports', 'customers/customer_data.json')
353
]
354
355
results = {}
356
for container, blob_name in files_to_check:
357
exists = wasb_hook.check_for_blob(container, blob_name)
358
results[blob_name] = exists
359
print(f"File {blob_name}: {'✓ Found' if exists else '✗ Not found'}")
360
361
if all(results.values()):
362
print("All files transferred successfully!")
363
else:
364
raise ValueError("Some files were not transferred successfully")
365
366
return results
367
368
dag = DAG(
369
'local_to_azure_transfers',
370
default_args={
371
'owner': 'data-transfer-team',
372
'retries': 2,
373
'retry_delay': timedelta(minutes=5)
374
},
375
description='Transfer local files to Azure services',
376
schedule_interval=timedelta(days=1),
377
start_date=datetime(2024, 1, 1),
378
catchup=False
379
)
380
381
# Prepare local files
382
prep_files = PythonOperator(
383
task_id='prepare_files',
384
python_callable=prepare_local_files,
385
dag=dag
386
)
387
388
# Transfer CSV to Azure Blob Storage
389
transfer_csv = LocalFilesystemToWasbOperator(
390
task_id='transfer_csv_to_blob',
391
file_path='/tmp/data_export/sales_data.csv',
392
container_name='data-exports',
393
blob_name='sales/sales_data.csv',
394
azure_conn_id='azure_blob_conn',
395
create_container=True,
396
overwrite=True,
397
metadata={
398
'source': 'local_filesystem',
399
'export_date': '{{ ds }}',
400
'file_type': 'csv'
401
},
402
dag=dag
403
)
404
405
# Transfer JSON to Azure Data Lake Storage
406
transfer_json = LocalFilesystemToADLSOperator(
407
task_id='transfer_json_to_adls',
408
local_path='/tmp/data_export/customer_data.json',
409
remote_path='/exports/customers/customer_data_{{ ds_nodash }}.json',
410
azure_data_lake_conn_id='adls_conn',
411
overwrite=True,
412
dag=dag
413
)
414
415
# Verify transfers
416
verify_files = PythonOperator(
417
task_id='verify_transfers',
418
python_callable=verify_transfers,
419
dag=dag
420
)
421
422
prep_files >> [transfer_csv, transfer_json] >> verify_files
423
```
424
425
### Database to Azure Transfer
426
427
```python
428
from airflow import DAG
429
from airflow.providers.microsoft.azure.transfers.oracle_to_azure_data_lake import OracleToAzureDataLakeOperator
430
from airflow.operators.python import PythonOperator
431
from datetime import datetime, timedelta
432
433
def prepare_oracle_queries():
434
"""Prepare Oracle queries for data extraction."""
435
queries = {
436
'sales_summary': """
437
SELECT
438
TO_CHAR(order_date, 'YYYY-MM-DD') as order_date,
439
product_category,
440
COUNT(*) as order_count,
441
SUM(order_total) as total_sales,
442
AVG(order_total) as avg_order_value
443
FROM sales_orders
444
WHERE order_date >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
445
AND order_date < TO_DATE('{{ next_ds }}', 'YYYY-MM-DD')
446
GROUP BY TO_CHAR(order_date, 'YYYY-MM-DD'), product_category
447
ORDER BY order_date, product_category
448
""",
449
450
'customer_activity': """
451
SELECT
452
c.customer_id,
453
c.customer_name,
454
c.email,
455
COUNT(o.order_id) as order_count,
456
SUM(o.order_total) as total_spent,
457
MAX(o.order_date) as last_order_date
458
FROM customers c
459
LEFT JOIN sales_orders o ON c.customer_id = o.customer_id
460
WHERE c.created_date <= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
461
GROUP BY c.customer_id, c.customer_name, c.email
462
HAVING COUNT(o.order_id) > 0
463
ORDER BY total_spent DESC
464
""",
465
466
'inventory_status': """
467
SELECT
468
p.product_id,
469
p.product_name,
470
p.category,
471
i.current_stock,
472
i.reserved_stock,
473
i.available_stock,
474
TO_CHAR(i.last_updated, 'YYYY-MM-DD HH24:MI:SS') as last_updated
475
FROM products p
476
JOIN inventory i ON p.product_id = i.product_id
477
WHERE i.last_updated >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
478
ORDER BY p.category, p.product_name
479
"""
480
}
481
482
return queries
483
484
def validate_extracted_data(**context):
485
"""Validate extracted data in Azure Data Lake Storage."""
486
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
487
488
adls_hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')
489
490
# Check extracted files
491
extracted_files = [
492
f'/oracle_exports/{{ ds }}/sales_summary.tsv',
493
f'/oracle_exports/{{ ds }}/customer_activity.tsv',
494
f'/oracle_exports/{{ ds }}/inventory_status.tsv'
495
]
496
497
validation_results = {}
498
499
for file_path in extracted_files:
500
rendered_path = file_path.replace('{{ ds }}', context['ds'])
501
502
if adls_hook.check_for_file(rendered_path):
503
# Get file size and row count
504
file_content = adls_hook.get_conn().cat(rendered_path)
505
row_count = len(file_content.decode('utf-8').split('\n')) - 1 # Subtract header
506
file_size = len(file_content)
507
508
validation_results[rendered_path] = {
509
'exists': True,
510
'size_bytes': file_size,
511
'row_count': row_count
512
}
513
print(f"✓ {rendered_path}: {row_count} rows, {file_size} bytes")
514
else:
515
validation_results[rendered_path] = {
516
'exists': False,
517
'size_bytes': 0,
518
'row_count': 0
519
}
520
print(f"✗ {rendered_path}: File not found")
521
522
# Validate minimum data requirements
523
min_requirements = {
524
'sales_summary.tsv': 1, # At least 1 row
525
'customer_activity.tsv': 10, # At least 10 customers
526
'inventory_status.tsv': 50 # At least 50 products
527
}
528
529
validation_passed = True
530
for file_path, result in validation_results.items():
531
file_name = file_path.split('/')[-1]
532
min_rows = min_requirements.get(file_name, 0)
533
534
if not result['exists'] or result['row_count'] < min_rows:
535
validation_passed = False
536
print(f"Validation failed for {file_name}: Expected >= {min_rows} rows, got {result['row_count']}")
537
538
if not validation_passed:
539
raise ValueError("Data validation failed")
540
541
return validation_results
542
543
dag = DAG(
544
'oracle_to_azure_transfer',
545
default_args={
546
'owner': 'data-engineering-team',
547
'retries': 3,
548
'retry_delay': timedelta(minutes=10)
549
},
550
description='Extract data from Oracle to Azure Data Lake Storage',
551
schedule_interval=timedelta(days=1),
552
start_date=datetime(2024, 1, 1),
553
catchup=False
554
)
555
556
# Prepare queries
557
prep_queries = PythonOperator(
558
task_id='prepare_queries',
559
python_callable=prepare_oracle_queries,
560
dag=dag
561
)
562
563
# Extract sales summary data
564
extract_sales = OracleToAzureDataLakeOperator(
565
task_id='extract_sales_summary',
566
filename='/oracle_exports/{{ ds }}/sales_summary.tsv',
567
azure_data_lake_conn_id='adls_conn',
568
oracle_conn_id='oracle_conn',
569
sql="""
570
SELECT
571
TO_CHAR(order_date, 'YYYY-MM-DD') as order_date,
572
product_category,
573
COUNT(*) as order_count,
574
SUM(order_total) as total_sales,
575
AVG(order_total) as avg_order_value
576
FROM sales_orders
577
WHERE order_date >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
578
AND order_date < TO_DATE('{{ next_ds }}', 'YYYY-MM-DD')
579
GROUP BY TO_CHAR(order_date, 'YYYY-MM-DD'), product_category
580
ORDER BY order_date, product_category
581
""",
582
delimiter='\t',
583
encoding='utf-8',
584
dag=dag
585
)
586
587
# Extract customer activity data
588
extract_customers = OracleToAzureDataLakeOperator(
589
task_id='extract_customer_activity',
590
filename='/oracle_exports/{{ ds }}/customer_activity.tsv',
591
azure_data_lake_conn_id='adls_conn',
592
oracle_conn_id='oracle_conn',
593
sql="""
594
SELECT
595
c.customer_id,
596
c.customer_name,
597
c.email,
598
COUNT(o.order_id) as order_count,
599
SUM(o.order_total) as total_spent,
600
MAX(o.order_date) as last_order_date
601
FROM customers c
602
LEFT JOIN sales_orders o ON c.customer_id = o.customer_id
603
WHERE c.created_date <= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
604
GROUP BY c.customer_id, c.customer_name, c.email
605
HAVING COUNT(o.order_id) > 0
606
ORDER BY total_spent DESC
607
""",
608
delimiter='\t',
609
encoding='utf-8',
610
dag=dag
611
)
612
613
# Extract inventory data
614
extract_inventory = OracleToAzureDataLakeOperator(
615
task_id='extract_inventory_status',
616
filename='/oracle_exports/{{ ds }}/inventory_status.tsv',
617
azure_data_lake_conn_id='adls_conn',
618
oracle_conn_id='oracle_conn',
619
sql="""
620
SELECT
621
p.product_id,
622
p.product_name,
623
p.category,
624
i.current_stock,
625
i.reserved_stock,
626
i.available_stock,
627
TO_CHAR(i.last_updated, 'YYYY-MM-DD HH24:MI:SS') as last_updated
628
FROM products p
629
JOIN inventory i ON p.product_id = i.product_id
630
WHERE i.last_updated >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')
631
ORDER BY p.category, p.product_name
632
""",
633
delimiter='\t',
634
encoding='utf-8',
635
dag=dag
636
)
637
638
# Validate extracted data
639
validate_data = PythonOperator(
640
task_id='validate_data',
641
python_callable=validate_extracted_data,
642
dag=dag
643
)
644
645
prep_queries >> [extract_sales, extract_customers, extract_inventory] >> validate_data
646
```
647
648
### Cloud-to-Cloud Transfers
649
650
```python
651
from airflow import DAG
652
from airflow.providers.microsoft.azure.transfers.s3_to_wasb import S3ToAzureBlobStorageOperator
653
from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
654
from airflow.operators.python import PythonOperator
655
from datetime import datetime, timedelta
656
657
def discover_source_files():
658
"""Discover files available for transfer from various sources."""
659
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
660
from airflow.providers.sftp.hooks.sftp import SFTPHook
661
662
# Discover S3 files
663
s3_hook = S3Hook(aws_conn_id='aws_conn')
664
s3_files = s3_hook.list_keys(
665
bucket_name='source-data-bucket',
666
prefix='daily-exports/{{ ds }}/',
667
delimiter=''
668
)
669
670
# Discover SFTP files
671
sftp_hook = SFTPHook(ssh_conn_id='sftp_conn')
672
sftp_files = sftp_hook.list_directory('/exports/{{ ds }}/')
673
674
return {
675
's3_files': s3_files or [],
676
'sftp_files': sftp_files or []
677
}
678
679
def monitor_transfer_progress(**context):
680
"""Monitor transfer progress and generate summary."""
681
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
682
683
wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')
684
685
# Check transferred files
686
containers_to_check = ['s3-transfers', 'sftp-transfers']
687
transfer_summary = {}
688
689
for container in containers_to_check:
690
try:
691
blobs = wasb_hook.get_container_client(container).list_blobs(
692
name_starts_with=f"{{ ds }}/"
693
)
694
695
blob_list = []
696
total_size = 0
697
698
for blob in blobs:
699
blob_info = {
700
'name': blob.name,
701
'size': blob.size,
702
'last_modified': blob.last_modified.isoformat() if blob.last_modified else None
703
}
704
blob_list.append(blob_info)
705
total_size += blob.size or 0
706
707
transfer_summary[container] = {
708
'file_count': len(blob_list),
709
'total_size_bytes': total_size,
710
'files': blob_list
711
}
712
713
print(f"Container {container}: {len(blob_list)} files, {total_size:,} bytes")
714
715
except Exception as e:
716
print(f"Error checking container {container}: {e}")
717
transfer_summary[container] = {
718
'error': str(e),
719
'file_count': 0,
720
'total_size_bytes': 0
721
}
722
723
return transfer_summary
724
725
def cleanup_source_files(**context):
726
"""Clean up source files after successful transfer (optional)."""
727
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
728
729
# Only cleanup if all transfers were successful
730
transfer_summary = context['task_instance'].xcom_pull(task_ids='monitor_progress')
731
732
total_files_transferred = sum(
733
container_info.get('file_count', 0)
734
for container_info in transfer_summary.values()
735
)
736
737
if total_files_transferred > 0:
738
print(f"Successfully transferred {total_files_transferred} files")
739
740
# Optional: Delete source files from S3 after successful transfer
741
# Uncomment the following lines if cleanup is desired
742
"""
743
s3_hook = S3Hook(aws_conn_id='aws_conn')
744
source_files = context['task_instance'].xcom_pull(task_ids='discover_files')['s3_files']
745
746
for s3_key in source_files:
747
try:
748
s3_hook.delete_objects(
749
bucket='source-data-bucket',
750
keys=[s3_key]
751
)
752
print(f"Deleted source file: {s3_key}")
753
except Exception as e:
754
print(f"Failed to delete {s3_key}: {e}")
755
"""
756
757
return total_files_transferred
758
759
dag = DAG(
760
'cloud_to_azure_transfers',
761
default_args={
762
'owner': 'integration-team',
763
'retries': 2,
764
'retry_delay': timedelta(minutes=5)
765
},
766
description='Transfer files from various cloud sources to Azure',
767
schedule_interval=timedelta(days=1),
768
start_date=datetime(2024, 1, 1),
769
catchup=False
770
)
771
772
# Discover source files
773
discover_files = PythonOperator(
774
task_id='discover_files',
775
python_callable=discover_source_files,
776
dag=dag
777
)
778
779
# Transfer from AWS S3 to Azure Blob Storage
780
transfer_s3_files = S3ToAzureBlobStorageOperator(
781
task_id='transfer_s3_data',
782
s3_source_key='daily-exports/{{ ds }}/sales_data.csv',
783
s3_bucket='source-data-bucket',
784
container_name='s3-transfers',
785
blob_name='{{ ds }}/sales_data.csv',
786
aws_conn_id='aws_conn',
787
wasb_conn_id='azure_blob_conn',
788
create_container=True,
789
overwrite=True,
790
s3_extra_args={
791
'ServerSideEncryption': 'AES256'
792
},
793
wasb_extra_args={
794
'content_settings': {
795
'content_type': 'text/csv',
796
'cache_control': 'no-cache'
797
},
798
'metadata': {
799
'source': 's3',
800
'transfer_date': '{{ ds }}',
801
'original_bucket': 'source-data-bucket'
802
}
803
},
804
dag=dag
805
)
806
807
# Transfer from SFTP to Azure Blob Storage
808
transfer_sftp_files = SFTPToWasbOperator(
809
task_id='transfer_sftp_data',
810
sftp_source_path='/exports/{{ ds }}/inventory_update.json',
811
container_name='sftp-transfers',
812
blob_name='{{ ds }}/inventory_update.json',
813
sftp_conn_id='sftp_conn',
814
wasb_conn_id='azure_blob_conn',
815
create_container=True,
816
overwrite=True,
817
move_object=False, # Keep original file on SFTP server
818
dag=dag
819
)
820
821
# Transfer additional S3 files with pattern matching
822
transfer_s3_logs = S3ToAzureBlobStorageOperator(
823
task_id='transfer_s3_logs',
824
s3_source_key='logs/{{ ds }}/application.log',
825
s3_bucket='source-data-bucket',
826
container_name='s3-transfers',
827
blob_name='{{ ds }}/logs/application.log',
828
aws_conn_id='aws_conn',
829
wasb_conn_id='azure_blob_conn',
830
create_container=True,
831
overwrite=True,
832
dag=dag
833
)
834
835
# Monitor transfer progress
836
monitor_progress = PythonOperator(
837
task_id='monitor_progress',
838
python_callable=monitor_transfer_progress,
839
dag=dag
840
)
841
842
# Optional cleanup
843
cleanup_sources = PythonOperator(
844
task_id='cleanup_sources',
845
python_callable=cleanup_source_files,
846
dag=dag
847
)
848
849
# Set up dependencies
850
discover_files >> [transfer_s3_files, transfer_sftp_files, transfer_s3_logs]
851
[transfer_s3_files, transfer_sftp_files, transfer_s3_logs] >> monitor_progress >> cleanup_sources
852
```
853
854
### Batch Transfer Operations
855
856
```python
857
from airflow import DAG
858
from airflow.providers.microsoft.azure.transfers.s3_to_wasb import S3ToAzureBlobStorageOperator
859
from airflow.operators.python import PythonOperator
860
from datetime import datetime, timedelta
861
862
def create_dynamic_transfer_tasks(**context):
863
"""Create transfer tasks dynamically based on available files."""
864
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
865
from airflow.models import TaskInstance
866
867
s3_hook = S3Hook(aws_conn_id='aws_conn')
868
869
# List all files in the source bucket for the current date
870
source_prefix = f"batch-export/{context['ds']}/"
871
files = s3_hook.list_keys(
872
bucket_name='batch-data-source',
873
prefix=source_prefix,
874
delimiter=''
875
)
876
877
if not files:
878
print("No files found for transfer")
879
return []
880
881
# Filter files by type and size
882
transfer_jobs = []
883
884
for file_key in files:
885
file_info = s3_hook.get_key(file_key, bucket_name='batch-data-source')
886
file_size = file_info.size if file_info else 0
887
file_name = file_key.split('/')[-1]
888
889
# Skip very large files (>1GB) or very small files (<1KB)
890
if file_size > 1024**3 or file_size < 1024:
891
print(f"Skipping {file_name}: size {file_size} bytes")
892
continue
893
894
# Determine target container based on file type
895
file_extension = file_name.split('.')[-1].lower()
896
container_mapping = {
897
'csv': 'structured-data',
898
'json': 'json-data',
899
'xml': 'xml-data',
900
'txt': 'text-data',
901
'parquet': 'columnar-data'
902
}
903
904
target_container = container_mapping.get(file_extension, 'unclassified-data')
905
906
transfer_job = {
907
'source_key': file_key,
908
'target_container': target_container,
909
'target_blob': f"{context['ds']}/{file_name}",
910
'file_size': file_size,
911
'file_type': file_extension
912
}
913
914
transfer_jobs.append(transfer_job)
915
916
print(f"Created {len(transfer_jobs)} transfer jobs")
917
return transfer_jobs
918
919
def execute_batch_transfers(**context):
920
"""Execute batch transfers with error handling and retries."""
921
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
922
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
923
924
transfer_jobs = context['task_instance'].xcom_pull(task_ids='create_transfer_jobs')
925
926
if not transfer_jobs:
927
print("No transfer jobs to execute")
928
return {'completed': 0, 'failed': 0, 'skipped': 0}
929
930
s3_hook = S3Hook(aws_conn_id='aws_conn')
931
wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')
932
933
results = {
934
'completed': 0,
935
'failed': 0,
936
'skipped': 0,
937
'transfer_details': []
938
}
939
940
for job in transfer_jobs:
941
try:
942
source_key = job['source_key']
943
target_container = job['target_container']
944
target_blob = job['target_blob']
945
946
print(f"Transferring {source_key} -> {target_container}/{target_blob}")
947
948
# Check if target already exists
949
if wasb_hook.check_for_blob(target_container, target_blob):
950
print(f"Target blob already exists, skipping: {target_blob}")
951
results['skipped'] += 1
952
continue
953
954
# Create container if it doesn't exist
955
try:
956
wasb_hook.create_container(target_container)
957
except Exception:
958
pass # Container might already exist
959
960
# Download from S3
961
s3_object = s3_hook.get_key(source_key, bucket_name='batch-data-source')
962
file_content = s3_object.get()['Body'].read()
963
964
# Upload to Azure Blob Storage
965
wasb_hook.load_bytes(
966
data=file_content,
967
container_name=target_container,
968
blob_name=target_blob,
969
overwrite=True
970
)
971
972
# Set metadata
973
wasb_hook.get_blob_client(
974
container=target_container,
975
blob=target_blob
976
).set_blob_metadata({
977
'source': 's3',
978
'source_bucket': 'batch-data-source',
979
'source_key': source_key,
980
'transfer_date': context['ds'],
981
'file_size': str(job['file_size']),
982
'file_type': job['file_type']
983
})
984
985
results['completed'] += 1
986
results['transfer_details'].append({
987
'source': source_key,
988
'target': f"{target_container}/{target_blob}",
989
'status': 'completed',
990
'size_bytes': job['file_size']
991
})
992
993
print(f"Successfully transferred: {source_key}")
994
995
except Exception as e:
996
print(f"Failed to transfer {job['source_key']}: {e}")
997
results['failed'] += 1
998
results['transfer_details'].append({
999
'source': job['source_key'],
1000
'target': f"{job['target_container']}/{job['target_blob']}",
1001
'status': 'failed',
1002
'error': str(e)
1003
})
1004
1005
print(f"Batch transfer completed: {results['completed']} successful, "
1006
f"{results['failed']} failed, {results['skipped']} skipped")
1007
1008
return results
1009
1010
dag = DAG(
1011
'batch_transfer_workflow',
1012
default_args={
1013
'owner': 'batch-processing-team',
1014
'retries': 1,
1015
'retry_delay': timedelta(minutes=5)
1016
},
1017
description='Batch transfer workflow for multiple files',
1018
schedule_interval=timedelta(days=1),
1019
start_date=datetime(2024, 1, 1),
1020
catchup=False
1021
)
1022
1023
# Create dynamic transfer jobs
1024
create_jobs = PythonOperator(
1025
task_id='create_transfer_jobs',
1026
python_callable=create_dynamic_transfer_tasks,
1027
dag=dag
1028
)
1029
1030
# Execute batch transfers
1031
execute_transfers = PythonOperator(
1032
task_id='execute_batch_transfers',
1033
python_callable=execute_batch_transfers,
1034
dag=dag
1035
)
1036
1037
create_jobs >> execute_transfers
1038
```
1039
1040
## Error Handling and Best Practices
1041
1042
### Transfer Operation Error Handling
1043
1044
```python
1045
from airflow.providers.microsoft.azure.transfers.s3_to_wasb import (
1046
S3ToAzureBlobStorageOperator,
1047
TooManyFilesToMoveException,
1048
InvalidAzureBlobParameters,
1049
InvalidKeyComponents
1050
)
1051
from airflow.exceptions import AirflowException
1052
1053
def robust_transfer_with_error_handling():
1054
"""Demonstrate comprehensive error handling for transfer operations."""
1055
1056
try:
1057
# Example of handling specific transfer exceptions
1058
operator = S3ToAzureBlobStorageOperator(
1059
task_id='safe_transfer',
1060
s3_source_key='large-dataset/data.csv',
1061
container_name='target-container',
1062
blob_name='processed-data.csv',
1063
aws_conn_id='aws_conn',
1064
wasb_conn_id='azure_conn'
1065
)
1066
1067
# This would be called by Airflow's execution engine
1068
# result = operator.execute(context)
1069
1070
except TooManyFilesToMoveException as e:
1071
print(f"Too many files in transfer operation: {e}")
1072
# Implement chunking or batch processing
1073
1074
except InvalidAzureBlobParameters as e:
1075
print(f"Invalid blob parameters: {e}")
1076
# Validate and correct blob parameters
1077
1078
except InvalidKeyComponents as e:
1079
print(f"Invalid key components: {e}")
1080
# Validate and correct file path components
1081
1082
except Exception as e:
1083
print(f"Unexpected transfer error: {e}")
1084
raise AirflowException(f"Transfer failed: {e}")
1085
1086
def implement_transfer_validation():
1087
"""Implement validation patterns for transfer operations."""
1088
1089
def validate_source_file(source_path: str, min_size: int = 1024) -> bool:
1090
"""Validate source file before transfer."""
1091
import os
1092
1093
if not os.path.exists(source_path):
1094
raise FileNotFoundError(f"Source file not found: {source_path}")
1095
1096
file_size = os.path.getsize(source_path)
1097
if file_size < min_size:
1098
raise ValueError(f"File too small: {file_size} bytes < {min_size} bytes")
1099
1100
return True
1101
1102
def validate_target_parameters(container_name: str, blob_name: str) -> bool:
1103
"""Validate target parameters."""
1104
if not container_name or len(container_name) < 3:
1105
raise InvalidAzureBlobParameters("Container name must be at least 3 characters")
1106
1107
if not blob_name or blob_name.startswith('/'):
1108
raise InvalidKeyComponents("Blob name cannot start with '/'")
1109
1110
return True
1111
1112
def validate_transfer_result(source_size: int, target_size: int, tolerance: float = 0.01) -> bool:
1113
"""Validate transfer result by comparing sizes."""
1114
if abs(source_size - target_size) > (source_size * tolerance):
1115
raise ValueError(f"Size mismatch: source={source_size}, target={target_size}")
1116
1117
return True
1118
1119
return {
1120
'validate_source': validate_source_file,
1121
'validate_target': validate_target_parameters,
1122
'validate_result': validate_transfer_result
1123
}
1124
1125
def implement_transfer_monitoring():
1126
"""Implement monitoring for transfer operations."""
1127
1128
class TransferMonitor:
1129
def __init__(self):
1130
self.transfer_stats = {
1131
'start_time': None,
1132
'end_time': None,
1133
'bytes_transferred': 0,
1134
'transfer_rate_mbps': 0,
1135
'status': 'pending'
1136
}
1137
1138
def start_transfer(self):
1139
"""Mark transfer start time."""
1140
import time
1141
self.transfer_stats['start_time'] = time.time()
1142
self.transfer_stats['status'] = 'in_progress'
1143
1144
def update_progress(self, bytes_transferred: int):
1145
"""Update transfer progress."""
1146
self.transfer_stats['bytes_transferred'] = bytes_transferred
1147
1148
if self.transfer_stats['start_time']:
1149
import time
1150
elapsed_time = time.time() - self.transfer_stats['start_time']
1151
if elapsed_time > 0:
1152
rate_bps = bytes_transferred / elapsed_time
1153
self.transfer_stats['transfer_rate_mbps'] = rate_bps / (1024 * 1024)
1154
1155
def complete_transfer(self):
1156
"""Mark transfer completion."""
1157
import time
1158
self.transfer_stats['end_time'] = time.time()
1159
self.transfer_stats['status'] = 'completed'
1160
1161
if self.transfer_stats['start_time']:
1162
total_time = self.transfer_stats['end_time'] - self.transfer_stats['start_time']
1163
print(f"Transfer completed in {total_time:.2f} seconds")
1164
print(f"Average rate: {self.transfer_stats['transfer_rate_mbps']:.2f} MB/s")
1165
1166
def get_stats(self):
1167
"""Get transfer statistics."""
1168
return self.transfer_stats.copy()
1169
1170
return TransferMonitor
1171
```
1172
1173
## Performance Optimization
1174
1175
### Optimizing Transfer Operations
1176
1177
```python
1178
def optimize_large_file_transfers():
1179
"""Optimize transfers for large files."""
1180
1181
# Configuration for large file transfers
1182
large_file_config = {
1183
'chunk_size': 64 * 1024 * 1024, # 64MB chunks
1184
'max_connections': 10, # Parallel connections
1185
'timeout': 300, # 5 minute timeout per chunk
1186
'retry_attempts': 3, # Retry failed chunks
1187
'use_compression': True # Compress during transfer
1188
}
1189
1190
# Configuration for small file batch transfers
1191
batch_config = {
1192
'batch_size': 100, # Files per batch
1193
'parallel_batches': 5, # Concurrent batches
1194
'batch_timeout': 600, # 10 minute timeout per batch
1195
'skip_existing': True # Skip existing files
1196
}
1197
1198
return {
1199
'large_files': large_file_config,
1200
'batch_processing': batch_config
1201
}
1202
1203
def implement_transfer_caching():
1204
"""Implement caching for frequently transferred files."""
1205
1206
class TransferCache:
1207
def __init__(self):
1208
self.cache = {}
1209
self.cache_ttl = 3600 # 1 hour TTL
1210
1211
def get_cached_transfer(self, source_path: str) -> dict | None:
1212
"""Get cached transfer information."""
1213
import time
1214
1215
if source_path in self.cache:
1216
cache_entry = self.cache[source_path]
1217
if time.time() - cache_entry['timestamp'] < self.cache_ttl:
1218
return cache_entry['data']
1219
else:
1220
del self.cache[source_path]
1221
1222
return None
1223
1224
def cache_transfer_result(self, source_path: str, result: dict):
1225
"""Cache transfer result."""
1226
import time
1227
1228
self.cache[source_path] = {
1229
'timestamp': time.time(),
1230
'data': result
1231
}
1232
1233
def clear_cache(self):
1234
"""Clear transfer cache."""
1235
self.cache.clear()
1236
1237
return TransferCache
1238
1239
def implement_parallel_transfers():
1240
"""Implement parallel transfer processing."""
1241
1242
import concurrent.futures
1243
import threading
1244
1245
class ParallelTransferManager:
1246
def __init__(self, max_workers: int = 5):
1247
self.max_workers = max_workers
1248
self.transfer_results = {}
1249
self.lock = threading.Lock()
1250
1251
def transfer_file(self, transfer_config: dict) -> dict:
1252
"""Transfer a single file."""
1253
try:
1254
# Simulate file transfer logic
1255
source = transfer_config['source']
1256
target = transfer_config['target']
1257
1258
# Actual transfer implementation would go here
1259
result = {
1260
'source': source,
1261
'target': target,
1262
'status': 'success',
1263
'size_bytes': transfer_config.get('size', 0)
1264
}
1265
1266
with self.lock:
1267
self.transfer_results[source] = result
1268
1269
return result
1270
1271
except Exception as e:
1272
error_result = {
1273
'source': transfer_config['source'],
1274
'status': 'failed',
1275
'error': str(e)
1276
}
1277
1278
with self.lock:
1279
self.transfer_results[transfer_config['source']] = error_result
1280
1281
return error_result
1282
1283
def execute_parallel_transfers(self, transfer_configs: list[dict]) -> dict:
1284
"""Execute multiple transfers in parallel."""
1285
1286
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
1287
# Submit all transfer tasks
1288
future_to_config = {
1289
executor.submit(self.transfer_file, config): config
1290
for config in transfer_configs
1291
}
1292
1293
# Wait for completion
1294
for future in concurrent.futures.as_completed(future_to_config):
1295
config = future_to_config[future]
1296
try:
1297
result = future.result()
1298
print(f"Completed: {config['source']} -> {result['status']}")
1299
except Exception as e:
1300
print(f"Failed: {config['source']} -> {e}")
1301
1302
# Return summary
1303
successful = sum(1 for r in self.transfer_results.values() if r['status'] == 'success')
1304
failed = len(self.transfer_results) - successful
1305
1306
return {
1307
'total': len(transfer_configs),
1308
'successful': successful,
1309
'failed': failed,
1310
'results': self.transfer_results
1311
}
1312
1313
return ParallelTransferManager
1314
```
1315
1316
This comprehensive documentation covers all data transfer capabilities in the Apache Airflow Microsoft Azure Provider, including local-to-Azure transfers, database-to-Azure transfers, cloud-to-cloud transfers, error handling patterns, and performance optimization techniques.