or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assets-scheduling.mdcli-utilities.mdconfiguration.mddag-management.mddatabase-models.mdexceptions.mdexecutors.mdextensions.mdindex.mdtask-operators.mdxcom.md

extensions.mddocs/

0

# Extensions and Providers

1

2

Plugin system, provider packages, operator links, notifications, and custom component development. Airflow's extensibility system allows custom operators, hooks, sensors, and integrations.

3

4

## Capabilities

5

6

### Provider System

7

8

Framework for extending Airflow with additional functionality.

9

10

```python { .api }

11

class BaseOperatorLink:

12

"""Base class for operator links in the web UI."""

13

14

name: str = None

15

16

def get_link(

17

self,

18

operator: BaseOperator,

19

dttm: datetime,

20

ti_key: TaskInstanceKey = None

21

) -> str:

22

"""

23

Get link URL for operator instance.

24

25

Args:

26

operator: Operator instance

27

dttm: Execution datetime

28

ti_key: Task instance key

29

30

Returns:

31

Link URL

32

"""

33

34

class ProviderInfo:

35

"""Information about a provider package."""

36

37

provider_name: str

38

package_name: str

39

version: str

40

description: str

41

connection_types: List[str]

42

extra_links: List[str]

43

```

44

45

### Notification System

46

47

Send notifications for DAG and task events.

48

49

```python { .api }

50

class BaseNotifier:

51

"""Base class for notification backends."""

52

53

def __init__(self, **kwargs):

54

"""Initialize notifier with configuration."""

55

56

def notify(self, context: Context) -> None:

57

"""

58

Send notification.

59

60

Args:

61

context: Task execution context

62

"""

63

64

class EmailNotifier(BaseNotifier):

65

"""Email notification backend."""

66

67

def __init__(

68

self,

69

to: List[str],

70

subject: str = None,

71

html_content: str = None,

72

**kwargs

73

):

74

"""

75

Initialize email notifier.

76

77

Args:

78

to: Recipient email addresses

79

subject: Email subject template

80

html_content: HTML email content

81

"""

82

83

class SlackNotifier(BaseNotifier):

84

"""Slack notification backend."""

85

86

def __init__(

87

self,

88

slack_conn_id: str,

89

channel: str = None,

90

username: str = None,

91

**kwargs

92

):

93

"""

94

Initialize Slack notifier.

95

96

Args:

97

slack_conn_id: Slack connection ID

98

channel: Slack channel

99

username: Bot username

100

"""

101

```

102

103

### Plugin Framework

104

105

Develop custom Airflow plugins.

106

107

```python { .api }

108

class AirflowPlugin:

109

"""Base class for Airflow plugins."""

110

111

name: str = None

112

operators: List[type] = []

113

sensors: List[type] = []

114

hooks: List[type] = []

115

executors: List[type] = []

116

macros: List[Any] = []

117

admin_views: List[Any] = []

118

flask_blueprints: List[Any] = []

119

menu_links: List[Any] = []

120

appbuilder_views: List[Any] = []

121

appbuilder_menu_items: List[Any] = []

122

global_operator_extra_links: List[BaseOperatorLink] = []

123

operator_extra_links: List[BaseOperatorLink] = []

124

125

# Example plugin

126

class CustomPlugin(AirflowPlugin):

127

name = "custom_plugin"

128

operators = [CustomOperator]

129

hooks = [CustomHook]

130

macros = [custom_macro_function]

131

```

132

133

### Custom Operators

134

135

Develop custom task operators.

136

137

```python { .api }

138

class CustomOperator(BaseOperator):

139

"""Example custom operator implementation."""

140

141

template_fields = ['param1', 'param2']

142

template_ext = ['.sql', '.txt']

143

ui_color = '#ffcccc'

144

145

def __init__(

146

self,

147

param1: str,

148

param2: Optional[str] = None,

149

**kwargs

150

):

151

"""

152

Initialize custom operator.

153

154

Args:

155

param1: Required parameter

156

param2: Optional parameter

157

"""

158

super().__init__(**kwargs)

159

self.param1 = param1

160

self.param2 = param2

161

162

def execute(self, context: Context) -> Any:

163

"""

164

Execute custom logic.

165

166

Args:

167

context: Task execution context

168

169

Returns:

170

Task result

171

"""

172

# Custom implementation

173

return f"Executed with {self.param1}, {self.param2}"

174

```

175

176

### Listener Framework

177

178

Event listeners for monitoring workflow execution.

179

180

```python { .api }

181

def on_dag_run_running(

182

dag_run: DagRun,

183

msg: str

184

) -> None:

185

"""

186

Called when DAG run starts.

187

188

Args:

189

dag_run: DAG run instance

190

msg: Event message

191

"""

192

193

def on_dag_run_success(

194

dag_run: DagRun,

195

msg: str

196

) -> None:

197

"""

198

Called when DAG run succeeds.

199

200

Args:

201

dag_run: DAG run instance

202

msg: Event message

203

"""

204

205

def on_dag_run_failed(

206

dag_run: DagRun,

207

msg: str

208

) -> None:

209

"""

210

Called when DAG run fails.

211

212

Args:

213

dag_run: DAG run instance

214

msg: Event message

215

"""

216

217

def on_task_instance_running(

218

previous_state: str,

219

task_instance: TaskInstance,

220

session: Session

221

) -> None:

222

"""

223

Called when task instance starts running.

224

225

Args:

226

previous_state: Previous task state

227

task_instance: Task instance

228

session: Database session

229

"""

230

231

def on_task_instance_success(

232

previous_state: str,

233

task_instance: TaskInstance,

234

session: Session

235

) -> None:

236

"""

237

Called when task instance succeeds.

238

239

Args:

240

previous_state: Previous task state

241

task_instance: Task instance

242

session: Database session

243

"""

244

245

def on_task_instance_failed(

246

previous_state: str,

247

task_instance: TaskInstance,

248

session: Session

249

) -> None:

250

"""

251

Called when task instance fails.

252

253

Args:

254

previous_state: Previous task state

255

task_instance: Task instance

256

session: Database session

257

"""

258

```

259

260

## Types

261

262

```python { .api }

263

from typing import List, Dict, Any, Optional, Callable

264

from datetime import datetime

265

from airflow.models.baseoperator import BaseOperator

266

from airflow.models.taskinstance import TaskInstance, TaskInstanceKey

267

from airflow.models.dagrun import DagRun

268

from airflow.utils.context import Context

269

270

PluginComponent = type

271

NotificationChannel = str

272

EventListener = Callable

273

```