0
# Notifier Compatibility
1
2
Version-compatible base notifier class for creating custom notification handlers that work across different Airflow versions. This module provides a consistent interface for building notification systems regardless of the underlying Airflow version.
3
4
## Capabilities
5
6
### Base Notifier Class
7
8
Version-compatible base class for implementing custom notifiers.
9
10
```python { .api }
11
class BaseNotifier:
12
"""
13
Version-compatible base notifier class.
14
15
Maps to airflow.sdk.bases.notifier.BaseNotifier in Airflow 3.0+
16
Maps to airflow.notifications.basenotifier.BaseNotifier in Airflow < 3.0
17
18
Use this class as the base for implementing custom notification handlers
19
that need to work across different Airflow versions.
20
"""
21
```
22
23
## Usage Examples
24
25
```python
26
from airflow.providers.common.compat.notifier import BaseNotifier
27
from airflow.configuration import conf
28
import smtplib
29
from email.mime.text import MIMEText
30
from email.mime.multipart import MIMEMultipart
31
32
# Custom email notifier
33
class EmailNotifier(BaseNotifier):
34
"""Custom email notifier that works across Airflow versions."""
35
36
def __init__(self, smtp_host: str, smtp_port: int, username: str, password: str):
37
super().__init__()
38
self.smtp_host = smtp_host
39
self.smtp_port = smtp_port
40
self.username = username
41
self.password = password
42
43
def notify(self, context):
44
"""Send email notification."""
45
# Extract information from context
46
dag_id = context['dag'].dag_id
47
task_id = context['task_instance'].task_id
48
execution_date = context['execution_date']
49
state = context['task_instance'].state
50
51
# Create email message
52
msg = MIMEMultipart()
53
msg['From'] = self.username
54
msg['To'] = "admin@company.com"
55
msg['Subject'] = f"Airflow Task {state}: {dag_id}.{task_id}"
56
57
body = f"""
58
DAG: {dag_id}
59
Task: {task_id}
60
Execution Date: {execution_date}
61
State: {state}
62
63
Log URL: {context['task_instance'].log_url}
64
"""
65
66
msg.attach(MIMEText(body, 'plain'))
67
68
# Send email
69
try:
70
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
71
server.starttls()
72
server.login(self.username, self.password)
73
text = msg.as_string()
74
server.sendmail(self.username, "admin@company.com", text)
75
server.quit()
76
print(f"Email notification sent for {dag_id}.{task_id}")
77
except Exception as e:
78
print(f"Failed to send email notification: {e}")
79
80
# Custom Slack notifier
81
class SlackNotifier(BaseNotifier):
82
"""Custom Slack notifier that works across Airflow versions."""
83
84
def __init__(self, webhook_url: str, channel: str = "#airflow"):
85
super().__init__()
86
self.webhook_url = webhook_url
87
self.channel = channel
88
89
def notify(self, context):
90
"""Send Slack notification."""
91
import requests
92
import json
93
94
# Extract information from context
95
dag_id = context['dag'].dag_id
96
task_id = context['task_instance'].task_id
97
execution_date = context['execution_date']
98
state = context['task_instance'].state
99
100
# Prepare Slack message
101
color = "good" if state == "success" else "danger"
102
103
payload = {
104
"channel": self.channel,
105
"username": "Airflow Bot",
106
"icon_emoji": ":airplane:",
107
"attachments": [{
108
"color": color,
109
"title": f"Task {state.upper()}: {dag_id}.{task_id}",
110
"fields": [
111
{"title": "DAG", "value": dag_id, "short": True},
112
{"title": "Task", "value": task_id, "short": True},
113
{"title": "Execution Date", "value": str(execution_date), "short": True},
114
{"title": "State", "value": state.upper(), "short": True}
115
],
116
"footer": "Airflow",
117
"ts": context['task_instance'].start_date.timestamp()
118
}]
119
}
120
121
# Send to Slack
122
try:
123
response = requests.post(
124
self.webhook_url,
125
data=json.dumps(payload),
126
headers={'Content-Type': 'application/json'}
127
)
128
response.raise_for_status()
129
print(f"Slack notification sent for {dag_id}.{task_id}")
130
except Exception as e:
131
print(f"Failed to send Slack notification: {e}")
132
133
# Use notifiers in DAGs
134
from airflow import DAG
135
from airflow.operators.python import PythonOperator
136
from datetime import datetime, timedelta
137
138
# Configure notifiers
139
email_notifier = EmailNotifier(
140
smtp_host="smtp.company.com",
141
smtp_port=587,
142
username="airflow@company.com",
143
password="secret_password"
144
)
145
146
slack_notifier = SlackNotifier(
147
webhook_url="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
148
channel="#data-alerts"
149
)
150
151
def task_success_callback(context):
152
"""Success callback using notifiers."""
153
email_notifier.notify(context)
154
slack_notifier.notify(context)
155
156
def task_failure_callback(context):
157
"""Failure callback using notifiers."""
158
email_notifier.notify(context)
159
slack_notifier.notify(context)
160
161
# Create DAG with notifications
162
dag = DAG(
163
"example_with_notifications",
164
start_date=datetime(2024, 1, 1),
165
schedule_interval=timedelta(days=1),
166
catchup=False,
167
default_args={
168
'on_success_callback': task_success_callback,
169
'on_failure_callback': task_failure_callback,
170
}
171
)
172
173
def my_task_function():
174
print("Executing important task...")
175
# Simulate some work
176
import time
177
time.sleep(2)
178
return "Task completed"
179
180
task = PythonOperator(
181
task_id="important_task",
182
python_callable=my_task_function,
183
dag=dag
184
)
185
186
# Multi-channel notifier
187
class MultiChannelNotifier(BaseNotifier):
188
"""Notifier that sends to multiple channels."""
189
190
def __init__(self, notifiers: list[BaseNotifier]):
191
super().__init__()
192
self.notifiers = notifiers
193
194
def notify(self, context):
195
"""Send notification to all configured channels."""
196
for notifier in self.notifiers:
197
try:
198
notifier.notify(context)
199
except Exception as e:
200
print(f"Notifier {type(notifier).__name__} failed: {e}")
201
202
# Use multi-channel notifier
203
multi_notifier = MultiChannelNotifier([
204
email_notifier,
205
slack_notifier
206
])
207
208
def multi_channel_callback(context):
209
multi_notifier.notify(context)
210
```