or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

hooks.mdindex.mdnotifications.mdoperators.mdtransfers.md
tile.json

notifications.mddocs/

Notifications

Integration with Airflow's notification system for sending alerts and status updates to Slack channels and webhooks. These notifiers can be used with Airflow's built-in notification framework for task failures, successes, and custom events.

Capabilities

Slack API Notifier

Sends notifications to Slack channels using the Slack Web API with support for rich formatting, attachments, and Block Kit integration.

class SlackNotifier(BaseNotifier):
    """
    Slack notifier for sending alerts via Slack Web API.
    
    Integrates with Airflow's notification system to send
    formatted messages with rich content support.
    """
    
    template_fields = ("text", "channel", "username", "attachments", "blocks")
    
    def __init__(
        self,
        *,
        slack_conn_id: str = SlackHook.default_conn_name,
        text: str = "This is a default message",
        channel: str = "#general",
        username: str = "Airflow",
        icon_url: str = ICON_URL,
        attachments: Sequence = (),
        blocks: Sequence = (),
        base_url: str | None = None,
        proxy: str | None = None,
        timeout: int | None = None,
        retry_handlers: list[RetryHandler] | None = None,
        unfurl_links: bool = True,
        unfurl_media: bool = True,
    ):
        """
        Initialize Slack API notifier.
        
        Args:
            slack_conn_id: Airflow connection ID for Slack API
            text: Default message text
            channel: Target Slack channel
            username: Bot username for messages
            icon_url: Icon URL for bot avatar
            attachments: Legacy message attachments
            blocks: Block Kit blocks for rich formatting
            base_url: Custom Slack API base URL
            proxy: Proxy server URL
            timeout: Request timeout in seconds
            retry_handlers: Custom retry handlers
            unfurl_links: Auto-unfurl links in messages
            unfurl_media: Auto-unfurl media in messages
        """
        
    @cached_property
    def hook(self) -> SlackHook:
        """Returns SlackHook instance."""
        
    def notify(self, context):
        """
        Send notification message to Slack.
        
        Args:
            context: Airflow task context containing execution information
        """

Usage Examples

from airflow.providers.slack.notifications.slack import SlackNotifier

# Basic notification setup
slack_notifier = SlackNotifier(
    slack_conn_id='slack_api_default',
    text='Task {{ task_instance.task_id }} completed',
    channel='#airflow-alerts',
    username='Airflow Notifier'
)

# Rich notification with blocks
failure_blocks = [
    {
        "type": "header",
        "text": {
            "type": "plain_text",
            "text": "🔴 Task Failure Alert"
        }
    },
    {
        "type": "section",
        "fields": [
            {
                "type": "mrkdwn",
                "text": "*DAG:*\n{{ dag.dag_id }}"
            },
            {
                "type": "mrkdwn",
                "text": "*Task:*\n{{ task_instance.task_id }}"
            },
            {
                "type": "mrkdwn",
                "text": "*Execution Date:*\n{{ ds }}"
            },
            {
                "type": "mrkdwn",
                "text": "*Duration:*\n{{ task_instance.duration }} seconds"
            }
        ]
    },
    {
        "type": "section",
        "text": {
            "type": "mrkdwn",
            "text": "*Error:*\n```{{ task_instance.exception }}```"
        }
    },
    {
        "type": "actions",
        "elements": [
            {
                "type": "button",
                "text": {
                    "type": "plain_text",
                    "text": "View Logs"
                },
                "url": "{{ task_instance.log_url }}"
            }
        ]
    }
]

rich_failure_notifier = SlackNotifier(
    text='Task failure - see details in blocks',
    channel='#critical-alerts',
    blocks=failure_blocks,
    username='Critical Alert Bot'
)

# Success notification with attachments
success_attachments = [
    {
        "color": "good",
        "title": "Task Completed Successfully",
        "fields": [
            {
                "title": "DAG",
                "value": "{{ dag.dag_id }}",
                "short": True
            },
            {
                "title": "Task",
                "value": "{{ task_instance.task_id }}",
                "short": True
            },
            {
                "title": "Duration",
                "value": "{{ task_instance.duration }}s",
                "short": True
            },
            {
                "title": "End Time",
                "value": "{{ task_instance.end_date }}",
                "short": True
            }
        ],
        "footer": "Airflow Notification",
        "ts": "{{ task_instance.end_date.timestamp() | int }}"
    }
]

success_notifier = SlackNotifier(
    text='✅ Task completed successfully',
    channel='#success-notifications', 
    attachments=success_attachments
)

# Using in DAG with notification framework
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    'example_with_notifications',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    default_args={
        'on_failure_callback': [rich_failure_notifier],
        'on_success_callback': [success_notifier]
    }
) as dag:
    
    task = BashOperator(
        task_id='example_task',
        bash_command='echo "Hello World"'
    )

Slack Webhook Notifier

Sends notifications using Slack Incoming Webhooks with support for both synchronous and asynchronous operations.

class SlackWebhookNotifier(BaseNotifier):
    """
    Slack webhook notifier for sending alerts via Incoming Webhooks.
    
    Simpler alternative to API notifier that uses webhook URLs
    for message delivery with async support.
    """
    
    template_fields = ("slack_webhook_conn_id", "text", "attachments", "blocks", "proxy", "timeout")
    
    def __init__(
        self,
        *,
        slack_webhook_conn_id: str = SlackWebhookHook.default_conn_name,
        text: str,
        blocks: list | None = None,
        unfurl_links: bool | None = None,
        unfurl_media: bool | None = None,
        proxy: str | None = None,
        timeout: int | None = None,
        attachments: list | None = None,
        retry_handlers: list[RetryHandler] | None = None,
        **kwargs,
    ):
        """
        Initialize Slack webhook notifier.
        
        Args:
            slack_webhook_conn_id: Airflow connection ID for webhook
            text: Message text content
            blocks: Block Kit blocks for rich formatting
            unfurl_links: Auto-unfurl links in messages
            unfurl_media: Auto-unfurl media in messages
            proxy: Proxy server URL
            timeout: Request timeout in seconds
            attachments: Legacy message attachments
            retry_handlers: Custom retry handlers
            **kwargs: Additional webhook parameters
        """
        
    @cached_property
    def hook(self) -> SlackWebhookHook:
        """Returns SlackWebhookHook instance."""
        
    def notify(self, context):
        """
        Send notification message via webhook.
        
        Args:
            context: Airflow task context containing execution information
        """

Usage Examples

from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier

# Simple webhook notification
webhook_notifier = SlackWebhookNotifier(
    slack_webhook_conn_id='slack_webhook_default',
    text='🚀 DAG {{ dag.dag_id }} started execution at {{ ts }}'
)

# Detailed failure notification
failure_webhook = SlackWebhookNotifier(
    slack_webhook_conn_id='alerts_webhook',
    text='❌ Task {{ task_instance.task_id }} failed in DAG {{ dag.dag_id }}\n'
         'Execution Date: {{ ds }}\n'
         'Log URL: {{ task_instance.log_url }}'
)

# Block Kit webhook notification
block_notification = SlackWebhookNotifier(
    slack_webhook_conn_id='webhook_conn',
    text='DAG execution update',
    blocks=[
        {
            "type": "section",
            "text": {
                "type": "mrkdwn", 
                "text": "🎯 *DAG Execution Summary*\n\n"
                       "*DAG:* {{ dag.dag_id }}\n"
                       "*Run ID:* {{ run_id }}\n"
                       "*Status:* {% if task_instance.state == 'success' %}✅ Success{% else %}❌ Failed{% endif %}"
            }
        },
        {
            "type": "context",
            "elements": [
                {
                    "type": "mrkdwn",
                    "text": "Execution Date: {{ ds }} | Duration: {{ task_instance.duration }}s"
                }
            ]
        }
    ]
)

# High-volume webhook notification
volume_notifier = SlackWebhookNotifier(
    slack_webhook_conn_id='high_volume_webhook', 
    text='Batch processing completed: {{ task_instance.task_id }}'
)

# Using in DAG
from airflow import DAG
from airflow.operators.python import PythonOperator

def example_task():
    return "Task completed"

with DAG(
    'webhook_notification_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily'
) as dag:
    
    task = PythonOperator(
        task_id='data_processing',
        python_callable=example_task,
        on_failure_callback=[failure_webhook],
        on_success_callback=[webhook_notifier]
    )

Notification Aliases

Convenient alias functions for quick notifier setup:

# Alias for SlackNotifier
send_slack_notification = SlackNotifier

# Alias for SlackWebhookNotifier  
send_slack_webhook_notification = SlackWebhookNotifier

Usage Examples

from airflow.providers.slack.notifications.slack import send_slack_notification
from airflow.providers.slack.notifications.slack_webhook import send_slack_webhook_notification

# Using aliases for cleaner code
api_alert = send_slack_notification(
    text='Alert from {{ dag.dag_id }}',
    channel='#alerts'
)

webhook_alert = send_slack_webhook_notification(
    slack_webhook_conn_id='webhook_conn',
    text='Webhook alert from {{ dag.dag_id }}'
)

Integration with Airflow Callbacks

Task-Level Callbacks

from airflow.operators.bash import BashOperator

# Task with success/failure notifications
task_with_notifications = BashOperator(
    task_id='monitored_task',
    bash_command='echo "Processing data..."',
    on_success_callback=[success_notifier],
    on_failure_callback=[failure_notifier],
    on_retry_callback=[retry_notifier]
)

DAG-Level Callbacks

# DAG with default notifications
with DAG(
    'monitored_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    default_args={
        'on_failure_callback': [dag_failure_notifier],
        'on_success_callback': [dag_success_notifier]
    }
) as dag:
    # All tasks inherit default callbacks
    pass

Custom Context Usage

# Access full task context in notifications
context_notifier = SlackNotifier(
    text="""
🔄 *Task Update*

• **DAG**: {{ dag.dag_id }}
• **Task**: {{ task_instance.task_id }}
• **State**: {{ task_instance.state }}
• **Try Number**: {{ task_instance.try_number }}
• **Max Tries**: {{ task_instance.max_tries }}
• **Start Date**: {{ task_instance.start_date }}
• **End Date**: {{ task_instance.end_date }}
• **Duration**: {{ task_instance.duration }} seconds
• **Log URL**: {{ task_instance.log_url }}

{% if task_instance.state == 'failed' %}
**Error**: {{ task_instance.exception }}
{% endif %}
    """,
    channel='#detailed-monitoring'
)

Advanced Features

Conditional Notifications

# Notification that only sends on specific conditions
conditional_notifier = SlackNotifier(
    text="""
{% if task_instance.duration > 3600 %}
⏰ Long-running task alert: {{ task_instance.task_id }} took {{ task_instance.duration }} seconds
{% elif task_instance.try_number > 1 %}
🔄 Task {{ task_instance.task_id }} succeeded after {{ task_instance.try_number }} attempts
{% else %}
✅ Task {{ task_instance.task_id }} completed normally
{% endif %}
    """,
    channel='#performance-alerts'
)

Rich Data Formatting

# Format complex data in notifications
data_summary_notifier = SlackNotifier(
    text='Data processing summary available in thread',
    blocks=[
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*Daily Data Summary - {datetime.now().strftime('%Y-%m-%d')}*"
            }
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": "*Records Processed:*\n{{ ti.xcom_pull(key='records_processed') or 'N/A' }}"
                },
                {
                    "type": "mrkdwn", 
                    "text": "*Processing Time:*\n{{ task_instance.duration }}s"
                },
                {
                    "type": "mrkdwn",
                    "text": "*Success Rate:*\n{{ ti.xcom_pull(key='success_rate') or 'N/A' }}%"
                },
                {
                    "type": "mrkdwn",
                    "text": "*Error Count:*\n{{ ti.xcom_pull(key='error_count') or '0' }}"
                }
            ]
        }
    ]
)

Constants

ICON_URL: str = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/src/airflow/ui/public/pin_100.png"

Default icon URL used by SlackNotifier for bot avatar.

Error Handling

Notifiers include comprehensive error handling for:

  • Connection failures: Automatic retry with configurable handlers
  • Invalid webhook URLs: Clear error messages for malformed URLs
  • Rate limiting: Built-in rate limit handling from slack_sdk
  • Network timeouts: Configurable timeout settings
  • Message formatting errors: Validation of Block Kit and attachment formats
  • Template rendering errors: Proper error reporting for template failures