or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assets-scheduling.mdcli-utilities.mdconfiguration.mddag-management.mddatabase-models.mdexceptions.mdexecutors.mdextensions.mdindex.mdtask-operators.mdxcom.md

task-operators.mddocs/

0

# Task Operators

1

2

Task definition and execution including BaseOperator, task decorators, dynamic task mapping, and task instance management. Tasks represent individual units of work within DAGs.

3

4

## Capabilities

5

6

### Base Operator

7

8

Foundation class for all Airflow operators, providing core task functionality and lifecycle management.

9

10

```python { .api }

11

class BaseOperator:

12

def __init__(

13

self,

14

task_id: str,

15

owner: str = "airflow",

16

email: Optional[Union[str, List[str]]] = None,

17

email_on_retry: bool = True,

18

email_on_failure: bool = True,

19

retries: Optional[int] = None,

20

retry_delay: timedelta = timedelta(seconds=300),

21

retry_exponential_backoff: bool = False,

22

max_retry_delay: Optional[timedelta] = None,

23

start_date: Optional[datetime] = None,

24

end_date: Optional[datetime] = None,

25

depends_on_past: bool = False,

26

wait_for_downstream: bool = False,

27

dag: Optional[DAG] = None,

28

params: Optional[Dict[str, Any]] = None,

29

default_args: Optional[Dict[str, Any]] = None,

30

pool: Optional[str] = None,

31

pool_slots: int = 1,

32

queue: Optional[str] = None,

33

priority_weight: int = 1,

34

weight_rule: str = "downstream",

35

sla: Optional[timedelta] = None,

36

execution_timeout: Optional[timedelta] = None,

37

on_execute_callback: Optional[Callable] = None,

38

on_failure_callback: Optional[Callable] = None,

39

on_success_callback: Optional[Callable] = None,

40

on_retry_callback: Optional[Callable] = None,

41

trigger_rule: str = "all_success",

42

resources: Optional[Dict[str, Any]] = None,

43

run_as_user: Optional[str] = None,

44

task_concurrency: Optional[int] = None,

45

max_active_tis_per_dag: Optional[int] = None,

46

executor_config: Optional[Dict[str, Any]] = None,

47

do_xcom_push: bool = True,

48

inlets: Optional[List[Any]] = None,

49

outlets: Optional[List[Any]] = None,

50

task_group: Optional[TaskGroup] = None,

51

doc: Optional[str] = None,

52

doc_md: Optional[str] = None,

53

doc_json: Optional[str] = None,

54

doc_yaml: Optional[str] = None,

55

doc_rst: Optional[str] = None,

56

wait_for_past_depends_before_skipping: bool = False,

57

max_active_tis_per_dagrun: Optional[int] = None,

58

map_index_template: Optional[str] = None,

59

multiple_outputs: bool = False,

60

task_display_name: Optional[str] = None,

61

logger_name: Optional[str] = None,

62

allow_nested_operators: bool = True,

63

**kwargs

64

):

65

"""

66

Base operator for all Airflow tasks.

67

68

Args:

69

task_id: Unique identifier for the task

70

owner: Owner of the task

71

retries: Number of retries when task fails

72

retry_delay: Delay between retries

73

start_date: When the task should start being scheduled

74

end_date: When the task should stop being scheduled

75

depends_on_past: Whether task depends on previous run success

76

pool: Resource pool for task execution

77

priority_weight: Task priority for execution order

78

trigger_rule: Rule for task triggering based on upstream tasks

79

execution_timeout: Maximum runtime before task timeout

80

"""

81

82

def execute(self, context: Context) -> Any:

83

"""Execute the task logic. Must be implemented by subclasses."""

84

85

def on_kill(self) -> None:

86

"""Called when task is killed for cleanup."""

87

88

def defer(self, trigger: BaseTrigger, method_name: str, **kwargs) -> None:

89

"""Defer task execution to a trigger."""

90

91

def resume_execution(self, context: Context, event: Dict[str, Any]) -> Any:

92

"""Resume execution after deferral."""

93

94

def render_template_fields(

95

self,

96

context: Context,

97

jinja_env: Optional[jinja2.Environment] = None

98

) -> None:

99

"""Render Jinja templates in task fields."""

100

```

101

102

### Task Decorator

103

104

Modern approach to task definition using decorators for cleaner, more Pythonic task creation.

105

106

```python { .api }

107

@task(

108

task_id: Optional[str] = None,

109

python_callable: Optional[Callable] = None,

110

op_args: Optional[List[Any]] = None,

111

op_kwargs: Optional[Dict[str, Any]] = None,

112

templates_dict: Optional[Dict[str, Any]] = None,

113

templates_exts: Optional[List[str]] = None,

114

show_return_value_in_logs: bool = True,

115

**kwargs

116

) -> Callable:

117

"""

118

Decorator to create a task from a Python function.

119

120

Args:

121

task_id: Unique identifier (auto-generated from function name if not provided)

122

python_callable: The Python function to execute

123

op_args: Positional arguments to pass to the function

124

op_kwargs: Keyword arguments to pass to the function

125

templates_dict: Dictionary of templates to render

126

show_return_value_in_logs: Whether to log return value

127

128

Returns:

129

Decorated function that returns task output

130

"""

131

132

@task.setup(

133

task_id: Optional[str] = None,

134

**kwargs

135

) -> Callable:

136

"""

137

Decorator for setup tasks that run before other tasks.

138

139

Args:

140

task_id: Unique identifier

141

**kwargs: Additional task arguments

142

143

Returns:

144

Decorated function for setup task

145

"""

146

147

@task.teardown(

148

task_id: Optional[str] = None,

149

**kwargs

150

) -> Callable:

151

"""

152

Decorator for teardown tasks that run after other tasks.

153

154

Args:

155

task_id: Unique identifier

156

**kwargs: Additional task arguments

157

158

Returns:

159

Decorated function for teardown task

160

"""

161

```

162

163

Usage example:

164

165

```python

166

from airflow.decorators import dag, task

167

from datetime import datetime

168

169

@dag(dag_id='task_decorator_example', start_date=datetime(2024, 1, 1))

170

def task_decorator_example():

171

@task

172

def extract_data(source: str) -> dict:

173

"""Extract data from source."""

174

return {'data': f'extracted from {source}', 'count': 100}

175

176

@task

177

def transform_data(data: dict) -> dict:

178

"""Transform the data."""

179

return {

180

'transformed_data': data['data'].upper(),

181

'processed_count': data['count'] * 2

182

}

183

184

@task.setup

185

def setup_environment():

186

"""Setup task that runs first."""

187

print("Setting up environment")

188

189

@task.teardown

190

def cleanup():

191

"""Cleanup task that runs last."""

192

print("Cleaning up")

193

194

# Define dependencies

195

setup_environment()

196

raw_data = extract_data('database')

197

processed_data = transform_data(raw_data)

198

cleanup()

199

200

dag_instance = task_decorator_example()

201

```

202

203

### Dynamic Task Mapping

204

205

Create tasks dynamically at runtime based on input data or external conditions.

206

207

```python { .api }

208

class MappedOperator:

209

"""

210

Operator created through dynamic task mapping.

211

212

Attributes:

213

task_id: Base task identifier

214

operator_class: Original operator class

215

mapped_op_kwargs: Mapped operator arguments

216

partial_kwargs: Static operator arguments

217

"""

218

task_id: str

219

operator_class: type

220

mapped_op_kwargs: Dict[str, Any]

221

partial_kwargs: Dict[str, Any]

222

223

def expand(self, **mapped_kwargs) -> 'MappedOperator':

224

"""

225

Expand operator with mapped arguments.

226

227

Args:

228

**mapped_kwargs: Arguments to map over

229

230

Returns:

231

MappedOperator instance

232

"""

233

234

def partial(self, **partial_kwargs) -> 'MappedOperator':

235

"""

236

Set static arguments for mapped operator.

237

238

Args:

239

**partial_kwargs: Static arguments

240

241

Returns:

242

Partially configured MappedOperator

243

"""

244

```

245

246

Usage example:

247

248

```python

249

from airflow.decorators import dag, task

250

251

@dag(dag_id='dynamic_mapping_example', start_date=datetime(2024, 1, 1))

252

def dynamic_mapping_example():

253

@task

254

def get_file_list() -> List[str]:

255

"""Get list of files to process."""

256

return ['file1.csv', 'file2.csv', 'file3.csv']

257

258

@task

259

def process_file(filename: str) -> str:

260

"""Process a single file."""

261

return f"processed {filename}"

262

263

# Dynamic mapping - creates one task per file

264

files = get_file_list()

265

process_file.expand(filename=files)

266

267

dag_instance = dynamic_mapping_example()

268

```

269

270

### Task Instance

271

272

Represents a specific execution of a task within a DAG run.

273

274

```python { .api }

275

class TaskInstance:

276

"""

277

ORM model for task instance execution.

278

279

Attributes:

280

task_id: Task identifier

281

dag_id: DAG identifier

282

run_id: DAG run identifier

283

execution_date: Execution date

284

start_date: When task started

285

end_date: When task ended

286

duration: Task execution duration

287

state: Current task state

288

try_number: Current retry attempt

289

max_tries: Maximum retry attempts

290

hostname: Worker hostname

291

unixname: Unix username

292

job_id: Job identifier

293

pool: Resource pool

294

pool_slots: Number of pool slots used

295

queue: Execution queue

296

priority_weight: Task priority

297

operator: Operator class name

298

queued_dttm: When task was queued

299

pid: Process ID

300

executor_config: Executor configuration

301

external_executor_id: External executor identifier

302

trigger_id: Trigger identifier (for deferred tasks)

303

next_method: Next method to call

304

next_kwargs: Arguments for next method

305

"""

306

id: Optional[UUID]

307

task_id: str

308

dag_id: str

309

run_id: str

310

map_index: int

311

execution_date: datetime

312

start_date: Optional[datetime]

313

end_date: Optional[datetime]

314

duration: Optional[float]

315

state: Optional[str]

316

try_number: int

317

max_tries: int

318

hostname: str

319

unixname: str

320

job_id: Optional[int]

321

pool: str

322

pool_slots: int

323

queue: str

324

priority_weight: int

325

operator: str

326

queued_dttm: Optional[datetime]

327

pid: Optional[int]

328

executor_config: Optional[Dict]

329

external_executor_id: Optional[str]

330

trigger_id: Optional[int]

331

next_method: Optional[str]

332

next_kwargs: Optional[Dict]

333

334

def clear_task_instances(

335

self,

336

tis: List['TaskInstance'],

337

session: Session = None,

338

dag: Optional[DAG] = None

339

) -> None:

340

"""Clear task instances for retry."""

341

342

def get_task_instance(

343

self,

344

task_id: str,

345

execution_date: datetime,

346

session: Session = None

347

) -> Optional['TaskInstance']:

348

"""Get task instance by ID and execution date."""

349

```

350

351

### Task Dependencies

352

353

Manage dependencies between tasks using various trigger rules and patterns.

354

355

```python { .api }

356

def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None:

357

"""

358

Chain tasks sequentially.

359

360

Args:

361

*tasks: Tasks to chain in order

362

"""

363

364

def chain_linear(*tasks: BaseOperator) -> None:

365

"""

366

Chain tasks in a linear sequence.

367

368

Args:

369

*tasks: Tasks to chain linearly

370

"""

371

372

def cross_downstream(

373

from_tasks: Sequence[BaseOperator],

374

to_tasks: Sequence[BaseOperator]

375

) -> None:

376

"""

377

Create dependencies from all tasks in from_tasks to all tasks in to_tasks.

378

379

Args:

380

from_tasks: Source tasks

381

to_tasks: Target tasks

382

"""

383

```

384

385

Usage example:

386

387

```python

388

from airflow.decorators import dag, task

389

from airflow.models.baseoperator import chain, cross_downstream

390

391

@dag(dag_id='dependencies_example', start_date=datetime(2024, 1, 1))

392

def dependencies_example():

393

@task

394

def start():

395

return "started"

396

397

@task

398

def process_a():

399

return "a"

400

401

@task

402

def process_b():

403

return "b"

404

405

@task

406

def combine():

407

return "combined"

408

409

@task

410

def end():

411

return "ended"

412

413

# Linear chain

414

start_task = start()

415

process_a_task = process_a()

416

process_b_task = process_b()

417

combine_task = combine()

418

end_task = end()

419

420

# Set up dependencies

421

chain(start_task, [process_a_task, process_b_task], combine_task, end_task)

422

423

dag_instance = dependencies_example()

424

```

425

426

### Task States and Lifecycle

427

428

Task execution states and lifecycle management.

429

430

```python { .api }

431

from airflow.utils.state import TaskInstanceState

432

433

# Task States

434

class TaskInstanceState:

435

"""Task instance states."""

436

NONE: str = "none"

437

SCHEDULED: str = "scheduled"

438

QUEUED: str = "queued"

439

RUNNING: str = "running"

440

SUCCESS: str = "success"

441

SHUTDOWN: str = "shutdown"

442

RESTARTING: str = "restarting"

443

FAILED: str = "failed"

444

UP_FOR_RETRY: str = "up_for_retry"

445

UP_FOR_RESCHEDULE: str = "up_for_reschedule"

446

UPSTREAM_FAILED: str = "upstream_failed"

447

SKIPPED: str = "skipped"

448

REMOVED: str = "removed"

449

DEFERRED: str = "deferred"

450

451

# Trigger Rules

452

TRIGGER_RULES = [

453

"all_success", # All upstream tasks succeeded

454

"all_failed", # All upstream tasks failed

455

"all_done", # All upstream tasks completed (success or failed)

456

"one_success", # At least one upstream task succeeded

457

"one_failed", # At least one upstream task failed

458

"none_failed", # No upstream tasks failed

459

"none_failed_min_one_success", # No failures and at least one success

460

"none_skipped", # No upstream tasks skipped

461

"always", # Always run regardless of upstream state

462

]

463

```

464

465

## Types

466

467

```python { .api }

468

from typing import Union, Optional, List, Dict, Callable, Any, Sequence

469

from datetime import datetime, timedelta

470

from airflow.models.dag import DAG

471

from airflow.models.taskgroup import TaskGroup

472

from airflow.utils.context import Context

473

474

TaskState = Literal[

475

"none", "scheduled", "queued", "running", "success",

476

"failed", "up_for_retry", "upstream_failed", "skipped"

477

]

478

479

TriggerRule = Literal[

480

"all_success", "all_failed", "all_done", "one_success",

481

"one_failed", "none_failed", "always"

482

]

483

```