or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdindex.mdlineage-entities.mdnotifier-compatibility.mdopenlineage-integration.mdprovider-verification.mdsecurity-permissions.mdstandard-components.mdversion-compatibility.md

notifier-compatibility.mddocs/

0

# Notifier Compatibility

1

2

Version-compatible base notifier class for creating custom notification handlers that work across different Airflow versions. This module provides a consistent interface for building notification systems regardless of the underlying Airflow version.

3

4

## Capabilities

5

6

### Base Notifier Class

7

8

Version-compatible base class for implementing custom notifiers.

9

10

```python { .api }

11

class BaseNotifier:

12

"""

13

Version-compatible base notifier class.

14

15

Maps to airflow.sdk.bases.notifier.BaseNotifier in Airflow 3.0+

16

Maps to airflow.notifications.basenotifier.BaseNotifier in Airflow < 3.0

17

18

Use this class as the base for implementing custom notification handlers

19

that need to work across different Airflow versions.

20

"""

21

```

22

23

## Usage Examples

24

25

```python

26

from airflow.providers.common.compat.notifier import BaseNotifier

27

from airflow.configuration import conf

28

import smtplib

29

from email.mime.text import MIMEText

30

from email.mime.multipart import MIMEMultipart

31

32

# Custom email notifier

33

class EmailNotifier(BaseNotifier):

34

"""Custom email notifier that works across Airflow versions."""

35

36

def __init__(self, smtp_host: str, smtp_port: int, username: str, password: str):

37

super().__init__()

38

self.smtp_host = smtp_host

39

self.smtp_port = smtp_port

40

self.username = username

41

self.password = password

42

43

def notify(self, context):

44

"""Send email notification."""

45

# Extract information from context

46

dag_id = context['dag'].dag_id

47

task_id = context['task_instance'].task_id

48

execution_date = context['execution_date']

49

state = context['task_instance'].state

50

51

# Create email message

52

msg = MIMEMultipart()

53

msg['From'] = self.username

54

msg['To'] = "admin@company.com"

55

msg['Subject'] = f"Airflow Task {state}: {dag_id}.{task_id}"

56

57

body = f"""

58

DAG: {dag_id}

59

Task: {task_id}

60

Execution Date: {execution_date}

61

State: {state}

62

63

Log URL: {context['task_instance'].log_url}

64

"""

65

66

msg.attach(MIMEText(body, 'plain'))

67

68

# Send email

69

try:

70

server = smtplib.SMTP(self.smtp_host, self.smtp_port)

71

server.starttls()

72

server.login(self.username, self.password)

73

text = msg.as_string()

74

server.sendmail(self.username, "admin@company.com", text)

75

server.quit()

76

print(f"Email notification sent for {dag_id}.{task_id}")

77

except Exception as e:

78

print(f"Failed to send email notification: {e}")

79

80

# Custom Slack notifier

81

class SlackNotifier(BaseNotifier):

82

"""Custom Slack notifier that works across Airflow versions."""

83

84

def __init__(self, webhook_url: str, channel: str = "#airflow"):

85

super().__init__()

86

self.webhook_url = webhook_url

87

self.channel = channel

88

89

def notify(self, context):

90

"""Send Slack notification."""

91

import requests

92

import json

93

94

# Extract information from context

95

dag_id = context['dag'].dag_id

96

task_id = context['task_instance'].task_id

97

execution_date = context['execution_date']

98

state = context['task_instance'].state

99

100

# Prepare Slack message

101

color = "good" if state == "success" else "danger"

102

103

payload = {

104

"channel": self.channel,

105

"username": "Airflow Bot",

106

"icon_emoji": ":airplane:",

107

"attachments": [{

108

"color": color,

109

"title": f"Task {state.upper()}: {dag_id}.{task_id}",

110

"fields": [

111

{"title": "DAG", "value": dag_id, "short": True},

112

{"title": "Task", "value": task_id, "short": True},

113

{"title": "Execution Date", "value": str(execution_date), "short": True},

114

{"title": "State", "value": state.upper(), "short": True}

115

],

116

"footer": "Airflow",

117

"ts": context['task_instance'].start_date.timestamp()

118

}]

119

}

120

121

# Send to Slack

122

try:

123

response = requests.post(

124

self.webhook_url,

125

data=json.dumps(payload),

126

headers={'Content-Type': 'application/json'}

127

)

128

response.raise_for_status()

129

print(f"Slack notification sent for {dag_id}.{task_id}")

130

except Exception as e:

131

print(f"Failed to send Slack notification: {e}")

132

133

# Use notifiers in DAGs

134

from airflow import DAG

135

from airflow.operators.python import PythonOperator

136

from datetime import datetime, timedelta

137

138

# Configure notifiers

139

email_notifier = EmailNotifier(

140

smtp_host="smtp.company.com",

141

smtp_port=587,

142

username="airflow@company.com",

143

password="secret_password"

144

)

145

146

slack_notifier = SlackNotifier(

147

webhook_url="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",

148

channel="#data-alerts"

149

)

150

151

def task_success_callback(context):

152

"""Success callback using notifiers."""

153

email_notifier.notify(context)

154

slack_notifier.notify(context)

155

156

def task_failure_callback(context):

157

"""Failure callback using notifiers."""

158

email_notifier.notify(context)

159

slack_notifier.notify(context)

160

161

# Create DAG with notifications

162

dag = DAG(

163

"example_with_notifications",

164

start_date=datetime(2024, 1, 1),

165

schedule_interval=timedelta(days=1),

166

catchup=False,

167

default_args={

168

'on_success_callback': task_success_callback,

169

'on_failure_callback': task_failure_callback,

170

}

171

)

172

173

def my_task_function():

174

print("Executing important task...")

175

# Simulate some work

176

import time

177

time.sleep(2)

178

return "Task completed"

179

180

task = PythonOperator(

181

task_id="important_task",

182

python_callable=my_task_function,

183

dag=dag

184

)

185

186

# Multi-channel notifier

187

class MultiChannelNotifier(BaseNotifier):

188

"""Notifier that sends to multiple channels."""

189

190

def __init__(self, notifiers: list[BaseNotifier]):

191

super().__init__()

192

self.notifiers = notifiers

193

194

def notify(self, context):

195

"""Send notification to all configured channels."""

196

for notifier in self.notifiers:

197

try:

198

notifier.notify(context)

199

except Exception as e:

200

print(f"Notifier {type(notifier).__name__} failed: {e}")

201

202

# Use multi-channel notifier

203

multi_notifier = MultiChannelNotifier([

204

email_notifier,

205

slack_notifier

206

])

207

208

def multi_channel_callback(context):

209

multi_notifier.notify(context)

210

```