CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-common-compat

Common Compatibility Provider - providing compatibility code for previous Airflow versions

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

notifier-compatibility.mddocs/

Notifier Compatibility

Version-compatible base notifier class for creating custom notification handlers that work across different Airflow versions. This module provides a consistent interface for building notification systems regardless of the underlying Airflow version.

Capabilities

Base Notifier Class

Version-compatible base class for implementing custom notifiers.

class BaseNotifier:
    """
    Version-compatible base notifier class.
    
    Maps to airflow.sdk.bases.notifier.BaseNotifier in Airflow 3.0+
    Maps to airflow.notifications.basenotifier.BaseNotifier in Airflow < 3.0
    
    Use this class as the base for implementing custom notification handlers
    that need to work across different Airflow versions.
    """

Usage Examples

from airflow.providers.common.compat.notifier import BaseNotifier
from airflow.configuration import conf
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Custom email notifier
class EmailNotifier(BaseNotifier):
    """Custom email notifier that works across Airflow versions."""
    
    def __init__(self, smtp_host: str, smtp_port: int, username: str, password: str):
        super().__init__()
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
    
    def notify(self, context):
        """Send email notification."""
        # Extract information from context
        dag_id = context['dag'].dag_id
        task_id = context['task_instance'].task_id
        execution_date = context['execution_date']
        state = context['task_instance'].state
        
        # Create email message
        msg = MIMEMultipart()
        msg['From'] = self.username
        msg['To'] = "admin@company.com"
        msg['Subject'] = f"Airflow Task {state}: {dag_id}.{task_id}"
        
        body = f"""
        DAG: {dag_id}
        Task: {task_id}
        Execution Date: {execution_date}
        State: {state}
        
        Log URL: {context['task_instance'].log_url}
        """
        
        msg.attach(MIMEText(body, 'plain'))
        
        # Send email
        try:
            server = smtplib.SMTP(self.smtp_host, self.smtp_port)
            server.starttls()
            server.login(self.username, self.password)
            text = msg.as_string()
            server.sendmail(self.username, "admin@company.com", text)
            server.quit()
            print(f"Email notification sent for {dag_id}.{task_id}")
        except Exception as e:
            print(f"Failed to send email notification: {e}")

# Custom Slack notifier
class SlackNotifier(BaseNotifier):
    """Custom Slack notifier that works across Airflow versions."""
    
    def __init__(self, webhook_url: str, channel: str = "#airflow"):
        super().__init__()
        self.webhook_url = webhook_url
        self.channel = channel
    
    def notify(self, context):
        """Send Slack notification."""
        import requests
        import json
        
        # Extract information from context
        dag_id = context['dag'].dag_id
        task_id = context['task_instance'].task_id
        execution_date = context['execution_date']
        state = context['task_instance'].state
        
        # Prepare Slack message
        color = "good" if state == "success" else "danger"
        
        payload = {
            "channel": self.channel,
            "username": "Airflow Bot",
            "icon_emoji": ":airplane:",
            "attachments": [{
                "color": color,
                "title": f"Task {state.upper()}: {dag_id}.{task_id}",
                "fields": [
                    {"title": "DAG", "value": dag_id, "short": True},
                    {"title": "Task", "value": task_id, "short": True},
                    {"title": "Execution Date", "value": str(execution_date), "short": True},
                    {"title": "State", "value": state.upper(), "short": True}
                ],
                "footer": "Airflow",
                "ts": context['task_instance'].start_date.timestamp()
            }]
        }
        
        # Send to Slack
        try:
            response = requests.post(
                self.webhook_url,
                data=json.dumps(payload),
                headers={'Content-Type': 'application/json'}
            )
            response.raise_for_status()
            print(f"Slack notification sent for {dag_id}.{task_id}")
        except Exception as e:
            print(f"Failed to send Slack notification: {e}")

# Use notifiers in DAGs
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Configure notifiers
email_notifier = EmailNotifier(
    smtp_host="smtp.company.com",
    smtp_port=587,
    username="airflow@company.com",
    password="secret_password"
)

slack_notifier = SlackNotifier(
    webhook_url="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
    channel="#data-alerts"
)

def task_success_callback(context):
    """Success callback using notifiers."""
    email_notifier.notify(context)
    slack_notifier.notify(context)

def task_failure_callback(context):
    """Failure callback using notifiers."""
    email_notifier.notify(context)
    slack_notifier.notify(context)

# Create DAG with notifications
dag = DAG(
    "example_with_notifications",
    start_date=datetime(2024, 1, 1),
    schedule_interval=timedelta(days=1),
    catchup=False,
    default_args={
        'on_success_callback': task_success_callback,
        'on_failure_callback': task_failure_callback,
    }
)

def my_task_function():
    print("Executing important task...")
    # Simulate some work
    import time
    time.sleep(2)
    return "Task completed"

task = PythonOperator(
    task_id="important_task",
    python_callable=my_task_function,
    dag=dag
)

# Multi-channel notifier
class MultiChannelNotifier(BaseNotifier):
    """Notifier that sends to multiple channels."""
    
    def __init__(self, notifiers: list[BaseNotifier]):
        super().__init__()
        self.notifiers = notifiers
    
    def notify(self, context):
        """Send notification to all configured channels."""
        for notifier in self.notifiers:
            try:
                notifier.notify(context)
            except Exception as e:
                print(f"Notifier {type(notifier).__name__} failed: {e}")

# Use multi-channel notifier
multi_notifier = MultiChannelNotifier([
    email_notifier,
    slack_notifier
])

def multi_channel_callback(context):
    multi_notifier.notify(context)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-common-compat

docs

asset-management.md

index.md

lineage-entities.md

notifier-compatibility.md

openlineage-integration.md

provider-verification.md

security-permissions.md

standard-components.md

version-compatibility.md

tile.json