Common Compatibility Provider - providing compatibility code for previous Airflow versions
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Version-compatible base notifier class for creating custom notification handlers that work across different Airflow versions. This module provides a consistent interface for building notification systems regardless of the underlying Airflow version.
Version-compatible base class for implementing custom notifiers.
class BaseNotifier:
"""
Version-compatible base notifier class.
Maps to airflow.sdk.bases.notifier.BaseNotifier in Airflow 3.0+
Maps to airflow.notifications.basenotifier.BaseNotifier in Airflow < 3.0
Use this class as the base for implementing custom notification handlers
that need to work across different Airflow versions.
"""from airflow.providers.common.compat.notifier import BaseNotifier
from airflow.configuration import conf
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Custom email notifier
class EmailNotifier(BaseNotifier):
"""Custom email notifier that works across Airflow versions."""
def __init__(self, smtp_host: str, smtp_port: int, username: str, password: str):
super().__init__()
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password
def notify(self, context):
"""Send email notification."""
# Extract information from context
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
execution_date = context['execution_date']
state = context['task_instance'].state
# Create email message
msg = MIMEMultipart()
msg['From'] = self.username
msg['To'] = "admin@company.com"
msg['Subject'] = f"Airflow Task {state}: {dag_id}.{task_id}"
body = f"""
DAG: {dag_id}
Task: {task_id}
Execution Date: {execution_date}
State: {state}
Log URL: {context['task_instance'].log_url}
"""
msg.attach(MIMEText(body, 'plain'))
# Send email
try:
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
server.starttls()
server.login(self.username, self.password)
text = msg.as_string()
server.sendmail(self.username, "admin@company.com", text)
server.quit()
print(f"Email notification sent for {dag_id}.{task_id}")
except Exception as e:
print(f"Failed to send email notification: {e}")
# Custom Slack notifier
class SlackNotifier(BaseNotifier):
"""Custom Slack notifier that works across Airflow versions."""
def __init__(self, webhook_url: str, channel: str = "#airflow"):
super().__init__()
self.webhook_url = webhook_url
self.channel = channel
def notify(self, context):
"""Send Slack notification."""
import requests
import json
# Extract information from context
dag_id = context['dag'].dag_id
task_id = context['task_instance'].task_id
execution_date = context['execution_date']
state = context['task_instance'].state
# Prepare Slack message
color = "good" if state == "success" else "danger"
payload = {
"channel": self.channel,
"username": "Airflow Bot",
"icon_emoji": ":airplane:",
"attachments": [{
"color": color,
"title": f"Task {state.upper()}: {dag_id}.{task_id}",
"fields": [
{"title": "DAG", "value": dag_id, "short": True},
{"title": "Task", "value": task_id, "short": True},
{"title": "Execution Date", "value": str(execution_date), "short": True},
{"title": "State", "value": state.upper(), "short": True}
],
"footer": "Airflow",
"ts": context['task_instance'].start_date.timestamp()
}]
}
# Send to Slack
try:
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
response.raise_for_status()
print(f"Slack notification sent for {dag_id}.{task_id}")
except Exception as e:
print(f"Failed to send Slack notification: {e}")
# Use notifiers in DAGs
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# Configure notifiers
email_notifier = EmailNotifier(
smtp_host="smtp.company.com",
smtp_port=587,
username="airflow@company.com",
password="secret_password"
)
slack_notifier = SlackNotifier(
webhook_url="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
channel="#data-alerts"
)
def task_success_callback(context):
"""Success callback using notifiers."""
email_notifier.notify(context)
slack_notifier.notify(context)
def task_failure_callback(context):
"""Failure callback using notifiers."""
email_notifier.notify(context)
slack_notifier.notify(context)
# Create DAG with notifications
dag = DAG(
"example_with_notifications",
start_date=datetime(2024, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
default_args={
'on_success_callback': task_success_callback,
'on_failure_callback': task_failure_callback,
}
)
def my_task_function():
print("Executing important task...")
# Simulate some work
import time
time.sleep(2)
return "Task completed"
task = PythonOperator(
task_id="important_task",
python_callable=my_task_function,
dag=dag
)
# Multi-channel notifier
class MultiChannelNotifier(BaseNotifier):
"""Notifier that sends to multiple channels."""
def __init__(self, notifiers: list[BaseNotifier]):
super().__init__()
self.notifiers = notifiers
def notify(self, context):
"""Send notification to all configured channels."""
for notifier in self.notifiers:
try:
notifier.notify(context)
except Exception as e:
print(f"Notifier {type(notifier).__name__} failed: {e}")
# Use multi-channel notifier
multi_notifier = MultiChannelNotifier([
email_notifier,
slack_notifier
])
def multi_channel_callback(context):
multi_notifier.notify(context)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-common-compat