or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mdhooks.mdindex.mdoperators.md

operators.mddocs/

0

# Core Operators

1

2

Essential operators for task execution that form the building blocks of Apache Airflow workflows. These operators handle common execution patterns including shell commands, Python functions, workflow control, and notifications.

3

4

## Capabilities

5

6

### Bash Command Execution

7

8

Execute shell commands, scripts, and system operations with environment variable support and output capture capabilities.

9

10

```python { .api }

11

class BashOperator(BaseOperator):

12

def __init__(

13

self,

14

bash_command,

15

xcom_push=False,

16

env=None,

17

**kwargs

18

):

19

"""

20

Execute a Bash script, command or set of commands.

21

22

Parameters:

23

- bash_command (str): The command, set of commands or reference to a bash script (must be '.sh') to be executed

24

- xcom_push (bool): If True, the last line written to stdout will be pushed to an XCom when the bash command completes

25

- env (dict): If not None, defines environment variables for the new process instead of inheriting the current process environment

26

- **kwargs: Additional BaseOperator parameters

27

"""

28

29

template_fields = ('bash_command',)

30

template_ext = ('.sh', '.bash')

31

ui_color = '#f0ede4'

32

33

def execute(self, context): ...

34

def on_kill(self): ...

35

```

36

37

**Usage Example**:

38

39

```python

40

from airflow.operators.bash_operator import BashOperator

41

42

# Simple command execution

43

bash_task = BashOperator(

44

task_id='run_bash_script',

45

bash_command='echo "Processing started at $(date)"',

46

dag=dag

47

)

48

49

# Script execution with environment variables

50

script_task = BashOperator(

51

task_id='run_data_script',

52

bash_command='/path/to/process_data.sh',

53

env={'DATA_PATH': '/tmp/data', 'LOG_LEVEL': 'INFO'},

54

xcom_push=True, # Capture script output

55

dag=dag

56

)

57

58

# Templated command using Airflow variables

59

templated_task = BashOperator(

60

task_id='templated_command',

61

bash_command='echo "Processing data for {{ ds }}"',

62

dag=dag

63

)

64

```

65

66

### Python Function Execution

67

68

Execute Python callables with parameter passing, context injection, and template support for dynamic task execution.

69

70

```python { .api }

71

class PythonOperator(BaseOperator):

72

def __init__(

73

self,

74

python_callable,

75

op_args=[],

76

op_kwargs={},

77

provide_context=False,

78

templates_dict=None,

79

templates_exts=None,

80

**kwargs

81

):

82

"""

83

Executes a Python callable.

84

85

Parameters:

86

- python_callable (callable): A reference to an object that is callable

87

- op_args (list, default=None): List of positional arguments that will get unpacked when calling your callable

88

- op_kwargs (dict, default=None): Dictionary of keyword arguments that will get unpacked in your function

89

- provide_context (bool): If True, Airflow will pass keyword arguments that can be used in your function

90

- templates_dict (dict): Dictionary where values are templates that will get templated by Airflow engine

91

- templates_exts (list): List of file extensions to resolve while processing templated fields

92

- **kwargs: Additional BaseOperator parameters

93

"""

94

95

template_fields = ('templates_dict',)

96

template_ext = tuple()

97

ui_color = '#ffefeb'

98

99

def execute(self, context): ...

100

```

101

102

**Usage Examples**:

103

104

```python

105

from airflow.operators.python_operator import PythonOperator

106

107

# Simple function execution

108

def my_python_function():

109

print("Task executed successfully")

110

return "Success"

111

112

python_task = PythonOperator(

113

task_id='run_python_function',

114

python_callable=my_python_function,

115

dag=dag

116

)

117

118

# Function with parameters

119

def process_data(input_path, output_path, **kwargs):

120

print(f"Processing {input_path} -> {output_path}")

121

# Processing logic here

122

return f"Processed {input_path}"

123

124

process_task = PythonOperator(

125

task_id='process_data',

126

python_callable=process_data,

127

op_args=['/tmp/input'],

128

op_kwargs={'output_path': '/tmp/output'},

129

dag=dag

130

)

131

132

# Function with Airflow context

133

def context_aware_function(**context):

134

execution_date = context['ds']

135

task_instance = context['ti']

136

dag = context['dag']

137

138

print(f"Execution date: {execution_date}")

139

print(f"Task ID: {task_instance.task_id}")

140

return f"Processed for {execution_date}"

141

142

context_task = PythonOperator(

143

task_id='context_function',

144

python_callable=context_aware_function,

145

provide_context=True,

146

dag=dag

147

)

148

```

149

150

### Workflow Branching

151

152

Control workflow execution paths based on runtime conditions with dynamic task selection and conditional execution.

153

154

```python { .api }

155

class BranchPythonOperator(PythonOperator):

156

def __init__(self, **kwargs):

157

"""

158

Allows a workflow to "branch" or follow a single path following the execution of this task.

159

160

The python_callable should return the task_id to follow. The returned task_id should point

161

to a task directly downstream from this operator. All other "branches" or directly

162

downstream tasks are marked with a state of "skipped".

163

"""

164

165

def execute(self, context): ...

166

```

167

168

**Usage Example**:

169

170

```python

171

from airflow.operators.python_operator import BranchPythonOperator

172

from airflow.operators.dummy_operator import DummyOperator

173

174

def choose_branch(**context):

175

# Decision logic based on context or external conditions

176

execution_date = context['ds']

177

178

# Example: Branch based on day of week

179

from datetime import datetime

180

date_obj = datetime.strptime(execution_date, '%Y-%m-%d')

181

182

if date_obj.weekday() < 5: # Monday-Friday

183

return 'weekday_processing'

184

else: # Weekend

185

return 'weekend_processing'

186

187

branch_task = BranchPythonOperator(

188

task_id='branch_decision',

189

python_callable=choose_branch,

190

provide_context=True,

191

dag=dag

192

)

193

194

weekday_task = DummyOperator(

195

task_id='weekday_processing',

196

dag=dag

197

)

198

199

weekend_task = DummyOperator(

200

task_id='weekend_processing',

201

dag=dag

202

)

203

204

# Set up branching

205

branch_task >> [weekday_task, weekend_task]

206

```

207

208

### Conditional Workflow Continuation

209

210

Stop workflow execution based on conditions while skipping downstream tasks when criteria are not met.

211

212

```python { .api }

213

class ShortCircuitOperator(PythonOperator):

214

def __init__(self, **kwargs):

215

"""

216

Allows a workflow to continue only if a condition is met. Otherwise, the workflow

217

"short-circuits" and downstream tasks are skipped.

218

219

The python_callable should return True to continue or False to short-circuit.

220

Any downstream tasks are marked with a state of "skipped" when condition is False.

221

"""

222

223

def execute(self, context): ...

224

```

225

226

**Usage Example**:

227

228

```python

229

from airflow.operators.python_operator import ShortCircuitOperator

230

231

def check_data_availability(**context):

232

# Check if required data is available

233

import os

234

data_path = f"/data/{context['ds']}"

235

236

if os.path.exists(data_path) and os.listdir(data_path):

237

print(f"Data available for {context['ds']}")

238

return True

239

else:

240

print(f"No data available for {context['ds']}, skipping downstream tasks")

241

return False

242

243

condition_check = ShortCircuitOperator(

244

task_id='check_data',

245

python_callable=check_data_availability,

246

provide_context=True,

247

dag=dag

248

)

249

250

# These tasks will be skipped if condition_check returns False

251

downstream_task1 = DummyOperator(task_id='process_data', dag=dag)

252

downstream_task2 = DummyOperator(task_id='generate_report', dag=dag)

253

254

condition_check >> [downstream_task1, downstream_task2]

255

```

256

257

### Workflow Placeholders and Grouping

258

259

Provide structural elements for DAG organization without performing actual work, useful for workflow visualization and dependency management.

260

261

```python { .api }

262

class DummyOperator(BaseOperator):

263

def __init__(self, **kwargs):

264

"""

265

Operator that does literally nothing. It can be used to group tasks in a DAG.

266

"""

267

268

template_fields = tuple()

269

ui_color = '#e8f7e4'

270

271

def execute(self, context): ...

272

```

273

274

**Usage Example**:

275

276

```python

277

from airflow.operators.dummy_operator import DummyOperator

278

279

# Workflow structure and grouping

280

start_task = DummyOperator(

281

task_id='workflow_start',

282

dag=dag

283

)

284

285

data_processing_start = DummyOperator(

286

task_id='data_processing_start',

287

dag=dag

288

)

289

290

data_processing_end = DummyOperator(

291

task_id='data_processing_end',

292

dag=dag

293

)

294

295

workflow_end = DummyOperator(

296

task_id='workflow_end',

297

dag=dag

298

)

299

300

# Create workflow structure

301

start_task >> data_processing_start

302

data_processing_start >> [task1, task2, task3] # Parallel processing

303

[task1, task2, task3] >> data_processing_end

304

data_processing_end >> workflow_end

305

```

306

307

### Email Notifications

308

309

Send email notifications with template support for dynamic content and file attachments.

310

311

```python { .api }

312

class EmailOperator(BaseOperator):

313

def __init__(

314

self,

315

to,

316

subject,

317

html_content,

318

files=None,

319

**kwargs

320

):

321

"""

322

Sends an email.

323

324

Parameters:

325

- to (str or list): List of emails to send the email to (comma or semicolon delimited if string)

326

- subject (str): Subject line for the email (templated)

327

- html_content (str): Content of the email (templated), html markup is allowed

328

- files (list, default=None): File names to attach in email

329

- **kwargs: Additional BaseOperator parameters

330

"""

331

332

template_fields = ('subject', 'html_content')

333

template_ext = ('.html',)

334

ui_color = '#e6faf9'

335

336

def execute(self, context): ...

337

```

338

339

**Usage Examples**:

340

341

```python

342

from airflow.operators.email_operator import EmailOperator

343

344

# Simple notification

345

email_task = EmailOperator(

346

task_id='send_notification',

347

to=['admin@example.com', 'team@example.com'],

348

subject='Workflow Completed Successfully',

349

html_content='<h2>Daily ETL process completed at {{ ts }}</h2>',

350

dag=dag

351

)

352

353

# Detailed report with attachments

354

report_email = EmailOperator(

355

task_id='send_report',

356

to='reports@example.com',

357

subject='Daily Report - {{ ds }}',

358

html_content='''

359

<h1>Daily Processing Report</h1>

360

<p>Execution Date: {{ ds }}</p>

361

<p>Task Instance: {{ ti.task_id }}</p>

362

<p>DAG: {{ dag.dag_id }}</p>

363

<h2>Summary</h2>

364

<p>All tasks completed successfully.</p>

365

''',

366

files=['/tmp/daily_report.pdf', '/tmp/data_summary.csv'],

367

dag=dag

368

)

369

370

# Conditional email based on task status

371

def send_failure_email(**context):

372

return EmailOperator(

373

task_id='failure_email',

374

to=['alerts@example.com'],

375

subject=f'ALERT: Task Failed - {context["task_instance"].task_id}',

376

html_content=f'''

377

<h1>Task Failure Alert</h1>

378

<p><strong>Task:</strong> {context["task_instance"].task_id}</p>

379

<p><strong>DAG:</strong> {context["dag"].dag_id}</p>

380

<p><strong>Execution Date:</strong> {context["ds"]}</p>

381

<p>Please investigate the failure immediately.</p>

382

''',

383

dag=dag

384

).execute(context)

385

```

386

387

### Database Operations

388

389

Execute SQL queries and statements against various database systems with connection management, parameter binding, and transaction control.

390

391

#### MySQL Operations

392

393

Execute SQL code in MySQL databases with connection management and parameter support.

394

395

```python { .api }

396

class MySqlOperator(BaseOperator):

397

def __init__(

398

self,

399

sql,

400

mysql_conn_id='mysql_default',

401

parameters=None,

402

**kwargs

403

):

404

"""

405

Executes SQL code in a specific MySQL database.

406

407

Parameters:

408

- sql (str|list): SQL statement, list of statements, or reference to template file (.sql)

409

- mysql_conn_id (str): Reference to MySQL connection ID

410

- parameters (dict): Parameters for SQL query binding

411

- **kwargs: Additional BaseOperator parameters

412

"""

413

414

template_fields = ('sql',)

415

template_ext = ('.sql',)

416

ui_color = '#ededed'

417

418

def execute(self, context): ...

419

```

420

421

#### PostgreSQL Operations

422

423

Execute SQL code in PostgreSQL databases with autocommit control and parameter binding.

424

425

```python { .api }

426

class PostgresOperator(BaseOperator):

427

def __init__(

428

self,

429

sql,

430

postgres_conn_id='postgres_default',

431

autocommit=False,

432

parameters=None,

433

**kwargs

434

):

435

"""

436

Executes SQL code in a specific PostgreSQL database.

437

438

Parameters:

439

- sql (str|list): SQL statement, list of statements, or reference to template file (.sql)

440

- postgres_conn_id (str): Reference to PostgreSQL connection ID

441

- autocommit (bool): Enable autocommit for the SQL execution

442

- parameters (dict): Parameters for SQL query binding

443

- **kwargs: Additional BaseOperator parameters

444

"""

445

446

template_fields = ('sql',)

447

template_ext = ('.sql',)

448

ui_color = '#ededed'

449

450

def execute(self, context): ...

451

```

452

453

#### SQLite Operations

454

455

Execute SQL code in SQLite databases with lightweight database operations.

456

457

```python { .api }

458

class SqliteOperator(BaseOperator):

459

def __init__(

460

self,

461

sql,

462

sqlite_conn_id='sqlite_default',

463

parameters=None,

464

**kwargs

465

):

466

"""

467

Executes SQL code in a specific SQLite database.

468

469

Parameters:

470

- sql (str|list): SQL statement, list of statements, or reference to template file (.sql)

471

- sqlite_conn_id (str): Reference to SQLite connection ID

472

- parameters (dict): Parameters for SQL query binding

473

- **kwargs: Additional BaseOperator parameters

474

"""

475

476

template_fields = ('sql',)

477

template_ext = ('.sql',)

478

ui_color = '#ededed'

479

480

def execute(self, context): ...

481

```

482

483

**Usage Examples**:

484

485

```python

486

from airflow.operators.mysql_operator import MySqlOperator

487

from airflow.operators.postgres_operator import PostgresOperator

488

from airflow.operators.sqlite_operator import SqliteOperator

489

490

# MySQL query execution

491

mysql_task = MySqlOperator(

492

task_id='run_mysql_query',

493

mysql_conn_id='mysql_prod',

494

sql='''

495

INSERT INTO daily_stats (date, record_count, avg_value)

496

SELECT '{{ ds }}', COUNT(*), AVG(value)

497

FROM transactions

498

WHERE DATE(created_at) = '{{ ds }}'

499

''',

500

dag=dag

501

)

502

503

# PostgreSQL with parameters

504

postgres_task = PostgresOperator(

505

task_id='update_user_stats',

506

postgres_conn_id='postgres_warehouse',

507

sql='''

508

UPDATE user_metrics

509

SET last_login = %(login_time)s,

510

login_count = login_count + 1

511

WHERE user_id = %(user_id)s

512

''',

513

parameters={'login_time': '{{ ts }}', 'user_id': 12345},

514

autocommit=True,

515

dag=dag

516

)

517

518

# SQLite file operations

519

sqlite_task = SqliteOperator(

520

task_id='local_db_cleanup',

521

sqlite_conn_id='sqlite_local',

522

sql='/path/to/cleanup_script.sql',

523

dag=dag

524

)

525

```

526

527

### HTTP Operations

528

529

Execute HTTP requests against external APIs and web services with response validation and customizable request parameters.

530

531

```python { .api }

532

class SimpleHttpOperator(BaseOperator):

533

def __init__(

534

self,

535

endpoint,

536

method='POST',

537

data=None,

538

headers=None,

539

response_check=None,

540

extra_options=None,

541

http_conn_id='http_default',

542

**kwargs

543

):

544

"""

545

Calls an endpoint on an HTTP system to execute an action.

546

547

Parameters:

548

- endpoint (str): The relative part of the full URL

549

- method (str): HTTP method to use (default: 'POST')

550

- data (dict): Data to pass (POST/PUT data or URL params for GET)

551

- headers (dict): HTTP headers to add to the request

552

- response_check (callable): Function to validate response (returns True/False)

553

- extra_options (dict): Extra options for requests library (timeout, ssl, etc.)

554

- http_conn_id (str): Reference to HTTP connection ID

555

- **kwargs: Additional BaseOperator parameters

556

"""

557

558

template_fields = ('endpoint',)

559

template_ext = ()

560

ui_color = '#f4a460'

561

562

def execute(self, context): ...

563

```

564

565

**Usage Example**:

566

567

```python

568

from airflow.operators.http_operator import SimpleHttpOperator

569

570

# POST request with data

571

api_call = SimpleHttpOperator(

572

task_id='api_post',

573

http_conn_id='api_server',

574

endpoint='/v1/process',

575

method='POST',

576

data={'job_id': '{{ dag_run.run_id }}', 'date': '{{ ds }}'},

577

headers={'Content-Type': 'application/json'},

578

dag=dag

579

)

580

581

# GET request with response validation

582

def check_status_code(response):

583

return response.status_code == 200

584

585

status_check = SimpleHttpOperator(

586

task_id='health_check',

587

http_conn_id='service_api',

588

endpoint='/health',

589

method='GET',

590

response_check=check_status_code,

591

dag=dag

592

)

593

```

594

595

### Workflow Composition

596

597

Create complex workflows by embedding sub-DAGs within parent DAGs for modular and reusable workflow components.

598

599

```python { .api }

600

class SubDagOperator(BaseOperator):

601

def __init__(

602

self,

603

subdag,

604

executor=DEFAULT_EXECUTOR,

605

**kwargs

606

):

607

"""

608

Execute a sub-DAG as part of a larger workflow.

609

610

By convention, a sub-DAG's dag_id should be prefixed by its parent and a dot,

611

as in 'parent.child'.

612

613

Parameters:

614

- subdag (DAG): The DAG object to run as a subdag of the current DAG

615

- executor (BaseExecutor): Executor to use for the sub-DAG

616

- **kwargs: Additional BaseOperator parameters (must include 'dag')

617

"""

618

619

template_fields = tuple()

620

ui_color = '#555'

621

ui_fgcolor = '#fff'

622

623

def execute(self, context): ...

624

```

625

626

**Usage Example**:

627

628

```python

629

from airflow.operators.subdag_operator import SubDagOperator

630

from airflow import DAG

631

from datetime import datetime, timedelta

632

633

# Define the sub-DAG

634

def create_subdag(parent_dag_id, child_dag_id, start_date, schedule_interval):

635

subdag = DAG(

636

dag_id=f'{parent_dag_id}.{child_dag_id}',

637

start_date=start_date,

638

schedule_interval=schedule_interval,

639

)

640

641

# Add tasks to sub-DAG

642

task1 = DummyOperator(task_id='subtask1', dag=subdag)

643

task2 = DummyOperator(task_id='subtask2', dag=subdag)

644

task1 >> task2

645

646

return subdag

647

648

# Use SubDagOperator in main DAG

649

subdag_task = SubDagOperator(

650

task_id='parallel_processing',

651

subdag=create_subdag(

652

parent_dag_id='main_dag',

653

child_dag_id='parallel_processing',

654

start_date=datetime(2023, 1, 1),

655

schedule_interval='@daily'

656

),

657

dag=dag

658

)

659

```

660

661

### Sensor Operations

662

663

Monitor external systems and wait for conditions to be met before proceeding with downstream tasks. Sensors periodically check conditions and succeed when criteria are satisfied.

664

665

#### Base Sensor Framework

666

667

Abstract foundation for all sensor operators providing polling mechanism, timeout handling, and configurable check intervals.

668

669

```python { .api }

670

class BaseSensorOperator(BaseOperator):

671

def __init__(

672

self,

673

poke_interval=60,

674

timeout=60*60*24*7,

675

**kwargs

676

):

677

"""

678

Base class for all sensor operators that keep executing at intervals until criteria is met.

679

680

Parameters:

681

- poke_interval (int): Time in seconds between each check (default: 60)

682

- timeout (int): Time in seconds before the task times out and fails (default: 7 days)

683

- **kwargs: Additional BaseOperator parameters

684

"""

685

686

ui_color = '#e6f1f2'

687

688

def poke(self, context):

689

"""Override this method to define sensor condition check logic."""

690

691

def execute(self, context): ...

692

```

693

694

#### SQL-based Sensors

695

696

Monitor database conditions by executing SQL queries until specified criteria are met.

697

698

```python { .api }

699

class SqlSensor(BaseSensorOperator):

700

def __init__(

701

self,

702

conn_id,

703

sql,

704

**kwargs

705

):

706

"""

707

Runs a SQL statement until criteria is met. Succeeds when SQL returns non-empty, non-zero result.

708

709

Parameters:

710

- conn_id (str): The connection ID to run the sensor against

711

- sql (str): SQL statement to execute. Must return at least one non-zero/non-empty cell to pass

712

- **kwargs: Additional BaseSensorOperator parameters

713

"""

714

715

template_fields = ('sql',)

716

template_ext = ('.hql', '.sql')

717

718

def poke(self, context): ...

719

```

720

721

#### File System Sensors

722

723

Monitor file systems for the presence of files or directories before proceeding with workflow execution.

724

725

```python { .api }

726

class HdfsSensor(BaseSensorOperator):

727

def __init__(

728

self,

729

filepath,

730

hdfs_conn_id='hdfs_default',

731

**kwargs

732

):

733

"""

734

Waits for a file or folder to appear in HDFS.

735

736

Parameters:

737

- filepath (str): Path to file or directory in HDFS

738

- hdfs_conn_id (str): Reference to HDFS connection ID

739

- **kwargs: Additional BaseSensorOperator parameters

740

"""

741

742

template_fields = ('filepath',)

743

744

def poke(self, context): ...

745

746

class WebHdfsSensor(BaseSensorOperator):

747

def __init__(

748

self,

749

filepath,

750

webhdfs_conn_id='webhdfs_default',

751

**kwargs

752

):

753

"""

754

Waits for a file or folder to appear in HDFS via WebHDFS API.

755

756

Parameters:

757

- filepath (str): Path to file or directory in HDFS

758

- webhdfs_conn_id (str): Reference to WebHDFS connection ID

759

- **kwargs: Additional BaseSensorOperator parameters

760

"""

761

762

template_fields = ('filepath',)

763

764

def poke(self, context): ...

765

```

766

767

**Usage Examples**:

768

769

```python

770

from airflow.operators.sensors import BaseSensorOperator, SqlSensor, HdfsSensor

771

772

# Custom sensor implementation

773

class DataReadySensor(BaseSensorOperator):

774

def __init__(self, data_path, **kwargs):

775

super().__init__(**kwargs)

776

self.data_path = data_path

777

778

def poke(self, context):

779

import os

780

return os.path.exists(self.data_path) and os.listdir(self.data_path)

781

782

data_sensor = DataReadySensor(

783

task_id='wait_for_data',

784

data_path='/data/{{ ds }}',

785

poke_interval=30, # Check every 30 seconds

786

timeout=3600, # Timeout after 1 hour

787

dag=dag

788

)

789

790

# SQL sensor for database monitoring

791

db_sensor = SqlSensor(

792

task_id='wait_for_records',

793

conn_id='postgres_prod',

794

sql='''

795

SELECT COUNT(*)

796

FROM transactions

797

WHERE DATE(created_at) = '{{ ds }}'

798

AND status = 'completed'

799

''',

800

poke_interval=300, # Check every 5 minutes

801

dag=dag

802

)

803

804

# HDFS file sensor

805

file_sensor = HdfsSensor(

806

task_id='wait_for_hdfs_file',

807

filepath='/data/raw/{{ ds }}/input.parquet',

808

hdfs_conn_id='hdfs_cluster',

809

poke_interval=60,

810

dag=dag

811

)

812

813

# Chain sensors with processing tasks

814

data_sensor >> db_sensor >> file_sensor >> processing_task

815

```

816

817

## Template Support

818

819

Most operators support Jinja templating for dynamic content:

820

821

```python

822

# Template variables available in operators

823

templated_bash = BashOperator(

824

task_id='templated_bash',

825

bash_command='echo "Processing {{ ds }} in DAG {{ dag.dag_id }}"',

826

dag=dag

827

)

828

829

templated_email = EmailOperator(

830

task_id='templated_email',

831

to=['admin@example.com'],

832

subject='Report for {{ ds }}',

833

html_content='''

834

<h1>Report Generated</h1>

835

<p>Date: {{ ds }}</p>

836

<p>Timestamp: {{ ts }}</p>

837

<p>Previous Date: {{ prev_ds }}</p>

838

<p>Next Date: {{ next_ds }}</p>

839

''',

840

dag=dag

841

)

842

```