Task execution components for posting messages, uploading files, and sending webhook notifications within Airflow workflows. All operators support Airflow's templating system and integrate with the task execution framework.
Posts messages to Slack channels using the Slack Web API with support for rich formatting, attachments, and Block Kit.
class SlackAPIPostOperator(SlackAPIOperator):
"""
Posts messages to Slack channel using Slack Web API.
Supports rich text formatting, attachments, Block Kit blocks,
and custom message styling.
"""
template_fields = ("username", "text", "attachments", "blocks", "channel")
ui_color = "#FFBA40"
def __init__(
self,
channel: str = "#general",
username: str = "Airflow",
text: str = (
"No message has been set.\n"
"Here is a cat video instead\n"
"https://www.youtube.com/watch?v=J---aiyznGQ"
),
icon_url: str = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/src/airflow/ui/public/pin_100.png",
blocks: list | None = None,
attachments: list | None = None,
slack_conn_id: str = SlackHook.default_conn_name,
method: str = "chat.postMessage",
api_params: dict | None = None,
base_url: str | None = None,
proxy: str | None = None,
timeout: int | None = None,
retry_handlers: list[RetryHandler] | None = None,
**kwargs,
) -> None:
"""
Initialize SlackAPIPostOperator.
Args:
channel: Slack channel name or ID
username: Bot username for the message
text: Message text content
icon_url: URL for bot icon
blocks: Block Kit blocks for rich formatting
attachments: Legacy message attachments
slack_conn_id: Airflow connection ID
method: Slack API method (default: chat.postMessage)
api_params: Additional API parameters
base_url: Custom Slack API base URL
proxy: Proxy server URL
timeout: Request timeout in seconds
retry_handlers: Custom retry handlers
**kwargs: Additional operator arguments
"""
def construct_api_call_params(self) -> Any:
"""Construct parameters for chat.postMessage API call."""# Simple text message
send_message = SlackAPIPostOperator(
task_id='send_alert',
slack_conn_id='slack_api_default',
channel='#alerts',
text='Task {{ task_instance.task_id }} completed at {{ ts }}',
username='Airflow Alert Bot'
)
# Message with Block Kit formatting
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Task Status Update*\n:white_check_mark: Task completed successfully"
}
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": "DAG: {{ dag.dag_id }} | Task: {{ task_instance.task_id }}"
}
]
}
]
send_rich_message = SlackAPIPostOperator(
task_id='send_rich_alert',
channel='#general',
blocks=blocks,
text='Fallback text for notifications'
)
# Using legacy attachments
attachments = [
{
"color": "good",
"title": "Process Completed",
"text": "All data processing tasks finished successfully",
"fields": [
{"title": "Duration", "value": "45 minutes", "short": True},
{"title": "Records", "value": "10,543", "short": True}
]
}
]
send_attachment = SlackAPIPostOperator(
task_id='send_report',
channel='#reports',
text='Daily processing report',
attachments=attachments
)Uploads files to Slack channels with support for multiple file formats, custom metadata, and both file paths and content strings.
class SlackAPIFileOperator(SlackAPIOperator):
"""
Uploads files to Slack channels using Slack Web API.
Supports file uploads from local paths or string content,
with customizable metadata and multiple channel targeting.
"""
template_fields = ("channels", "initial_comment", "filename", "filetype", "content", "title", "snippet_type")
ui_color = "#44BEDF"
def __init__(
self,
channels: str | Sequence[str] | None = None,
initial_comment: str | None = None,
filename: str | None = None,
filetype: str | None = None,
content: str | None = None,
title: str | None = None,
method_version: Literal["v1", "v2"] | None = None,
snippet_type: str | None = None,
slack_conn_id: str = SlackHook.default_conn_name,
method: str = "files.upload",
api_params: dict | None = None,
base_url: str | None = None,
proxy: str | None = None,
timeout: int | None = None,
retry_handlers: list[RetryHandler] | None = None,
**kwargs,
) -> None:
"""
Initialize SlackAPIFileOperator.
Args:
channels: Channel names or IDs for file upload
initial_comment: Comment to add with the file
filename: Local file path to upload
filetype: File type hint for Slack
content: File content as string (alternative to filename)
title: Display title for the file
method_version: API version ("v1" or "v2")
snippet_type: Syntax highlighting type for code files
slack_conn_id: Airflow connection ID
method: Slack API method (files.upload or files.upload_v2)
api_params: Additional API parameters
base_url: Custom Slack API base URL
proxy: Proxy server URL
timeout: Request timeout in seconds
retry_handlers: Custom retry handlers
**kwargs: Additional operator arguments
"""
def execute(self, context: Context):
"""Execute the file upload operation."""# Upload file from local path
upload_report = SlackAPIFileOperator(
task_id='upload_daily_report',
slack_conn_id='slack_api_default',
channels=['#reports', '#management'],
filename='/tmp/daily_report_{{ ds }}.csv',
title='Daily Sales Report - {{ ds }}',
initial_comment='Here is the daily sales report for {{ ds }}',
filetype='csv'
)
# Upload content as file
upload_log = SlackAPIFileOperator(
task_id='upload_error_log',
channels='#alerts',
content='{{ task_instance.log }}',
filename='error_log_{{ ts_nodash }}.txt',
title='Task Error Log',
initial_comment='Error log for failed task {{ task_instance.task_id }}',
snippet_type='text'
)
# Upload code snippet with syntax highlighting
upload_code = SlackAPIFileOperator(
task_id='share_sql_query',
channels='#data-team',
content="""
SELECT
date_trunc('day', created_at) as date,
count(*) as daily_count
FROM user_events
WHERE created_at >= '{{ ds }}'
GROUP BY 1
ORDER BY 1;
""",
filename='daily_query.sql',
title='Daily User Events Query',
snippet_type='sql',
method_version='v2'
)Posts messages using Slack Incoming Webhooks, providing a simpler integration method without requiring a full Slack app.
class SlackWebhookOperator(BaseOperator):
"""
Posts messages to Slack using Incoming Webhooks.
Simpler alternative to API operators that doesn't require
a full Slack app, just webhook configuration.
"""
template_fields = ("message", "attachments", "blocks", "channel", "username", "proxy")
def __init__(
self,
*,
slack_webhook_conn_id,
message: str = "",
attachments: list | None = None,
blocks: list | None = None,
channel: str | None = None,
username: str | None = None,
icon_emoji: str | None = None,
icon_url: str | None = None,
proxy: str | None = None,
timeout: int | None = None,
retry_handlers: list[RetryHandler] | None = None,
**kwargs,
) -> None:
"""
Initialize SlackWebhookOperator.
Args:
slack_webhook_conn_id: Airflow connection ID for webhook
message: Message text content
attachments: Legacy message attachments
blocks: Block Kit blocks for rich formatting
channel: Override default webhook channel
username: Override default webhook username
icon_emoji: Custom emoji icon (e.g., ':robot_face:')
icon_url: Custom icon URL
proxy: Proxy server URL
timeout: Request timeout in seconds
retry_handlers: Custom retry handlers
**kwargs: Additional operator arguments
"""
@cached_property
def hook(self) -> SlackWebhookHook:
"""Returns SlackWebhookHook instance."""
def execute(self, context: Context) -> None:
"""Execute the webhook operation."""# Simple webhook message
send_webhook = SlackWebhookOperator(
task_id='notify_completion',
slack_webhook_conn_id='slack_webhook_default',
message='Workflow {{ dag.dag_id }} completed successfully! ✅',
username='Airflow Webhook Bot',
icon_emoji=':robot_face:'
)
# Webhook with custom channel override
send_alert = SlackWebhookOperator(
task_id='send_failure_alert',
slack_webhook_conn_id='slack_webhook_default',
message='❌ Task {{ task_instance.task_id }} failed in DAG {{ dag.dag_id }}',
channel='#critical-alerts',
username='Alert Bot'
)
# Webhook with Block Kit blocks
failure_blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "🚨 Task Failure Alert"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*DAG:*\n{{ dag.dag_id }}"
},
{
"type": "mrkdwn",
"text": "*Task:*\n{{ task_instance.task_id }}"
},
{
"type": "mrkdwn",
"text": "*Execution Date:*\n{{ ds }}"
},
{
"type": "mrkdwn",
"text": "*Log URL:*\n<{{ task_instance.log_url }}|View Logs>"
}
]
}
]
send_failure_details = SlackWebhookOperator(
task_id='detailed_failure_alert',
slack_webhook_conn_id='slack_webhook_default',
blocks=failure_blocks,
message='Task failure - see details above'
)Abstract base class for Slack API operators providing common functionality and interface.
class SlackAPIOperator(BaseOperator, ABC):
"""
Abstract base class for Slack API operators.
Provides common functionality for API authentication,
connection management, and error handling.
"""
def __init__(
self,
*,
slack_conn_id: str = SlackHook.default_conn_name,
method: str,
api_params: dict | None = None,
base_url: str | None = None,
proxy: str | None = None,
timeout: int | None = None,
retry_handlers: list[RetryHandler] | None = None,
**kwargs,
) -> None:
"""
Initialize base Slack API operator.
Args:
slack_conn_id: Airflow connection ID
method: Slack API method name
api_params: Additional API parameters
base_url: Custom Slack API base URL
proxy: Proxy server URL
timeout: Request timeout in seconds
retry_handlers: Custom retry handlers
**kwargs: Additional operator arguments
"""
@cached_property
def hook(self) -> SlackHook:
"""Returns SlackHook instance."""
@abstractmethod
def construct_api_call_params(self) -> Any:
"""Construct API call parameters (must be implemented by subclasses)."""
def execute(self, context: Context):
"""Execute the Slack API operation."""All operators support Airflow templating for dynamic content:
username: Bot usernametext: Message textattachments: Message attachmentsblocks: Block Kit blockschannel: Target channelchannels: Target channelsinitial_comment: File commentfilename: File pathfiletype: File typecontent: File contenttitle: File titlesnippet_type: Syntax highlightingmessage: Message textattachments: Message attachmentsblocks: Block Kit blockschannel: Target channelusername: Bot usernameproxy: Proxy URLAll operators include comprehensive error handling: