or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdhooks.mdindex.mdoperators.md

core.mddocs/

0

# Core Framework

1

2

Foundation classes and utilities that provide the essential framework for Apache Airflow operator development, state management, error handling, and workflow control. These components form the building blocks for creating custom operators and managing task execution.

3

4

## Capabilities

5

6

### Base Operator Framework

7

8

Abstract foundation class that all operators inherit from, providing core task functionality, dependency management, templating support, and DAG integration.

9

10

```python { .api }

11

class BaseOperator:

12

def __init__(

13

self,

14

task_id,

15

owner,

16

email=None,

17

email_on_retry=True,

18

email_on_failure=True,

19

retries=0,

20

retry_delay=timedelta(seconds=300),

21

start_date=None,

22

end_date=None,

23

schedule_interval=None,

24

depends_on_past=False,

25

wait_for_downstream=False,

26

dag=None,

27

params=None,

28

default_args=None,

29

adhoc=False,

30

priority_weight=1,

31

queue='default',

32

pool=None,

33

sla=None,

34

execution_timeout=None,

35

on_failure_callback=None,

36

on_success_callback=None,

37

on_retry_callback=None,

38

trigger_rule=TriggerRule.ALL_SUCCESS,

39

**kwargs

40

):

41

"""

42

Abstract base class for all operators. Contains recursive methods for DAG crawling behavior.

43

44

Key Parameters:

45

- task_id (str): Unique, meaningful identifier for the task

46

- owner (str): Owner of the task (unix username recommended)

47

- retries (int): Number of retries before task failure

48

- retry_delay (timedelta): Delay between retries

49

- start_date (datetime): Task start date

50

- depends_on_past (bool): Task instance depends on success of previous schedule

51

- dag (DAG): The DAG this task belongs to

52

- trigger_rule (str): Rule for triggering task based on upstream states

53

- pool (str): Resource pool to use for task execution

54

- priority_weight (int): Priority weight for task scheduling

55

"""

56

57

template_fields = ()

58

template_ext = ()

59

ui_color = '#fff'

60

ui_fgcolor = '#000'

61

62

def execute(self, context):

63

"""

64

Execute the task logic (must be implemented by subclasses).

65

66

Parameters:

67

- context (dict): Task execution context containing runtime information

68

69

Raises:

70

- NotImplementedError: If not implemented by subclass

71

"""

72

73

def pre_execute(self, context):

74

"""Hook called before task execution."""

75

76

def post_execute(self, context, result):

77

"""Hook called after task execution."""

78

79

def on_kill(self):

80

"""Override to perform cleanup when task is killed."""

81

82

def set_upstream(self, task_or_task_list):

83

"""Set upstream task dependencies."""

84

85

def set_downstream(self, task_or_task_list):

86

"""Set downstream task dependencies."""

87

88

def __rshift__(self, other):

89

"""Implement >> operator for task dependencies."""

90

91

def __lshift__(self, other):

92

"""Implement << operator for task dependencies."""

93

```

94

95

**Usage Example**:

96

97

```python

98

from airflow.models import BaseOperator

99

from airflow.utils import apply_defaults

100

from datetime import datetime, timedelta

101

102

class CustomOperator(BaseOperator):

103

# Define template fields and UI colors

104

template_fields = ('input_path', 'output_path')

105

ui_color = '#87CEEB'

106

107

@apply_defaults

108

def __init__(

109

self,

110

input_path,

111

output_path,

112

processing_options=None,

113

*args,

114

**kwargs

115

):

116

super().__init__(*args, **kwargs)

117

self.input_path = input_path

118

self.output_path = output_path

119

self.processing_options = processing_options or {}

120

121

def execute(self, context):

122

"""Implement custom task logic."""

123

print(f"Processing {self.input_path} -> {self.output_path}")

124

print(f"Execution date: {context['ds']}")

125

126

# Custom processing logic here

127

result = self._process_data()

128

129

# Return value for XCom

130

return result

131

132

def _process_data(self):

133

# Implementation details

134

return "Processing completed"

135

136

def on_kill(self):

137

"""Cleanup when task is killed."""

138

print("Task was killed, performing cleanup")

139

140

# Using the custom operator

141

custom_task = CustomOperator(

142

task_id='custom_processing',

143

input_path='/data/input/{{ ds }}', # Templated

144

output_path='/data/output/{{ ds }}', # Templated

145

processing_options={'threads': 4},

146

retries=2,

147

retry_delay=timedelta(minutes=5),

148

dag=dag

149

)

150

```

151

152

### Task State Management

153

154

Constants and utilities for managing task instance states throughout the execution lifecycle with color coding for UI display.

155

156

```python { .api }

157

class State:

158

QUEUED = "queued"

159

RUNNING = "running"

160

SUCCESS = "success"

161

SHUTDOWN = "shutdown"

162

FAILED = "failed"

163

UP_FOR_RETRY = "up_for_retry"

164

UPSTREAM_FAILED = "upstream_failed"

165

SKIPPED = "skipped"

166

NONE = "none"

167

168

@classmethod

169

def color(cls, state):

170

"""

171

Get UI color for a given state.

172

173

Parameters:

174

- state (str): State name

175

176

Returns:

177

- str: Color string for UI display

178

"""

179

180

@classmethod

181

def runnable(cls):

182

"""

183

Get list of states that are considered runnable.

184

185

Returns:

186

- list: List of runnable state values

187

"""

188

```

189

190

**Usage Examples**:

191

192

```python

193

from airflow.utils import State

194

195

def check_task_state(**context):

196

task_instance = context['ti']

197

198

# Check current state

199

if task_instance.state == State.RUNNING:

200

print("Task is currently running")

201

elif task_instance.state == State.SUCCESS:

202

print("Task completed successfully")

203

elif task_instance.state == State.FAILED:

204

print("Task failed")

205

206

# Get UI color for state

207

color = State.color(task_instance.state)

208

print(f"UI color for state '{task_instance.state}': {color}")

209

210

# Check if state is runnable

211

runnable_states = State.runnable()

212

if task_instance.state in runnable_states:

213

print("Task is in a runnable state")

214

215

# Custom state checking in operators

216

class StateAwareOperator(BaseOperator):

217

@apply_defaults

218

def __init__(self, check_upstream_states=False, **kwargs):

219

super().__init__(**kwargs)

220

self.check_upstream_states = check_upstream_states

221

222

def execute(self, context):

223

if self.check_upstream_states:

224

dag = context['dag']

225

execution_date = context['execution_date']

226

227

for upstream_task_id in self.upstream_task_ids:

228

ti = dag.get_task(upstream_task_id).get_task_instance(

229

execution_date=execution_date

230

)

231

232

if ti.state != State.SUCCESS:

233

raise AirflowException(

234

f"Upstream task {upstream_task_id} is in state {ti.state}"

235

)

236

237

# Continue with task execution

238

return "Task completed"

239

```

240

241

### Trigger Rules for Task Dependencies

242

243

Constants defining when tasks should be triggered based on upstream task completion states, enabling complex workflow control patterns.

244

245

```python { .api }

246

class TriggerRule:

247

ALL_SUCCESS = 'all_success'

248

ALL_FAILED = 'all_failed'

249

ALL_DONE = 'all_done'

250

ONE_SUCCESS = 'one_success'

251

ONE_FAILED = 'one_failed'

252

DUMMY = 'dummy'

253

```

254

255

**Usage Examples**:

256

257

```python

258

from airflow.utils import TriggerRule

259

from airflow.operators.dummy_operator import DummyOperator

260

from airflow.operators.python_operator import PythonOperator

261

262

# Task that runs only if all upstream tasks succeed (default)

263

success_task = DummyOperator(

264

task_id='all_success_task',

265

trigger_rule=TriggerRule.ALL_SUCCESS, # Default behavior

266

dag=dag

267

)

268

269

# Task that runs if any upstream task fails (error handling)

270

error_handler = PythonOperator(

271

task_id='error_handler',

272

python_callable=handle_errors,

273

trigger_rule=TriggerRule.ONE_FAILED,

274

dag=dag

275

)

276

277

# Task that runs regardless of upstream task states (cleanup)

278

cleanup_task = DummyOperator(

279

task_id='cleanup',

280

trigger_rule=TriggerRule.ALL_DONE,

281

dag=dag

282

)

283

284

# Task that runs if no upstream tasks failed

285

continue_task = DummyOperator(

286

task_id='continue_processing',

287

trigger_rule=TriggerRule.NONE_FAILED,

288

dag=dag

289

)

290

291

# Always run (ignore dependencies)

292

monitoring_task = PythonOperator(

293

task_id='monitoring',

294

python_callable=send_metrics,

295

trigger_rule=TriggerRule.DUMMY,

296

dag=dag

297

)

298

299

# Complex workflow with multiple trigger rules

300

task_a = DummyOperator(task_id='task_a', dag=dag)

301

task_b = DummyOperator(task_id='task_b', dag=dag)

302

task_c = DummyOperator(task_id='task_c', dag=dag)

303

304

# Success path - continues only if all upstream succeed

305

success_path = DummyOperator(

306

task_id='success_path',

307

trigger_rule=TriggerRule.ALL_SUCCESS,

308

dag=dag

309

)

310

311

# Failure path - handles any upstream failures

312

failure_path = PythonOperator(

313

task_id='failure_path',

314

python_callable=handle_failure,

315

trigger_rule=TriggerRule.ONE_FAILED,

316

dag=dag

317

)

318

319

# Cleanup - always runs at the end

320

final_cleanup = DummyOperator(

321

task_id='final_cleanup',

322

trigger_rule=TriggerRule.ALL_DONE,

323

dag=dag

324

)

325

326

# Set dependencies

327

[task_a, task_b, task_c] >> success_path

328

[task_a, task_b, task_c] >> failure_path

329

[success_path, failure_path] >> final_cleanup

330

```

331

332

### Exception Handling

333

334

Base exception class for Airflow-specific errors with proper error propagation and logging integration.

335

336

```python { .api }

337

class AirflowException(Exception):

338

"""

339

Base exception class for Airflow-specific errors.

340

341

All Airflow operators and hooks should raise this exception type

342

for proper error handling by the scheduler and executor.

343

"""

344

pass

345

```

346

347

**Usage Examples**:

348

349

```python

350

from airflow.utils import AirflowException

351

352

def validate_input_data(**context):

353

data_path = f"/data/{context['ds']}"

354

355

# Check if data exists

356

import os

357

if not os.path.exists(data_path):

358

raise AirflowException(f"Input data not found at {data_path}")

359

360

# Check data quality

361

import pandas as pd

362

df = pd.read_csv(data_path)

363

364

if df.empty:

365

raise AirflowException(f"Data file {data_path} is empty")

366

367

if df.isnull().sum().sum() > len(df) * 0.1: # >10% missing values

368

raise AirflowException(f"Data quality check failed: too many missing values")

369

370

return f"Data validation passed for {len(df)} records"

371

372

def safe_api_call(url, **context):

373

import requests

374

375

try:

376

response = requests.get(url, timeout=30)

377

response.raise_for_status()

378

379

if not response.json():

380

raise AirflowException("API returned empty response")

381

382

return response.json()

383

384

except requests.exceptions.Timeout:

385

raise AirflowException(f"API call to {url} timed out after 30 seconds")

386

except requests.exceptions.ConnectionError:

387

raise AirflowException(f"Failed to connect to API at {url}")

388

except requests.exceptions.HTTPError as e:

389

raise AirflowException(f"HTTP error {e.response.status_code}: {e.response.text}")

390

except Exception as e:

391

raise AirflowException(f"Unexpected error calling API: {str(e)}")

392

393

# Custom operator with proper exception handling

394

class DataValidationOperator(BaseOperator):

395

@apply_defaults

396

def __init__(self, validation_rules=None, **kwargs):

397

super().__init__(**kwargs)

398

self.validation_rules = validation_rules or {}

399

400

def execute(self, context):

401

try:

402

# Perform validation

403

result = self._validate_data(context)

404

405

if not result['valid']:

406

raise AirflowException(

407

f"Data validation failed: {result['errors']}"

408

)

409

410

return result

411

412

except FileNotFoundError as e:

413

raise AirflowException(f"Required file not found: {e}")

414

except ValueError as e:

415

raise AirflowException(f"Data validation error: {e}")

416

except Exception as e:

417

# Wrap unexpected exceptions

418

raise AirflowException(f"Validation failed unexpectedly: {str(e)}")

419

420

def _validate_data(self, context):

421

# Implementation details

422

return {'valid': True, 'errors': []}

423

```

424

425

### Default Arguments Decorator

426

427

Function decorator that automatically applies default arguments from DAG configuration, enabling consistent operator parameter management across workflows.

428

429

```python { .api }

430

def apply_defaults(func):

431

"""

432

Function decorator that looks for an argument named "default_args" and fills

433

unspecified arguments from it.

434

435

Features:

436

- Searches for "default_args" parameter and applies missing arguments

437

- Provides specific information about missing arguments for debugging

438

- Enforces keyword argument usage when initializing operators

439

- Integrates with DAG-level default arguments

440

441

Parameters:

442

- func (callable): Function to decorate (typically __init__ method)

443

444

Returns:

445

- callable: Decorated function with default argument application

446

"""

447

```

448

449

**Usage Examples**:

450

451

```python

452

from airflow.utils import apply_defaults

453

from airflow.models import BaseOperator

454

from datetime import timedelta

455

456

class MyCustomOperator(BaseOperator):

457

@apply_defaults

458

def __init__(

459

self,

460

my_param,

461

optional_param=None,

462

*args,

463

**kwargs

464

):

465

super().__init__(*args, **kwargs)

466

self.my_param = my_param

467

self.optional_param = optional_param

468

469

def execute(self, context):

470

return f"Executed with {self.my_param}"

471

472

# DAG with default arguments

473

default_args = {

474

'owner': 'data_team',

475

'depends_on_past': False,

476

'retries': 2,

477

'retry_delay': timedelta(minutes=5),

478

'email_on_failure': True,

479

'email_on_retry': False,

480

'email': ['admin@example.com']

481

}

482

483

dag = DAG(

484

'example_dag',

485

default_args=default_args,

486

schedule_interval=timedelta(days=1),

487

start_date=datetime(2023, 1, 1)

488

)

489

490

# Operator automatically inherits default_args

491

task1 = MyCustomOperator(

492

task_id='task1',

493

my_param='value1',

494

dag=dag

495

# owner, retries, retry_delay, etc. are applied automatically

496

)

497

498

# Override specific defaults

499

task2 = MyCustomOperator(

500

task_id='task2',

501

my_param='value2',

502

retries=5, # Override default retries

503

owner='specific_owner', # Override default owner

504

dag=dag

505

)

506

507

# Complex operator with multiple parameter types

508

class AdvancedOperator(BaseOperator):

509

template_fields = ('input_template', 'output_template')

510

511

@apply_defaults

512

def __init__(

513

self,

514

input_path,

515

output_path,

516

processing_config=None,

517

input_template=None,

518

output_template=None,

519

validation_enabled=True,

520

*args,

521

**kwargs

522

):

523

super().__init__(*args, **kwargs)

524

self.input_path = input_path

525

self.output_path = output_path

526

self.processing_config = processing_config or {}

527

self.input_template = input_template

528

self.output_template = output_template

529

self.validation_enabled = validation_enabled

530

531

def execute(self, context):

532

# Implementation uses all parameters

533

print(f"Processing {self.input_path} -> {self.output_path}")

534

print(f"Config: {self.processing_config}")

535

return "Processing complete"

536

537

# Enhanced default arguments with custom parameters

538

enhanced_defaults = {

539

'owner': 'data_pipeline',

540

'retries': 3,

541

'retry_delay': timedelta(minutes=10),

542

'processing_config': {'threads': 4, 'memory_limit': '2GB'},

543

'validation_enabled': True,

544

'email_on_failure': True

545

}

546

547

enhanced_dag = DAG(

548

'enhanced_pipeline',

549

default_args=enhanced_defaults,

550

schedule_interval='@daily'

551

)

552

553

# Operator inherits both standard and custom defaults

554

advanced_task = AdvancedOperator(

555

task_id='advanced_processing',

556

input_path='/data/{{ ds }}',

557

output_path='/processed/{{ ds }}',

558

dag=enhanced_dag

559

# All default_args are applied automatically

560

)

561

```

562

563

## Framework Integration Patterns

564

565

### Custom Operator Development

566

567

```python

568

from airflow.models import BaseOperator

569

from airflow.utils import apply_defaults, AirflowException

570

from airflow.hooks.base_hook import BaseHook

571

572

class DatabaseETLOperator(BaseOperator):

573

"""

574

Custom operator that combines multiple framework components.

575

"""

576

template_fields = ('source_sql', 'target_table')

577

ui_color = '#4CAF50'

578

579

@apply_defaults

580

def __init__(

581

self,

582

source_conn_id,

583

target_conn_id,

584

source_sql,

585

target_table,

586

chunk_size=10000,

587

**kwargs

588

):

589

super().__init__(**kwargs)

590

self.source_conn_id = source_conn_id

591

self.target_conn_id = target_conn_id

592

self.source_sql = source_sql

593

self.target_table = target_table

594

self.chunk_size = chunk_size

595

596

def execute(self, context):

597

try:

598

# Use hooks for database connectivity

599

source_hook = BaseHook.get_hook(self.source_conn_id)

600

target_hook = BaseHook.get_hook(self.target_conn_id)

601

602

# Extract data

603

data = source_hook.get_records(self.source_sql)

604

605

if not data:

606

raise AirflowException("No data returned from source query")

607

608

# Load data in chunks

609

for i in range(0, len(data), self.chunk_size):

610

chunk = data[i:i + self.chunk_size]

611

target_hook.insert_rows(

612

table=self.target_table,

613

rows=chunk

614

)

615

616

return f"Loaded {len(data)} records to {self.target_table}"

617

618

except Exception as e:

619

raise AirflowException(f"ETL operation failed: {str(e)}")

620

621

def on_kill(self):

622

# Cleanup resources when task is killed

623

print("ETL operation was killed, performing cleanup")

624

```

625

626

### State and Trigger Rule Combinations

627

628

```python

629

# Complex workflow with multiple paths and trigger rules

630

def create_robust_workflow():

631

# Data processing tasks

632

extract_task = PythonOperator(

633

task_id='extract_data',

634

python_callable=extract_data,

635

dag=dag

636

)

637

638

validate_task = PythonOperator(

639

task_id='validate_data',

640

python_callable=validate_data,

641

dag=dag

642

)

643

644

transform_task = PythonOperator(

645

task_id='transform_data',

646

python_callable=transform_data,

647

dag=dag

648

)

649

650

# Success path

651

load_task = PythonOperator(

652

task_id='load_data',

653

python_callable=load_data,

654

trigger_rule=TriggerRule.ALL_SUCCESS,

655

dag=dag

656

)

657

658

# Error handling

659

error_notification = EmailOperator(

660

task_id='error_notification',

661

to=['admin@example.com'],

662

subject='Pipeline Failed - {{ ds }}',

663

html_content='Pipeline failed at task {{ ti.task_id }}',

664

trigger_rule=TriggerRule.ONE_FAILED,

665

dag=dag

666

)

667

668

# Cleanup (always runs)

669

cleanup_task = BashOperator(

670

task_id='cleanup',

671

bash_command='rm -rf /tmp/pipeline_{{ ds }}',

672

trigger_rule=TriggerRule.ALL_DONE,

673

dag=dag

674

)

675

676

# Set up dependencies

677

extract_task >> validate_task >> transform_task >> load_task

678

[extract_task, validate_task, transform_task, load_task] >> error_notification

679

[load_task, error_notification] >> cleanup_task

680

681

return dag

682

```