Slack services integration for Apache Airflow including Slack API and Slack Incoming Webhook support
npx @tessl/cli install tessl/pypi-apache-airflow-providers-slack@9.2.0A comprehensive Slack integration library for Apache Airflow that enables seamless communication and notification through Slack services. This provider supports both Slack Web API and Incoming Webhooks, offering hooks for connections, operators for task execution, transfer operators for data movement, and notification classes for workflow status updates.
pip install apache-airflow-providers-slackfrom airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.providers.slack.operators.slack import SlackAPIPostOperator, SlackAPIFileOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifierfrom airflow import DAG
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from datetime import datetime
with DAG(
'slack_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
) as dag:
send_slack_message = SlackAPIPostOperator(
task_id='send_message',
slack_conn_id='slack_api_default',
channel='#general',
text='Hello from Airflow! 🚀',
username='Airflow Bot',
)from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
send_webhook_message = SlackWebhookOperator(
task_id='send_webhook',
slack_webhook_conn_id='slack_webhook_default',
message='Workflow completed successfully!',
channel='#alerts',
)from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackApiFileOperator
sql_to_slack = SqlToSlackApiFileOperator(
task_id='send_report',
sql="SELECT * FROM sales WHERE date = '{{ ds }}'",
sql_conn_id='postgres_default',
slack_conn_id='slack_api_default',
slack_filename='daily_sales_report.csv',
slack_channels=['#reports'],
slack_initial_comment='Daily sales report attached 📊',
)The Apache Airflow Providers Slack package follows Airflow's extensible architecture with four main component types:
All components support Airflow's templating system, connection management, and error handling patterns.
Core connection and communication interfaces for Slack services, providing authenticated access to Slack Web API and Incoming Webhooks with comprehensive method coverage.
class SlackHook(BaseHook):
def __init__(
self,
*,
slack_conn_id: str = "slack_api_default",
base_url: str | None = None,
timeout: int | None = None,
proxy: str | None = None,
retry_handlers: list[RetryHandler] | None = None,
**extra_client_args: Any,
) -> None: ...
def call(self, api_method: str, **kwargs) -> SlackResponse: ...
def send_file_v2(
self,
*,
channel_id: str | None = None,
file_uploads: FileUploadTypeDef | list[FileUploadTypeDef],
initial_comment: str | None = None
) -> SlackResponse: ...
def get_channel_id(self, channel_name: str) -> str: ...
class SlackWebhookHook(BaseHook):
def __init__(
self,
*,
slack_webhook_conn_id: str,
timeout: int | None = None,
proxy: str | None = None,
retry_handlers: list[RetryHandler] | None = None,
**extra_client_args: Any,
): ...
def send(
self,
*,
text: str | None = None,
blocks: list[dict[str, Any]] | None = None,
attachments: list[dict[str, Any]] | None = None,
**kwargs
): ...Task execution components for posting messages, uploading files, and sending webhook notifications within Airflow workflows.
class SlackAPIPostOperator(SlackAPIOperator):
def __init__(
self,
channel: str = "#general",
username: str = "Airflow",
text: str = "No message has been set...",
icon_url: str = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/src/airflow/ui/public/pin_100.png",
blocks: list | None = None,
attachments: list | None = None,
**kwargs,
) -> None: ...
class SlackAPIFileOperator(SlackAPIOperator):
def __init__(
self,
channels: str | Sequence[str] | None = None,
initial_comment: str | None = None,
filename: str | None = None,
content: str | None = None,
title: str | None = None,
method_version: Literal["v1", "v2"] | None = None,
**kwargs,
) -> None: ...
class SlackWebhookOperator(BaseOperator):
def __init__(
self,
*,
slack_webhook_conn_id,
message: str = "",
blocks: list | None = None,
channel: str | None = None,
username: str | None = None,
**kwargs,
) -> None: ...Data transfer operators for executing SQL queries and sending results to Slack channels in various formats (CSV, JSON, HTML).
class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
def __init__(
self,
*,
sql: str,
sql_conn_id: str,
slack_conn_id: str = SlackHook.default_conn_name,
slack_filename: str,
slack_channels: str | Sequence[str] | None = None,
slack_initial_comment: str | None = None,
slack_method_version: Literal["v1", "v2"] | None = None,
df_kwargs: dict | None = None,
action_on_empty_df: Literal["send", "skip", "error"] = "send",
**kwargs,
): ...
class SqlToSlackWebhookOperator(BaseSqlToSlackOperator):
def __init__(
self,
*,
sql: str,
sql_conn_id: str,
slack_webhook_conn_id: str | None = None,
slack_channel: str | None = None,
slack_message: str,
results_df_name: str = "results_df",
**kwargs,
) -> None: ...Integration with Airflow's notification system for sending alerts and status updates to Slack channels and webhooks.
class SlackNotifier(BaseNotifier):
def __init__(
self,
*,
slack_conn_id: str = SlackHook.default_conn_name,
text: str = "This is a default message",
channel: str = "#general",
username: str = "Airflow",
attachments: Sequence = (),
blocks: Sequence = (),
**kwargs,
): ...
def notify(self, context): ...
class SlackWebhookNotifier(BaseNotifier):
def __init__(
self,
*,
slack_webhook_conn_id: str = SlackWebhookHook.default_conn_name,
text: str,
blocks: list | None = None,
attachments: list | None = None,
**kwargs,
): ...
def notify(self, context): ...
async def async_notify(self, context): ...# Connection ID: slack_api_default
# Connection Type: slack
# Host: (leave empty)
# Extra: {"token": "xoxb-your-bot-token"}# Connection ID: slack_webhook_default
# Connection Type: slackwebhook
# Host: hooks.slack.com
# Password: /services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXXfrom typing import TypedDict, Optional
class FileUploadTypeDef(TypedDict, total=False):
file: Optional[str] # Path to file
content: Optional[str] # File contents as string
filename: Optional[str] # Display filename
title: Optional[str] # File title
alt_txt: Optional[str] # Alt text for images
snippet_type: Optional[str] # Syntax highlighting type
# Slack response types (from slack_sdk)
SlackResponse = dict[str, Any]