Core connection and communication interfaces for Slack services. Hooks provide authenticated access to Slack Web API and Incoming Webhooks with comprehensive method coverage, connection management, and error handling.
Provides authenticated access to Slack's Web API using the slack_sdk.WebClient, supporting all Slack API methods including messaging, file uploads, channel management, and user operations.
class SlackHook(BaseHook):
"""Hook for Slack Web API operations using slack_sdk.WebClient."""
conn_name_attr = "slack_conn_id"
default_conn_name = "slack_api_default"
conn_type = "slack"
hook_name = "Slack API"
def __init__(
self,
*,
slack_conn_id: str = "slack_api_default",
base_url: str | None = None,
timeout: int | None = None,
proxy: str | None = None,
retry_handlers: list[RetryHandler] | None = None,
**extra_client_args: Any,
) -> None:
"""
Initialize SlackHook.
Args:
slack_conn_id: Airflow connection ID for Slack API
base_url: Custom Slack API base URL
timeout: Request timeout in seconds
proxy: Proxy server URL
retry_handlers: Custom retry handlers for requests
**extra_client_args: Additional arguments for WebClient
"""
@cached_property
def client(self) -> WebClient:
"""Returns the underlying slack_sdk.WebClient instance."""
def get_conn(self) -> WebClient:
"""Returns the underlying slack_sdk.WebClient instance."""
def call(self, api_method: str, **kwargs) -> SlackResponse:
"""
Call any Slack Web API method.
Args:
api_method: Slack API method name (e.g., 'chat.postMessage')
**kwargs: API method parameters
Returns:
SlackResponse: API response dictionary
"""
def send_file_v2(
self,
*,
channel_id: str | None = None,
file_uploads: FileUploadTypeDef | list[FileUploadTypeDef],
initial_comment: str | None = None
) -> SlackResponse:
"""
Send files using Slack's files.upload_v2 API.
Args:
channel_id: Channel ID to upload to
file_uploads: File upload specifications
initial_comment: Comment to add with files
Returns:
SlackResponse: Upload response
"""
def send_file_v1_to_v2(
self,
*,
channels: str | Sequence[str] | None = None,
file: str | Path | None = None,
content: str | None = None,
filename: str | None = None,
initial_comment: str | None = None,
title: str | None = None,
snippet_type: str | None = None
) -> list[SlackResponse]:
"""
Compatibility method for transitioning from files.upload v1 to v2.
Args:
channels: Channel names or IDs
file: File path to upload
content: File content as string
filename: Display filename
initial_comment: Comment to add with file
title: File title
snippet_type: Syntax highlighting type
Returns:
list[SlackResponse]: List of upload responses
"""
def get_channel_id(self, channel_name: str) -> str:
"""
Get channel ID from channel name.
Args:
channel_name: Channel name (with or without #)
Returns:
str: Channel ID
Raises:
SlackApiError: If channel not found
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Slack API connection.
Returns:
tuple[bool, str]: (success, message)
"""
@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Return connection form widgets for Airflow UI."""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom UI field behavior for connections."""# Basic API call
hook = SlackHook(slack_conn_id='my_slack_conn')
response = hook.call('chat.postMessage',
channel='#general',
text='Hello from Airflow!')
# File upload
hook.send_file_v2(
channel_id='C1234567890',
file_uploads=[{
'file': '/path/to/report.csv',
'title': 'Daily Report',
'initial_comment': 'Here is today\'s report'
}]
)
# Get channel ID
channel_id = hook.get_channel_id('#general')Wrapper around slack_sdk.WebhookClient for Slack Incoming Webhooks, providing both synchronous and asynchronous message sending capabilities.
class SlackWebhookHook(BaseHook):
"""Hook for Slack Incoming Webhooks using slack_sdk.WebhookClient."""
conn_name_attr = "slack_webhook_conn_id"
default_conn_name = "slack_default"
conn_type = "slackwebhook"
hook_name = "Slack Incoming Webhook"
def __init__(
self,
*,
slack_webhook_conn_id: str,
timeout: int | None = None,
proxy: str | None = None,
retry_handlers: list[RetryHandler] | None = None,
**extra_client_args: Any,
):
"""
Initialize SlackWebhookHook.
Args:
slack_webhook_conn_id: Airflow connection ID for webhook
timeout: Request timeout in seconds
proxy: Proxy server URL
retry_handlers: Custom retry handlers
**extra_client_args: Additional WebhookClient arguments
"""
@cached_property
def client(self) -> WebhookClient:
"""Returns the underlying WebhookClient instance."""
def get_conn(self) -> WebhookClient:
"""Returns the underlying WebhookClient instance."""
def send_dict(
self,
body: dict[str, Any] | str,
*,
headers: dict[str, str] | None = None
):
"""
Send dictionary or JSON string to webhook.
Args:
body: Message data as dict or JSON string
headers: Additional HTTP headers
"""
def send(
self,
*,
text: str | None = None,
blocks: list[dict[str, Any]] | None = None,
response_type: str | None = None,
replace_original: bool | None = None,
delete_original: bool | None = None,
unfurl_links: bool | None = None,
unfurl_media: bool | None = None,
headers: dict[str, str] | None = None,
attachments: list[dict[str, Any]] | None = None,
**kwargs
):
"""
Send message to Slack webhook.
Args:
text: Message text
blocks: Block Kit blocks
response_type: Response type ('in_channel' or 'ephemeral')
replace_original: Replace original message
delete_original: Delete original message
unfurl_links: Auto-unfurl links
unfurl_media: Auto-unfurl media
headers: Additional HTTP headers
attachments: Legacy message attachments
**kwargs: Additional message parameters
"""
def send_text(
self,
text: str,
*,
unfurl_links: bool | None = None,
unfurl_media: bool | None = None,
headers: dict[str, str] | None = None
):
"""
Send simple text message to webhook.
Args:
text: Message text
unfurl_links: Auto-unfurl links
unfurl_media: Auto-unfurl media
headers: Additional HTTP headers
"""
@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Return connection form widgets for Airflow UI."""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom UI field behavior for connections."""# Basic webhook message
hook = SlackWebhookHook(slack_webhook_conn_id='webhook_conn')
hook.send(text='Process completed successfully!',
channel='#alerts')
# Message with blocks
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Alert:* Process failed"
}
}
]
hook.send(blocks=blocks)
# Simple text message
hook.send_text('Simple text message')def check_webhook_response(func: Callable) -> Callable:
"""
Decorator to validate webhook responses.
Args:
func: Function that makes webhook calls
Returns:
Callable: Decorated function with response validation
"""
async def async_check_webhook_response(func: Callable) -> Callable:
"""
Async decorator to validate webhook responses.
Args:
func: Async function that makes webhook calls
Returns:
Callable: Decorated async function with response validation
"""LEGACY_INTEGRATION_PARAMS: tuple = (
"channel",
"username",
"icon_emoji",
"icon_url"
)xoxb-)# Connection ID: slack_api_default
# Connection Type: slack
# Extra: {"token": "xoxb-your-bot-token-here"}# Connection ID: slack_webhook_default
# Connection Type: slackwebhook
# Host: hooks.slack.com
# Password: /services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXXAll hooks include proper error handling for common scenarios: