or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md

data-lake-storage.mddocs/

0

# Azure Data Lake Storage

1

2

Comprehensive Azure Data Lake Storage integration supporting both Gen1 and Gen2 with complete file system operations, directory management, data upload/download capabilities, and filesystem interface compatibility.

3

4

## Capabilities

5

6

### Data Lake Storage Gen1 Hook

7

8

Primary interface for Azure Data Lake Storage Gen1 operations, providing authenticated connections and file system functionality.

9

10

```python { .api }

11

class AzureDataLakeHook(BaseHook):

12

"""

13

Hook for Azure Data Lake Storage Gen1 operations.

14

15

Provides methods for file operations, directory management, and data transfers.

16

Supports Azure Active Directory authentication and connection configurations.

17

"""

18

19

def get_conn(self) -> core.AzureDLFileSystem:

20

"""

21

Get authenticated Azure Data Lake Storage Gen1 client.

22

23

Returns:

24

core.AzureDLFileSystem: ADLS Gen1 client instance

25

"""

26

27

def check_for_file(self, file_path: str) -> bool:

28

"""

29

Check if a file exists in Azure Data Lake Storage.

30

31

Args:

32

file_path (str): Path to the file to check

33

34

Returns:

35

bool: True if file exists, False otherwise

36

"""

37

38

def upload_file(

39

self,

40

local_path: str,

41

remote_path: str,

42

nthreads: int = 64,

43

overwrite: bool = True,

44

buffersize: int = 4194304,

45

blocksize: int = 4194304

46

) -> None:

47

"""

48

Upload a local file to Azure Data Lake Storage.

49

50

Args:

51

local_path (str): Path to local file

52

remote_path (str): Target path in ADLS

53

nthreads (int): Number of threads for upload (default: 64)

54

overwrite (bool): Whether to overwrite existing file (default: True)

55

buffersize (int): Buffer size for upload (default: 4194304)

56

blocksize (int): Block size for upload (default: 4194304)

57

"""

58

59

def download_file(

60

self,

61

local_path: str,

62

remote_path: str,

63

nthreads: int = 64,

64

overwrite: bool = True,

65

buffersize: int = 4194304,

66

blocksize: int = 4194304

67

) -> None:

68

"""

69

Download a file from Azure Data Lake Storage to local system.

70

71

Args:

72

local_path (str): Local destination path

73

remote_path (str): Source path in ADLS

74

nthreads (int): Number of threads for download (default: 64)

75

overwrite (bool): Whether to overwrite existing local file (default: True)

76

buffersize (int): Buffer size for download (default: 4194304)

77

blocksize (int): Block size for download (default: 4194304)

78

"""

79

80

def list(self, path: str) -> list:

81

"""

82

List directory contents in Azure Data Lake Storage.

83

84

Args:

85

path (str): Directory path to list

86

87

Returns:

88

list: List of files and directories in the path

89

"""

90

91

def remove(

92

self,

93

path: str,

94

recursive: bool = False,

95

ignore_not_found: bool = True

96

) -> None:

97

"""

98

Remove file or directory from Azure Data Lake Storage.

99

100

Args:

101

path (str): Path to file or directory to remove

102

recursive (bool): Whether to remove directories recursively (default: False)

103

ignore_not_found (bool): Don't raise error if path doesn't exist (default: True)

104

"""

105

```

106

107

### Data Lake Storage Gen2 Hook

108

109

Advanced interface for Azure Data Lake Storage Gen2 operations with hierarchical namespace support and enhanced capabilities.

110

111

```python { .api }

112

class AzureDataLakeStorageV2Hook(BaseHook):

113

"""

114

Hook for Azure Data Lake Storage Gen2 operations.

115

116

Provides methods for file system operations, directory management, and data transfers.

117

Supports multiple authentication methods including managed identity and service principal.

118

"""

119

120

def get_conn(self) -> DataLakeServiceClient:

121

"""

122

Get authenticated Azure Data Lake Storage Gen2 service client.

123

124

Returns:

125

DataLakeServiceClient: ADLS Gen2 service client instance

126

"""

127

128

def create_file_system(self, file_system_name: str) -> None:

129

"""

130

Create a new file system (container) in Azure Data Lake Storage Gen2.

131

132

Args:

133

file_system_name (str): Name of the file system to create

134

"""

135

136

def get_file_system(self, file_system: FileSystemProperties | str) -> FileSystemClient:

137

"""

138

Get file system client for operations within a specific file system.

139

140

Args:

141

file_system (FileSystemProperties | str): File system name or properties

142

143

Returns:

144

FileSystemClient: Client for file system operations

145

"""

146

147

def create_directory(

148

self,

149

file_system_name: FileSystemProperties | str,

150

directory_name: str,

151

metadata: dict[str, str] | None = None,

152

**kwargs

153

) -> DataLakeDirectoryClient:

154

"""

155

Create a directory in the specified file system.

156

157

Args:

158

file_system_name (FileSystemProperties | str): File system name or properties

159

directory_name (str): Name of the directory to create

160

metadata (dict[str, str] | None): Optional metadata for the directory

161

**kwargs: Additional arguments

162

163

Returns:

164

DataLakeDirectoryClient: Client for directory operations

165

"""

166

167

def get_directory_client(

168

self,

169

file_system_name: FileSystemProperties | str,

170

directory_name: str

171

) -> DataLakeDirectoryClient:

172

"""

173

Get directory client for operations within a specific directory.

174

175

Args:

176

file_system_name (FileSystemProperties | str): File system name or properties

177

directory_name (str): Directory name

178

179

Returns:

180

DataLakeDirectoryClient: Client for directory operations

181

"""

182

183

def create_file(

184

self,

185

file_system_name: FileSystemProperties | str,

186

file_name: str

187

) -> DataLakeFileClient:

188

"""

189

Create a new file in the specified file system.

190

191

Args:

192

file_system_name (FileSystemProperties | str): File system name or properties

193

file_name (str): Name of the file to create

194

195

Returns:

196

DataLakeFileClient: Client for file operations

197

"""

198

199

def upload_file(

200

self,

201

file_system_name: FileSystemProperties | str,

202

file_name: str,

203

file_path: str,

204

overwrite: bool = False,

205

**kwargs

206

) -> DataLakeFileClient:

207

"""

208

Upload a local file to Azure Data Lake Storage Gen2.

209

210

Args:

211

file_system_name (FileSystemProperties | str): File system name or properties

212

file_name (str): Target file name in ADLS

213

file_path (str): Path to local file

214

overwrite (bool): Whether to overwrite existing file (default: False)

215

**kwargs: Additional arguments

216

217

Returns:

218

DataLakeFileClient: Client for the uploaded file

219

"""

220

221

def upload_file_to_directory(

222

self,

223

file_system_name: FileSystemProperties | str,

224

directory_name: str,

225

file_name: str,

226

file_path: str,

227

overwrite: bool = False,

228

**kwargs

229

) -> DataLakeFileClient:

230

"""

231

Upload a local file to a specific directory in Azure Data Lake Storage Gen2.

232

233

Args:

234

file_system_name (FileSystemProperties | str): File system name or properties

235

directory_name (str): Target directory name

236

file_name (str): Target file name

237

file_path (str): Path to local file

238

overwrite (bool): Whether to overwrite existing file (default: False)

239

**kwargs: Additional arguments

240

241

Returns:

242

DataLakeFileClient: Client for the uploaded file

243

"""

244

245

def list_files_directory(

246

self,

247

file_system_name: FileSystemProperties | str,

248

directory_name: str | None = None

249

) -> list:

250

"""

251

List files in a directory within the file system.

252

253

Args:

254

file_system_name (FileSystemProperties | str): File system name or properties

255

directory_name (str | None): Directory to list (None for root)

256

257

Returns:

258

list: List of files and directories

259

"""

260

261

def list_file_system(

262

self,

263

prefix: str | None = None,

264

include_metadata: bool = False

265

) -> list:

266

"""

267

List all file systems in the storage account.

268

269

Args:

270

prefix (str | None): Filter file systems by prefix

271

include_metadata (bool): Whether to include metadata (default: False)

272

273

Returns:

274

list: List of file systems

275

"""

276

277

def delete_file_system(self, file_system_name: FileSystemProperties | str) -> None:

278

"""

279

Delete a file system from Azure Data Lake Storage Gen2.

280

281

Args:

282

file_system_name (FileSystemProperties | str): File system name or properties

283

"""

284

285

def delete_directory(

286

self,

287

file_system_name: FileSystemProperties | str,

288

directory_name: str

289

) -> None:

290

"""

291

Delete a directory from the specified file system.

292

293

Args:

294

file_system_name (FileSystemProperties | str): File system name or properties

295

directory_name (str): Directory name to delete

296

"""

297

298

def test_connection(self) -> tuple[bool, str]:

299

"""

300

Test the Azure Data Lake Storage Gen2 connection.

301

302

Returns:

303

tuple[bool, str]: Success status and message

304

"""

305

```

306

307

### Data Lake Storage Operators

308

309

Execute Azure Data Lake Storage operations as Airflow tasks with comprehensive file and directory management capabilities.

310

311

```python { .api }

312

class ADLSCreateObjectOperator(BaseOperator):

313

"""

314

Creates objects in Azure Data Lake Storage.

315

316

Supports creating both files and directories with configurable options

317

and metadata.

318

"""

319

320

def __init__(

321

self,

322

*,

323

azure_data_lake_conn_id: str = "azure_data_lake_default",

324

path: str,

325

data: Any = None,

326

length: int | None = None,

327

**kwargs

328

):

329

"""

330

Initialize ADLS create object operator.

331

332

Args:

333

azure_data_lake_conn_id (str): Airflow connection ID for ADLS

334

path (str): Path to create in ADLS

335

data (Any): Data to write to the object

336

length (int | None): Length of data to write

337

"""

338

339

class ADLSDeleteOperator(BaseOperator):

340

"""

341

Deletes objects from Azure Data Lake Storage.

342

343

Supports deleting files and directories with recursive deletion

344

and error handling options.

345

"""

346

347

def __init__(

348

self,

349

*,

350

azure_data_lake_conn_id: str = "azure_data_lake_default",

351

path: str,

352

recursive: bool = False,

353

ignore_not_found: bool = True,

354

**kwargs

355

):

356

"""

357

Initialize ADLS delete operator.

358

359

Args:

360

azure_data_lake_conn_id (str): Airflow connection ID for ADLS

361

path (str): Path to delete from ADLS

362

recursive (bool): Whether to delete directories recursively

363

ignore_not_found (bool): Don't raise error if path doesn't exist

364

"""

365

366

class ADLSListOperator(BaseOperator):

367

"""

368

Lists objects in Azure Data Lake Storage.

369

370

Provides directory listing capabilities with filtering and

371

detailed file information retrieval.

372

"""

373

374

def __init__(

375

self,

376

*,

377

azure_data_lake_conn_id: str = "azure_data_lake_default",

378

path: str,

379

**kwargs

380

):

381

"""

382

Initialize ADLS list operator.

383

384

Args:

385

azure_data_lake_conn_id (str): Airflow connection ID for ADLS

386

path (str): Path to list in ADLS

387

"""

388

```

389

390

### Filesystem Interface

391

392

Provides fsspec-compatible filesystem interface for Azure Data Lake Storage integration with data processing frameworks.

393

394

```python { .api }

395

def get_fs(

396

conn_id: str | None,

397

storage_options: dict[str, Any] | None = None

398

) -> AbstractFileSystem:

399

"""

400

Create Azure Blob FileSystem (fsspec-compatible) for Data Lake Storage.

401

402

Supports both ADLS Gen1 and Gen2 with automatic protocol detection

403

and credential management.

404

405

Args:

406

conn_id (str | None): Airflow connection ID for ADLS configuration

407

storage_options (dict[str, Any] | None): Additional storage options

408

409

Returns:

410

AbstractFileSystem: fsspec-compatible filesystem interface

411

412

Supported Schemes:

413

- abfs: Azure Data Lake Storage Gen2

414

- abfss: Azure Data Lake Storage Gen2 (secure)

415

- adl: Azure Data Lake Storage Gen1

416

"""

417

```

418

419

## Usage Examples

420

421

### Basic File Operations with ADLS Gen1

422

423

```python

424

from airflow import DAG

425

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook

426

from airflow.operators.python import PythonOperator

427

from datetime import datetime, timedelta

428

429

def upload_to_adls():

430

"""Upload file to Azure Data Lake Storage Gen1."""

431

hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')

432

433

# Upload local file

434

hook.upload_file(

435

local_path='/tmp/data.csv',

436

remote_path='/raw/data.csv',

437

overwrite=True

438

)

439

440

# Verify upload

441

if hook.check_for_file('/raw/data.csv'):

442

print("File uploaded successfully")

443

444

def process_adls_directory():

445

"""Process files in ADLS directory."""

446

hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')

447

448

# List directory contents

449

files = hook.list('/raw/')

450

print(f"Found {len(files)} files")

451

452

# Download and process each file

453

for file_info in files:

454

if file_info['name'].endswith('.csv'):

455

hook.download_file(

456

local_path=f"/tmp/{file_info['name']}",

457

remote_path=file_info['name']

458

)

459

460

dag = DAG(

461

'adls_gen1_workflow',

462

default_args={

463

'owner': 'data-team',

464

'retries': 1,

465

'retry_delay': timedelta(minutes=5)

466

},

467

description='ADLS Gen1 data processing workflow',

468

schedule_interval=timedelta(days=1),

469

start_date=datetime(2024, 1, 1),

470

catchup=False

471

)

472

473

upload_task = PythonOperator(

474

task_id='upload_to_adls',

475

python_callable=upload_to_adls,

476

dag=dag

477

)

478

479

process_task = PythonOperator(

480

task_id='process_directory',

481

python_callable=process_adls_directory,

482

dag=dag

483

)

484

485

upload_task >> process_task

486

```

487

488

### Advanced Operations with ADLS Gen2

489

490

```python

491

from airflow import DAG

492

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook

493

from airflow.providers.microsoft.azure.operators.adls import (

494

ADLSCreateObjectOperator,

495

ADLSListOperator,

496

ADLSDeleteOperator

497

)

498

from airflow.operators.python import PythonOperator

499

from datetime import datetime, timedelta

500

501

def setup_adls_structure():

502

"""Set up directory structure in ADLS Gen2."""

503

hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')

504

505

# Create file system if it doesn't exist

506

try:

507

hook.create_file_system('data-lake')

508

except Exception as e:

509

print(f"File system may already exist: {e}")

510

511

# Create directory structure

512

directories = ['raw', 'processed', 'archive']

513

for directory in directories:

514

hook.create_directory(

515

file_system_name='data-lake',

516

directory_name=directory,

517

metadata={'created_by': 'airflow', 'purpose': 'data_processing'}

518

)

519

520

def upload_with_metadata():

521

"""Upload file with custom metadata to ADLS Gen2."""

522

hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')

523

524

# Upload to specific directory

525

file_client = hook.upload_file_to_directory(

526

file_system_name='data-lake',

527

directory_name='raw',

528

file_name='sales_data.json',

529

file_path='/tmp/sales_data.json',

530

overwrite=True

531

)

532

533

# Set custom metadata

534

file_client.set_metadata({

535

'source': 'sales_system',

536

'format': 'json',

537

'upload_date': datetime.now().isoformat()

538

})

539

540

def list_and_process():

541

"""List files and process them."""

542

hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')

543

544

# List files in raw directory

545

files = hook.list_files_directory(

546

file_system_name='data-lake',

547

directory_name='raw'

548

)

549

550

for file_info in files:

551

print(f"Processing file: {file_info['name']}")

552

# File processing logic here

553

554

# Move processed file to archive

555

# (Implementation would involve download, process, upload to processed/)

556

557

dag = DAG(

558

'adls_gen2_advanced_workflow',

559

default_args={

560

'owner': 'data-team',

561

'retries': 2,

562

'retry_delay': timedelta(minutes=3)

563

},

564

description='Advanced ADLS Gen2 workflow with directory management',

565

schedule_interval=timedelta(hours=6),

566

start_date=datetime(2024, 1, 1),

567

catchup=False

568

)

569

570

# Setup directory structure

571

setup_task = PythonOperator(

572

task_id='setup_directories',

573

python_callable=setup_adls_structure,

574

dag=dag

575

)

576

577

# Upload data files

578

upload_task = PythonOperator(

579

task_id='upload_with_metadata',

580

python_callable=upload_with_metadata,

581

dag=dag

582

)

583

584

# List and process files

585

list_task = ADLSListOperator(

586

task_id='list_raw_files',

587

azure_data_lake_conn_id='adls_v2_conn',

588

path='raw/',

589

dag=dag

590

)

591

592

process_task = PythonOperator(

593

task_id='process_files',

594

python_callable=list_and_process,

595

dag=dag

596

)

597

598

# Clean up old files

599

cleanup_task = ADLSDeleteOperator(

600

task_id='cleanup_old_files',

601

azure_data_lake_conn_id='adls_v2_conn',

602

path='archive/old/',

603

recursive=True,

604

ignore_not_found=True,

605

dag=dag

606

)

607

608

setup_task >> upload_task >> list_task >> process_task >> cleanup_task

609

```

610

611

### Filesystem Interface Usage

612

613

```python

614

from airflow.providers.microsoft.azure.fs.adls import get_fs

615

import pandas as pd

616

617

def use_fsspec_interface():

618

"""Use fsspec interface for data processing."""

619

# Get filesystem instance

620

fs = get_fs(

621

conn_id='adls_v2_conn',

622

storage_options={'account_name': 'mystorageaccount'}

623

)

624

625

# Use with pandas for direct file access

626

df = pd.read_csv('abfs://data-lake/raw/sales_data.csv', storage_options={'account_name': 'mystorageaccount'})

627

628

# Process data

629

processed_df = df.groupby('region').sum()

630

631

# Write back using fsspec

632

processed_df.to_csv('abfs://data-lake/processed/sales_summary.csv', storage_options={'account_name': 'mystorageaccount'})

633

634

# List files using fsspec

635

files = fs.ls('abfs://data-lake/processed/')

636

print(f"Processed files: {files}")

637

```

638

639

## Connection Configuration

640

641

### ADLS Gen1 Connection (`azure_data_lake`)

642

643

Configure Azure Data Lake Storage Gen1 connections in Airflow:

644

645

```python

646

# Connection configuration for ADLS Gen1

647

{

648

"conn_id": "azure_data_lake_default",

649

"conn_type": "azure_data_lake",

650

"host": "mydatalake.azuredatalakestore.net",

651

"extra": {

652

"tenant_id": "your-tenant-id",

653

"client_id": "your-client-id",

654

"client_secret": "your-client-secret"

655

}

656

}

657

```

658

659

### ADLS Gen2 Connection (`adls`)

660

661

Configure Azure Data Lake Storage Gen2 connections in Airflow:

662

663

```python

664

# Connection configuration for ADLS Gen2

665

{

666

"conn_id": "adls_default",

667

"conn_type": "adls",

668

"login": "mystorageaccount", # Storage account name

669

"extra": {

670

"account_url": "https://mystorageaccount.dfs.core.windows.net",

671

"tenant_id": "your-tenant-id",

672

"client_id": "your-client-id",

673

"client_secret": "your-client-secret"

674

}

675

}

676

```

677

678

### Authentication Methods

679

680

Both ADLS Gen1 and Gen2 support multiple authentication methods:

681

682

1. **Service Principal Authentication**:

683

```python

684

extra = {

685

"tenant_id": "your-tenant-id",

686

"client_id": "your-client-id",

687

"client_secret": "your-client-secret"

688

}

689

```

690

691

2. **Managed Identity Authentication**:

692

```python

693

extra = {

694

"managed_identity_client_id": "your-managed-identity-client-id"

695

}

696

```

697

698

3. **Account Key Authentication** (Gen2 only):

699

```python

700

extra = {

701

"account_key": "your-storage-account-key"

702

}

703

```

704

705

4. **SAS Token Authentication** (Gen2 only):

706

```python

707

extra = {

708

"sas_token": "your-sas-token"

709

}

710

```

711

712

## Error Handling

713

714

### Common Exception Patterns

715

716

```python

717

from azure.core.exceptions import ResourceNotFoundError, ResourceExistsError

718

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook

719

720

def robust_file_operations():

721

"""Demonstrate error handling patterns."""

722

hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_conn')

723

724

try:

725

# Attempt file operation

726

hook.upload_file(

727

file_system_name='data-lake',

728

file_name='data.csv',

729

file_path='/tmp/data.csv'

730

)

731

except ResourceExistsError:

732

print("File already exists, skipping upload")

733

except ResourceNotFoundError:

734

print("File system doesn't exist, creating it first")

735

hook.create_file_system('data-lake')

736

# Retry upload

737

hook.upload_file(

738

file_system_name='data-lake',

739

file_name='data.csv',

740

file_path='/tmp/data.csv'

741

)

742

except Exception as e:

743

print(f"Unexpected error: {e}")

744

raise

745

```

746

747

### Connection Testing

748

749

```python

750

def test_adls_connections():

751

"""Test both ADLS Gen1 and Gen2 connections."""

752

753

# Test Gen1 connection

754

try:

755

gen1_hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_gen1_conn')

756

client = gen1_hook.get_conn()

757

files = gen1_hook.list('/')

758

print("ADLS Gen1 connection successful")

759

except Exception as e:

760

print(f"ADLS Gen1 connection failed: {e}")

761

762

# Test Gen2 connection

763

try:

764

gen2_hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_gen2_conn')

765

success, message = gen2_hook.test_connection()

766

print(f"ADLS Gen2 connection: {message}")

767

except Exception as e:

768

print(f"ADLS Gen2 connection failed: {e}")

769

```

770

771

## Performance Considerations

772

773

### Optimizing File Operations

774

775

```python

776

def optimized_bulk_upload():

777

"""Optimize bulk file uploads to ADLS."""

778

hook = AzureDataLakeHook(azure_data_lake_conn_id='adls_conn')

779

780

# Use multiple threads for large files

781

hook.upload_file(

782

local_path='/tmp/large_file.csv',

783

remote_path='/data/large_file.csv',

784

nthreads=128, # Increase threads for better performance

785

buffersize=8388608, # 8MB buffer for large files

786

blocksize=8388608

787

)

788

789

def batch_directory_operations():

790

"""Batch operations for better performance."""

791

hook = AzureDataLakeStorageV2Hook(adls_conn_id='adls_v2_conn')

792

793

# Get file system client once

794

fs_client = hook.get_file_system('data-lake')

795

796

# Batch multiple operations

797

files_to_upload = ['file1.csv', 'file2.json', 'file3.parquet']

798

799

for filename in files_to_upload:

800

file_client = hook.create_file('data-lake', f'batch/{filename}')

801

with open(f'/tmp/{filename}', 'rb') as data:

802

file_client.upload_data(data, overwrite=True)

803

```

804

805

This comprehensive documentation covers all Azure Data Lake Storage capabilities in the Apache Airflow Microsoft Azure Provider, including both Gen1 and Gen2 implementations, filesystem interfaces, and practical usage patterns for data lake operations.