0
# Core Operators
1
2
Essential operators for task execution that form the building blocks of Apache Airflow workflows. These operators handle common execution patterns including shell commands, Python functions, workflow control, and notifications.
3
4
## Capabilities
5
6
### Bash Command Execution
7
8
Execute shell commands, scripts, and system operations with environment variable support and output capture capabilities.
9
10
```python { .api }
11
class BashOperator(BaseOperator):
12
def __init__(
13
self,
14
bash_command,
15
xcom_push=False,
16
env=None,
17
**kwargs
18
):
19
"""
20
Execute a Bash script, command or set of commands.
21
22
Parameters:
23
- bash_command (str): The command, set of commands or reference to a bash script (must be '.sh') to be executed
24
- xcom_push (bool): If True, the last line written to stdout will be pushed to an XCom when the bash command completes
25
- env (dict): If not None, defines environment variables for the new process instead of inheriting the current process environment
26
- **kwargs: Additional BaseOperator parameters
27
"""
28
29
template_fields = ('bash_command',)
30
template_ext = ('.sh', '.bash')
31
ui_color = '#f0ede4'
32
33
def execute(self, context): ...
34
def on_kill(self): ...
35
```
36
37
**Usage Example**:
38
39
```python
40
from airflow.operators.bash_operator import BashOperator
41
42
# Simple command execution
43
bash_task = BashOperator(
44
task_id='run_bash_script',
45
bash_command='echo "Processing started at $(date)"',
46
dag=dag
47
)
48
49
# Script execution with environment variables
50
script_task = BashOperator(
51
task_id='run_data_script',
52
bash_command='/path/to/process_data.sh',
53
env={'DATA_PATH': '/tmp/data', 'LOG_LEVEL': 'INFO'},
54
xcom_push=True, # Capture script output
55
dag=dag
56
)
57
58
# Templated command using Airflow variables
59
templated_task = BashOperator(
60
task_id='templated_command',
61
bash_command='echo "Processing data for {{ ds }}"',
62
dag=dag
63
)
64
```
65
66
### Python Function Execution
67
68
Execute Python callables with parameter passing, context injection, and template support for dynamic task execution.
69
70
```python { .api }
71
class PythonOperator(BaseOperator):
72
def __init__(
73
self,
74
python_callable,
75
op_args=[],
76
op_kwargs={},
77
provide_context=False,
78
templates_dict=None,
79
templates_exts=None,
80
**kwargs
81
):
82
"""
83
Executes a Python callable.
84
85
Parameters:
86
- python_callable (callable): A reference to an object that is callable
87
- op_args (list, default=None): List of positional arguments that will get unpacked when calling your callable
88
- op_kwargs (dict, default=None): Dictionary of keyword arguments that will get unpacked in your function
89
- provide_context (bool): If True, Airflow will pass keyword arguments that can be used in your function
90
- templates_dict (dict): Dictionary where values are templates that will get templated by Airflow engine
91
- templates_exts (list): List of file extensions to resolve while processing templated fields
92
- **kwargs: Additional BaseOperator parameters
93
"""
94
95
template_fields = ('templates_dict',)
96
template_ext = tuple()
97
ui_color = '#ffefeb'
98
99
def execute(self, context): ...
100
```
101
102
**Usage Examples**:
103
104
```python
105
from airflow.operators.python_operator import PythonOperator
106
107
# Simple function execution
108
def my_python_function():
109
print("Task executed successfully")
110
return "Success"
111
112
python_task = PythonOperator(
113
task_id='run_python_function',
114
python_callable=my_python_function,
115
dag=dag
116
)
117
118
# Function with parameters
119
def process_data(input_path, output_path, **kwargs):
120
print(f"Processing {input_path} -> {output_path}")
121
# Processing logic here
122
return f"Processed {input_path}"
123
124
process_task = PythonOperator(
125
task_id='process_data',
126
python_callable=process_data,
127
op_args=['/tmp/input'],
128
op_kwargs={'output_path': '/tmp/output'},
129
dag=dag
130
)
131
132
# Function with Airflow context
133
def context_aware_function(**context):
134
execution_date = context['ds']
135
task_instance = context['ti']
136
dag = context['dag']
137
138
print(f"Execution date: {execution_date}")
139
print(f"Task ID: {task_instance.task_id}")
140
return f"Processed for {execution_date}"
141
142
context_task = PythonOperator(
143
task_id='context_function',
144
python_callable=context_aware_function,
145
provide_context=True,
146
dag=dag
147
)
148
```
149
150
### Workflow Branching
151
152
Control workflow execution paths based on runtime conditions with dynamic task selection and conditional execution.
153
154
```python { .api }
155
class BranchPythonOperator(PythonOperator):
156
def __init__(self, **kwargs):
157
"""
158
Allows a workflow to "branch" or follow a single path following the execution of this task.
159
160
The python_callable should return the task_id to follow. The returned task_id should point
161
to a task directly downstream from this operator. All other "branches" or directly
162
downstream tasks are marked with a state of "skipped".
163
"""
164
165
def execute(self, context): ...
166
```
167
168
**Usage Example**:
169
170
```python
171
from airflow.operators.python_operator import BranchPythonOperator
172
from airflow.operators.dummy_operator import DummyOperator
173
174
def choose_branch(**context):
175
# Decision logic based on context or external conditions
176
execution_date = context['ds']
177
178
# Example: Branch based on day of week
179
from datetime import datetime
180
date_obj = datetime.strptime(execution_date, '%Y-%m-%d')
181
182
if date_obj.weekday() < 5: # Monday-Friday
183
return 'weekday_processing'
184
else: # Weekend
185
return 'weekend_processing'
186
187
branch_task = BranchPythonOperator(
188
task_id='branch_decision',
189
python_callable=choose_branch,
190
provide_context=True,
191
dag=dag
192
)
193
194
weekday_task = DummyOperator(
195
task_id='weekday_processing',
196
dag=dag
197
)
198
199
weekend_task = DummyOperator(
200
task_id='weekend_processing',
201
dag=dag
202
)
203
204
# Set up branching
205
branch_task >> [weekday_task, weekend_task]
206
```
207
208
### Conditional Workflow Continuation
209
210
Stop workflow execution based on conditions while skipping downstream tasks when criteria are not met.
211
212
```python { .api }
213
class ShortCircuitOperator(PythonOperator):
214
def __init__(self, **kwargs):
215
"""
216
Allows a workflow to continue only if a condition is met. Otherwise, the workflow
217
"short-circuits" and downstream tasks are skipped.
218
219
The python_callable should return True to continue or False to short-circuit.
220
Any downstream tasks are marked with a state of "skipped" when condition is False.
221
"""
222
223
def execute(self, context): ...
224
```
225
226
**Usage Example**:
227
228
```python
229
from airflow.operators.python_operator import ShortCircuitOperator
230
231
def check_data_availability(**context):
232
# Check if required data is available
233
import os
234
data_path = f"/data/{context['ds']}"
235
236
if os.path.exists(data_path) and os.listdir(data_path):
237
print(f"Data available for {context['ds']}")
238
return True
239
else:
240
print(f"No data available for {context['ds']}, skipping downstream tasks")
241
return False
242
243
condition_check = ShortCircuitOperator(
244
task_id='check_data',
245
python_callable=check_data_availability,
246
provide_context=True,
247
dag=dag
248
)
249
250
# These tasks will be skipped if condition_check returns False
251
downstream_task1 = DummyOperator(task_id='process_data', dag=dag)
252
downstream_task2 = DummyOperator(task_id='generate_report', dag=dag)
253
254
condition_check >> [downstream_task1, downstream_task2]
255
```
256
257
### Workflow Placeholders and Grouping
258
259
Provide structural elements for DAG organization without performing actual work, useful for workflow visualization and dependency management.
260
261
```python { .api }
262
class DummyOperator(BaseOperator):
263
def __init__(self, **kwargs):
264
"""
265
Operator that does literally nothing. It can be used to group tasks in a DAG.
266
"""
267
268
template_fields = tuple()
269
ui_color = '#e8f7e4'
270
271
def execute(self, context): ...
272
```
273
274
**Usage Example**:
275
276
```python
277
from airflow.operators.dummy_operator import DummyOperator
278
279
# Workflow structure and grouping
280
start_task = DummyOperator(
281
task_id='workflow_start',
282
dag=dag
283
)
284
285
data_processing_start = DummyOperator(
286
task_id='data_processing_start',
287
dag=dag
288
)
289
290
data_processing_end = DummyOperator(
291
task_id='data_processing_end',
292
dag=dag
293
)
294
295
workflow_end = DummyOperator(
296
task_id='workflow_end',
297
dag=dag
298
)
299
300
# Create workflow structure
301
start_task >> data_processing_start
302
data_processing_start >> [task1, task2, task3] # Parallel processing
303
[task1, task2, task3] >> data_processing_end
304
data_processing_end >> workflow_end
305
```
306
307
### Email Notifications
308
309
Send email notifications with template support for dynamic content and file attachments.
310
311
```python { .api }
312
class EmailOperator(BaseOperator):
313
def __init__(
314
self,
315
to,
316
subject,
317
html_content,
318
files=None,
319
**kwargs
320
):
321
"""
322
Sends an email.
323
324
Parameters:
325
- to (str or list): List of emails to send the email to (comma or semicolon delimited if string)
326
- subject (str): Subject line for the email (templated)
327
- html_content (str): Content of the email (templated), html markup is allowed
328
- files (list, default=None): File names to attach in email
329
- **kwargs: Additional BaseOperator parameters
330
"""
331
332
template_fields = ('subject', 'html_content')
333
template_ext = ('.html',)
334
ui_color = '#e6faf9'
335
336
def execute(self, context): ...
337
```
338
339
**Usage Examples**:
340
341
```python
342
from airflow.operators.email_operator import EmailOperator
343
344
# Simple notification
345
email_task = EmailOperator(
346
task_id='send_notification',
347
to=['admin@example.com', 'team@example.com'],
348
subject='Workflow Completed Successfully',
349
html_content='<h2>Daily ETL process completed at {{ ts }}</h2>',
350
dag=dag
351
)
352
353
# Detailed report with attachments
354
report_email = EmailOperator(
355
task_id='send_report',
356
to='reports@example.com',
357
subject='Daily Report - {{ ds }}',
358
html_content='''
359
<h1>Daily Processing Report</h1>
360
<p>Execution Date: {{ ds }}</p>
361
<p>Task Instance: {{ ti.task_id }}</p>
362
<p>DAG: {{ dag.dag_id }}</p>
363
<h2>Summary</h2>
364
<p>All tasks completed successfully.</p>
365
''',
366
files=['/tmp/daily_report.pdf', '/tmp/data_summary.csv'],
367
dag=dag
368
)
369
370
# Conditional email based on task status
371
def send_failure_email(**context):
372
return EmailOperator(
373
task_id='failure_email',
374
to=['alerts@example.com'],
375
subject=f'ALERT: Task Failed - {context["task_instance"].task_id}',
376
html_content=f'''
377
<h1>Task Failure Alert</h1>
378
<p><strong>Task:</strong> {context["task_instance"].task_id}</p>
379
<p><strong>DAG:</strong> {context["dag"].dag_id}</p>
380
<p><strong>Execution Date:</strong> {context["ds"]}</p>
381
<p>Please investigate the failure immediately.</p>
382
''',
383
dag=dag
384
).execute(context)
385
```
386
387
### Database Operations
388
389
Execute SQL queries and statements against various database systems with connection management, parameter binding, and transaction control.
390
391
#### MySQL Operations
392
393
Execute SQL code in MySQL databases with connection management and parameter support.
394
395
```python { .api }
396
class MySqlOperator(BaseOperator):
397
def __init__(
398
self,
399
sql,
400
mysql_conn_id='mysql_default',
401
parameters=None,
402
**kwargs
403
):
404
"""
405
Executes SQL code in a specific MySQL database.
406
407
Parameters:
408
- sql (str|list): SQL statement, list of statements, or reference to template file (.sql)
409
- mysql_conn_id (str): Reference to MySQL connection ID
410
- parameters (dict): Parameters for SQL query binding
411
- **kwargs: Additional BaseOperator parameters
412
"""
413
414
template_fields = ('sql',)
415
template_ext = ('.sql',)
416
ui_color = '#ededed'
417
418
def execute(self, context): ...
419
```
420
421
#### PostgreSQL Operations
422
423
Execute SQL code in PostgreSQL databases with autocommit control and parameter binding.
424
425
```python { .api }
426
class PostgresOperator(BaseOperator):
427
def __init__(
428
self,
429
sql,
430
postgres_conn_id='postgres_default',
431
autocommit=False,
432
parameters=None,
433
**kwargs
434
):
435
"""
436
Executes SQL code in a specific PostgreSQL database.
437
438
Parameters:
439
- sql (str|list): SQL statement, list of statements, or reference to template file (.sql)
440
- postgres_conn_id (str): Reference to PostgreSQL connection ID
441
- autocommit (bool): Enable autocommit for the SQL execution
442
- parameters (dict): Parameters for SQL query binding
443
- **kwargs: Additional BaseOperator parameters
444
"""
445
446
template_fields = ('sql',)
447
template_ext = ('.sql',)
448
ui_color = '#ededed'
449
450
def execute(self, context): ...
451
```
452
453
#### SQLite Operations
454
455
Execute SQL code in SQLite databases with lightweight database operations.
456
457
```python { .api }
458
class SqliteOperator(BaseOperator):
459
def __init__(
460
self,
461
sql,
462
sqlite_conn_id='sqlite_default',
463
parameters=None,
464
**kwargs
465
):
466
"""
467
Executes SQL code in a specific SQLite database.
468
469
Parameters:
470
- sql (str|list): SQL statement, list of statements, or reference to template file (.sql)
471
- sqlite_conn_id (str): Reference to SQLite connection ID
472
- parameters (dict): Parameters for SQL query binding
473
- **kwargs: Additional BaseOperator parameters
474
"""
475
476
template_fields = ('sql',)
477
template_ext = ('.sql',)
478
ui_color = '#ededed'
479
480
def execute(self, context): ...
481
```
482
483
**Usage Examples**:
484
485
```python
486
from airflow.operators.mysql_operator import MySqlOperator
487
from airflow.operators.postgres_operator import PostgresOperator
488
from airflow.operators.sqlite_operator import SqliteOperator
489
490
# MySQL query execution
491
mysql_task = MySqlOperator(
492
task_id='run_mysql_query',
493
mysql_conn_id='mysql_prod',
494
sql='''
495
INSERT INTO daily_stats (date, record_count, avg_value)
496
SELECT '{{ ds }}', COUNT(*), AVG(value)
497
FROM transactions
498
WHERE DATE(created_at) = '{{ ds }}'
499
''',
500
dag=dag
501
)
502
503
# PostgreSQL with parameters
504
postgres_task = PostgresOperator(
505
task_id='update_user_stats',
506
postgres_conn_id='postgres_warehouse',
507
sql='''
508
UPDATE user_metrics
509
SET last_login = %(login_time)s,
510
login_count = login_count + 1
511
WHERE user_id = %(user_id)s
512
''',
513
parameters={'login_time': '{{ ts }}', 'user_id': 12345},
514
autocommit=True,
515
dag=dag
516
)
517
518
# SQLite file operations
519
sqlite_task = SqliteOperator(
520
task_id='local_db_cleanup',
521
sqlite_conn_id='sqlite_local',
522
sql='/path/to/cleanup_script.sql',
523
dag=dag
524
)
525
```
526
527
### HTTP Operations
528
529
Execute HTTP requests against external APIs and web services with response validation and customizable request parameters.
530
531
```python { .api }
532
class SimpleHttpOperator(BaseOperator):
533
def __init__(
534
self,
535
endpoint,
536
method='POST',
537
data=None,
538
headers=None,
539
response_check=None,
540
extra_options=None,
541
http_conn_id='http_default',
542
**kwargs
543
):
544
"""
545
Calls an endpoint on an HTTP system to execute an action.
546
547
Parameters:
548
- endpoint (str): The relative part of the full URL
549
- method (str): HTTP method to use (default: 'POST')
550
- data (dict): Data to pass (POST/PUT data or URL params for GET)
551
- headers (dict): HTTP headers to add to the request
552
- response_check (callable): Function to validate response (returns True/False)
553
- extra_options (dict): Extra options for requests library (timeout, ssl, etc.)
554
- http_conn_id (str): Reference to HTTP connection ID
555
- **kwargs: Additional BaseOperator parameters
556
"""
557
558
template_fields = ('endpoint',)
559
template_ext = ()
560
ui_color = '#f4a460'
561
562
def execute(self, context): ...
563
```
564
565
**Usage Example**:
566
567
```python
568
from airflow.operators.http_operator import SimpleHttpOperator
569
570
# POST request with data
571
api_call = SimpleHttpOperator(
572
task_id='api_post',
573
http_conn_id='api_server',
574
endpoint='/v1/process',
575
method='POST',
576
data={'job_id': '{{ dag_run.run_id }}', 'date': '{{ ds }}'},
577
headers={'Content-Type': 'application/json'},
578
dag=dag
579
)
580
581
# GET request with response validation
582
def check_status_code(response):
583
return response.status_code == 200
584
585
status_check = SimpleHttpOperator(
586
task_id='health_check',
587
http_conn_id='service_api',
588
endpoint='/health',
589
method='GET',
590
response_check=check_status_code,
591
dag=dag
592
)
593
```
594
595
### Workflow Composition
596
597
Create complex workflows by embedding sub-DAGs within parent DAGs for modular and reusable workflow components.
598
599
```python { .api }
600
class SubDagOperator(BaseOperator):
601
def __init__(
602
self,
603
subdag,
604
executor=DEFAULT_EXECUTOR,
605
**kwargs
606
):
607
"""
608
Execute a sub-DAG as part of a larger workflow.
609
610
By convention, a sub-DAG's dag_id should be prefixed by its parent and a dot,
611
as in 'parent.child'.
612
613
Parameters:
614
- subdag (DAG): The DAG object to run as a subdag of the current DAG
615
- executor (BaseExecutor): Executor to use for the sub-DAG
616
- **kwargs: Additional BaseOperator parameters (must include 'dag')
617
"""
618
619
template_fields = tuple()
620
ui_color = '#555'
621
ui_fgcolor = '#fff'
622
623
def execute(self, context): ...
624
```
625
626
**Usage Example**:
627
628
```python
629
from airflow.operators.subdag_operator import SubDagOperator
630
from airflow import DAG
631
from datetime import datetime, timedelta
632
633
# Define the sub-DAG
634
def create_subdag(parent_dag_id, child_dag_id, start_date, schedule_interval):
635
subdag = DAG(
636
dag_id=f'{parent_dag_id}.{child_dag_id}',
637
start_date=start_date,
638
schedule_interval=schedule_interval,
639
)
640
641
# Add tasks to sub-DAG
642
task1 = DummyOperator(task_id='subtask1', dag=subdag)
643
task2 = DummyOperator(task_id='subtask2', dag=subdag)
644
task1 >> task2
645
646
return subdag
647
648
# Use SubDagOperator in main DAG
649
subdag_task = SubDagOperator(
650
task_id='parallel_processing',
651
subdag=create_subdag(
652
parent_dag_id='main_dag',
653
child_dag_id='parallel_processing',
654
start_date=datetime(2023, 1, 1),
655
schedule_interval='@daily'
656
),
657
dag=dag
658
)
659
```
660
661
### Sensor Operations
662
663
Monitor external systems and wait for conditions to be met before proceeding with downstream tasks. Sensors periodically check conditions and succeed when criteria are satisfied.
664
665
#### Base Sensor Framework
666
667
Abstract foundation for all sensor operators providing polling mechanism, timeout handling, and configurable check intervals.
668
669
```python { .api }
670
class BaseSensorOperator(BaseOperator):
671
def __init__(
672
self,
673
poke_interval=60,
674
timeout=60*60*24*7,
675
**kwargs
676
):
677
"""
678
Base class for all sensor operators that keep executing at intervals until criteria is met.
679
680
Parameters:
681
- poke_interval (int): Time in seconds between each check (default: 60)
682
- timeout (int): Time in seconds before the task times out and fails (default: 7 days)
683
- **kwargs: Additional BaseOperator parameters
684
"""
685
686
ui_color = '#e6f1f2'
687
688
def poke(self, context):
689
"""Override this method to define sensor condition check logic."""
690
691
def execute(self, context): ...
692
```
693
694
#### SQL-based Sensors
695
696
Monitor database conditions by executing SQL queries until specified criteria are met.
697
698
```python { .api }
699
class SqlSensor(BaseSensorOperator):
700
def __init__(
701
self,
702
conn_id,
703
sql,
704
**kwargs
705
):
706
"""
707
Runs a SQL statement until criteria is met. Succeeds when SQL returns non-empty, non-zero result.
708
709
Parameters:
710
- conn_id (str): The connection ID to run the sensor against
711
- sql (str): SQL statement to execute. Must return at least one non-zero/non-empty cell to pass
712
- **kwargs: Additional BaseSensorOperator parameters
713
"""
714
715
template_fields = ('sql',)
716
template_ext = ('.hql', '.sql')
717
718
def poke(self, context): ...
719
```
720
721
#### File System Sensors
722
723
Monitor file systems for the presence of files or directories before proceeding with workflow execution.
724
725
```python { .api }
726
class HdfsSensor(BaseSensorOperator):
727
def __init__(
728
self,
729
filepath,
730
hdfs_conn_id='hdfs_default',
731
**kwargs
732
):
733
"""
734
Waits for a file or folder to appear in HDFS.
735
736
Parameters:
737
- filepath (str): Path to file or directory in HDFS
738
- hdfs_conn_id (str): Reference to HDFS connection ID
739
- **kwargs: Additional BaseSensorOperator parameters
740
"""
741
742
template_fields = ('filepath',)
743
744
def poke(self, context): ...
745
746
class WebHdfsSensor(BaseSensorOperator):
747
def __init__(
748
self,
749
filepath,
750
webhdfs_conn_id='webhdfs_default',
751
**kwargs
752
):
753
"""
754
Waits for a file or folder to appear in HDFS via WebHDFS API.
755
756
Parameters:
757
- filepath (str): Path to file or directory in HDFS
758
- webhdfs_conn_id (str): Reference to WebHDFS connection ID
759
- **kwargs: Additional BaseSensorOperator parameters
760
"""
761
762
template_fields = ('filepath',)
763
764
def poke(self, context): ...
765
```
766
767
**Usage Examples**:
768
769
```python
770
from airflow.operators.sensors import BaseSensorOperator, SqlSensor, HdfsSensor
771
772
# Custom sensor implementation
773
class DataReadySensor(BaseSensorOperator):
774
def __init__(self, data_path, **kwargs):
775
super().__init__(**kwargs)
776
self.data_path = data_path
777
778
def poke(self, context):
779
import os
780
return os.path.exists(self.data_path) and os.listdir(self.data_path)
781
782
data_sensor = DataReadySensor(
783
task_id='wait_for_data',
784
data_path='/data/{{ ds }}',
785
poke_interval=30, # Check every 30 seconds
786
timeout=3600, # Timeout after 1 hour
787
dag=dag
788
)
789
790
# SQL sensor for database monitoring
791
db_sensor = SqlSensor(
792
task_id='wait_for_records',
793
conn_id='postgres_prod',
794
sql='''
795
SELECT COUNT(*)
796
FROM transactions
797
WHERE DATE(created_at) = '{{ ds }}'
798
AND status = 'completed'
799
''',
800
poke_interval=300, # Check every 5 minutes
801
dag=dag
802
)
803
804
# HDFS file sensor
805
file_sensor = HdfsSensor(
806
task_id='wait_for_hdfs_file',
807
filepath='/data/raw/{{ ds }}/input.parquet',
808
hdfs_conn_id='hdfs_cluster',
809
poke_interval=60,
810
dag=dag
811
)
812
813
# Chain sensors with processing tasks
814
data_sensor >> db_sensor >> file_sensor >> processing_task
815
```
816
817
## Template Support
818
819
Most operators support Jinja templating for dynamic content:
820
821
```python
822
# Template variables available in operators
823
templated_bash = BashOperator(
824
task_id='templated_bash',
825
bash_command='echo "Processing {{ ds }} in DAG {{ dag.dag_id }}"',
826
dag=dag
827
)
828
829
templated_email = EmailOperator(
830
task_id='templated_email',
831
to=['admin@example.com'],
832
subject='Report for {{ ds }}',
833
html_content='''
834
<h1>Report Generated</h1>
835
<p>Date: {{ ds }}</p>
836
<p>Timestamp: {{ ts }}</p>
837
<p>Previous Date: {{ prev_ds }}</p>
838
<p>Next Date: {{ next_ds }}</p>
839
''',
840
dag=dag
841
)
842
```