0
# Core Framework
1
2
Foundation classes and utilities that provide the essential framework for Apache Airflow operator development, state management, error handling, and workflow control. These components form the building blocks for creating custom operators and managing task execution.
3
4
## Capabilities
5
6
### Base Operator Framework
7
8
Abstract foundation class that all operators inherit from, providing core task functionality, dependency management, templating support, and DAG integration.
9
10
```python { .api }
11
class BaseOperator:
12
def __init__(
13
self,
14
task_id,
15
owner,
16
email=None,
17
email_on_retry=True,
18
email_on_failure=True,
19
retries=0,
20
retry_delay=timedelta(seconds=300),
21
start_date=None,
22
end_date=None,
23
schedule_interval=None,
24
depends_on_past=False,
25
wait_for_downstream=False,
26
dag=None,
27
params=None,
28
default_args=None,
29
adhoc=False,
30
priority_weight=1,
31
queue='default',
32
pool=None,
33
sla=None,
34
execution_timeout=None,
35
on_failure_callback=None,
36
on_success_callback=None,
37
on_retry_callback=None,
38
trigger_rule=TriggerRule.ALL_SUCCESS,
39
**kwargs
40
):
41
"""
42
Abstract base class for all operators. Contains recursive methods for DAG crawling behavior.
43
44
Key Parameters:
45
- task_id (str): Unique, meaningful identifier for the task
46
- owner (str): Owner of the task (unix username recommended)
47
- retries (int): Number of retries before task failure
48
- retry_delay (timedelta): Delay between retries
49
- start_date (datetime): Task start date
50
- depends_on_past (bool): Task instance depends on success of previous schedule
51
- dag (DAG): The DAG this task belongs to
52
- trigger_rule (str): Rule for triggering task based on upstream states
53
- pool (str): Resource pool to use for task execution
54
- priority_weight (int): Priority weight for task scheduling
55
"""
56
57
template_fields = ()
58
template_ext = ()
59
ui_color = '#fff'
60
ui_fgcolor = '#000'
61
62
def execute(self, context):
63
"""
64
Execute the task logic (must be implemented by subclasses).
65
66
Parameters:
67
- context (dict): Task execution context containing runtime information
68
69
Raises:
70
- NotImplementedError: If not implemented by subclass
71
"""
72
73
def pre_execute(self, context):
74
"""Hook called before task execution."""
75
76
def post_execute(self, context, result):
77
"""Hook called after task execution."""
78
79
def on_kill(self):
80
"""Override to perform cleanup when task is killed."""
81
82
def set_upstream(self, task_or_task_list):
83
"""Set upstream task dependencies."""
84
85
def set_downstream(self, task_or_task_list):
86
"""Set downstream task dependencies."""
87
88
def __rshift__(self, other):
89
"""Implement >> operator for task dependencies."""
90
91
def __lshift__(self, other):
92
"""Implement << operator for task dependencies."""
93
```
94
95
**Usage Example**:
96
97
```python
98
from airflow.models import BaseOperator
99
from airflow.utils import apply_defaults
100
from datetime import datetime, timedelta
101
102
class CustomOperator(BaseOperator):
103
# Define template fields and UI colors
104
template_fields = ('input_path', 'output_path')
105
ui_color = '#87CEEB'
106
107
@apply_defaults
108
def __init__(
109
self,
110
input_path,
111
output_path,
112
processing_options=None,
113
*args,
114
**kwargs
115
):
116
super().__init__(*args, **kwargs)
117
self.input_path = input_path
118
self.output_path = output_path
119
self.processing_options = processing_options or {}
120
121
def execute(self, context):
122
"""Implement custom task logic."""
123
print(f"Processing {self.input_path} -> {self.output_path}")
124
print(f"Execution date: {context['ds']}")
125
126
# Custom processing logic here
127
result = self._process_data()
128
129
# Return value for XCom
130
return result
131
132
def _process_data(self):
133
# Implementation details
134
return "Processing completed"
135
136
def on_kill(self):
137
"""Cleanup when task is killed."""
138
print("Task was killed, performing cleanup")
139
140
# Using the custom operator
141
custom_task = CustomOperator(
142
task_id='custom_processing',
143
input_path='/data/input/{{ ds }}', # Templated
144
output_path='/data/output/{{ ds }}', # Templated
145
processing_options={'threads': 4},
146
retries=2,
147
retry_delay=timedelta(minutes=5),
148
dag=dag
149
)
150
```
151
152
### Task State Management
153
154
Constants and utilities for managing task instance states throughout the execution lifecycle with color coding for UI display.
155
156
```python { .api }
157
class State:
158
QUEUED = "queued"
159
RUNNING = "running"
160
SUCCESS = "success"
161
SHUTDOWN = "shutdown"
162
FAILED = "failed"
163
UP_FOR_RETRY = "up_for_retry"
164
UPSTREAM_FAILED = "upstream_failed"
165
SKIPPED = "skipped"
166
NONE = "none"
167
168
@classmethod
169
def color(cls, state):
170
"""
171
Get UI color for a given state.
172
173
Parameters:
174
- state (str): State name
175
176
Returns:
177
- str: Color string for UI display
178
"""
179
180
@classmethod
181
def runnable(cls):
182
"""
183
Get list of states that are considered runnable.
184
185
Returns:
186
- list: List of runnable state values
187
"""
188
```
189
190
**Usage Examples**:
191
192
```python
193
from airflow.utils import State
194
195
def check_task_state(**context):
196
task_instance = context['ti']
197
198
# Check current state
199
if task_instance.state == State.RUNNING:
200
print("Task is currently running")
201
elif task_instance.state == State.SUCCESS:
202
print("Task completed successfully")
203
elif task_instance.state == State.FAILED:
204
print("Task failed")
205
206
# Get UI color for state
207
color = State.color(task_instance.state)
208
print(f"UI color for state '{task_instance.state}': {color}")
209
210
# Check if state is runnable
211
runnable_states = State.runnable()
212
if task_instance.state in runnable_states:
213
print("Task is in a runnable state")
214
215
# Custom state checking in operators
216
class StateAwareOperator(BaseOperator):
217
@apply_defaults
218
def __init__(self, check_upstream_states=False, **kwargs):
219
super().__init__(**kwargs)
220
self.check_upstream_states = check_upstream_states
221
222
def execute(self, context):
223
if self.check_upstream_states:
224
dag = context['dag']
225
execution_date = context['execution_date']
226
227
for upstream_task_id in self.upstream_task_ids:
228
ti = dag.get_task(upstream_task_id).get_task_instance(
229
execution_date=execution_date
230
)
231
232
if ti.state != State.SUCCESS:
233
raise AirflowException(
234
f"Upstream task {upstream_task_id} is in state {ti.state}"
235
)
236
237
# Continue with task execution
238
return "Task completed"
239
```
240
241
### Trigger Rules for Task Dependencies
242
243
Constants defining when tasks should be triggered based on upstream task completion states, enabling complex workflow control patterns.
244
245
```python { .api }
246
class TriggerRule:
247
ALL_SUCCESS = 'all_success'
248
ALL_FAILED = 'all_failed'
249
ALL_DONE = 'all_done'
250
ONE_SUCCESS = 'one_success'
251
ONE_FAILED = 'one_failed'
252
DUMMY = 'dummy'
253
```
254
255
**Usage Examples**:
256
257
```python
258
from airflow.utils import TriggerRule
259
from airflow.operators.dummy_operator import DummyOperator
260
from airflow.operators.python_operator import PythonOperator
261
262
# Task that runs only if all upstream tasks succeed (default)
263
success_task = DummyOperator(
264
task_id='all_success_task',
265
trigger_rule=TriggerRule.ALL_SUCCESS, # Default behavior
266
dag=dag
267
)
268
269
# Task that runs if any upstream task fails (error handling)
270
error_handler = PythonOperator(
271
task_id='error_handler',
272
python_callable=handle_errors,
273
trigger_rule=TriggerRule.ONE_FAILED,
274
dag=dag
275
)
276
277
# Task that runs regardless of upstream task states (cleanup)
278
cleanup_task = DummyOperator(
279
task_id='cleanup',
280
trigger_rule=TriggerRule.ALL_DONE,
281
dag=dag
282
)
283
284
# Task that runs if no upstream tasks failed
285
continue_task = DummyOperator(
286
task_id='continue_processing',
287
trigger_rule=TriggerRule.NONE_FAILED,
288
dag=dag
289
)
290
291
# Always run (ignore dependencies)
292
monitoring_task = PythonOperator(
293
task_id='monitoring',
294
python_callable=send_metrics,
295
trigger_rule=TriggerRule.DUMMY,
296
dag=dag
297
)
298
299
# Complex workflow with multiple trigger rules
300
task_a = DummyOperator(task_id='task_a', dag=dag)
301
task_b = DummyOperator(task_id='task_b', dag=dag)
302
task_c = DummyOperator(task_id='task_c', dag=dag)
303
304
# Success path - continues only if all upstream succeed
305
success_path = DummyOperator(
306
task_id='success_path',
307
trigger_rule=TriggerRule.ALL_SUCCESS,
308
dag=dag
309
)
310
311
# Failure path - handles any upstream failures
312
failure_path = PythonOperator(
313
task_id='failure_path',
314
python_callable=handle_failure,
315
trigger_rule=TriggerRule.ONE_FAILED,
316
dag=dag
317
)
318
319
# Cleanup - always runs at the end
320
final_cleanup = DummyOperator(
321
task_id='final_cleanup',
322
trigger_rule=TriggerRule.ALL_DONE,
323
dag=dag
324
)
325
326
# Set dependencies
327
[task_a, task_b, task_c] >> success_path
328
[task_a, task_b, task_c] >> failure_path
329
[success_path, failure_path] >> final_cleanup
330
```
331
332
### Exception Handling
333
334
Base exception class for Airflow-specific errors with proper error propagation and logging integration.
335
336
```python { .api }
337
class AirflowException(Exception):
338
"""
339
Base exception class for Airflow-specific errors.
340
341
All Airflow operators and hooks should raise this exception type
342
for proper error handling by the scheduler and executor.
343
"""
344
pass
345
```
346
347
**Usage Examples**:
348
349
```python
350
from airflow.utils import AirflowException
351
352
def validate_input_data(**context):
353
data_path = f"/data/{context['ds']}"
354
355
# Check if data exists
356
import os
357
if not os.path.exists(data_path):
358
raise AirflowException(f"Input data not found at {data_path}")
359
360
# Check data quality
361
import pandas as pd
362
df = pd.read_csv(data_path)
363
364
if df.empty:
365
raise AirflowException(f"Data file {data_path} is empty")
366
367
if df.isnull().sum().sum() > len(df) * 0.1: # >10% missing values
368
raise AirflowException(f"Data quality check failed: too many missing values")
369
370
return f"Data validation passed for {len(df)} records"
371
372
def safe_api_call(url, **context):
373
import requests
374
375
try:
376
response = requests.get(url, timeout=30)
377
response.raise_for_status()
378
379
if not response.json():
380
raise AirflowException("API returned empty response")
381
382
return response.json()
383
384
except requests.exceptions.Timeout:
385
raise AirflowException(f"API call to {url} timed out after 30 seconds")
386
except requests.exceptions.ConnectionError:
387
raise AirflowException(f"Failed to connect to API at {url}")
388
except requests.exceptions.HTTPError as e:
389
raise AirflowException(f"HTTP error {e.response.status_code}: {e.response.text}")
390
except Exception as e:
391
raise AirflowException(f"Unexpected error calling API: {str(e)}")
392
393
# Custom operator with proper exception handling
394
class DataValidationOperator(BaseOperator):
395
@apply_defaults
396
def __init__(self, validation_rules=None, **kwargs):
397
super().__init__(**kwargs)
398
self.validation_rules = validation_rules or {}
399
400
def execute(self, context):
401
try:
402
# Perform validation
403
result = self._validate_data(context)
404
405
if not result['valid']:
406
raise AirflowException(
407
f"Data validation failed: {result['errors']}"
408
)
409
410
return result
411
412
except FileNotFoundError as e:
413
raise AirflowException(f"Required file not found: {e}")
414
except ValueError as e:
415
raise AirflowException(f"Data validation error: {e}")
416
except Exception as e:
417
# Wrap unexpected exceptions
418
raise AirflowException(f"Validation failed unexpectedly: {str(e)}")
419
420
def _validate_data(self, context):
421
# Implementation details
422
return {'valid': True, 'errors': []}
423
```
424
425
### Default Arguments Decorator
426
427
Function decorator that automatically applies default arguments from DAG configuration, enabling consistent operator parameter management across workflows.
428
429
```python { .api }
430
def apply_defaults(func):
431
"""
432
Function decorator that looks for an argument named "default_args" and fills
433
unspecified arguments from it.
434
435
Features:
436
- Searches for "default_args" parameter and applies missing arguments
437
- Provides specific information about missing arguments for debugging
438
- Enforces keyword argument usage when initializing operators
439
- Integrates with DAG-level default arguments
440
441
Parameters:
442
- func (callable): Function to decorate (typically __init__ method)
443
444
Returns:
445
- callable: Decorated function with default argument application
446
"""
447
```
448
449
**Usage Examples**:
450
451
```python
452
from airflow.utils import apply_defaults
453
from airflow.models import BaseOperator
454
from datetime import timedelta
455
456
class MyCustomOperator(BaseOperator):
457
@apply_defaults
458
def __init__(
459
self,
460
my_param,
461
optional_param=None,
462
*args,
463
**kwargs
464
):
465
super().__init__(*args, **kwargs)
466
self.my_param = my_param
467
self.optional_param = optional_param
468
469
def execute(self, context):
470
return f"Executed with {self.my_param}"
471
472
# DAG with default arguments
473
default_args = {
474
'owner': 'data_team',
475
'depends_on_past': False,
476
'retries': 2,
477
'retry_delay': timedelta(minutes=5),
478
'email_on_failure': True,
479
'email_on_retry': False,
480
'email': ['admin@example.com']
481
}
482
483
dag = DAG(
484
'example_dag',
485
default_args=default_args,
486
schedule_interval=timedelta(days=1),
487
start_date=datetime(2023, 1, 1)
488
)
489
490
# Operator automatically inherits default_args
491
task1 = MyCustomOperator(
492
task_id='task1',
493
my_param='value1',
494
dag=dag
495
# owner, retries, retry_delay, etc. are applied automatically
496
)
497
498
# Override specific defaults
499
task2 = MyCustomOperator(
500
task_id='task2',
501
my_param='value2',
502
retries=5, # Override default retries
503
owner='specific_owner', # Override default owner
504
dag=dag
505
)
506
507
# Complex operator with multiple parameter types
508
class AdvancedOperator(BaseOperator):
509
template_fields = ('input_template', 'output_template')
510
511
@apply_defaults
512
def __init__(
513
self,
514
input_path,
515
output_path,
516
processing_config=None,
517
input_template=None,
518
output_template=None,
519
validation_enabled=True,
520
*args,
521
**kwargs
522
):
523
super().__init__(*args, **kwargs)
524
self.input_path = input_path
525
self.output_path = output_path
526
self.processing_config = processing_config or {}
527
self.input_template = input_template
528
self.output_template = output_template
529
self.validation_enabled = validation_enabled
530
531
def execute(self, context):
532
# Implementation uses all parameters
533
print(f"Processing {self.input_path} -> {self.output_path}")
534
print(f"Config: {self.processing_config}")
535
return "Processing complete"
536
537
# Enhanced default arguments with custom parameters
538
enhanced_defaults = {
539
'owner': 'data_pipeline',
540
'retries': 3,
541
'retry_delay': timedelta(minutes=10),
542
'processing_config': {'threads': 4, 'memory_limit': '2GB'},
543
'validation_enabled': True,
544
'email_on_failure': True
545
}
546
547
enhanced_dag = DAG(
548
'enhanced_pipeline',
549
default_args=enhanced_defaults,
550
schedule_interval='@daily'
551
)
552
553
# Operator inherits both standard and custom defaults
554
advanced_task = AdvancedOperator(
555
task_id='advanced_processing',
556
input_path='/data/{{ ds }}',
557
output_path='/processed/{{ ds }}',
558
dag=enhanced_dag
559
# All default_args are applied automatically
560
)
561
```
562
563
## Framework Integration Patterns
564
565
### Custom Operator Development
566
567
```python
568
from airflow.models import BaseOperator
569
from airflow.utils import apply_defaults, AirflowException
570
from airflow.hooks.base_hook import BaseHook
571
572
class DatabaseETLOperator(BaseOperator):
573
"""
574
Custom operator that combines multiple framework components.
575
"""
576
template_fields = ('source_sql', 'target_table')
577
ui_color = '#4CAF50'
578
579
@apply_defaults
580
def __init__(
581
self,
582
source_conn_id,
583
target_conn_id,
584
source_sql,
585
target_table,
586
chunk_size=10000,
587
**kwargs
588
):
589
super().__init__(**kwargs)
590
self.source_conn_id = source_conn_id
591
self.target_conn_id = target_conn_id
592
self.source_sql = source_sql
593
self.target_table = target_table
594
self.chunk_size = chunk_size
595
596
def execute(self, context):
597
try:
598
# Use hooks for database connectivity
599
source_hook = BaseHook.get_hook(self.source_conn_id)
600
target_hook = BaseHook.get_hook(self.target_conn_id)
601
602
# Extract data
603
data = source_hook.get_records(self.source_sql)
604
605
if not data:
606
raise AirflowException("No data returned from source query")
607
608
# Load data in chunks
609
for i in range(0, len(data), self.chunk_size):
610
chunk = data[i:i + self.chunk_size]
611
target_hook.insert_rows(
612
table=self.target_table,
613
rows=chunk
614
)
615
616
return f"Loaded {len(data)} records to {self.target_table}"
617
618
except Exception as e:
619
raise AirflowException(f"ETL operation failed: {str(e)}")
620
621
def on_kill(self):
622
# Cleanup resources when task is killed
623
print("ETL operation was killed, performing cleanup")
624
```
625
626
### State and Trigger Rule Combinations
627
628
```python
629
# Complex workflow with multiple paths and trigger rules
630
def create_robust_workflow():
631
# Data processing tasks
632
extract_task = PythonOperator(
633
task_id='extract_data',
634
python_callable=extract_data,
635
dag=dag
636
)
637
638
validate_task = PythonOperator(
639
task_id='validate_data',
640
python_callable=validate_data,
641
dag=dag
642
)
643
644
transform_task = PythonOperator(
645
task_id='transform_data',
646
python_callable=transform_data,
647
dag=dag
648
)
649
650
# Success path
651
load_task = PythonOperator(
652
task_id='load_data',
653
python_callable=load_data,
654
trigger_rule=TriggerRule.ALL_SUCCESS,
655
dag=dag
656
)
657
658
# Error handling
659
error_notification = EmailOperator(
660
task_id='error_notification',
661
to=['admin@example.com'],
662
subject='Pipeline Failed - {{ ds }}',
663
html_content='Pipeline failed at task {{ ti.task_id }}',
664
trigger_rule=TriggerRule.ONE_FAILED,
665
dag=dag
666
)
667
668
# Cleanup (always runs)
669
cleanup_task = BashOperator(
670
task_id='cleanup',
671
bash_command='rm -rf /tmp/pipeline_{{ ds }}',
672
trigger_rule=TriggerRule.ALL_DONE,
673
dag=dag
674
)
675
676
# Set up dependencies
677
extract_task >> validate_task >> transform_task >> load_task
678
[extract_task, validate_task, transform_task, load_task] >> error_notification
679
[load_task, error_notification] >> cleanup_task
680
681
return dag
682
```