or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md

data-transfers.mddocs/

0

# Data Transfer Operations

1

2

Specialized operators for transferring data between Azure services and external systems including local filesystem, SFTP servers, Oracle databases, and AWS S3. Provides comprehensive data movement capabilities with error handling and performance optimization.

3

4

## Capabilities

5

6

### Local to Azure Transfers

7

8

Transfer data from local filesystem to Azure services with comprehensive file handling and upload capabilities.

9

10

```python { .api }

11

class LocalFilesystemToADLSOperator(BaseOperator):

12

"""

13

Transfers files from local filesystem to Azure Data Lake Storage.

14

15

Supports uploading local files to both ADLS Gen1 and Gen2 with

16

configurable options for overwrite, directory creation, and metadata.

17

"""

18

19

def __init__(

20

self,

21

*,

22

local_path: str,

23

remote_path: str,

24

azure_data_lake_conn_id: str = "azure_data_lake_default",

25

overwrite: bool = True,

26

nthreads: int = 64,

27

buffersize: int = 4194304,

28

blocksize: int = 4194304,

29

**kwargs

30

):

31

"""

32

Initialize local to ADLS transfer operator.

33

34

Args:

35

local_path (str): Path to local file or directory

36

remote_path (str): Target path in Azure Data Lake Storage

37

azure_data_lake_conn_id (str): Airflow connection ID for ADLS

38

overwrite (bool): Whether to overwrite existing files (default: True)

39

nthreads (int): Number of threads for upload (default: 64)

40

buffersize (int): Buffer size for upload (default: 4194304)

41

blocksize (int): Block size for upload (default: 4194304)

42

"""

43

44

def execute(self, context: Context) -> dict[str, Any]:

45

"""

46

Execute file transfer from local filesystem to ADLS.

47

48

Args:

49

context (Context): Airflow task context

50

51

Returns:

52

dict[str, Any]: Transfer results including file count and sizes

53

"""

54

55

class LocalFilesystemToWasbOperator(BaseOperator):

56

"""

57

Transfers files from local filesystem to Azure Blob Storage.

58

59

Supports uploading local files to Azure Blob Storage with configurable

60

options for container creation, overwrite behavior, and metadata.

61

"""

62

63

def __init__(

64

self,

65

*,

66

file_path: str,

67

container_name: str,

68

blob_name: str,

69

azure_conn_id: str = "wasb_default",

70

create_container: bool = False,

71

overwrite: bool = True,

72

content_settings: dict[str, Any] | None = None,

73

metadata: dict[str, str] | None = None,

74

**kwargs

75

):

76

"""

77

Initialize local to Azure Blob Storage transfer operator.

78

79

Args:

80

file_path (str): Path to local file

81

container_name (str): Target container name in Azure Blob Storage

82

blob_name (str): Target blob name

83

azure_conn_id (str): Airflow connection ID for Azure Blob Storage

84

create_container (bool): Whether to create container if it doesn't exist

85

overwrite (bool): Whether to overwrite existing blob (default: True)

86

content_settings (dict[str, Any] | None): Blob content settings

87

metadata (dict[str, str] | None): Blob metadata

88

"""

89

90

def execute(self, context: Context) -> str:

91

"""

92

Execute file transfer from local filesystem to Azure Blob Storage.

93

94

Args:

95

context (Context): Airflow task context

96

97

Returns:

98

str: Blob URL of uploaded file

99

"""

100

```

101

102

### Database to Azure Transfers

103

104

Transfer data from database systems to Azure services with query execution and data transformation capabilities.

105

106

```python { .api }

107

class OracleToAzureDataLakeOperator(BaseOperator):

108

"""

109

Transfers data from Oracle database to Azure Data Lake Storage.

110

111

Executes Oracle queries and uploads results to ADLS with support for

112

various data formats, partitioning, and incremental transfers.

113

"""

114

115

def __init__(

116

self,

117

*,

118

filename: str,

119

azure_data_lake_conn_id: str,

120

oracle_conn_id: str,

121

sql: str,

122

sql_params: dict[str, Any] | None = None,

123

delimiter: str = "\t",

124

encoding: str = "utf-8",

125

quotechar: str = '"',

126

quoting: int = csv.QUOTE_MINIMAL,

127

**kwargs

128

):

129

"""

130

Initialize Oracle to Azure Data Lake transfer operator.

131

132

Args:

133

filename (str): Target filename in Azure Data Lake Storage

134

azure_data_lake_conn_id (str): Airflow connection ID for ADLS

135

oracle_conn_id (str): Airflow connection ID for Oracle database

136

sql (str): SQL query to execute on Oracle database

137

sql_params (dict[str, Any] | None): Parameters for SQL query

138

delimiter (str): Field delimiter for output file (default: tab)

139

encoding (str): File encoding (default: "utf-8")

140

quotechar (str): Quote character for CSV (default: '"')

141

quoting (int): Quoting behavior (default: csv.QUOTE_MINIMAL)

142

"""

143

144

def execute(self, context: Context) -> str:

145

"""

146

Execute data transfer from Oracle to Azure Data Lake Storage.

147

148

Args:

149

context (Context): Airflow task context

150

151

Returns:

152

str: Path to uploaded file in ADLS

153

"""

154

```

155

156

### Cloud-to-Cloud Transfers

157

158

Transfer data between different cloud services with comprehensive protocol support and authentication handling.

159

160

```python { .api }

161

class SFTPToWasbOperator(BaseOperator):

162

"""

163

Transfers files from SFTP server to Azure Blob Storage.

164

165

Downloads files from SFTP servers and uploads them to Azure Blob Storage

166

with support for directory traversal, file filtering, and batch processing.

167

"""

168

169

def __init__(

170

self,

171

*,

172

sftp_source_path: str,

173

container_name: str,

174

blob_name: str,

175

sftp_conn_id: str = "sftp_default",

176

wasb_conn_id: str = "wasb_default",

177

create_container: bool = False,

178

overwrite: bool = True,

179

move_object: bool = False,

180

**kwargs

181

):

182

"""

183

Initialize SFTP to Azure Blob Storage transfer operator.

184

185

Args:

186

sftp_source_path (str): Path to source file on SFTP server

187

container_name (str): Target container name in Azure Blob Storage

188

blob_name (str): Target blob name

189

sftp_conn_id (str): Airflow connection ID for SFTP server

190

wasb_conn_id (str): Airflow connection ID for Azure Blob Storage

191

create_container (bool): Whether to create container if it doesn't exist

192

overwrite (bool): Whether to overwrite existing blob (default: True)

193

move_object (bool): Whether to delete source file after transfer

194

"""

195

196

def execute(self, context: Context) -> str:

197

"""

198

Execute file transfer from SFTP to Azure Blob Storage.

199

200

Args:

201

context (Context): Airflow task context

202

203

Returns:

204

str: Blob URL of transferred file

205

"""

206

207

class S3ToAzureBlobStorageOperator(BaseOperator):

208

"""

209

Transfers objects from AWS S3 to Azure Blob Storage.

210

211

Downloads objects from AWS S3 and uploads them to Azure Blob Storage

212

with support for large files, batch processing, and metadata preservation.

213

"""

214

215

def __init__(

216

self,

217

*,

218

s3_source_key: str,

219

container_name: str,

220

blob_name: str,

221

s3_bucket: str | None = None,

222

aws_conn_id: str = "aws_default",

223

wasb_conn_id: str = "wasb_default",

224

create_container: bool = False,

225

overwrite: bool = True,

226

s3_verify: bool | str | None = None,

227

s3_extra_args: dict[str, Any] | None = None,

228

wasb_extra_args: dict[str, Any] | None = None,

229

**kwargs

230

):

231

"""

232

Initialize AWS S3 to Azure Blob Storage transfer operator.

233

234

Args:

235

s3_source_key (str): Source object key in AWS S3

236

container_name (str): Target container name in Azure Blob Storage

237

blob_name (str): Target blob name

238

s3_bucket (str | None): Source S3 bucket name

239

aws_conn_id (str): Airflow connection ID for AWS S3

240

wasb_conn_id (str): Airflow connection ID for Azure Blob Storage

241

create_container (bool): Whether to create container if it doesn't exist

242

overwrite (bool): Whether to overwrite existing blob (default: True)

243

s3_verify (bool | str | None): S3 SSL verification configuration

244

s3_extra_args (dict[str, Any] | None): Additional S3 arguments

245

wasb_extra_args (dict[str, Any] | None): Additional WASB arguments

246

"""

247

248

def execute(self, context: Context) -> str:

249

"""

250

Execute object transfer from AWS S3 to Azure Blob Storage.

251

252

Args:

253

context (Context): Airflow task context

254

255

Returns:

256

str: Blob URL of transferred object

257

"""

258

```

259

260

## Supporting Exception Classes

261

262

Custom exception classes for handling transfer operation errors and edge cases.

263

264

```python { .api }

265

class TooManyFilesToMoveException(Exception):

266

"""

267

Exception for bulk transfer limits.

268

269

Raised when transfer operations exceed configured limits for

270

batch processing or concurrent file transfers.

271

"""

272

pass

273

274

class InvalidAzureBlobParameters(Exception):

275

"""

276

Exception for invalid blob parameters.

277

278

Raised when blob storage parameters are invalid or

279

incompatible with the operation being performed.

280

"""

281

pass

282

283

class InvalidKeyComponents(Exception):

284

"""

285

Exception for invalid key components.

286

287

Raised when file path or key components are invalid

288

for the target storage system.

289

"""

290

pass

291

```

292

293

## Usage Examples

294

295

### Local to Azure Transfers

296

297

```python

298

from airflow import DAG

299

from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator

300

from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalFilesystemToADLSOperator

301

from airflow.operators.python import PythonOperator

302

from datetime import datetime, timedelta

303

import os

304

305

def prepare_local_files():

306

"""Prepare local files for transfer to Azure."""

307

# Create sample data files

308

data_dir = '/tmp/data_export'

309

os.makedirs(data_dir, exist_ok=True)

310

311

# Generate sample CSV file

312

import csv

313

csv_file = os.path.join(data_dir, 'sales_data.csv')

314

with open(csv_file, 'w', newline='') as f:

315

writer = csv.writer(f)

316

writer.writerow(['date', 'product', 'sales', 'region'])

317

for i in range(1000):

318

writer.writerow([

319

f'2024-01-{(i % 31) + 1:02d}',

320

f'Product_{i % 10}',

321

f'{1000 + (i * 10)}',

322

f'Region_{i % 5}'

323

])

324

325

# Generate sample JSON file

326

import json

327

json_file = os.path.join(data_dir, 'customer_data.json')

328

customer_data = {

329

'customers': [

330

{'id': i, 'name': f'Customer_{i}', 'email': f'customer{i}@example.com'}

331

for i in range(100)

332

]

333

}

334

with open(json_file, 'w') as f:

335

json.dump(customer_data, f, indent=2)

336

337

return {

338

'csv_file': csv_file,

339

'json_file': json_file,

340

'data_dir': data_dir

341

}

342

343

def verify_transfers(**context):

344

"""Verify that files were transferred successfully."""

345

from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

346

347

wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')

348

349

# Check if files exist in blob storage

350

files_to_check = [

351

('data-exports', 'sales/sales_data.csv'),

352

('data-exports', 'customers/customer_data.json')

353

]

354

355

results = {}

356

for container, blob_name in files_to_check:

357

exists = wasb_hook.check_for_blob(container, blob_name)

358

results[blob_name] = exists

359

print(f"File {blob_name}: {'✓ Found' if exists else '✗ Not found'}")

360

361

if all(results.values()):

362

print("All files transferred successfully!")

363

else:

364

raise ValueError("Some files were not transferred successfully")

365

366

return results

367

368

dag = DAG(

369

'local_to_azure_transfers',

370

default_args={

371

'owner': 'data-transfer-team',

372

'retries': 2,

373

'retry_delay': timedelta(minutes=5)

374

},

375

description='Transfer local files to Azure services',

376

schedule_interval=timedelta(days=1),

377

start_date=datetime(2024, 1, 1),

378

catchup=False

379

)

380

381

# Prepare local files

382

prep_files = PythonOperator(

383

task_id='prepare_files',

384

python_callable=prepare_local_files,

385

dag=dag

386

)

387

388

# Transfer CSV to Azure Blob Storage

389

transfer_csv = LocalFilesystemToWasbOperator(

390

task_id='transfer_csv_to_blob',

391

file_path='/tmp/data_export/sales_data.csv',

392

container_name='data-exports',

393

blob_name='sales/sales_data.csv',

394

azure_conn_id='azure_blob_conn',

395

create_container=True,

396

overwrite=True,

397

metadata={

398

'source': 'local_filesystem',

399

'export_date': '{{ ds }}',

400

'file_type': 'csv'

401

},

402

dag=dag

403

)

404

405

# Transfer JSON to Azure Data Lake Storage

406

transfer_json = LocalFilesystemToADLSOperator(

407

task_id='transfer_json_to_adls',

408

local_path='/tmp/data_export/customer_data.json',

409

remote_path='/exports/customers/customer_data_{{ ds_nodash }}.json',

410

azure_data_lake_conn_id='adls_conn',

411

overwrite=True,

412

dag=dag

413

)

414

415

# Verify transfers

416

verify_files = PythonOperator(

417

task_id='verify_transfers',

418

python_callable=verify_transfers,

419

dag=dag

420

)

421

422

prep_files >> [transfer_csv, transfer_json] >> verify_files

423

```

424

425

### Database to Azure Transfer

426

427

```python

428

from airflow import DAG

429

from airflow.providers.microsoft.azure.transfers.oracle_to_azure_data_lake import OracleToAzureDataLakeOperator

430

from airflow.operators.python import PythonOperator

431

from datetime import datetime, timedelta

432

433

def prepare_oracle_queries():

434

"""Prepare Oracle queries for data extraction."""

435

queries = {

436

'sales_summary': """

437

SELECT

438

TO_CHAR(order_date, 'YYYY-MM-DD') as order_date,

439

product_category,

440

COUNT(*) as order_count,

441

SUM(order_total) as total_sales,

442

AVG(order_total) as avg_order_value

443

FROM sales_orders

444

WHERE order_date >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')

445

AND order_date < TO_DATE('{{ next_ds }}', 'YYYY-MM-DD')

446

GROUP BY TO_CHAR(order_date, 'YYYY-MM-DD'), product_category

447

ORDER BY order_date, product_category

448

""",

449

450

'customer_activity': """

451

SELECT

452

c.customer_id,

453

c.customer_name,

454

c.email,

455

COUNT(o.order_id) as order_count,

456

SUM(o.order_total) as total_spent,

457

MAX(o.order_date) as last_order_date

458

FROM customers c

459

LEFT JOIN sales_orders o ON c.customer_id = o.customer_id

460

WHERE c.created_date <= TO_DATE('{{ ds }}', 'YYYY-MM-DD')

461

GROUP BY c.customer_id, c.customer_name, c.email

462

HAVING COUNT(o.order_id) > 0

463

ORDER BY total_spent DESC

464

""",

465

466

'inventory_status': """

467

SELECT

468

p.product_id,

469

p.product_name,

470

p.category,

471

i.current_stock,

472

i.reserved_stock,

473

i.available_stock,

474

TO_CHAR(i.last_updated, 'YYYY-MM-DD HH24:MI:SS') as last_updated

475

FROM products p

476

JOIN inventory i ON p.product_id = i.product_id

477

WHERE i.last_updated >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')

478

ORDER BY p.category, p.product_name

479

"""

480

}

481

482

return queries

483

484

def validate_extracted_data(**context):

485

"""Validate extracted data in Azure Data Lake Storage."""

486

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook

487

488

adls_hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')

489

490

# Check extracted files

491

extracted_files = [

492

f'/oracle_exports/{{ ds }}/sales_summary.tsv',

493

f'/oracle_exports/{{ ds }}/customer_activity.tsv',

494

f'/oracle_exports/{{ ds }}/inventory_status.tsv'

495

]

496

497

validation_results = {}

498

499

for file_path in extracted_files:

500

rendered_path = file_path.replace('{{ ds }}', context['ds'])

501

502

if adls_hook.check_for_file(rendered_path):

503

# Get file size and row count

504

file_content = adls_hook.get_conn().cat(rendered_path)

505

row_count = len(file_content.decode('utf-8').split('\n')) - 1 # Subtract header

506

file_size = len(file_content)

507

508

validation_results[rendered_path] = {

509

'exists': True,

510

'size_bytes': file_size,

511

'row_count': row_count

512

}

513

print(f"✓ {rendered_path}: {row_count} rows, {file_size} bytes")

514

else:

515

validation_results[rendered_path] = {

516

'exists': False,

517

'size_bytes': 0,

518

'row_count': 0

519

}

520

print(f"✗ {rendered_path}: File not found")

521

522

# Validate minimum data requirements

523

min_requirements = {

524

'sales_summary.tsv': 1, # At least 1 row

525

'customer_activity.tsv': 10, # At least 10 customers

526

'inventory_status.tsv': 50 # At least 50 products

527

}

528

529

validation_passed = True

530

for file_path, result in validation_results.items():

531

file_name = file_path.split('/')[-1]

532

min_rows = min_requirements.get(file_name, 0)

533

534

if not result['exists'] or result['row_count'] < min_rows:

535

validation_passed = False

536

print(f"Validation failed for {file_name}: Expected >= {min_rows} rows, got {result['row_count']}")

537

538

if not validation_passed:

539

raise ValueError("Data validation failed")

540

541

return validation_results

542

543

dag = DAG(

544

'oracle_to_azure_transfer',

545

default_args={

546

'owner': 'data-engineering-team',

547

'retries': 3,

548

'retry_delay': timedelta(minutes=10)

549

},

550

description='Extract data from Oracle to Azure Data Lake Storage',

551

schedule_interval=timedelta(days=1),

552

start_date=datetime(2024, 1, 1),

553

catchup=False

554

)

555

556

# Prepare queries

557

prep_queries = PythonOperator(

558

task_id='prepare_queries',

559

python_callable=prepare_oracle_queries,

560

dag=dag

561

)

562

563

# Extract sales summary data

564

extract_sales = OracleToAzureDataLakeOperator(

565

task_id='extract_sales_summary',

566

filename='/oracle_exports/{{ ds }}/sales_summary.tsv',

567

azure_data_lake_conn_id='adls_conn',

568

oracle_conn_id='oracle_conn',

569

sql="""

570

SELECT

571

TO_CHAR(order_date, 'YYYY-MM-DD') as order_date,

572

product_category,

573

COUNT(*) as order_count,

574

SUM(order_total) as total_sales,

575

AVG(order_total) as avg_order_value

576

FROM sales_orders

577

WHERE order_date >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')

578

AND order_date < TO_DATE('{{ next_ds }}', 'YYYY-MM-DD')

579

GROUP BY TO_CHAR(order_date, 'YYYY-MM-DD'), product_category

580

ORDER BY order_date, product_category

581

""",

582

delimiter='\t',

583

encoding='utf-8',

584

dag=dag

585

)

586

587

# Extract customer activity data

588

extract_customers = OracleToAzureDataLakeOperator(

589

task_id='extract_customer_activity',

590

filename='/oracle_exports/{{ ds }}/customer_activity.tsv',

591

azure_data_lake_conn_id='adls_conn',

592

oracle_conn_id='oracle_conn',

593

sql="""

594

SELECT

595

c.customer_id,

596

c.customer_name,

597

c.email,

598

COUNT(o.order_id) as order_count,

599

SUM(o.order_total) as total_spent,

600

MAX(o.order_date) as last_order_date

601

FROM customers c

602

LEFT JOIN sales_orders o ON c.customer_id = o.customer_id

603

WHERE c.created_date <= TO_DATE('{{ ds }}', 'YYYY-MM-DD')

604

GROUP BY c.customer_id, c.customer_name, c.email

605

HAVING COUNT(o.order_id) > 0

606

ORDER BY total_spent DESC

607

""",

608

delimiter='\t',

609

encoding='utf-8',

610

dag=dag

611

)

612

613

# Extract inventory data

614

extract_inventory = OracleToAzureDataLakeOperator(

615

task_id='extract_inventory_status',

616

filename='/oracle_exports/{{ ds }}/inventory_status.tsv',

617

azure_data_lake_conn_id='adls_conn',

618

oracle_conn_id='oracle_conn',

619

sql="""

620

SELECT

621

p.product_id,

622

p.product_name,

623

p.category,

624

i.current_stock,

625

i.reserved_stock,

626

i.available_stock,

627

TO_CHAR(i.last_updated, 'YYYY-MM-DD HH24:MI:SS') as last_updated

628

FROM products p

629

JOIN inventory i ON p.product_id = i.product_id

630

WHERE i.last_updated >= TO_DATE('{{ ds }}', 'YYYY-MM-DD')

631

ORDER BY p.category, p.product_name

632

""",

633

delimiter='\t',

634

encoding='utf-8',

635

dag=dag

636

)

637

638

# Validate extracted data

639

validate_data = PythonOperator(

640

task_id='validate_data',

641

python_callable=validate_extracted_data,

642

dag=dag

643

)

644

645

prep_queries >> [extract_sales, extract_customers, extract_inventory] >> validate_data

646

```

647

648

### Cloud-to-Cloud Transfers

649

650

```python

651

from airflow import DAG

652

from airflow.providers.microsoft.azure.transfers.s3_to_wasb import S3ToAzureBlobStorageOperator

653

from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator

654

from airflow.operators.python import PythonOperator

655

from datetime import datetime, timedelta

656

657

def discover_source_files():

658

"""Discover files available for transfer from various sources."""

659

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

660

from airflow.providers.sftp.hooks.sftp import SFTPHook

661

662

# Discover S3 files

663

s3_hook = S3Hook(aws_conn_id='aws_conn')

664

s3_files = s3_hook.list_keys(

665

bucket_name='source-data-bucket',

666

prefix='daily-exports/{{ ds }}/',

667

delimiter=''

668

)

669

670

# Discover SFTP files

671

sftp_hook = SFTPHook(ssh_conn_id='sftp_conn')

672

sftp_files = sftp_hook.list_directory('/exports/{{ ds }}/')

673

674

return {

675

's3_files': s3_files or [],

676

'sftp_files': sftp_files or []

677

}

678

679

def monitor_transfer_progress(**context):

680

"""Monitor transfer progress and generate summary."""

681

from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

682

683

wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')

684

685

# Check transferred files

686

containers_to_check = ['s3-transfers', 'sftp-transfers']

687

transfer_summary = {}

688

689

for container in containers_to_check:

690

try:

691

blobs = wasb_hook.get_container_client(container).list_blobs(

692

name_starts_with=f"{{ ds }}/"

693

)

694

695

blob_list = []

696

total_size = 0

697

698

for blob in blobs:

699

blob_info = {

700

'name': blob.name,

701

'size': blob.size,

702

'last_modified': blob.last_modified.isoformat() if blob.last_modified else None

703

}

704

blob_list.append(blob_info)

705

total_size += blob.size or 0

706

707

transfer_summary[container] = {

708

'file_count': len(blob_list),

709

'total_size_bytes': total_size,

710

'files': blob_list

711

}

712

713

print(f"Container {container}: {len(blob_list)} files, {total_size:,} bytes")

714

715

except Exception as e:

716

print(f"Error checking container {container}: {e}")

717

transfer_summary[container] = {

718

'error': str(e),

719

'file_count': 0,

720

'total_size_bytes': 0

721

}

722

723

return transfer_summary

724

725

def cleanup_source_files(**context):

726

"""Clean up source files after successful transfer (optional)."""

727

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

728

729

# Only cleanup if all transfers were successful

730

transfer_summary = context['task_instance'].xcom_pull(task_ids='monitor_progress')

731

732

total_files_transferred = sum(

733

container_info.get('file_count', 0)

734

for container_info in transfer_summary.values()

735

)

736

737

if total_files_transferred > 0:

738

print(f"Successfully transferred {total_files_transferred} files")

739

740

# Optional: Delete source files from S3 after successful transfer

741

# Uncomment the following lines if cleanup is desired

742

"""

743

s3_hook = S3Hook(aws_conn_id='aws_conn')

744

source_files = context['task_instance'].xcom_pull(task_ids='discover_files')['s3_files']

745

746

for s3_key in source_files:

747

try:

748

s3_hook.delete_objects(

749

bucket='source-data-bucket',

750

keys=[s3_key]

751

)

752

print(f"Deleted source file: {s3_key}")

753

except Exception as e:

754

print(f"Failed to delete {s3_key}: {e}")

755

"""

756

757

return total_files_transferred

758

759

dag = DAG(

760

'cloud_to_azure_transfers',

761

default_args={

762

'owner': 'integration-team',

763

'retries': 2,

764

'retry_delay': timedelta(minutes=5)

765

},

766

description='Transfer files from various cloud sources to Azure',

767

schedule_interval=timedelta(days=1),

768

start_date=datetime(2024, 1, 1),

769

catchup=False

770

)

771

772

# Discover source files

773

discover_files = PythonOperator(

774

task_id='discover_files',

775

python_callable=discover_source_files,

776

dag=dag

777

)

778

779

# Transfer from AWS S3 to Azure Blob Storage

780

transfer_s3_files = S3ToAzureBlobStorageOperator(

781

task_id='transfer_s3_data',

782

s3_source_key='daily-exports/{{ ds }}/sales_data.csv',

783

s3_bucket='source-data-bucket',

784

container_name='s3-transfers',

785

blob_name='{{ ds }}/sales_data.csv',

786

aws_conn_id='aws_conn',

787

wasb_conn_id='azure_blob_conn',

788

create_container=True,

789

overwrite=True,

790

s3_extra_args={

791

'ServerSideEncryption': 'AES256'

792

},

793

wasb_extra_args={

794

'content_settings': {

795

'content_type': 'text/csv',

796

'cache_control': 'no-cache'

797

},

798

'metadata': {

799

'source': 's3',

800

'transfer_date': '{{ ds }}',

801

'original_bucket': 'source-data-bucket'

802

}

803

},

804

dag=dag

805

)

806

807

# Transfer from SFTP to Azure Blob Storage

808

transfer_sftp_files = SFTPToWasbOperator(

809

task_id='transfer_sftp_data',

810

sftp_source_path='/exports/{{ ds }}/inventory_update.json',

811

container_name='sftp-transfers',

812

blob_name='{{ ds }}/inventory_update.json',

813

sftp_conn_id='sftp_conn',

814

wasb_conn_id='azure_blob_conn',

815

create_container=True,

816

overwrite=True,

817

move_object=False, # Keep original file on SFTP server

818

dag=dag

819

)

820

821

# Transfer additional S3 files with pattern matching

822

transfer_s3_logs = S3ToAzureBlobStorageOperator(

823

task_id='transfer_s3_logs',

824

s3_source_key='logs/{{ ds }}/application.log',

825

s3_bucket='source-data-bucket',

826

container_name='s3-transfers',

827

blob_name='{{ ds }}/logs/application.log',

828

aws_conn_id='aws_conn',

829

wasb_conn_id='azure_blob_conn',

830

create_container=True,

831

overwrite=True,

832

dag=dag

833

)

834

835

# Monitor transfer progress

836

monitor_progress = PythonOperator(

837

task_id='monitor_progress',

838

python_callable=monitor_transfer_progress,

839

dag=dag

840

)

841

842

# Optional cleanup

843

cleanup_sources = PythonOperator(

844

task_id='cleanup_sources',

845

python_callable=cleanup_source_files,

846

dag=dag

847

)

848

849

# Set up dependencies

850

discover_files >> [transfer_s3_files, transfer_sftp_files, transfer_s3_logs]

851

[transfer_s3_files, transfer_sftp_files, transfer_s3_logs] >> monitor_progress >> cleanup_sources

852

```

853

854

### Batch Transfer Operations

855

856

```python

857

from airflow import DAG

858

from airflow.providers.microsoft.azure.transfers.s3_to_wasb import S3ToAzureBlobStorageOperator

859

from airflow.operators.python import PythonOperator

860

from datetime import datetime, timedelta

861

862

def create_dynamic_transfer_tasks(**context):

863

"""Create transfer tasks dynamically based on available files."""

864

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

865

from airflow.models import TaskInstance

866

867

s3_hook = S3Hook(aws_conn_id='aws_conn')

868

869

# List all files in the source bucket for the current date

870

source_prefix = f"batch-export/{context['ds']}/"

871

files = s3_hook.list_keys(

872

bucket_name='batch-data-source',

873

prefix=source_prefix,

874

delimiter=''

875

)

876

877

if not files:

878

print("No files found for transfer")

879

return []

880

881

# Filter files by type and size

882

transfer_jobs = []

883

884

for file_key in files:

885

file_info = s3_hook.get_key(file_key, bucket_name='batch-data-source')

886

file_size = file_info.size if file_info else 0

887

file_name = file_key.split('/')[-1]

888

889

# Skip very large files (>1GB) or very small files (<1KB)

890

if file_size > 1024**3 or file_size < 1024:

891

print(f"Skipping {file_name}: size {file_size} bytes")

892

continue

893

894

# Determine target container based on file type

895

file_extension = file_name.split('.')[-1].lower()

896

container_mapping = {

897

'csv': 'structured-data',

898

'json': 'json-data',

899

'xml': 'xml-data',

900

'txt': 'text-data',

901

'parquet': 'columnar-data'

902

}

903

904

target_container = container_mapping.get(file_extension, 'unclassified-data')

905

906

transfer_job = {

907

'source_key': file_key,

908

'target_container': target_container,

909

'target_blob': f"{context['ds']}/{file_name}",

910

'file_size': file_size,

911

'file_type': file_extension

912

}

913

914

transfer_jobs.append(transfer_job)

915

916

print(f"Created {len(transfer_jobs)} transfer jobs")

917

return transfer_jobs

918

919

def execute_batch_transfers(**context):

920

"""Execute batch transfers with error handling and retries."""

921

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

922

from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

923

924

transfer_jobs = context['task_instance'].xcom_pull(task_ids='create_transfer_jobs')

925

926

if not transfer_jobs:

927

print("No transfer jobs to execute")

928

return {'completed': 0, 'failed': 0, 'skipped': 0}

929

930

s3_hook = S3Hook(aws_conn_id='aws_conn')

931

wasb_hook = WasbHook(wasb_conn_id='azure_blob_conn')

932

933

results = {

934

'completed': 0,

935

'failed': 0,

936

'skipped': 0,

937

'transfer_details': []

938

}

939

940

for job in transfer_jobs:

941

try:

942

source_key = job['source_key']

943

target_container = job['target_container']

944

target_blob = job['target_blob']

945

946

print(f"Transferring {source_key} -> {target_container}/{target_blob}")

947

948

# Check if target already exists

949

if wasb_hook.check_for_blob(target_container, target_blob):

950

print(f"Target blob already exists, skipping: {target_blob}")

951

results['skipped'] += 1

952

continue

953

954

# Create container if it doesn't exist

955

try:

956

wasb_hook.create_container(target_container)

957

except Exception:

958

pass # Container might already exist

959

960

# Download from S3

961

s3_object = s3_hook.get_key(source_key, bucket_name='batch-data-source')

962

file_content = s3_object.get()['Body'].read()

963

964

# Upload to Azure Blob Storage

965

wasb_hook.load_bytes(

966

data=file_content,

967

container_name=target_container,

968

blob_name=target_blob,

969

overwrite=True

970

)

971

972

# Set metadata

973

wasb_hook.get_blob_client(

974

container=target_container,

975

blob=target_blob

976

).set_blob_metadata({

977

'source': 's3',

978

'source_bucket': 'batch-data-source',

979

'source_key': source_key,

980

'transfer_date': context['ds'],

981

'file_size': str(job['file_size']),

982

'file_type': job['file_type']

983

})

984

985

results['completed'] += 1

986

results['transfer_details'].append({

987

'source': source_key,

988

'target': f"{target_container}/{target_blob}",

989

'status': 'completed',

990

'size_bytes': job['file_size']

991

})

992

993

print(f"Successfully transferred: {source_key}")

994

995

except Exception as e:

996

print(f"Failed to transfer {job['source_key']}: {e}")

997

results['failed'] += 1

998

results['transfer_details'].append({

999

'source': job['source_key'],

1000

'target': f"{job['target_container']}/{job['target_blob']}",

1001

'status': 'failed',

1002

'error': str(e)

1003

})

1004

1005

print(f"Batch transfer completed: {results['completed']} successful, "

1006

f"{results['failed']} failed, {results['skipped']} skipped")

1007

1008

return results

1009

1010

dag = DAG(

1011

'batch_transfer_workflow',

1012

default_args={

1013

'owner': 'batch-processing-team',

1014

'retries': 1,

1015

'retry_delay': timedelta(minutes=5)

1016

},

1017

description='Batch transfer workflow for multiple files',

1018

schedule_interval=timedelta(days=1),

1019

start_date=datetime(2024, 1, 1),

1020

catchup=False

1021

)

1022

1023

# Create dynamic transfer jobs

1024

create_jobs = PythonOperator(

1025

task_id='create_transfer_jobs',

1026

python_callable=create_dynamic_transfer_tasks,

1027

dag=dag

1028

)

1029

1030

# Execute batch transfers

1031

execute_transfers = PythonOperator(

1032

task_id='execute_batch_transfers',

1033

python_callable=execute_batch_transfers,

1034

dag=dag

1035

)

1036

1037

create_jobs >> execute_transfers

1038

```

1039

1040

## Error Handling and Best Practices

1041

1042

### Transfer Operation Error Handling

1043

1044

```python

1045

from airflow.providers.microsoft.azure.transfers.s3_to_wasb import (

1046

S3ToAzureBlobStorageOperator,

1047

TooManyFilesToMoveException,

1048

InvalidAzureBlobParameters,

1049

InvalidKeyComponents

1050

)

1051

from airflow.exceptions import AirflowException

1052

1053

def robust_transfer_with_error_handling():

1054

"""Demonstrate comprehensive error handling for transfer operations."""

1055

1056

try:

1057

# Example of handling specific transfer exceptions

1058

operator = S3ToAzureBlobStorageOperator(

1059

task_id='safe_transfer',

1060

s3_source_key='large-dataset/data.csv',

1061

container_name='target-container',

1062

blob_name='processed-data.csv',

1063

aws_conn_id='aws_conn',

1064

wasb_conn_id='azure_conn'

1065

)

1066

1067

# This would be called by Airflow's execution engine

1068

# result = operator.execute(context)

1069

1070

except TooManyFilesToMoveException as e:

1071

print(f"Too many files in transfer operation: {e}")

1072

# Implement chunking or batch processing

1073

1074

except InvalidAzureBlobParameters as e:

1075

print(f"Invalid blob parameters: {e}")

1076

# Validate and correct blob parameters

1077

1078

except InvalidKeyComponents as e:

1079

print(f"Invalid key components: {e}")

1080

# Validate and correct file path components

1081

1082

except Exception as e:

1083

print(f"Unexpected transfer error: {e}")

1084

raise AirflowException(f"Transfer failed: {e}")

1085

1086

def implement_transfer_validation():

1087

"""Implement validation patterns for transfer operations."""

1088

1089

def validate_source_file(source_path: str, min_size: int = 1024) -> bool:

1090

"""Validate source file before transfer."""

1091

import os

1092

1093

if not os.path.exists(source_path):

1094

raise FileNotFoundError(f"Source file not found: {source_path}")

1095

1096

file_size = os.path.getsize(source_path)

1097

if file_size < min_size:

1098

raise ValueError(f"File too small: {file_size} bytes < {min_size} bytes")

1099

1100

return True

1101

1102

def validate_target_parameters(container_name: str, blob_name: str) -> bool:

1103

"""Validate target parameters."""

1104

if not container_name or len(container_name) < 3:

1105

raise InvalidAzureBlobParameters("Container name must be at least 3 characters")

1106

1107

if not blob_name or blob_name.startswith('/'):

1108

raise InvalidKeyComponents("Blob name cannot start with '/'")

1109

1110

return True

1111

1112

def validate_transfer_result(source_size: int, target_size: int, tolerance: float = 0.01) -> bool:

1113

"""Validate transfer result by comparing sizes."""

1114

if abs(source_size - target_size) > (source_size * tolerance):

1115

raise ValueError(f"Size mismatch: source={source_size}, target={target_size}")

1116

1117

return True

1118

1119

return {

1120

'validate_source': validate_source_file,

1121

'validate_target': validate_target_parameters,

1122

'validate_result': validate_transfer_result

1123

}

1124

1125

def implement_transfer_monitoring():

1126

"""Implement monitoring for transfer operations."""

1127

1128

class TransferMonitor:

1129

def __init__(self):

1130

self.transfer_stats = {

1131

'start_time': None,

1132

'end_time': None,

1133

'bytes_transferred': 0,

1134

'transfer_rate_mbps': 0,

1135

'status': 'pending'

1136

}

1137

1138

def start_transfer(self):

1139

"""Mark transfer start time."""

1140

import time

1141

self.transfer_stats['start_time'] = time.time()

1142

self.transfer_stats['status'] = 'in_progress'

1143

1144

def update_progress(self, bytes_transferred: int):

1145

"""Update transfer progress."""

1146

self.transfer_stats['bytes_transferred'] = bytes_transferred

1147

1148

if self.transfer_stats['start_time']:

1149

import time

1150

elapsed_time = time.time() - self.transfer_stats['start_time']

1151

if elapsed_time > 0:

1152

rate_bps = bytes_transferred / elapsed_time

1153

self.transfer_stats['transfer_rate_mbps'] = rate_bps / (1024 * 1024)

1154

1155

def complete_transfer(self):

1156

"""Mark transfer completion."""

1157

import time

1158

self.transfer_stats['end_time'] = time.time()

1159

self.transfer_stats['status'] = 'completed'

1160

1161

if self.transfer_stats['start_time']:

1162

total_time = self.transfer_stats['end_time'] - self.transfer_stats['start_time']

1163

print(f"Transfer completed in {total_time:.2f} seconds")

1164

print(f"Average rate: {self.transfer_stats['transfer_rate_mbps']:.2f} MB/s")

1165

1166

def get_stats(self):

1167

"""Get transfer statistics."""

1168

return self.transfer_stats.copy()

1169

1170

return TransferMonitor

1171

```

1172

1173

## Performance Optimization

1174

1175

### Optimizing Transfer Operations

1176

1177

```python

1178

def optimize_large_file_transfers():

1179

"""Optimize transfers for large files."""

1180

1181

# Configuration for large file transfers

1182

large_file_config = {

1183

'chunk_size': 64 * 1024 * 1024, # 64MB chunks

1184

'max_connections': 10, # Parallel connections

1185

'timeout': 300, # 5 minute timeout per chunk

1186

'retry_attempts': 3, # Retry failed chunks

1187

'use_compression': True # Compress during transfer

1188

}

1189

1190

# Configuration for small file batch transfers

1191

batch_config = {

1192

'batch_size': 100, # Files per batch

1193

'parallel_batches': 5, # Concurrent batches

1194

'batch_timeout': 600, # 10 minute timeout per batch

1195

'skip_existing': True # Skip existing files

1196

}

1197

1198

return {

1199

'large_files': large_file_config,

1200

'batch_processing': batch_config

1201

}

1202

1203

def implement_transfer_caching():

1204

"""Implement caching for frequently transferred files."""

1205

1206

class TransferCache:

1207

def __init__(self):

1208

self.cache = {}

1209

self.cache_ttl = 3600 # 1 hour TTL

1210

1211

def get_cached_transfer(self, source_path: str) -> dict | None:

1212

"""Get cached transfer information."""

1213

import time

1214

1215

if source_path in self.cache:

1216

cache_entry = self.cache[source_path]

1217

if time.time() - cache_entry['timestamp'] < self.cache_ttl:

1218

return cache_entry['data']

1219

else:

1220

del self.cache[source_path]

1221

1222

return None

1223

1224

def cache_transfer_result(self, source_path: str, result: dict):

1225

"""Cache transfer result."""

1226

import time

1227

1228

self.cache[source_path] = {

1229

'timestamp': time.time(),

1230

'data': result

1231

}

1232

1233

def clear_cache(self):

1234

"""Clear transfer cache."""

1235

self.cache.clear()

1236

1237

return TransferCache

1238

1239

def implement_parallel_transfers():

1240

"""Implement parallel transfer processing."""

1241

1242

import concurrent.futures

1243

import threading

1244

1245

class ParallelTransferManager:

1246

def __init__(self, max_workers: int = 5):

1247

self.max_workers = max_workers

1248

self.transfer_results = {}

1249

self.lock = threading.Lock()

1250

1251

def transfer_file(self, transfer_config: dict) -> dict:

1252

"""Transfer a single file."""

1253

try:

1254

# Simulate file transfer logic

1255

source = transfer_config['source']

1256

target = transfer_config['target']

1257

1258

# Actual transfer implementation would go here

1259

result = {

1260

'source': source,

1261

'target': target,

1262

'status': 'success',

1263

'size_bytes': transfer_config.get('size', 0)

1264

}

1265

1266

with self.lock:

1267

self.transfer_results[source] = result

1268

1269

return result

1270

1271

except Exception as e:

1272

error_result = {

1273

'source': transfer_config['source'],

1274

'status': 'failed',

1275

'error': str(e)

1276

}

1277

1278

with self.lock:

1279

self.transfer_results[transfer_config['source']] = error_result

1280

1281

return error_result

1282

1283

def execute_parallel_transfers(self, transfer_configs: list[dict]) -> dict:

1284

"""Execute multiple transfers in parallel."""

1285

1286

with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:

1287

# Submit all transfer tasks

1288

future_to_config = {

1289

executor.submit(self.transfer_file, config): config

1290

for config in transfer_configs

1291

}

1292

1293

# Wait for completion

1294

for future in concurrent.futures.as_completed(future_to_config):

1295

config = future_to_config[future]

1296

try:

1297

result = future.result()

1298

print(f"Completed: {config['source']} -> {result['status']}")

1299

except Exception as e:

1300

print(f"Failed: {config['source']} -> {e}")

1301

1302

# Return summary

1303

successful = sum(1 for r in self.transfer_results.values() if r['status'] == 'success')

1304

failed = len(self.transfer_results) - successful

1305

1306

return {

1307

'total': len(transfer_configs),

1308

'successful': successful,

1309

'failed': failed,

1310

'results': self.transfer_results

1311

}

1312

1313

return ParallelTransferManager

1314

```

1315

1316

This comprehensive documentation covers all data transfer capabilities in the Apache Airflow Microsoft Azure Provider, including local-to-Azure transfers, database-to-Azure transfers, cloud-to-cloud transfers, error handling patterns, and performance optimization techniques.