or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md

synapse-analytics.mddocs/

0

# Azure Synapse Analytics

1

2

Execute Spark jobs and manage pipeline operations on Azure Synapse Analytics for big data processing and analytics workloads. Provides comprehensive integration for both Spark batch processing and pipeline orchestration capabilities.

3

4

## Capabilities

5

6

### Base Synapse Hook

7

8

Foundation hook for Azure Synapse Analytics operations providing common functionality and connection management.

9

10

```python { .api }

11

class BaseAzureSynapseHook(BaseHook):

12

"""

13

Base hook for Azure Synapse Analytics operations.

14

15

Provides common functionality and connection management for Synapse

16

Spark and pipeline operations.

17

"""

18

19

def get_conn(self) -> Any:

20

"""

21

Get authenticated Azure Synapse client.

22

23

Returns:

24

Any: Synapse client instance

25

"""

26

27

def test_connection(self) -> tuple[bool, str]:

28

"""

29

Test the Azure Synapse Analytics connection.

30

31

Returns:

32

tuple[bool, str]: Success status and message

33

"""

34

```

35

36

### Synapse Spark Hook

37

38

Hook for Azure Synapse Spark operations providing Spark job execution and monitoring capabilities.

39

40

```python { .api }

41

class AzureSynapseHook(BaseAzureSynapseHook):

42

"""

43

Hook for Azure Synapse Spark operations.

44

45

Provides methods for submitting Spark jobs, monitoring execution,

46

and managing Spark batch sessions on Azure Synapse Analytics.

47

"""

48

49

def get_conn(self) -> SparkClient:

50

"""

51

Get authenticated Synapse Spark client.

52

53

Returns:

54

SparkClient: Synapse Spark client instance

55

"""

56

57

def run_spark_job(

58

self,

59

payload: dict[str, Any],

60

**kwargs: Any

61

) -> dict[str, Any]:

62

"""

63

Submit a Spark job to Azure Synapse Analytics.

64

65

Args:

66

payload (dict[str, Any]): Spark job configuration including:

67

- name: Job name

68

- file: Main application file (jar, py, etc.)

69

- className: Main class name (for Scala/Java)

70

- args: Application arguments

71

- conf: Spark configuration

72

- executorCount: Number of executors

73

- executorCores: Cores per executor

74

- executorMemory: Memory per executor

75

- driverCores: Driver cores

76

- driverMemory: Driver memory

77

**kwargs: Additional job submission parameters

78

79

Returns:

80

dict[str, Any]: Job submission response with job ID and status

81

"""

82

83

def get_job_run_status(

84

self,

85

job_id: int,

86

**kwargs: Any

87

) -> str:

88

"""

89

Get the current status of a Spark job.

90

91

Args:

92

job_id (int): Spark job ID

93

**kwargs: Additional parameters

94

95

Returns:

96

str: Current job status (not_started, starting, running, idle, busy,

97

shutting_down, error, dead, killed, success)

98

"""

99

100

def wait_for_job_run_status(

101

self,

102

job_id: int,

103

expected_statuses: list[str],

104

check_interval: int = 30,

105

timeout: int = 3600

106

) -> bool:

107

"""

108

Wait for Spark job to reach expected status.

109

110

Args:

111

job_id (int): Spark job ID

112

expected_statuses (list[str]): List of acceptable statuses

113

check_interval (int): Check interval in seconds (default: 30)

114

timeout (int): Timeout in seconds (default: 3600)

115

116

Returns:

117

bool: True if job reached expected status, False if timeout

118

"""

119

120

def cancel_job_run(

121

self,

122

job_id: int,

123

**kwargs: Any

124

) -> None:

125

"""

126

Cancel a running Spark job.

127

128

Args:

129

job_id (int): Spark job ID to cancel

130

**kwargs: Additional parameters

131

"""

132

133

def get_job_logs(

134

self,

135

job_id: int,

136

**kwargs: Any

137

) -> dict[str, Any]:

138

"""

139

Get logs for a Spark job.

140

141

Args:

142

job_id (int): Spark job ID

143

**kwargs: Additional parameters

144

145

Returns:

146

dict[str, Any]: Job logs including stdout, stderr, and driver logs

147

"""

148

149

def list_spark_pools(self) -> list[dict[str, Any]]:

150

"""

151

List available Spark pools in the workspace.

152

153

Returns:

154

list[dict[str, Any]]: List of Spark pool configurations

155

"""

156

157

def get_spark_pool_details(

158

self,

159

spark_pool_name: str

160

) -> dict[str, Any]:

161

"""

162

Get details of a specific Spark pool.

163

164

Args:

165

spark_pool_name (str): Name of the Spark pool

166

167

Returns:

168

dict[str, Any]: Spark pool configuration and status

169

"""

170

```

171

172

### Synapse Pipeline Hook

173

174

Hook for Azure Synapse Pipeline operations providing pipeline execution and monitoring capabilities.

175

176

```python { .api }

177

class AzureSynapsePipelineHook(BaseAzureSynapseHook):

178

"""

179

Hook for Azure Synapse Pipeline operations.

180

181

Provides methods for running pipelines, monitoring execution,

182

and managing pipeline runs on Azure Synapse Analytics.

183

"""

184

185

def get_conn(self) -> ArtifactsClient:

186

"""

187

Get authenticated Synapse artifacts client.

188

189

Returns:

190

ArtifactsClient: Synapse artifacts client instance

191

"""

192

193

def run_pipeline(

194

self,

195

pipeline_name: str,

196

**config: Any

197

) -> CreateRunResponse:

198

"""

199

Run a pipeline in Azure Synapse Analytics.

200

201

Args:

202

pipeline_name (str): Name of the pipeline to run

203

**config: Pipeline run configuration including:

204

- parameters: Pipeline parameters

205

- reference_pipeline_run_id: Reference run ID

206

- is_recovery: Whether this is a recovery run

207

208

Returns:

209

CreateRunResponse: Pipeline run response with run ID

210

"""

211

212

def get_pipeline_run(

213

self,

214

run_id: str,

215

**kwargs: Any

216

) -> PipelineRun:

217

"""

218

Get details of a pipeline run.

219

220

Args:

221

run_id (str): Pipeline run ID

222

**kwargs: Additional parameters

223

224

Returns:

225

PipelineRun: Pipeline run details including status and metadata

226

"""

227

228

def get_pipeline_run_status(

229

self,

230

run_id: str,

231

**kwargs: Any

232

) -> str:

233

"""

234

Get the current status of a pipeline run.

235

236

Args:

237

run_id (str): Pipeline run ID

238

**kwargs: Additional parameters

239

240

Returns:

241

str: Current pipeline status (Queued, InProgress, Succeeded, Failed, Cancelled)

242

"""

243

244

def refresh_conn(self) -> ArtifactsClient:

245

"""

246

Refresh the Synapse artifacts connection.

247

248

Returns:

249

ArtifactsClient: Refreshed artifacts client instance

250

"""

251

252

def wait_for_pipeline_run_status(

253

self,

254

run_id: str,

255

expected_statuses: list[str],

256

check_interval: int = 60,

257

timeout: int = 7200

258

) -> bool:

259

"""

260

Wait for pipeline run to reach expected status.

261

262

Args:

263

run_id (str): Pipeline run ID

264

expected_statuses (list[str]): List of acceptable statuses

265

check_interval (int): Check interval in seconds (default: 60)

266

timeout (int): Timeout in seconds (default: 7200)

267

268

Returns:

269

bool: True if pipeline reached expected status, False if timeout

270

"""

271

272

def cancel_run_pipeline(

273

self,

274

run_id: str,

275

**kwargs: Any

276

) -> None:

277

"""

278

Cancel a running pipeline.

279

280

Args:

281

run_id (str): Pipeline run ID to cancel

282

**kwargs: Additional parameters

283

"""

284

285

def get_pipeline_activities(

286

self,

287

run_id: str,

288

**kwargs: Any

289

) -> list[dict[str, Any]]:

290

"""

291

Get activity runs for a pipeline run.

292

293

Args:

294

run_id (str): Pipeline run ID

295

**kwargs: Additional parameters including:

296

- activity_name: Filter by activity name

297

- activity_type: Filter by activity type

298

299

Returns:

300

list[dict[str, Any]]: List of activity run details

301

"""

302

303

def get_pipeline_details(

304

self,

305

pipeline_name: str

306

) -> dict[str, Any]:

307

"""

308

Get pipeline definition and metadata.

309

310

Args:

311

pipeline_name (str): Name of the pipeline

312

313

Returns:

314

dict[str, Any]: Pipeline definition and configuration

315

"""

316

317

def list_pipelines(self) -> list[dict[str, Any]]:

318

"""

319

List all pipelines in the workspace.

320

321

Returns:

322

list[dict[str, Any]]: List of pipeline definitions

323

"""

324

```

325

326

## Synapse Analytics Operators

327

328

Execute Azure Synapse Analytics operations as Airflow tasks with comprehensive Spark and pipeline management capabilities.

329

330

### Spark Batch Operator

331

332

```python { .api }

333

class AzureSynapseRunSparkBatchOperator(BaseOperator):

334

"""

335

Runs Spark batch jobs on Azure Synapse Analytics.

336

337

Supports running Spark applications with custom configurations,

338

resource allocation, and dependency management.

339

"""

340

341

def __init__(

342

self,

343

*,

344

azure_synapse_conn_id: str = "azure_synapse_default",

345

spark_pool_name: str,

346

payload: dict[str, Any],

347

timeout: int = 60 * 60 * 24 * 7,

348

check_interval: int = 60,

349

**kwargs

350

):

351

"""

352

Initialize Synapse Spark batch operator.

353

354

Args:

355

azure_synapse_conn_id (str): Airflow connection ID for Synapse

356

spark_pool_name (str): Name of the Spark pool to use

357

payload (dict[str, Any]): Spark job configuration

358

timeout (int): Job timeout in seconds (default: 7 days)

359

check_interval (int): Status check interval in seconds (default: 60)

360

"""

361

362

def execute(self, context: Context) -> dict[str, Any]:

363

"""

364

Execute Spark batch job on Synapse Analytics.

365

366

Args:

367

context (Context): Airflow task context

368

369

Returns:

370

dict[str, Any]: Job execution results and metadata

371

"""

372

373

def on_kill(self) -> None:

374

"""Cancel running Spark job on task termination."""

375

376

class AzureSynapseRunPipelineOperator(BaseOperator):

377

"""

378

Runs pipelines on Azure Synapse Analytics.

379

380

Supports executing Synapse pipelines with parameter passing

381

and comprehensive monitoring capabilities.

382

"""

383

384

def __init__(

385

self,

386

*,

387

pipeline_name: str,

388

azure_synapse_conn_id: str = "azure_synapse_default",

389

pipeline_timeout: int = 60 * 60 * 24 * 7,

390

check_interval: int = 60,

391

**pipeline_run_parameters: Any

392

):

393

"""

394

Initialize Synapse pipeline operator.

395

396

Args:

397

pipeline_name (str): Name of the pipeline to run

398

azure_synapse_conn_id (str): Airflow connection ID for Synapse

399

pipeline_timeout (int): Pipeline timeout in seconds (default: 7 days)

400

check_interval (int): Status check interval in seconds (default: 60)

401

**pipeline_run_parameters: Parameters to pass to the pipeline

402

"""

403

404

def execute(self, context: Context) -> str:

405

"""

406

Execute pipeline on Synapse Analytics.

407

408

Args:

409

context (Context): Airflow task context

410

411

Returns:

412

str: Pipeline run ID

413

"""

414

415

def on_kill(self) -> None:

416

"""Cancel running pipeline on task termination."""

417

```

418

419

## Supporting Classes

420

421

### Status Constants and Exceptions

422

423

```python { .api }

424

class AzureSynapseSparkBatchRunStatus:

425

"""Constants for Synapse Spark job statuses."""

426

427

NOT_STARTED: str = "not_started"

428

STARTING: str = "starting"

429

RUNNING: str = "running"

430

IDLE: str = "idle"

431

BUSY: str = "busy"

432

SHUTTING_DOWN: str = "shutting_down"

433

ERROR: str = "error"

434

DEAD: str = "dead"

435

KILLED: str = "killed"

436

SUCCESS: str = "success"

437

438

class AzureSynapsePipelineRunStatus:

439

"""Constants for Synapse pipeline run statuses."""

440

441

QUEUED: str = "Queued"

442

IN_PROGRESS: str = "InProgress"

443

SUCCEEDED: str = "Succeeded"

444

FAILED: str = "Failed"

445

CANCELLED: str = "Cancelled"

446

447

class AzureSynapsePipelineRunException(Exception):

448

"""Custom exception for Synapse pipeline operations."""

449

pass

450

```

451

452

### Extra Links

453

454

```python { .api }

455

class AzureSynapsePipelineRunLink(BaseOperatorLink):

456

"""

457

Link to Synapse pipeline run in Azure portal.

458

459

Provides direct access to pipeline run details and monitoring

460

in the Azure Synapse Studio interface.

461

"""

462

463

name: str = "Azure Synapse Pipeline Run"

464

465

def get_link(

466

self,

467

operator: BaseOperator,

468

dttm: datetime | None = None,

469

**kwargs: Any

470

) -> str:

471

"""

472

Generate link to Azure Synapse pipeline run.

473

474

Args:

475

operator (BaseOperator): Airflow operator instance

476

dttm (datetime | None): Execution date

477

**kwargs: Additional parameters

478

479

Returns:

480

str: URL to Synapse pipeline run in Azure portal

481

"""

482

```

483

484

## Usage Examples

485

486

### Basic Spark Job Execution

487

488

```python

489

from airflow import DAG

490

from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunSparkBatchOperator

491

from airflow.operators.python import PythonOperator

492

from datetime import datetime, timedelta

493

494

def process_spark_results(**context):

495

"""Process results from Spark job execution."""

496

result = context['task_instance'].xcom_pull(task_ids='run_data_analysis')

497

498

print(f"Spark job result: {result}")

499

job_id = result.get('job_id')

500

status = result.get('status')

501

502

if status == 'success':

503

print(f"Spark job {job_id} completed successfully")

504

else:

505

print(f"Spark job {job_id} failed with status: {status}")

506

507

return result

508

509

dag = DAG(

510

'synapse_spark_workflow',

511

default_args={

512

'owner': 'analytics-team',

513

'retries': 2,

514

'retry_delay': timedelta(minutes=5)

515

},

516

description='Synapse Spark data analysis workflow',

517

schedule_interval=timedelta(days=1),

518

start_date=datetime(2024, 1, 1),

519

catchup=False

520

)

521

522

# Configure Spark job payload

523

spark_payload = {

524

"name": "daily-data-analysis",

525

"file": "abfss://data@mystorageaccount.dfs.core.windows.net/scripts/analyze_data.py",

526

"args": [

527

"--input-path", "abfss://data@mystorageaccount.dfs.core.windows.net/input/",

528

"--output-path", "abfss://data@mystorageaccount.dfs.core.windows.net/output/",

529

"--date", "{{ ds }}"

530

],

531

"conf": {

532

"spark.sql.adaptive.enabled": "true",

533

"spark.sql.adaptive.coalescePartitions.enabled": "true",

534

"spark.serializer": "org.apache.spark.serializer.KryoSerializer"

535

},

536

"executorCount": 4,

537

"executorCores": 4,

538

"executorMemory": "8g",

539

"driverCores": 2,

540

"driverMemory": "4g"

541

}

542

543

# Run Spark job

544

run_analysis = AzureSynapseRunSparkBatchOperator(

545

task_id='run_data_analysis',

546

azure_synapse_conn_id='synapse_conn',

547

spark_pool_name='analytics-pool',

548

payload=spark_payload,

549

timeout=3600, # 1 hour timeout

550

check_interval=30, # Check every 30 seconds

551

dag=dag

552

)

553

554

# Process results

555

process_results = PythonOperator(

556

task_id='process_results',

557

python_callable=process_spark_results,

558

dag=dag

559

)

560

561

run_analysis >> process_results

562

```

563

564

### Advanced Spark Configuration

565

566

```python

567

from airflow import DAG

568

from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunSparkBatchOperator

569

from airflow.providers.microsoft.azure.hooks.synapse import AzureSynapseHook

570

from airflow.operators.python import PythonOperator

571

from datetime import datetime, timedelta

572

573

def setup_spark_environment():

574

"""Set up Spark environment and validate configuration."""

575

hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')

576

577

# List available Spark pools

578

pools = hook.list_spark_pools()

579

print(f"Available Spark pools: {[pool['name'] for pool in pools]}")

580

581

# Get details of the target pool

582

pool_details = hook.get_spark_pool_details('ml-processing-pool')

583

print(f"Pool configuration: {pool_details}")

584

585

return pool_details

586

587

def monitor_spark_job(**context):

588

"""Monitor Spark job execution with detailed logging."""

589

hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')

590

591

# Get job ID from previous task

592

job_result = context['task_instance'].xcom_pull(task_ids='run_ml_training')

593

job_id = job_result['job_id']

594

595

# Get detailed logs

596

logs = hook.get_job_logs(job_id)

597

598

print("=== Spark Driver Logs ===")

599

print(logs.get('driverLogs', 'No driver logs available'))

600

601

print("=== Spark Executor Logs ===")

602

print(logs.get('executorLogs', 'No executor logs available'))

603

604

# Final status check

605

final_status = hook.get_job_run_status(job_id)

606

print(f"Final job status: {final_status}")

607

608

return {

609

'job_id': job_id,

610

'final_status': final_status,

611

'logs_available': bool(logs.get('driverLogs') or logs.get('executorLogs'))

612

}

613

614

dag = DAG(

615

'advanced_spark_ml_workflow',

616

default_args={

617

'owner': 'ml-team',

618

'retries': 1,

619

'retry_delay': timedelta(minutes=10)

620

},

621

description='Advanced Spark ML workflow with monitoring',

622

schedule_interval=timedelta(days=1),

623

start_date=datetime(2024, 1, 1),

624

catchup=False

625

)

626

627

# Setup environment

628

setup_env = PythonOperator(

629

task_id='setup_environment',

630

python_callable=setup_spark_environment,

631

dag=dag

632

)

633

634

# Advanced ML training job configuration

635

ml_payload = {

636

"name": "ml-model-training-{{ ds_nodash }}",

637

"file": "abfss://ml@mlstorageaccount.dfs.core.windows.net/scripts/train_model.py",

638

"className": None, # Python job

639

"args": [

640

"--training-data", "abfss://ml@mlstorageaccount.dfs.core.windows.net/data/training/{{ ds }}/",

641

"--model-output", "abfss://ml@mlstorageaccount.dfs.core.windows.net/models/{{ ds_nodash }}/",

642

"--algorithm", "random_forest",

643

"--cross-validation", "5",

644

"--feature-selection", "true"

645

],

646

"jars": [

647

"abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/spark-ml-extended.jar"

648

],

649

"pyFiles": [

650

"abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/ml_utils.py",

651

"abfss://ml@mlstorageaccount.dfs.core.windows.net/libs/feature_engineering.py"

652

],

653

"files": [

654

"abfss://ml@mlstorageaccount.dfs.core.windows.net/config/ml_config.json"

655

],

656

"conf": {

657

# Performance tuning

658

"spark.sql.adaptive.enabled": "true",

659

"spark.sql.adaptive.coalescePartitions.enabled": "true",

660

"spark.sql.adaptive.skewJoin.enabled": "true",

661

662

# Memory management

663

"spark.sql.execution.arrow.pyspark.enabled": "true",

664

"spark.serializer": "org.apache.spark.serializer.KryoSerializer",

665

"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",

666

667

# ML specific configurations

668

"spark.ml.stage.parallelism": "4",

669

"spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB",

670

671

# Checkpointing for long-running ML jobs

672

"spark.sql.streaming.checkpointLocation": "abfss://ml@mlstorageaccount.dfs.core.windows.net/checkpoints/{{ ds_nodash }}/"

673

},

674

"executorCount": 8,

675

"executorCores": 4,

676

"executorMemory": "16g",

677

"driverCores": 4,

678

"driverMemory": "8g",

679

"tags": {

680

"project": "ml-pipeline",

681

"environment": "production",

682

"model_type": "random_forest"

683

}

684

}

685

686

# Run ML training job

687

run_training = AzureSynapseRunSparkBatchOperator(

688

task_id='run_ml_training',

689

azure_synapse_conn_id='synapse_conn',

690

spark_pool_name='ml-processing-pool',

691

payload=ml_payload,

692

timeout=7200, # 2 hours timeout for ML training

693

check_interval=60, # Check every minute

694

dag=dag

695

)

696

697

# Monitor job execution

698

monitor_job = PythonOperator(

699

task_id='monitor_training_job',

700

python_callable=monitor_spark_job,

701

dag=dag

702

)

703

704

setup_env >> run_training >> monitor_job

705

```

706

707

### Pipeline Orchestration

708

709

```python

710

from airflow import DAG

711

from airflow.providers.microsoft.azure.operators.synapse import AzureSynapseRunPipelineOperator

712

from airflow.providers.microsoft.azure.hooks.synapse import AzureSynapsePipelineHook

713

from airflow.operators.python import PythonOperator

714

from datetime import datetime, timedelta

715

716

def setup_pipeline_parameters():

717

"""Set up dynamic parameters for pipeline execution."""

718

execution_date = datetime.now().strftime('%Y-%m-%d')

719

720

parameters = {

721

"processingDate": execution_date,

722

"inputDataPath": f"abfss://data@datastorageaccount.dfs.core.windows.net/raw/{execution_date}/",

723

"outputDataPath": f"abfss://data@datastorageaccount.dfs.core.windows.net/processed/{execution_date}/",

724

"batchSize": "1000",

725

"parallelism": "4",

726

"retryAttempts": "3"

727

}

728

729

print(f"Pipeline parameters: {parameters}")

730

return parameters

731

732

def monitor_pipeline_activities(**context):

733

"""Monitor individual activities within the pipeline run."""

734

hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')

735

736

# Get pipeline run ID from previous task

737

run_id = context['task_instance'].xcom_pull(task_ids='run_etl_pipeline')

738

739

# Get activity runs

740

activities = hook.get_pipeline_activities(run_id)

741

742

print(f"Pipeline run {run_id} activities:")

743

for activity in activities:

744

print(f"- {activity['activityName']}: {activity['status']} "

745

f"(Duration: {activity.get('durationInMs', 0)}ms)")

746

747

if activity['status'] == 'Failed':

748

print(f" Error: {activity.get('error', {}).get('message', 'Unknown error')}")

749

750

# Get overall pipeline status

751

pipeline_run = hook.get_pipeline_run(run_id)

752

753

return {

754

'run_id': run_id,

755

'status': pipeline_run.status,

756

'duration_ms': pipeline_run.duration_in_ms,

757

'activities': len(activities),

758

'failed_activities': len([a for a in activities if a['status'] == 'Failed'])

759

}

760

761

def validate_pipeline_outputs(**context):

762

"""Validate pipeline execution results."""

763

monitoring_result = context['task_instance'].xcom_pull(task_ids='monitor_activities')

764

765

if monitoring_result['failed_activities'] > 0:

766

raise ValueError(f"Pipeline has {monitoring_result['failed_activities']} failed activities")

767

768

if monitoring_result['status'] != 'Succeeded':

769

raise ValueError(f"Pipeline failed with status: {monitoring_result['status']}")

770

771

print(f"Pipeline validation passed. Duration: {monitoring_result['duration_ms']}ms")

772

return monitoring_result

773

774

dag = DAG(

775

'synapse_pipeline_workflow',

776

default_args={

777

'owner': 'data-engineering-team',

778

'retries': 1,

779

'retry_delay': timedelta(minutes=5)

780

},

781

description='Synapse pipeline ETL workflow',

782

schedule_interval=timedelta(hours=6),

783

start_date=datetime(2024, 1, 1),

784

catchup=False

785

)

786

787

# Setup parameters

788

setup_params = PythonOperator(

789

task_id='setup_parameters',

790

python_callable=setup_pipeline_parameters,

791

dag=dag

792

)

793

794

# Run ETL pipeline

795

run_pipeline = AzureSynapseRunPipelineOperator(

796

task_id='run_etl_pipeline',

797

pipeline_name='data-processing-etl',

798

azure_synapse_conn_id='synapse_conn',

799

pipeline_timeout=3600, # 1 hour timeout

800

check_interval=30, # Check every 30 seconds

801

# Dynamic parameters from previous task

802

processingDate="{{ task_instance.xcom_pull(task_ids='setup_parameters')['processingDate'] }}",

803

inputDataPath="{{ task_instance.xcom_pull(task_ids='setup_parameters')['inputDataPath'] }}",

804

outputDataPath="{{ task_instance.xcom_pull(task_ids='setup_parameters')['outputDataPath'] }}",

805

batchSize="{{ task_instance.xcom_pull(task_ids='setup_parameters')['batchSize'] }}",

806

dag=dag

807

)

808

809

# Monitor activities

810

monitor_activities = PythonOperator(

811

task_id='monitor_activities',

812

python_callable=monitor_pipeline_activities,

813

dag=dag

814

)

815

816

# Validate outputs

817

validate_outputs = PythonOperator(

818

task_id='validate_outputs',

819

python_callable=validate_pipeline_outputs,

820

dag=dag

821

)

822

823

setup_params >> run_pipeline >> monitor_activities >> validate_outputs

824

```

825

826

### Complex Workflow with Multiple Synapse Services

827

828

```python

829

from airflow import DAG

830

from airflow.providers.microsoft.azure.operators.synapse import (

831

AzureSynapseRunSparkBatchOperator,

832

AzureSynapseRunPipelineOperator

833

)

834

from airflow.providers.microsoft.azure.hooks.synapse import (

835

AzureSynapseHook,

836

AzureSynapsePipelineHook

837

)

838

from airflow.operators.python import PythonOperator, BranchPythonOperator

839

from airflow.operators.dummy import DummyOperator

840

from datetime import datetime, timedelta

841

842

def check_data_availability():

843

"""Check if input data is available for processing."""

844

# This would typically check Azure Data Lake or other data sources

845

# For this example, we'll simulate the check

846

847

import random

848

data_available = random.choice([True, False])

849

850

if data_available:

851

print("Input data is available, proceeding with processing")

852

return 'data_preprocessing'

853

else:

854

print("Input data not available, skipping processing")

855

return 'skip_processing'

856

857

def choose_processing_method(**context):

858

"""Choose between Spark job or pipeline based on data size."""

859

# This would typically analyze data characteristics

860

# For this example, we'll simulate the decision

861

862

import random

863

data_size = random.choice(['small', 'large'])

864

865

if data_size == 'large':

866

print("Large dataset detected, using Spark batch processing")

867

return 'spark_processing'

868

else:

869

print("Small dataset detected, using pipeline processing")

870

return 'pipeline_processing'

871

872

dag = DAG(

873

'complex_synapse_workflow',

874

default_args={

875

'owner': 'analytics-platform-team',

876

'retries': 2,

877

'retry_delay': timedelta(minutes=3)

878

},

879

description='Complex Synapse workflow with conditional processing',

880

schedule_interval=timedelta(hours=4),

881

start_date=datetime(2024, 1, 1),

882

catchup=False

883

)

884

885

# Check data availability

886

check_data = BranchPythonOperator(

887

task_id='check_data_availability',

888

python_callable=check_data_availability,

889

dag=dag

890

)

891

892

# Data preprocessing (common step)

893

preprocessing_payload = {

894

"name": "data-preprocessing-{{ ds_nodash }}",

895

"file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/preprocess.py",

896

"args": ["--date", "{{ ds }}", "--validate", "true"],

897

"executorCount": 2,

898

"executorCores": 2,

899

"executorMemory": "4g",

900

"driverMemory": "2g"

901

}

902

903

data_preprocessing = AzureSynapseRunSparkBatchOperator(

904

task_id='data_preprocessing',

905

azure_synapse_conn_id='synapse_conn',

906

spark_pool_name='preprocessing-pool',

907

payload=preprocessing_payload,

908

timeout=1800,

909

dag=dag

910

)

911

912

# Choose processing method

913

choose_method = BranchPythonOperator(

914

task_id='choose_processing_method',

915

python_callable=choose_processing_method,

916

dag=dag

917

)

918

919

# Spark processing branch

920

spark_payload = {

921

"name": "large-data-processing-{{ ds_nodash }}",

922

"file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/spark_processing.py",

923

"args": [

924

"--input", "abfss://analytics@analyticsstorage.dfs.core.windows.net/preprocessed/{{ ds }}/",

925

"--output", "abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/"

926

],

927

"executorCount": 8,

928

"executorCores": 4,

929

"executorMemory": "16g",

930

"driverMemory": "8g",

931

"conf": {

932

"spark.sql.adaptive.enabled": "true",

933

"spark.sql.adaptive.coalescePartitions.enabled": "true"

934

}

935

}

936

937

spark_processing = AzureSynapseRunSparkBatchOperator(

938

task_id='spark_processing',

939

azure_synapse_conn_id='synapse_conn',

940

spark_pool_name='large-processing-pool',

941

payload=spark_payload,

942

timeout=3600,

943

dag=dag

944

)

945

946

# Pipeline processing branch

947

pipeline_processing = AzureSynapseRunPipelineOperator(

948

task_id='pipeline_processing',

949

pipeline_name='small-data-pipeline',

950

azure_synapse_conn_id='synapse_conn',

951

pipeline_timeout=1800,

952

inputPath="abfss://analytics@analyticsstorage.dfs.core.windows.net/preprocessed/{{ ds }}/",

953

outputPath="abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/",

954

dag=dag

955

)

956

957

# Skip processing (when no data available)

958

skip_processing = DummyOperator(

959

task_id='skip_processing',

960

dag=dag

961

)

962

963

# Join point for all branches

964

join_processing = DummyOperator(

965

task_id='join_processing',

966

trigger_rule='none_failed_or_skipped',

967

dag=dag

968

)

969

970

# Post-processing

971

postprocessing_payload = {

972

"name": "postprocessing-{{ ds_nodash }}",

973

"file": "abfss://analytics@analyticsstorage.dfs.core.windows.net/scripts/postprocess.py",

974

"args": [

975

"--results-path", "abfss://analytics@analyticsstorage.dfs.core.windows.net/results/{{ ds }}/",

976

"--final-output", "abfss://analytics@analyticsstorage.dfs.core.windows.net/final/{{ ds }}/"

977

],

978

"executorCount": 2,

979

"executorCores": 2,

980

"executorMemory": "4g"

981

}

982

983

postprocessing = AzureSynapseRunSparkBatchOperator(

984

task_id='postprocessing',

985

azure_synapse_conn_id='synapse_conn',

986

spark_pool_name='postprocessing-pool',

987

payload=postprocessing_payload,

988

timeout=900,

989

dag=dag

990

)

991

992

# Set up dependencies

993

check_data >> [data_preprocessing, skip_processing]

994

data_preprocessing >> choose_method

995

choose_method >> [spark_processing, pipeline_processing]

996

[spark_processing, pipeline_processing, skip_processing] >> join_processing

997

join_processing >> postprocessing

998

```

999

1000

## Connection Configuration

1001

1002

### Synapse Analytics Connection (`azure_synapse`)

1003

1004

Configure Azure Synapse Analytics connections in Airflow:

1005

1006

```python

1007

# Connection configuration for Synapse Analytics

1008

{

1009

"conn_id": "azure_synapse_default",

1010

"conn_type": "azure_synapse",

1011

"host": "myworkspace.dev.azuresynapse.net", # Synapse workspace URL

1012

"extra": {

1013

"subscriptionId": "your-subscription-id",

1014

"resourceGroupName": "your-resource-group",

1015

"workspaceName": "your-workspace-name",

1016

"tenantId": "your-tenant-id",

1017

"clientId": "your-client-id",

1018

"clientSecret": "your-client-secret"

1019

}

1020

}

1021

```

1022

1023

### Authentication Methods

1024

1025

Azure Synapse Analytics supports multiple authentication methods:

1026

1027

1. **Service Principal Authentication**:

1028

```python

1029

extra = {

1030

"tenantId": "your-tenant-id",

1031

"clientId": "your-client-id",

1032

"clientSecret": "your-client-secret"

1033

}

1034

```

1035

1036

2. **Managed Identity Authentication**:

1037

```python

1038

extra = {

1039

"managedIdentityClientId": "your-managed-identity-client-id"

1040

}

1041

```

1042

1043

3. **Azure CLI Authentication**:

1044

```python

1045

extra = {

1046

"use_azure_cli": True

1047

}

1048

```

1049

1050

## Error Handling

1051

1052

### Common Exception Patterns

1053

1054

```python

1055

from airflow.providers.microsoft.azure.hooks.synapse import (

1056

AzureSynapseHook,

1057

AzureSynapsePipelineHook,

1058

AzureSynapsePipelineRunException

1059

)

1060

1061

def robust_synapse_operations():

1062

"""Demonstrate error handling patterns for Synapse operations."""

1063

1064

spark_hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')

1065

pipeline_hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')

1066

1067

# Spark job error handling

1068

try:

1069

job_result = spark_hook.run_spark_job({

1070

"name": "test-job",

1071

"file": "test.py"

1072

})

1073

1074

job_id = job_result['job_id']

1075

1076

# Wait for completion with timeout

1077

success = spark_hook.wait_for_job_run_status(

1078

job_id=job_id,

1079

expected_statuses=['success', 'error', 'dead', 'killed'],

1080

timeout=3600

1081

)

1082

1083

if not success:

1084

print("Job timed out, cancelling...")

1085

spark_hook.cancel_job_run(job_id)

1086

raise TimeoutError("Spark job timed out")

1087

1088

# Check final status

1089

final_status = spark_hook.get_job_run_status(job_id)

1090

if final_status != 'success':

1091

# Get logs for debugging

1092

logs = spark_hook.get_job_logs(job_id)

1093

print(f"Job failed with status: {final_status}")

1094

print(f"Error logs: {logs.get('stderr', 'No error logs')}")

1095

raise RuntimeError(f"Spark job failed with status: {final_status}")

1096

1097

except Exception as e:

1098

print(f"Spark job error: {e}")

1099

raise

1100

1101

# Pipeline error handling

1102

try:

1103

run_response = pipeline_hook.run_pipeline("test-pipeline")

1104

run_id = run_response.run_id

1105

1106

# Monitor pipeline with error handling

1107

success = pipeline_hook.wait_for_pipeline_run_status(

1108

run_id=run_id,

1109

expected_statuses=['Succeeded', 'Failed', 'Cancelled'],

1110

timeout=7200

1111

)

1112

1113

if not success:

1114

print("Pipeline timed out, attempting to cancel...")

1115

try:

1116

pipeline_hook.cancel_run_pipeline(run_id)

1117

except Exception as cancel_error:

1118

print(f"Failed to cancel pipeline: {cancel_error}")

1119

raise TimeoutError("Pipeline timed out")

1120

1121

# Check final status and get activity details

1122

pipeline_run = pipeline_hook.get_pipeline_run(run_id)

1123

if pipeline_run.status == 'Failed':

1124

activities = pipeline_hook.get_pipeline_activities(run_id)

1125

failed_activities = [a for a in activities if a['status'] == 'Failed']

1126

1127

print(f"Pipeline failed. Failed activities: {len(failed_activities)}")

1128

for activity in failed_activities:

1129

print(f"- {activity['activityName']}: {activity.get('error', {}).get('message', 'Unknown error')}")

1130

1131

raise AzureSynapsePipelineRunException(f"Pipeline failed with {len(failed_activities)} failed activities")

1132

1133

except AzureSynapsePipelineRunException:

1134

raise

1135

except Exception as e:

1136

print(f"Pipeline error: {e}")

1137

raise

1138

```

1139

1140

### Connection Testing

1141

1142

```python

1143

def test_synapse_connections():

1144

"""Test Synapse Analytics connections and capabilities."""

1145

1146

# Test Spark hook

1147

try:

1148

spark_hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')

1149

success, message = spark_hook.test_connection()

1150

1151

if success:

1152

print("Synapse Spark connection successful")

1153

1154

# List available pools

1155

pools = spark_hook.list_spark_pools()

1156

print(f"Available Spark pools: {[p['name'] for p in pools]}")

1157

else:

1158

print(f"Synapse Spark connection failed: {message}")

1159

1160

except Exception as e:

1161

print(f"Synapse Spark connection test failed: {e}")

1162

1163

# Test Pipeline hook

1164

try:

1165

pipeline_hook = AzureSynapsePipelineHook(azure_synapse_conn_id='synapse_conn')

1166

success, message = pipeline_hook.test_connection()

1167

1168

if success:

1169

print("Synapse Pipeline connection successful")

1170

1171

# List available pipelines

1172

pipelines = pipeline_hook.list_pipelines()

1173

print(f"Available pipelines: {[p['name'] for p in pipelines]}")

1174

else:

1175

print(f"Synapse Pipeline connection failed: {message}")

1176

1177

except Exception as e:

1178

print(f"Synapse Pipeline connection test failed: {e}")

1179

```

1180

1181

## Performance Considerations

1182

1183

### Optimizing Spark Jobs

1184

1185

```python

1186

def optimize_spark_configuration():

1187

"""Demonstrate Spark optimization techniques for Synapse."""

1188

1189

# Optimized configuration for different workload types

1190

1191

# ETL/Data Processing workload

1192

etl_config = {

1193

"executorCount": 8,

1194

"executorCores": 4,

1195

"executorMemory": "16g",

1196

"driverCores": 2,

1197

"driverMemory": "8g",

1198

"conf": {

1199

"spark.sql.adaptive.enabled": "true",

1200

"spark.sql.adaptive.coalescePartitions.enabled": "true",

1201

"spark.sql.adaptive.skewJoin.enabled": "true",

1202

"spark.serializer": "org.apache.spark.serializer.KryoSerializer",

1203

"spark.sql.execution.arrow.pyspark.enabled": "true"

1204

}

1205

}

1206

1207

# Machine Learning workload

1208

ml_config = {

1209

"executorCount": 6,

1210

"executorCores": 6,

1211

"executorMemory": "24g",

1212

"driverCores": 4,

1213

"driverMemory": "12g",

1214

"conf": {

1215

"spark.ml.stage.parallelism": "6",

1216

"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",

1217

"spark.sql.adaptive.enabled": "true",

1218

"spark.sql.adaptive.localShuffleReader.enabled": "true"

1219

}

1220

}

1221

1222

# Streaming workload

1223

streaming_config = {

1224

"executorCount": 4,

1225

"executorCores": 2,

1226

"executorMemory": "8g",

1227

"driverMemory": "4g",

1228

"conf": {

1229

"spark.streaming.backpressure.enabled": "true",

1230

"spark.sql.streaming.checkpointLocation": "/checkpoint/path",

1231

"spark.sql.adaptive.enabled": "false" # Disable for streaming

1232

}

1233

}

1234

1235

return {

1236

'etl': etl_config,

1237

'ml': ml_config,

1238

'streaming': streaming_config

1239

}

1240

1241

def implement_spark_monitoring():

1242

"""Implement comprehensive Spark job monitoring."""

1243

1244

def monitor_spark_job_detailed(job_id: int):

1245

"""Detailed monitoring of Spark job execution."""

1246

hook = AzureSynapseHook(azure_synapse_conn_id='synapse_conn')

1247

1248

monitoring_data = {

1249

'job_id': job_id,

1250

'status_history': [],

1251

'duration_seconds': 0,

1252

'resource_usage': {}

1253

}

1254

1255

start_time = datetime.now()

1256

1257

while True:

1258

current_status = hook.get_job_run_status(job_id)

1259

monitoring_data['status_history'].append({

1260

'timestamp': datetime.now(),

1261

'status': current_status

1262

})

1263

1264

# Terminal statuses

1265

if current_status in ['success', 'error', 'dead', 'killed']:

1266

break

1267

1268

# Check for stuck jobs

1269

monitoring_data['duration_seconds'] = (datetime.now() - start_time).total_seconds()

1270

if monitoring_data['duration_seconds'] > 3600: # 1 hour

1271

print("Job appears to be stuck, investigating...")

1272

logs = hook.get_job_logs(job_id)

1273

if 'OutOfMemoryError' in logs.get('stderr', ''):

1274

print("OutOfMemoryError detected - job needs more memory")

1275

1276

time.sleep(30) # Check every 30 seconds

1277

1278

# Get final logs and metrics

1279

final_logs = hook.get_job_logs(job_id)

1280

monitoring_data['final_logs'] = final_logs

1281

monitoring_data['final_status'] = current_status

1282

1283

return monitoring_data

1284

1285

return monitor_spark_job_detailed

1286

```

1287

1288

This comprehensive documentation covers all Azure Synapse Analytics capabilities in the Apache Airflow Microsoft Azure Provider, including Spark job execution, pipeline orchestration, monitoring, and performance optimization techniques.