or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md

data-factory.mddocs/

0

# Azure Data Factory

1

2

Execute and monitor Azure Data Factory pipelines with comprehensive pipeline management, run monitoring, status tracking, and factory management capabilities. Supports both synchronous and asynchronous operations for long-running pipeline executions.

3

4

## Capabilities

5

6

### Data Factory Hook

7

8

Primary interface for Azure Data Factory operations, providing authenticated connections and pipeline management functionality.

9

10

```python { .api }

11

class AzureDataFactoryHook(BaseHook):

12

"""

13

Hook for Azure Data Factory operations.

14

15

Provides methods for pipeline execution, monitoring, and factory management.

16

Supports multiple authentication methods and connection configurations.

17

"""

18

19

def get_conn(self) -> DataFactoryManagementClient:

20

"""Get authenticated Azure Data Factory Management client."""

21

22

def refresh_conn(self) -> DataFactoryManagementClient:

23

"""Refresh the Data Factory Management client connection."""

24

25

def get_factory(

26

self,

27

resource_group_name: str,

28

factory_name: str,

29

**config: Any

30

) -> Factory | None:

31

"""

32

Get Azure Data Factory details.

33

34

Args:

35

resource_group_name (str): Azure resource group name

36

factory_name (str): Data Factory name

37

**config: Additional configuration parameters

38

39

Returns:

40

Factory: Data Factory details or None if not found

41

"""

42

43

def update_factory(

44

self,

45

factory: Factory,

46

resource_group_name: str,

47

factory_name: str,

48

**config: Any

49

) -> Factory:

50

"""

51

Update Azure Data Factory configuration.

52

53

Args:

54

factory (Factory): Factory configuration object

55

resource_group_name (str): Azure resource group name

56

factory_name (str): Data Factory name

57

**config: Additional configuration parameters

58

59

Returns:

60

Factory: Updated factory details

61

"""

62

63

def create_factory(

64

self,

65

factory: Factory,

66

resource_group_name: str,

67

factory_name: str,

68

**config: Any

69

) -> Factory:

70

"""

71

Create a new Azure Data Factory.

72

73

Args:

74

factory (Factory): Factory configuration object

75

resource_group_name (str): Azure resource group name

76

factory_name (str): Data Factory name

77

**config: Additional configuration parameters

78

79

Returns:

80

Factory: Created factory details

81

"""

82

83

def delete_factory(

84

self,

85

resource_group_name: str,

86

factory_name: str,

87

**config: Any

88

) -> None:

89

"""

90

Delete Azure Data Factory.

91

92

Args:

93

resource_group_name (str): Azure resource group name

94

factory_name (str): Data Factory name to delete

95

**config: Additional configuration parameters

96

"""

97

98

def run_pipeline(

99

self,

100

pipeline_name: str,

101

resource_group_name: str,

102

factory_name: str,

103

reference_pipeline_run_id: str | None = None,

104

is_recovery: bool | None = None,

105

start_activity_name: str | None = None,

106

start_from_failure: bool | None = None,

107

parameters: dict[str, Any] | None = None,

108

**config: Any

109

) -> CreateRunResponse:

110

"""

111

Execute Azure Data Factory pipeline.

112

113

Args:

114

pipeline_name (str): Name of pipeline to execute

115

resource_group_name (str): Azure resource group name

116

factory_name (str): Data Factory name

117

reference_pipeline_run_id (str): Reference run ID for recovery

118

is_recovery (bool): Whether this is a recovery run

119

start_activity_name (str): Activity to start from

120

start_from_failure (bool): Start from previous failure point

121

parameters (dict): Pipeline parameters

122

**config: Additional configuration parameters

123

124

Returns:

125

CreateRunResponse: Pipeline run details with run ID

126

"""

127

128

def get_pipeline_run(

129

self,

130

run_id: str,

131

resource_group_name: str,

132

factory_name: str,

133

**config: Any

134

) -> PipelineRun:

135

"""

136

Get pipeline run details and status.

137

138

Args:

139

run_id (str): Pipeline run ID

140

resource_group_name (str): Azure resource group name

141

factory_name (str): Data Factory name

142

**config: Additional configuration parameters

143

144

Returns:

145

PipelineRun: Pipeline run details and status

146

"""

147

148

def get_pipeline_run_status(

149

self,

150

run_id: str,

151

resource_group_name: str,

152

factory_name: str,

153

**config: Any

154

) -> str:

155

"""

156

Get current status of pipeline run.

157

158

Args:

159

run_id (str): Pipeline run ID

160

resource_group_name (str): Azure resource group name

161

factory_name (str): Data Factory name

162

**config: Additional configuration parameters

163

164

Returns:

165

str: Current pipeline run status

166

"""

167

168

def wait_for_pipeline_run_status(

169

self,

170

run_id: str,

171

expected_statuses: str | set[str],

172

resource_group_name: str,

173

factory_name: str,

174

check_interval: int = 60,

175

timeout: int = 60 * 60 * 24 * 7,

176

**config: Any

177

) -> bool:

178

"""

179

Wait for pipeline run to reach expected status.

180

181

Args:

182

run_id (str): Pipeline run ID

183

expected_statuses (str | set[str]): Expected status(es) to wait for

184

resource_group_name (str): Azure resource group name

185

factory_name (str): Data Factory name

186

check_interval (int): Polling interval in seconds

187

timeout (int): Maximum wait time in seconds

188

**config: Additional configuration parameters

189

190

Returns:

191

bool: True if expected status reached, False if timeout

192

"""

193

194

def cancel_pipeline_run(

195

self,

196

run_id: str,

197

resource_group_name: str,

198

factory_name: str,

199

**config: Any

200

) -> None:

201

"""

202

Cancel a running pipeline.

203

204

Args:

205

run_id (str): Pipeline run ID to cancel

206

resource_group_name (str): Azure resource group name

207

factory_name (str): Data Factory name

208

**config: Additional configuration parameters

209

"""

210

211

def test_connection(self) -> tuple[bool, str]:

212

"""Test the Data Factory connection."""

213

```

214

215

### Async Data Factory Hook

216

217

Asynchronous version for non-blocking operations.

218

219

```python { .api }

220

class AzureDataFactoryAsyncHook(AzureDataFactoryHook):

221

"""Async hook for Azure Data Factory operations."""

222

223

async def get_conn(self) -> DataFactoryManagementClient:

224

"""Get authenticated async Data Factory Management client."""

225

226

async def get_pipeline_run_status(

227

self,

228

run_id: str,

229

resource_group_name: str,

230

factory_name: str

231

) -> str:

232

"""Async get pipeline run status."""

233

```

234

235

### Pipeline Run Operator

236

237

Operator for executing Azure Data Factory pipelines.

238

239

```python { .api }

240

class AzureDataFactoryRunPipelineOperator(BaseOperator):

241

"""

242

Execute Azure Data Factory pipeline.

243

244

Runs a pipeline and optionally waits for completion with status monitoring.

245

"""

246

247

def __init__(

248

self,

249

pipeline_name: str,

250

resource_group_name: str,

251

factory_name: str,

252

azure_data_factory_conn_id: str = "azure_data_factory_default",

253

wait_for_termination: bool = True,

254

reference_pipeline_run_id: str | None = None,

255

is_recovery: bool | None = None,

256

start_activity_name: str | None = None,

257

start_from_failure: bool | None = None,

258

parameters: dict[str, Any] | None = None,

259

timeout: int = 60 * 60 * 24 * 7,

260

check_interval: int = 60,

261

deferrable: bool = False,

262

**kwargs

263

):

264

"""

265

Initialize Data Factory pipeline operator.

266

267

Args:

268

pipeline_name (str): Name of pipeline to execute

269

resource_group_name (str): Azure resource group name

270

factory_name (str): Data Factory name

271

azure_data_factory_conn_id (str): Airflow connection ID

272

wait_for_termination (bool): Wait for pipeline completion

273

reference_pipeline_run_id (str): Reference run for recovery

274

is_recovery (bool): Whether this is a recovery run

275

start_activity_name (str): Activity to start from

276

start_from_failure (bool): Start from failure point

277

parameters (dict): Pipeline parameters

278

timeout (int): Maximum wait time in seconds

279

check_interval (int): Status check interval in seconds

280

deferrable (bool): Use async execution

281

"""

282

```

283

284

### Pipeline Status Sensor

285

286

Sensor that monitors Azure Data Factory pipeline execution status.

287

288

```python { .api }

289

class AzureDataFactoryPipelineRunStatusSensor(BaseSensorOperator):

290

"""

291

Sensor that waits for Azure Data Factory pipeline run to reach target status.

292

293

Monitors a pipeline run until it reaches one of the specified target statuses

294

or times out.

295

"""

296

297

def __init__(

298

self,

299

run_id: str,

300

resource_group_name: str,

301

factory_name: str,

302

azure_data_factory_conn_id: str = "azure_data_factory_default",

303

target_status: str | list[str] = AzureDataFactoryPipelineRunStatus.SUCCEEDED,

304

**kwargs

305

):

306

"""

307

Initialize Data Factory pipeline status sensor.

308

309

Args:

310

run_id (str): Pipeline run ID to monitor

311

resource_group_name (str): Azure resource group name

312

factory_name (str): Data Factory name

313

azure_data_factory_conn_id (str): Airflow connection ID

314

target_status (str | list[str]): Target status(es) to wait for

315

"""

316

317

def poke(self, context: dict) -> bool:

318

"""Check if pipeline run has reached target status."""

319

```

320

321

### Async Pipeline Triggers

322

323

Deferrable triggers for pipeline monitoring.

324

325

```python { .api }

326

class AzureDataFactoryTrigger(BaseTrigger):

327

"""General async trigger for Azure Data Factory operations."""

328

329

def __init__(

330

self,

331

run_id: str,

332

resource_group_name: str,

333

factory_name: str,

334

conn_id: str,

335

end_time: datetime,

336

check_interval: int = 60,

337

**kwargs

338

):

339

"""

340

Initialize Data Factory trigger.

341

342

Args:

343

run_id (str): Pipeline run ID

344

resource_group_name (str): Resource group name

345

factory_name (str): Factory name

346

conn_id (str): Connection ID

347

end_time (datetime): Maximum end time

348

check_interval (int): Polling interval in seconds

349

"""

350

351

class ADFPipelineRunStatusSensorTrigger(BaseTrigger):

352

"""Async trigger for ADF pipeline run status monitoring."""

353

354

def __init__(

355

self,

356

run_id: str,

357

resource_group_name: str,

358

factory_name: str,

359

target_status: str | list[str],

360

conn_id: str,

361

poke_interval: int = 60,

362

**kwargs

363

):

364

"""

365

Initialize pipeline status sensor trigger.

366

367

Args:

368

run_id (str): Pipeline run ID

369

resource_group_name (str): Resource group name

370

factory_name (str): Factory name

371

target_status (str | list[str]): Target status(es)

372

conn_id (str): Connection ID

373

poke_interval (int): Polling interval in seconds

374

"""

375

```

376

377

### Pipeline Run Link

378

379

Extra link for viewing pipeline runs in Azure portal.

380

381

```python { .api }

382

class AzureDataFactoryPipelineRunLink(BaseOperatorLink):

383

"""Link to Azure Data Factory pipeline run in Azure portal."""

384

385

name: str = "Monitor Pipeline Run"

386

387

def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:

388

"""Get link URL to pipeline run in Azure portal."""

389

```

390

391

### Status Constants and Exceptions

392

393

```python { .api }

394

class AzureDataFactoryPipelineRunStatus:

395

"""Azure Data Factory pipeline run status constants."""

396

397

QUEUED: str = "Queued"

398

IN_PROGRESS: str = "InProgress"

399

SUCCEEDED: str = "Succeeded"

400

FAILED: str = "Failed"

401

CANCELLING: str = "Cancelling"

402

CANCELLED: str = "Cancelled"

403

404

class AzureDataFactoryPipelineRunException(AirflowException):

405

"""Exception raised for Data Factory pipeline run errors."""

406

```

407

408

## Usage Examples

409

410

### Basic Pipeline Execution

411

412

```python

413

from airflow.providers.microsoft.azure.hooks.data_factory import AzureDataFactoryHook

414

415

# Initialize hook

416

adf_hook = AzureDataFactoryHook(azure_data_factory_conn_id='adf_default')

417

418

# Run pipeline

419

run_response = adf_hook.run_pipeline(

420

pipeline_name='MyDataPipeline',

421

resource_group_name='my-resource-group',

422

factory_name='my-data-factory',

423

parameters={'inputPath': '/data/input/', 'outputPath': '/data/output/'}

424

)

425

426

# Get run ID

427

run_id = run_response.run_id

428

429

# Monitor pipeline status

430

status = adf_hook.get_pipeline_run_status(

431

run_id=run_id,

432

resource_group_name='my-resource-group',

433

factory_name='my-data-factory'

434

)

435

436

# Wait for completion

437

success = adf_hook.wait_for_pipeline_run_status(

438

run_id=run_id,

439

expected_statuses=['Succeeded'],

440

resource_group_name='my-resource-group',

441

factory_name='my-data-factory',

442

check_interval=30,

443

timeout=1800 # 30 minutes

444

)

445

```

446

447

### Using in Airflow DAGs

448

449

```python

450

from airflow import DAG

451

from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator

452

from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor

453

from datetime import datetime, timedelta

454

455

dag = DAG(

456

'adf_pipeline_example',

457

default_args={

458

'owner': 'data-team',

459

'retries': 1,

460

'retry_delay': timedelta(minutes=5)

461

},

462

description='Execute ADF pipeline',

463

schedule_interval='@daily',

464

start_date=datetime(2024, 1, 1)

465

)

466

467

# Execute pipeline

468

run_pipeline = AzureDataFactoryRunPipelineOperator(

469

task_id='run_etl_pipeline',

470

pipeline_name='ETL_Pipeline',

471

resource_group_name='analytics-rg',

472

factory_name='analytics-adf',

473

parameters={

474

'source_table': 'raw_data',

475

'target_table': 'processed_data',

476

'batch_date': '{{ ds }}'

477

},

478

wait_for_termination=False, # Don't wait, use sensor instead

479

azure_data_factory_conn_id='adf_default',

480

dag=dag

481

)

482

483

# Monitor pipeline with sensor (alternative approach)

484

wait_for_completion = AzureDataFactoryPipelineRunStatusSensor(

485

task_id='wait_for_pipeline_completion',

486

run_id='{{ ti.xcom_pull("run_etl_pipeline")["run_id"] }}',

487

resource_group_name='analytics-rg',

488

factory_name='analytics-adf',

489

target_status='Succeeded',

490

timeout=3600, # 1 hour timeout

491

poke_interval=60, # Check every minute

492

dag=dag

493

)

494

495

run_pipeline >> wait_for_completion

496

```

497

498

### Deferrable Pipeline Execution

499

500

```python

501

# Using deferrable mode for long-running pipelines

502

run_pipeline_async = AzureDataFactoryRunPipelineOperator(

503

task_id='run_long_pipeline',

504

pipeline_name='LongRunningETL',

505

resource_group_name='analytics-rg',

506

factory_name='analytics-adf',

507

deferrable=True, # Use async execution

508

timeout=7200, # 2 hours

509

check_interval=120, # Check every 2 minutes

510

dag=dag

511

)

512

```

513

514

## Connection Configuration

515

516

Azure Data Factory connections require specific authentication and resource information.

517

518

**Connection Type**: `azure_data_factory`

519

520

**Required Fields**:

521

- `resource_group_name`: Azure resource group containing the Data Factory

522

- `factory_name`: Name of the Azure Data Factory

523

- `subscription_id`: Azure subscription ID

524

525

**Authentication Options**:

526

- **Service Principal**: Use client credentials

527

- **Managed Identity**: Use Azure managed identity

528

- **DefaultAzureCredential**: Use Azure SDK default credential chain

529

530

**Connection Fields**:

531

- `client_id`: Service principal client ID

532

- `client_secret`: Service principal secret

533

- `tenant_id`: Azure tenant ID

534

- `subscription_id`: Azure subscription ID

535

536

## Error Handling

537

538

The Azure Data Factory integration includes comprehensive error handling for pipeline execution failures, authentication issues, and resource access problems. All exceptions inherit from standard Airflow exception classes and provide detailed error messages for troubleshooting pipeline issues.

539

540

Azure Data Factory integration provides robust pipeline orchestration capabilities with support for complex ETL workflows, parameter passing, error handling, and comprehensive monitoring within Airflow environments.