or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow@3.0.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow@3.0.0

0

# Apache Airflow

1

2

Apache Airflow is a comprehensive platform for programmatically authoring, scheduling, and monitoring data workflows and pipelines. It enables developers to define workflows as directed acyclic graphs (DAGs) of tasks with rich dependency management, featuring a powerful scheduler that executes tasks across worker arrays while respecting specified dependencies.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow`

10

- **Version**: 3.0.6

11

12

## Core Imports

13

14

```python

15

import airflow

16

from airflow import DAG, Asset, XComArg

17

```

18

19

Common for working with DAGs and tasks:

20

21

```python

22

from airflow import DAG

23

from airflow.decorators import dag, task

24

from airflow.models import BaseOperator

25

from airflow.utils.dates import days_ago

26

```

27

28

## Basic Usage

29

30

```python

31

from datetime import datetime, timedelta

32

from airflow import DAG

33

from airflow.decorators import task

34

35

# Define default arguments

36

default_args = {

37

'owner': 'data_team',

38

'depends_on_past': False,

39

'start_date': datetime(2024, 1, 1),

40

'email_on_failure': True,

41

'email_on_retry': False,

42

'retries': 1,

43

'retry_delay': timedelta(minutes=5)

44

}

45

46

# Create DAG using decorator

47

@dag(

48

dag_id='example_workflow',

49

default_args=default_args,

50

description='A simple example DAG',

51

schedule_interval=timedelta(days=1),

52

catchup=False,

53

tags=['example']

54

)

55

def example_workflow():

56

57

@task

58

def extract_data():

59

"""Extract data from source"""

60

return {'data': [1, 2, 3, 4, 5]}

61

62

@task

63

def transform_data(data):

64

"""Transform the extracted data"""

65

return {'transformed': [x * 2 for x in data['data']]}

66

67

@task

68

def load_data(data):

69

"""Load transformed data"""

70

print(f"Loading: {data}")

71

return True

72

73

# Define task dependencies

74

raw_data = extract_data()

75

transformed_data = transform_data(raw_data)

76

load_data(transformed_data)

77

78

# Instantiate the DAG

79

dag_instance = example_workflow()

80

```

81

82

## Architecture

83

84

Apache Airflow follows a distributed architecture with several key components:

85

86

- **Scheduler**: Orchestrates task execution based on dependencies and schedules

87

- **Executor**: Runs tasks on worker nodes (Local, Celery, Kubernetes, etc.)

88

- **Web Server**: Provides the web UI for monitoring and managing workflows

89

- **Worker Nodes**: Execute individual tasks in the workflow

90

- **Metadata Database**: Stores DAG definitions, task states, and execution history

91

- **Task SDK**: Core definitions and utilities for task execution (new in 3.0)

92

93

The platform transforms workflow definitions into versionable, testable, and collaborative code, making it ideal for data engineering teams building complex data processing pipelines, ETL workflows, machine learning orchestration, and automated task scheduling.

94

95

## Capabilities

96

97

### DAG Management

98

99

Core functionality for defining, scheduling, and managing directed acyclic graphs of tasks including DAG decorators, task groups, and dependency management.

100

101

```python { .api }

102

class DAG:

103

def __init__(

104

self,

105

dag_id: str,

106

description: str = None,

107

schedule_interval: Optional[Union[str, datetime.timedelta]] = None,

108

start_date: Optional[datetime.datetime] = None,

109

end_date: Optional[datetime.datetime] = None,

110

**kwargs

111

): ...

112

113

@dag(

114

dag_id: str,

115

description: Optional[str] = None,

116

schedule: Optional[Union[str, timedelta]] = None,

117

start_date: Optional[datetime] = None,

118

**kwargs

119

) -> Callable: ...

120

```

121

122

[DAG Management](./dag-management.md)

123

124

### Task Operators

125

126

Task definition and execution including BaseOperator, task decorators, dynamic task mapping, and task instance management.

127

128

```python { .api }

129

class BaseOperator:

130

def __init__(

131

self,

132

task_id: str,

133

owner: str = "airflow",

134

retries: int = None,

135

retry_delay: timedelta = None,

136

**kwargs

137

): ...

138

139

@task(

140

task_id: Optional[str] = None,

141

python_callable: Optional[Callable] = None,

142

**kwargs

143

) -> Callable: ...

144

```

145

146

[Task Operators](./task-operators.md)

147

148

### Assets and Scheduling

149

150

Asset-driven scheduling system for creating data-aware workflows, including asset definitions, timetables, and dependency management.

151

152

```python { .api }

153

class Asset:

154

def __init__(

155

self,

156

uri: str,

157

name: Optional[str] = None,

158

group: Optional[str] = None,

159

extra: Optional[Dict[str, Any]] = None

160

): ...

161

162

class AssetAlias:

163

def __init__(self, name: str): ...

164

```

165

166

[Assets and Scheduling](./assets-scheduling.md)

167

168

### Configuration Management

169

170

System configuration, variables, parameters, and connection management for workflow orchestration.

171

172

```python { .api }

173

class Variable:

174

@classmethod

175

def get(cls, key: str, default_var: Any = None) -> Any: ...

176

177

@classmethod

178

def set(cls, key: str, value: Any) -> None: ...

179

180

class Param:

181

def __init__(

182

self,

183

default: Any = None,

184

description: Optional[str] = None,

185

**kwargs

186

): ...

187

```

188

189

[Configuration](./configuration.md)

190

191

### Cross-Communication (XCom)

192

193

Cross-communication system for sharing data between tasks including XComArg, custom backends, and serialization.

194

195

```python { .api }

196

class XComArg:

197

def __init__(

198

self,

199

operator: BaseOperator,

200

key: str = None

201

): ...

202

203

class XCom:

204

@classmethod

205

def get_one(

206

cls,

207

task_id: str,

208

dag_id: str,

209

key: str = None,

210

execution_date: datetime = None

211

) -> Any: ...

212

```

213

214

[Cross-Communication](./xcom.md)

215

216

### Executors

217

218

Task execution engines including LocalExecutor, CeleryExecutor, KubernetesExecutor, and custom executor development.

219

220

```python { .api }

221

class BaseExecutor:

222

def __init__(self, parallelism: int = 32): ...

223

224

def execute_async(

225

self,

226

key: TaskInstanceKey,

227

command: CommandType,

228

queue: Optional[str] = None

229

) -> None: ...

230

231

class LocalExecutor(BaseExecutor):

232

def __init__(self, parallelism: int = 0): ...

233

```

234

235

[Executors](./executors.md)

236

237

### CLI and Utilities

238

239

Command-line interface, context utilities, dependency management, and workflow orchestration helpers.

240

241

```python { .api }

242

def get_current_context() -> Context: ...

243

244

def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]) -> None: ...

245

246

def cross_downstream(

247

from_tasks: Sequence[BaseOperator],

248

to_tasks: Sequence[BaseOperator]

249

) -> None: ...

250

```

251

252

[CLI and Utilities](./cli-utilities.md)

253

254

### Exception Handling

255

256

Comprehensive exception hierarchy for error handling, timeout management, and workflow recovery.

257

258

```python { .api }

259

class AirflowException(Exception): ...

260

261

class AirflowTaskTimeout(AirflowException): ...

262

263

class AirflowSensorTimeout(AirflowException): ...

264

265

class AirflowRescheduleException(AirflowException): ...

266

```

267

268

[Exception Handling](./exceptions.md)

269

270

### Database Models

271

272

ORM models for DAGs, tasks, runs, connections, and metadata storage with SQLAlchemy integration.

273

274

```python { .api }

275

class DagModel:

276

dag_id: str

277

is_active: bool

278

last_parsed_time: datetime

279

next_dagrun: datetime

280

281

class TaskInstance:

282

task_id: str

283

dag_id: str

284

execution_date: datetime

285

state: str

286

```

287

288

[Database Models](./database-models.md)

289

290

### Extensions and Providers

291

292

Plugin system, provider packages, operator links, notifications, and custom component development.

293

294

```python { .api }

295

class BaseOperatorLink:

296

name: str = None

297

298

def get_link(

299

self,

300

operator: BaseOperator,

301

dttm: datetime

302

) -> str: ...

303

304

class BaseNotifier:

305

def __init__(self, **kwargs): ...

306

307

def notify(self, context: Context) -> None: ...

308

```

309

310

[Extensions and Providers](./extensions.md)