Integration with Airflow's notification system for sending alerts and status updates to Slack channels and webhooks. These notifiers can be used with Airflow's built-in notification framework for task failures, successes, and custom events.
Sends notifications to Slack channels using the Slack Web API with support for rich formatting, attachments, and Block Kit integration.
class SlackNotifier(BaseNotifier):
"""
Slack notifier for sending alerts via Slack Web API.
Integrates with Airflow's notification system to send
formatted messages with rich content support.
"""
template_fields = ("text", "channel", "username", "attachments", "blocks")
def __init__(
self,
*,
slack_conn_id: str = SlackHook.default_conn_name,
text: str = "This is a default message",
channel: str = "#general",
username: str = "Airflow",
icon_url: str = ICON_URL,
attachments: Sequence = (),
blocks: Sequence = (),
base_url: str | None = None,
proxy: str | None = None,
timeout: int | None = None,
retry_handlers: list[RetryHandler] | None = None,
unfurl_links: bool = True,
unfurl_media: bool = True,
):
"""
Initialize Slack API notifier.
Args:
slack_conn_id: Airflow connection ID for Slack API
text: Default message text
channel: Target Slack channel
username: Bot username for messages
icon_url: Icon URL for bot avatar
attachments: Legacy message attachments
blocks: Block Kit blocks for rich formatting
base_url: Custom Slack API base URL
proxy: Proxy server URL
timeout: Request timeout in seconds
retry_handlers: Custom retry handlers
unfurl_links: Auto-unfurl links in messages
unfurl_media: Auto-unfurl media in messages
"""
@cached_property
def hook(self) -> SlackHook:
"""Returns SlackHook instance."""
def notify(self, context):
"""
Send notification message to Slack.
Args:
context: Airflow task context containing execution information
"""from airflow.providers.slack.notifications.slack import SlackNotifier
# Basic notification setup
slack_notifier = SlackNotifier(
slack_conn_id='slack_api_default',
text='Task {{ task_instance.task_id }} completed',
channel='#airflow-alerts',
username='Airflow Notifier'
)
# Rich notification with blocks
failure_blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "🔴 Task Failure Alert"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*DAG:*\n{{ dag.dag_id }}"
},
{
"type": "mrkdwn",
"text": "*Task:*\n{{ task_instance.task_id }}"
},
{
"type": "mrkdwn",
"text": "*Execution Date:*\n{{ ds }}"
},
{
"type": "mrkdwn",
"text": "*Duration:*\n{{ task_instance.duration }} seconds"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Error:*\n```{{ task_instance.exception }}```"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {
"type": "plain_text",
"text": "View Logs"
},
"url": "{{ task_instance.log_url }}"
}
]
}
]
rich_failure_notifier = SlackNotifier(
text='Task failure - see details in blocks',
channel='#critical-alerts',
blocks=failure_blocks,
username='Critical Alert Bot'
)
# Success notification with attachments
success_attachments = [
{
"color": "good",
"title": "Task Completed Successfully",
"fields": [
{
"title": "DAG",
"value": "{{ dag.dag_id }}",
"short": True
},
{
"title": "Task",
"value": "{{ task_instance.task_id }}",
"short": True
},
{
"title": "Duration",
"value": "{{ task_instance.duration }}s",
"short": True
},
{
"title": "End Time",
"value": "{{ task_instance.end_date }}",
"short": True
}
],
"footer": "Airflow Notification",
"ts": "{{ task_instance.end_date.timestamp() | int }}"
}
]
success_notifier = SlackNotifier(
text='✅ Task completed successfully',
channel='#success-notifications',
attachments=success_attachments
)
# Using in DAG with notification framework
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
'example_with_notifications',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
default_args={
'on_failure_callback': [rich_failure_notifier],
'on_success_callback': [success_notifier]
}
) as dag:
task = BashOperator(
task_id='example_task',
bash_command='echo "Hello World"'
)Sends notifications using Slack Incoming Webhooks with support for both synchronous and asynchronous operations.
class SlackWebhookNotifier(BaseNotifier):
"""
Slack webhook notifier for sending alerts via Incoming Webhooks.
Simpler alternative to API notifier that uses webhook URLs
for message delivery with async support.
"""
template_fields = ("slack_webhook_conn_id", "text", "attachments", "blocks", "proxy", "timeout")
def __init__(
self,
*,
slack_webhook_conn_id: str = SlackWebhookHook.default_conn_name,
text: str,
blocks: list | None = None,
unfurl_links: bool | None = None,
unfurl_media: bool | None = None,
proxy: str | None = None,
timeout: int | None = None,
attachments: list | None = None,
retry_handlers: list[RetryHandler] | None = None,
**kwargs,
):
"""
Initialize Slack webhook notifier.
Args:
slack_webhook_conn_id: Airflow connection ID for webhook
text: Message text content
blocks: Block Kit blocks for rich formatting
unfurl_links: Auto-unfurl links in messages
unfurl_media: Auto-unfurl media in messages
proxy: Proxy server URL
timeout: Request timeout in seconds
attachments: Legacy message attachments
retry_handlers: Custom retry handlers
**kwargs: Additional webhook parameters
"""
@cached_property
def hook(self) -> SlackWebhookHook:
"""Returns SlackWebhookHook instance."""
def notify(self, context):
"""
Send notification message via webhook.
Args:
context: Airflow task context containing execution information
"""from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier
# Simple webhook notification
webhook_notifier = SlackWebhookNotifier(
slack_webhook_conn_id='slack_webhook_default',
text='🚀 DAG {{ dag.dag_id }} started execution at {{ ts }}'
)
# Detailed failure notification
failure_webhook = SlackWebhookNotifier(
slack_webhook_conn_id='alerts_webhook',
text='❌ Task {{ task_instance.task_id }} failed in DAG {{ dag.dag_id }}\n'
'Execution Date: {{ ds }}\n'
'Log URL: {{ task_instance.log_url }}'
)
# Block Kit webhook notification
block_notification = SlackWebhookNotifier(
slack_webhook_conn_id='webhook_conn',
text='DAG execution update',
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "🎯 *DAG Execution Summary*\n\n"
"*DAG:* {{ dag.dag_id }}\n"
"*Run ID:* {{ run_id }}\n"
"*Status:* {% if task_instance.state == 'success' %}✅ Success{% else %}❌ Failed{% endif %}"
}
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": "Execution Date: {{ ds }} | Duration: {{ task_instance.duration }}s"
}
]
}
]
)
# High-volume webhook notification
volume_notifier = SlackWebhookNotifier(
slack_webhook_conn_id='high_volume_webhook',
text='Batch processing completed: {{ task_instance.task_id }}'
)
# Using in DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
def example_task():
return "Task completed"
with DAG(
'webhook_notification_example',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
) as dag:
task = PythonOperator(
task_id='data_processing',
python_callable=example_task,
on_failure_callback=[failure_webhook],
on_success_callback=[webhook_notifier]
)Convenient alias functions for quick notifier setup:
# Alias for SlackNotifier
send_slack_notification = SlackNotifier
# Alias for SlackWebhookNotifier
send_slack_webhook_notification = SlackWebhookNotifierfrom airflow.providers.slack.notifications.slack import send_slack_notification
from airflow.providers.slack.notifications.slack_webhook import send_slack_webhook_notification
# Using aliases for cleaner code
api_alert = send_slack_notification(
text='Alert from {{ dag.dag_id }}',
channel='#alerts'
)
webhook_alert = send_slack_webhook_notification(
slack_webhook_conn_id='webhook_conn',
text='Webhook alert from {{ dag.dag_id }}'
)from airflow.operators.bash import BashOperator
# Task with success/failure notifications
task_with_notifications = BashOperator(
task_id='monitored_task',
bash_command='echo "Processing data..."',
on_success_callback=[success_notifier],
on_failure_callback=[failure_notifier],
on_retry_callback=[retry_notifier]
)# DAG with default notifications
with DAG(
'monitored_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
default_args={
'on_failure_callback': [dag_failure_notifier],
'on_success_callback': [dag_success_notifier]
}
) as dag:
# All tasks inherit default callbacks
pass# Access full task context in notifications
context_notifier = SlackNotifier(
text="""
🔄 *Task Update*
• **DAG**: {{ dag.dag_id }}
• **Task**: {{ task_instance.task_id }}
• **State**: {{ task_instance.state }}
• **Try Number**: {{ task_instance.try_number }}
• **Max Tries**: {{ task_instance.max_tries }}
• **Start Date**: {{ task_instance.start_date }}
• **End Date**: {{ task_instance.end_date }}
• **Duration**: {{ task_instance.duration }} seconds
• **Log URL**: {{ task_instance.log_url }}
{% if task_instance.state == 'failed' %}
**Error**: {{ task_instance.exception }}
{% endif %}
""",
channel='#detailed-monitoring'
)# Notification that only sends on specific conditions
conditional_notifier = SlackNotifier(
text="""
{% if task_instance.duration > 3600 %}
⏰ Long-running task alert: {{ task_instance.task_id }} took {{ task_instance.duration }} seconds
{% elif task_instance.try_number > 1 %}
🔄 Task {{ task_instance.task_id }} succeeded after {{ task_instance.try_number }} attempts
{% else %}
✅ Task {{ task_instance.task_id }} completed normally
{% endif %}
""",
channel='#performance-alerts'
)# Format complex data in notifications
data_summary_notifier = SlackNotifier(
text='Data processing summary available in thread',
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Daily Data Summary - {datetime.now().strftime('%Y-%m-%d')}*"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Records Processed:*\n{{ ti.xcom_pull(key='records_processed') or 'N/A' }}"
},
{
"type": "mrkdwn",
"text": "*Processing Time:*\n{{ task_instance.duration }}s"
},
{
"type": "mrkdwn",
"text": "*Success Rate:*\n{{ ti.xcom_pull(key='success_rate') or 'N/A' }}%"
},
{
"type": "mrkdwn",
"text": "*Error Count:*\n{{ ti.xcom_pull(key='error_count') or '0' }}"
}
]
}
]
)ICON_URL: str = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/src/airflow/ui/public/pin_100.png"Default icon URL used by SlackNotifier for bot avatar.
Notifiers include comprehensive error handling for: