Programmatically author, schedule and monitor data pipelines
npx @tessl/cli install tessl/pypi-apache-airflow@3.0.00
# Apache Airflow
1
2
Apache Airflow is a comprehensive platform for programmatically authoring, scheduling, and monitoring data workflows and pipelines. It enables developers to define workflows as directed acyclic graphs (DAGs) of tasks with rich dependency management, featuring a powerful scheduler that executes tasks across worker arrays while respecting specified dependencies.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow`
10
- **Version**: 3.0.6
11
12
## Core Imports
13
14
```python
15
import airflow
16
from airflow import DAG, Asset, XComArg
17
```
18
19
Common for working with DAGs and tasks:
20
21
```python
22
from airflow import DAG
23
from airflow.decorators import dag, task
24
from airflow.models import BaseOperator
25
from airflow.utils.dates import days_ago
26
```
27
28
## Basic Usage
29
30
```python
31
from datetime import datetime, timedelta
32
from airflow import DAG
33
from airflow.decorators import task
34
35
# Define default arguments
36
default_args = {
37
'owner': 'data_team',
38
'depends_on_past': False,
39
'start_date': datetime(2024, 1, 1),
40
'email_on_failure': True,
41
'email_on_retry': False,
42
'retries': 1,
43
'retry_delay': timedelta(minutes=5)
44
}
45
46
# Create DAG using decorator
47
@dag(
48
dag_id='example_workflow',
49
default_args=default_args,
50
description='A simple example DAG',
51
schedule_interval=timedelta(days=1),
52
catchup=False,
53
tags=['example']
54
)
55
def example_workflow():
56
57
@task
58
def extract_data():
59
"""Extract data from source"""
60
return {'data': [1, 2, 3, 4, 5]}
61
62
@task
63
def transform_data(data):
64
"""Transform the extracted data"""
65
return {'transformed': [x * 2 for x in data['data']]}
66
67
@task
68
def load_data(data):
69
"""Load transformed data"""
70
print(f"Loading: {data}")
71
return True
72
73
# Define task dependencies
74
raw_data = extract_data()
75
transformed_data = transform_data(raw_data)
76
load_data(transformed_data)
77
78
# Instantiate the DAG
79
dag_instance = example_workflow()
80
```
81
82
## Architecture
83
84
Apache Airflow follows a distributed architecture with several key components:
85
86
- **Scheduler**: Orchestrates task execution based on dependencies and schedules
87
- **Executor**: Runs tasks on worker nodes (Local, Celery, Kubernetes, etc.)
88
- **Web Server**: Provides the web UI for monitoring and managing workflows
89
- **Worker Nodes**: Execute individual tasks in the workflow
90
- **Metadata Database**: Stores DAG definitions, task states, and execution history
91
- **Task SDK**: Core definitions and utilities for task execution (new in 3.0)
92
93
The platform transforms workflow definitions into versionable, testable, and collaborative code, making it ideal for data engineering teams building complex data processing pipelines, ETL workflows, machine learning orchestration, and automated task scheduling.
94
95
## Capabilities
96
97
### DAG Management
98
99
Core functionality for defining, scheduling, and managing directed acyclic graphs of tasks including DAG decorators, task groups, and dependency management.
100
101
```python { .api }
102
class DAG:
103
def __init__(
104
self,
105
dag_id: str,
106
description: str = None,
107
schedule_interval: Optional[Union[str, datetime.timedelta]] = None,
108
start_date: Optional[datetime.datetime] = None,
109
end_date: Optional[datetime.datetime] = None,
110
**kwargs
111
): ...
112
113
@dag(
114
dag_id: str,
115
description: Optional[str] = None,
116
schedule: Optional[Union[str, timedelta]] = None,
117
start_date: Optional[datetime] = None,
118
**kwargs
119
) -> Callable: ...
120
```
121
122
[DAG Management](./dag-management.md)
123
124
### Task Operators
125
126
Task definition and execution including BaseOperator, task decorators, dynamic task mapping, and task instance management.
127
128
```python { .api }
129
class BaseOperator:
130
def __init__(
131
self,
132
task_id: str,
133
owner: str = "airflow",
134
retries: int = None,
135
retry_delay: timedelta = None,
136
**kwargs
137
): ...
138
139
@task(
140
task_id: Optional[str] = None,
141
python_callable: Optional[Callable] = None,
142
**kwargs
143
) -> Callable: ...
144
```
145
146
[Task Operators](./task-operators.md)
147
148
### Assets and Scheduling
149
150
Asset-driven scheduling system for creating data-aware workflows, including asset definitions, timetables, and dependency management.
151
152
```python { .api }
153
class Asset:
154
def __init__(
155
self,
156
uri: str,
157
name: Optional[str] = None,
158
group: Optional[str] = None,
159
extra: Optional[Dict[str, Any]] = None
160
): ...
161
162
class AssetAlias:
163
def __init__(self, name: str): ...
164
```
165
166
[Assets and Scheduling](./assets-scheduling.md)
167
168
### Configuration Management
169
170
System configuration, variables, parameters, and connection management for workflow orchestration.
171
172
```python { .api }
173
class Variable:
174
@classmethod
175
def get(cls, key: str, default_var: Any = None) -> Any: ...
176
177
@classmethod
178
def set(cls, key: str, value: Any) -> None: ...
179
180
class Param:
181
def __init__(
182
self,
183
default: Any = None,
184
description: Optional[str] = None,
185
**kwargs
186
): ...
187
```
188
189
[Configuration](./configuration.md)
190
191
### Cross-Communication (XCom)
192
193
Cross-communication system for sharing data between tasks including XComArg, custom backends, and serialization.
194
195
```python { .api }
196
class XComArg:
197
def __init__(
198
self,
199
operator: BaseOperator,
200
key: str = None
201
): ...
202
203
class XCom:
204
@classmethod
205
def get_one(
206
cls,
207
task_id: str,
208
dag_id: str,
209
key: str = None,
210
execution_date: datetime = None
211
) -> Any: ...
212
```
213
214
[Cross-Communication](./xcom.md)
215
216
### Executors
217
218
Task execution engines including LocalExecutor, CeleryExecutor, KubernetesExecutor, and custom executor development.
219
220
```python { .api }
221
class BaseExecutor:
222
def __init__(self, parallelism: int = 32): ...
223
224
def execute_async(
225
self,
226
key: TaskInstanceKey,
227
command: CommandType,
228
queue: Optional[str] = None
229
) -> None: ...
230
231
class LocalExecutor(BaseExecutor):
232
def __init__(self, parallelism: int = 0): ...
233
```
234
235
[Executors](./executors.md)
236
237
### CLI and Utilities
238
239
Command-line interface, context utilities, dependency management, and workflow orchestration helpers.
240
241
```python { .api }
242
def get_current_context() -> Context: ...
243
244
def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None: ...
245
246
def cross_downstream(
247
from_tasks: Sequence[BaseOperator],
248
to_tasks: Sequence[BaseOperator]
249
) -> None: ...
250
```
251
252
[CLI and Utilities](./cli-utilities.md)
253
254
### Exception Handling
255
256
Comprehensive exception hierarchy for error handling, timeout management, and workflow recovery.
257
258
```python { .api }
259
class AirflowException(Exception): ...
260
261
class AirflowTaskTimeout(AirflowException): ...
262
263
class AirflowSensorTimeout(AirflowException): ...
264
265
class AirflowRescheduleException(AirflowException): ...
266
```
267
268
[Exception Handling](./exceptions.md)
269
270
### Database Models
271
272
ORM models for DAGs, tasks, runs, connections, and metadata storage with SQLAlchemy integration.
273
274
```python { .api }
275
class DagModel:
276
dag_id: str
277
is_active: bool
278
last_parsed_time: datetime
279
next_dagrun: datetime
280
281
class TaskInstance:
282
task_id: str
283
dag_id: str
284
execution_date: datetime
285
state: str
286
```
287
288
[Database Models](./database-models.md)
289
290
### Extensions and Providers
291
292
Plugin system, provider packages, operator links, notifications, and custom component development.
293
294
```python { .api }
295
class BaseOperatorLink:
296
name: str = None
297
298
def get_link(
299
self,
300
operator: BaseOperator,
301
dttm: datetime
302
) -> str: ...
303
304
class BaseNotifier:
305
def __init__(self, **kwargs): ...
306
307
def notify(self, context: Context) -> None: ...
308
```
309
310
[Extensions and Providers](./extensions.md)