or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md

blob-storage.mddocs/

0

# Azure Blob Storage

1

2

Comprehensive Azure Blob Storage integration providing full blob operations, container management, file transfers, and monitoring capabilities. Supports both synchronous and asynchronous operations with extensive configuration options.

3

4

## Capabilities

5

6

### Blob Storage Hook

7

8

The primary interface for Azure Blob Storage operations, providing authenticated connections and core blob functionality.

9

10

```python { .api }

11

class WasbHook(AzureBaseHook):

12

"""

13

Hook for Azure Blob Storage (WASB) operations.

14

15

Provides methods for blob operations, container management, and file transfers.

16

Supports multiple authentication methods and connection configurations.

17

"""

18

19

def get_conn(self) -> BlobServiceClient:

20

"""Get authenticated Azure Blob Service client."""

21

22

def check_for_blob(self, container_name: str, blob_name: str, **kwargs) -> bool:

23

"""

24

Check if a blob exists in the specified container.

25

26

Args:

27

container_name (str): Name of the container

28

blob_name (str): Name of the blob to check

29

30

Returns:

31

bool: True if blob exists, False otherwise

32

"""

33

34

def load_file(

35

self,

36

file_path: str,

37

container_name: str,

38

blob_name: str,

39

**kwargs

40

) -> None:

41

"""

42

Upload a local file to Azure Blob Storage.

43

44

Args:

45

file_path (str): Path to local file

46

container_name (str): Target container name

47

blob_name (str): Target blob name

48

"""

49

50

def load_string(

51

self,

52

string_data: str,

53

container_name: str,

54

blob_name: str,

55

**kwargs

56

) -> None:

57

"""

58

Upload string data to Azure Blob Storage.

59

60

Args:

61

string_data (str): String data to upload

62

container_name (str): Target container name

63

blob_name (str): Target blob name

64

"""

65

66

def check_for_prefix(self, container_name: str, prefix: str, **kwargs) -> bool:

67

"""

68

Check if any blobs exist with the given prefix.

69

70

Args:

71

container_name (str): Name of the container

72

prefix (str): Prefix to search for

73

74

Returns:

75

bool: True if blobs with prefix exist, False otherwise

76

"""

77

78

def get_blobs_list(

79

self,

80

container_name: str,

81

prefix: str | None = None,

82

include: list | None = None,

83

delimiter: str = "/",

84

**kwargs

85

) -> list:

86

"""

87

List blobs in container with optional prefix filtering.

88

89

Args:

90

container_name (str): Name of the container

91

prefix (str, optional): Filter blobs by prefix

92

include (list, optional): Additional properties to include

93

delimiter (str): Delimiter for blob hierarchy

94

95

Returns:

96

list: List of blob names

97

"""

98

99

def get_blobs_list_recursive(

100

self,

101

container_name: str,

102

prefix: str | None = None,

103

include: list | None = None,

104

endswith: str = "",

105

**kwargs

106

) -> list:

107

"""

108

Recursively list all blobs in container.

109

110

Args:

111

container_name (str): Name of the container

112

prefix (str, optional): Filter blobs by prefix

113

include (list, optional): Additional properties to include

114

endswith (str): Filter blobs ending with string

115

116

Returns:

117

list: List of all blob names recursively

118

"""

119

120

def read_file(self, container_name: str, blob_name: str, **kwargs) -> bytes:

121

"""

122

Download blob content as bytes.

123

124

Args:

125

container_name (str): Name of the container

126

blob_name (str): Name of the blob

127

128

Returns:

129

bytes: Blob content as bytes

130

"""

131

132

def get_file(self, file_path: str, container_name: str, blob_name: str, **kwargs) -> None:

133

"""

134

Download blob to local file.

135

136

Args:

137

file_path (str): Local file path to save blob

138

container_name (str): Name of the container

139

blob_name (str): Name of the blob

140

"""

141

142

def upload(

143

self,

144

container_name: str,

145

blob_name: str,

146

data: Any,

147

blob_type: str = "BlockBlob",

148

length: int | None = None,

149

**kwargs

150

) -> dict[str, Any]:

151

"""

152

Upload data to blob with advanced options.

153

154

Args:

155

container_name (str): Name of the container

156

blob_name (str): Name of the blob

157

data: Data to upload

158

blob_type (str): Type of blob (BlockBlob, PageBlob, AppendBlob)

159

length (int, optional): Length of data

160

161

Returns:

162

dict: Upload response metadata

163

"""

164

165

def download(

166

self,

167

container_name: str,

168

blob_name: str,

169

offset: int | None = None,

170

length: int | None = None,

171

**kwargs

172

) -> StorageStreamDownloader:

173

"""

174

Download blob with range support.

175

176

Args:

177

container_name (str): Name of the container

178

blob_name (str): Name of the blob

179

offset (int, optional): Start offset for partial download

180

length (int, optional): Number of bytes to download

181

182

Returns:

183

StorageStreamDownloader: Stream downloader object

184

"""

185

186

def create_container(self, container_name: str, **kwargs) -> None:

187

"""

188

Create a new container.

189

190

Args:

191

container_name (str): Name of the container to create

192

"""

193

194

def delete_container(self, container_name: str, **kwargs) -> None:

195

"""

196

Delete an existing container.

197

198

Args:

199

container_name (str): Name of the container to delete

200

"""

201

202

def delete_blobs(self, container_name: str, *blobs, **kwargs) -> None:

203

"""

204

Delete multiple blobs from container.

205

206

Args:

207

container_name (str): Name of the container

208

*blobs: Variable number of blob names to delete

209

"""

210

211

def copy_blobs(

212

self,

213

source_container: str,

214

source_blob: str,

215

destination_container: str,

216

destination_blob: str,

217

**kwargs

218

) -> None:

219

"""

220

Copy blob from source to destination.

221

222

Args:

223

source_container (str): Source container name

224

source_blob (str): Source blob name

225

destination_container (str): Destination container name

226

destination_blob (str): Destination blob name

227

"""

228

229

def delete_file(

230

self,

231

container_name: str,

232

blob_name: str,

233

is_prefix: bool = False,

234

ignore_if_missing: bool = False,

235

**kwargs

236

) -> None:

237

"""

238

Delete blob(s) from container.

239

240

Args:

241

container_name (str): Name of the container

242

blob_name (str): Name of the blob or prefix

243

is_prefix (bool): Whether to delete all blobs with prefix

244

ignore_if_missing (bool): Don't raise error if blob doesn't exist

245

"""

246

```

247

248

### Async Blob Storage Hook

249

250

Async version of the WASB hook for deferrable operations and high-performance scenarios.

251

252

```python { .api }

253

class WasbAsyncHook(WasbHook):

254

"""

255

Async hook for Azure Blob Storage operations.

256

257

Provides async methods for blob operations with improved performance

258

for deferrable tasks and high-concurrency scenarios.

259

"""

260

261

async def get_async_conn(self) -> AsyncBlobServiceClient:

262

"""Get async Azure Blob Service client."""

263

264

async def check_for_blob_async(self, container_name: str, blob_name: str, **kwargs) -> bool:

265

"""Async version of check_for_blob."""

266

267

async def get_blobs_list_async(

268

self,

269

container_name: str,

270

prefix: str | None = None,

271

include: list | None = None,

272

delimiter: str = "/",

273

**kwargs

274

) -> list:

275

"""Async version of get_blobs_list."""

276

277

async def check_for_prefix_async(self, container_name: str, prefix: str, **kwargs) -> bool:

278

"""Async version of check_for_prefix."""

279

container_name: str,

280

blob_name: str,

281

**kwargs

282

) -> None:

283

"""

284

Upload string data as a blob.

285

286

Args:

287

string_data (str): String content to upload

288

container_name (str): Target container name

289

blob_name (str): Target blob name

290

"""

291

292

def read_file(self, container_name: str, blob_name: str, **kwargs) -> bytes:

293

"""

294

Download blob content as bytes.

295

296

Args:

297

container_name (str): Container name

298

blob_name (str): Blob name

299

300

Returns:

301

bytes: Blob content

302

"""

303

304

def delete_file(

305

self,

306

container_name: str,

307

blob_name: str,

308

is_prefix: bool = False,

309

ignore_if_missing: bool = False

310

) -> None:

311

"""

312

Delete a blob from the container.

313

314

Args:

315

container_name (str): Container name

316

blob_name (str): Blob name or prefix

317

is_prefix (bool): Whether to delete all blobs with this prefix

318

ignore_if_missing (bool): Don't raise error if blob doesn't exist

319

"""

320

321

def get_file(

322

self,

323

file_path: str,

324

container_name: str,

325

blob_name: str,

326

**kwargs

327

) -> None:

328

"""

329

Download a blob to local file.

330

331

Args:

332

file_path (str): Local file path to save to

333

container_name (str): Container name

334

blob_name (str): Blob name

335

"""

336

337

def create_container(self, container_name: str, **kwargs) -> None:

338

"""

339

Create a new container.

340

341

Args:

342

container_name (str): Name of container to create

343

"""

344

345

def delete_container(self, container_name: str, **kwargs) -> None:

346

"""

347

Delete a container and all its blobs.

348

349

Args:

350

container_name (str): Name of container to delete

351

"""

352

```

353

354

### Async Blob Storage Hook

355

356

Asynchronous version of the blob storage hook for non-blocking operations.

357

358

```python { .api }

359

class WasbAsyncHook(WasbHook):

360

"""Async hook for Azure Blob Storage operations."""

361

362

async def get_conn(self) -> BlobServiceClient:

363

"""Get authenticated async Azure Blob Service client."""

364

365

async def check_for_blob(self, container_name: str, blob_name: str) -> bool:

366

"""Async check if a blob exists."""

367

```

368

369

### Blob Deletion Operator

370

371

Operator for deleting blobs from Azure Blob Storage containers.

372

373

```python { .api }

374

class WasbDeleteBlobOperator(BaseOperator):

375

"""

376

Delete blobs from Azure Blob Storage.

377

378

Supports deleting single blobs or multiple blobs using prefix matching.

379

"""

380

381

def __init__(

382

self,

383

container_name: str,

384

blob_name: str,

385

wasb_conn_id: str = "wasb_default",

386

is_prefix: bool = False,

387

ignore_if_missing: bool = False,

388

**kwargs

389

):

390

"""

391

Initialize blob deletion operator.

392

393

Args:

394

container_name (str): Azure container name

395

blob_name (str): Blob name or prefix to delete

396

wasb_conn_id (str): Airflow connection ID for Azure Blob Storage

397

is_prefix (bool): Whether to delete all blobs with this prefix

398

ignore_if_missing (bool): Don't fail if blob doesn't exist

399

"""

400

```

401

402

### Blob Existence Sensor

403

404

Sensor that waits for a blob to exist in Azure Blob Storage.

405

406

```python { .api }

407

class WasbBlobSensor(BaseSensorOperator):

408

"""

409

Sensor that waits for a blob to exist in Azure Blob Storage.

410

411

Polls the blob storage container at regular intervals until the specified

412

blob is found or timeout is reached.

413

"""

414

415

def __init__(

416

self,

417

container_name: str,

418

blob_name: str,

419

wasb_conn_id: str = "wasb_default",

420

check_options: dict | None = None,

421

**kwargs

422

):

423

"""

424

Initialize blob sensor.

425

426

Args:

427

container_name (str): Azure container name to monitor

428

blob_name (str): Blob name to wait for

429

wasb_conn_id (str): Airflow connection ID for Azure Blob Storage

430

check_options (dict): Additional options for blob checking

431

"""

432

433

def poke(self, context: dict) -> bool:

434

"""Check if the blob exists."""

435

```

436

437

### Blob Prefix Sensor

438

439

Sensor that waits for blobs matching a prefix pattern.

440

441

```python { .api }

442

class WasbPrefixSensor(BaseSensorOperator):

443

"""

444

Sensor that waits for blobs with a specific prefix in Azure Blob Storage.

445

446

Useful for waiting for multiple files or files with unknown exact names

447

but known prefix patterns.

448

"""

449

450

def __init__(

451

self,

452

container_name: str,

453

prefix: str,

454

wasb_conn_id: str = "wasb_default",

455

**kwargs

456

):

457

"""

458

Initialize prefix sensor.

459

460

Args:

461

container_name (str): Azure container name to monitor

462

prefix (str): Blob name prefix to match

463

wasb_conn_id (str): Airflow connection ID for Azure Blob Storage

464

"""

465

```

466

467

### Async Blob Triggers

468

469

Deferrable triggers for blob monitoring that don't block worker slots.

470

471

```python { .api }

472

class WasbBlobSensorTrigger(BaseTrigger):

473

"""Async trigger for blob existence monitoring."""

474

475

def __init__(

476

self,

477

container_name: str,

478

blob_name: str,

479

wasb_conn_id: str,

480

poke_interval: float = 60,

481

**kwargs

482

):

483

"""

484

Initialize blob sensor trigger.

485

486

Args:

487

container_name (str): Container name to monitor

488

blob_name (str): Blob name to wait for

489

wasb_conn_id (str): Connection ID

490

poke_interval (float): Polling interval in seconds

491

"""

492

493

class WasbPrefixSensorTrigger(BaseTrigger):

494

"""Async trigger for blob prefix monitoring."""

495

496

def __init__(

497

self,

498

container_name: str,

499

prefix: str,

500

wasb_conn_id: str,

501

poke_interval: float = 60,

502

**kwargs

503

):

504

"""

505

Initialize prefix sensor trigger.

506

507

Args:

508

container_name (str): Container name to monitor

509

prefix (str): Blob prefix to match

510

wasb_conn_id (str): Connection ID

511

poke_interval (float): Polling interval in seconds

512

"""

513

```

514

515

### File Transfer Operators

516

517

Transfer data between local filesystem and Azure Blob Storage.

518

519

```python { .api }

520

class LocalFilesystemToWasbOperator(BaseOperator):

521

"""Transfer files from local filesystem to Azure Blob Storage."""

522

523

def __init__(

524

self,

525

file_path: str,

526

container_name: str,

527

blob_name: str,

528

wasb_conn_id: str = "wasb_default",

529

create_container: bool = False,

530

**kwargs

531

):

532

"""

533

Initialize local to WASB transfer operator.

534

535

Args:

536

file_path (str): Local file path to upload

537

container_name (str): Target container name

538

blob_name (str): Target blob name

539

wasb_conn_id (str): Connection ID

540

create_container (bool): Create container if it doesn't exist

541

"""

542

543

class SFTPToWasbOperator(BaseOperator):

544

"""Transfer files from SFTP server to Azure Blob Storage."""

545

546

def __init__(

547

self,

548

sftp_source_path: str,

549

container_name: str,

550

blob_name: str,

551

sftp_conn_id: str = "sftp_default",

552

wasb_conn_id: str = "wasb_default",

553

**kwargs

554

):

555

"""

556

Initialize SFTP to WASB transfer operator.

557

558

Args:

559

sftp_source_path (str): Source file path on SFTP server

560

container_name (str): Target container name

561

blob_name (str): Target blob name

562

sftp_conn_id (str): SFTP connection ID

563

wasb_conn_id (str): WASB connection ID

564

"""

565

566

class S3ToAzureBlobStorageOperator(BaseOperator):

567

"""Transfer objects from AWS S3 to Azure Blob Storage."""

568

569

def __init__(

570

self,

571

s3_source_key: str,

572

container_name: str,

573

blob_name: str,

574

s3_bucket: str | None = None,

575

aws_conn_id: str = "aws_default",

576

wasb_conn_id: str = "wasb_default",

577

**kwargs

578

):

579

"""

580

Initialize S3 to Azure Blob transfer operator.

581

582

Args:

583

s3_source_key (str): S3 object key to transfer

584

container_name (str): Target Azure container name

585

blob_name (str): Target blob name

586

s3_bucket (str): Source S3 bucket name

587

aws_conn_id (str): AWS connection ID

588

wasb_conn_id (str): WASB connection ID

589

"""

590

```

591

592

### Azure Blob Filesystem Interface

593

594

fsspec-compatible filesystem interface for Azure Blob Storage.

595

596

```python { .api }

597

def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) -> AbstractFileSystem:

598

"""

599

Create Azure Blob FileSystem (fsspec-compatible).

600

601

Args:

602

conn_id (str): Airflow connection ID for Azure storage

603

storage_options (dict): Additional storage configuration options

604

605

Returns:

606

AbstractFileSystem: fsspec filesystem for Azure Blob Storage

607

"""

608

```

609

610

### Logging Handler

611

612

Task log handler that writes Airflow logs to Azure Blob Storage.

613

614

```python { .api }

615

class WasbTaskHandler(FileTaskHandler):

616

"""Airflow task handler that writes logs to Azure Blob Storage."""

617

618

def __init__(

619

self,

620

base_log_folder: str,

621

wasb_log_folder: str,

622

wasb_container: str,

623

filename_template: str | None = None,

624

**kwargs

625

):

626

"""

627

Initialize WASB task handler.

628

629

Args:

630

base_log_folder (str): Base folder for local logs

631

wasb_log_folder (str): WASB folder for remote logs

632

wasb_container (str): WASB container for logs

633

filename_template (str): Log filename template

634

"""

635

636

class WasbRemoteLogIO:

637

"""Low-level I/O operations for WASB logging."""

638

639

def upload_log(self, log_content: str, remote_log_location: str) -> None:

640

"""Upload log content to WASB."""

641

642

def download_log(self, remote_log_location: str) -> str:

643

"""Download log content from WASB."""

644

```

645

646

## Usage Examples

647

648

### Basic Blob Operations

649

650

```python

651

from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

652

653

# Initialize hook

654

wasb_hook = WasbHook(wasb_conn_id='azure_default')

655

656

# Upload a file

657

wasb_hook.load_file(

658

file_path='/path/to/local/file.txt',

659

container_name='my-container',

660

blob_name='uploaded-file.txt'

661

)

662

663

# Check if blob exists

664

exists = wasb_hook.check_for_blob(

665

container_name='my-container',

666

blob_name='uploaded-file.txt'

667

)

668

669

# Download blob content

670

content = wasb_hook.read_file(

671

container_name='my-container',

672

blob_name='uploaded-file.txt'

673

)

674

675

# Delete blob

676

wasb_hook.delete_file(

677

container_name='my-container',

678

blob_name='uploaded-file.txt'

679

)

680

```

681

682

### Using Sensors in DAGs

683

684

```python

685

from airflow import DAG

686

from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor, WasbPrefixSensor

687

from datetime import datetime

688

689

dag = DAG(

690

'blob_monitoring_example',

691

start_date=datetime(2024, 1, 1),

692

schedule_interval='@daily'

693

)

694

695

# Wait for specific blob

696

wait_for_file = WasbBlobSensor(

697

task_id='wait_for_input_file',

698

container_name='input-data',

699

blob_name='daily-export.csv',

700

wasb_conn_id='azure_default',

701

timeout=300,

702

poke_interval=30,

703

dag=dag

704

)

705

706

# Wait for files with prefix

707

wait_for_batch = WasbPrefixSensor(

708

task_id='wait_for_batch_files',

709

container_name='batch-data',

710

prefix='batch_2024_',

711

wasb_conn_id='azure_default',

712

dag=dag

713

)

714

```

715

716

## Connection Configuration

717

718

Azure Blob Storage connections support multiple authentication methods:

719

720

**Connection Type**: `wasb`

721

722

**Authentication Options**:

723

- **Account Key**: Use storage account name and key

724

- **SAS Token**: Use Shared Access Signature

725

- **Managed Identity**: Use Azure managed identity

726

- **Service Principal**: Use client credentials

727

728

**Connection Fields**:

729

- `account_name`: Azure storage account name

730

- `account_key`: Storage account access key (for key auth)

731

- `sas_token`: Shared access signature (for SAS auth)

732

- `client_id`: Service principal client ID

733

- `client_secret`: Service principal secret

734

- `tenant_id`: Azure tenant ID

735

736

## Error Handling

737

738

```python { .api }

739

# Custom exceptions for blob operations

740

class AzureBlobStorageException(AirflowException):

741

"""Base exception for Azure Blob Storage operations."""

742

743

class BlobNotFound(AzureBlobStorageException):

744

"""Raised when a blob is not found."""

745

746

class ContainerNotFound(AzureBlobStorageException):

747

"""Raised when a container is not found."""

748

```

749

750

The Azure Blob Storage integration provides comprehensive functionality for managing blob storage operations within Airflow workflows, with support for both simple file operations and complex data pipeline scenarios.