0
# Extensions and Providers
1
2
Plugin system, provider packages, operator links, notifications, and custom component development. Airflow's extensibility system allows custom operators, hooks, sensors, and integrations.
3
4
## Capabilities
5
6
### Provider System
7
8
Framework for extending Airflow with additional functionality.
9
10
```python { .api }
11
class BaseOperatorLink:
12
"""Base class for operator links in the web UI."""
13
14
name: str = None
15
16
def get_link(
17
self,
18
operator: BaseOperator,
19
dttm: datetime,
20
ti_key: TaskInstanceKey = None
21
) -> str:
22
"""
23
Get link URL for operator instance.
24
25
Args:
26
operator: Operator instance
27
dttm: Execution datetime
28
ti_key: Task instance key
29
30
Returns:
31
Link URL
32
"""
33
34
class ProviderInfo:
35
"""Information about a provider package."""
36
37
provider_name: str
38
package_name: str
39
version: str
40
description: str
41
connection_types: List[str]
42
extra_links: List[str]
43
```
44
45
### Notification System
46
47
Send notifications for DAG and task events.
48
49
```python { .api }
50
class BaseNotifier:
51
"""Base class for notification backends."""
52
53
def __init__(self, **kwargs):
54
"""Initialize notifier with configuration."""
55
56
def notify(self, context: Context) -> None:
57
"""
58
Send notification.
59
60
Args:
61
context: Task execution context
62
"""
63
64
class EmailNotifier(BaseNotifier):
65
"""Email notification backend."""
66
67
def __init__(
68
self,
69
to: List[str],
70
subject: str = None,
71
html_content: str = None,
72
**kwargs
73
):
74
"""
75
Initialize email notifier.
76
77
Args:
78
to: Recipient email addresses
79
subject: Email subject template
80
html_content: HTML email content
81
"""
82
83
class SlackNotifier(BaseNotifier):
84
"""Slack notification backend."""
85
86
def __init__(
87
self,
88
slack_conn_id: str,
89
channel: str = None,
90
username: str = None,
91
**kwargs
92
):
93
"""
94
Initialize Slack notifier.
95
96
Args:
97
slack_conn_id: Slack connection ID
98
channel: Slack channel
99
username: Bot username
100
"""
101
```
102
103
### Plugin Framework
104
105
Develop custom Airflow plugins.
106
107
```python { .api }
108
class AirflowPlugin:
109
"""Base class for Airflow plugins."""
110
111
name: str = None
112
operators: List[type] = []
113
sensors: List[type] = []
114
hooks: List[type] = []
115
executors: List[type] = []
116
macros: List[Any] = []
117
admin_views: List[Any] = []
118
flask_blueprints: List[Any] = []
119
menu_links: List[Any] = []
120
appbuilder_views: List[Any] = []
121
appbuilder_menu_items: List[Any] = []
122
global_operator_extra_links: List[BaseOperatorLink] = []
123
operator_extra_links: List[BaseOperatorLink] = []
124
125
# Example plugin
126
class CustomPlugin(AirflowPlugin):
127
name = "custom_plugin"
128
operators = [CustomOperator]
129
hooks = [CustomHook]
130
macros = [custom_macro_function]
131
```
132
133
### Custom Operators
134
135
Develop custom task operators.
136
137
```python { .api }
138
class CustomOperator(BaseOperator):
139
"""Example custom operator implementation."""
140
141
template_fields = ['param1', 'param2']
142
template_ext = ['.sql', '.txt']
143
ui_color = '#ffcccc'
144
145
def __init__(
146
self,
147
param1: str,
148
param2: Optional[str] = None,
149
**kwargs
150
):
151
"""
152
Initialize custom operator.
153
154
Args:
155
param1: Required parameter
156
param2: Optional parameter
157
"""
158
super().__init__(**kwargs)
159
self.param1 = param1
160
self.param2 = param2
161
162
def execute(self, context: Context) -> Any:
163
"""
164
Execute custom logic.
165
166
Args:
167
context: Task execution context
168
169
Returns:
170
Task result
171
"""
172
# Custom implementation
173
return f"Executed with {self.param1}, {self.param2}"
174
```
175
176
### Listener Framework
177
178
Event listeners for monitoring workflow execution.
179
180
```python { .api }
181
def on_dag_run_running(
182
dag_run: DagRun,
183
msg: str
184
) -> None:
185
"""
186
Called when DAG run starts.
187
188
Args:
189
dag_run: DAG run instance
190
msg: Event message
191
"""
192
193
def on_dag_run_success(
194
dag_run: DagRun,
195
msg: str
196
) -> None:
197
"""
198
Called when DAG run succeeds.
199
200
Args:
201
dag_run: DAG run instance
202
msg: Event message
203
"""
204
205
def on_dag_run_failed(
206
dag_run: DagRun,
207
msg: str
208
) -> None:
209
"""
210
Called when DAG run fails.
211
212
Args:
213
dag_run: DAG run instance
214
msg: Event message
215
"""
216
217
def on_task_instance_running(
218
previous_state: str,
219
task_instance: TaskInstance,
220
session: Session
221
) -> None:
222
"""
223
Called when task instance starts running.
224
225
Args:
226
previous_state: Previous task state
227
task_instance: Task instance
228
session: Database session
229
"""
230
231
def on_task_instance_success(
232
previous_state: str,
233
task_instance: TaskInstance,
234
session: Session
235
) -> None:
236
"""
237
Called when task instance succeeds.
238
239
Args:
240
previous_state: Previous task state
241
task_instance: Task instance
242
session: Database session
243
"""
244
245
def on_task_instance_failed(
246
previous_state: str,
247
task_instance: TaskInstance,
248
session: Session
249
) -> None:
250
"""
251
Called when task instance fails.
252
253
Args:
254
previous_state: Previous task state
255
task_instance: Task instance
256
session: Database session
257
"""
258
```
259
260
## Types
261
262
```python { .api }
263
from typing import List, Dict, Any, Optional, Callable
264
from datetime import datetime
265
from airflow.models.baseoperator import BaseOperator
266
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
267
from airflow.models.dagrun import DagRun
268
from airflow.utils.context import Context
269
270
PluginComponent = type
271
NotificationChannel = str
272
EventListener = Callable
273
```