or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

hooks.mdindex.mdnotifications.mdoperators.mdtransfers.md
tile.json

hooks.mddocs/

Hooks

Core connection and communication interfaces for Slack services. Hooks provide authenticated access to Slack Web API and Incoming Webhooks with comprehensive method coverage, connection management, and error handling.

Capabilities

Slack Web API Hook

Provides authenticated access to Slack's Web API using the slack_sdk.WebClient, supporting all Slack API methods including messaging, file uploads, channel management, and user operations.

class SlackHook(BaseHook):
    """Hook for Slack Web API operations using slack_sdk.WebClient."""
    
    conn_name_attr = "slack_conn_id"
    default_conn_name = "slack_api_default"
    conn_type = "slack"
    hook_name = "Slack API"
    
    def __init__(
        self,
        *,
        slack_conn_id: str = "slack_api_default",
        base_url: str | None = None,
        timeout: int | None = None,
        proxy: str | None = None,
        retry_handlers: list[RetryHandler] | None = None,
        **extra_client_args: Any,
    ) -> None:
        """
        Initialize SlackHook.
        
        Args:
            slack_conn_id: Airflow connection ID for Slack API
            base_url: Custom Slack API base URL
            timeout: Request timeout in seconds
            proxy: Proxy server URL
            retry_handlers: Custom retry handlers for requests
            **extra_client_args: Additional arguments for WebClient
        """
        
    @cached_property
    def client(self) -> WebClient:
        """Returns the underlying slack_sdk.WebClient instance."""
        
    def get_conn(self) -> WebClient:
        """Returns the underlying slack_sdk.WebClient instance."""
        
    def call(self, api_method: str, **kwargs) -> SlackResponse:
        """
        Call any Slack Web API method.
        
        Args:
            api_method: Slack API method name (e.g., 'chat.postMessage')
            **kwargs: API method parameters
            
        Returns:
            SlackResponse: API response dictionary
        """
        
    def send_file_v2(
        self,
        *,
        channel_id: str | None = None,
        file_uploads: FileUploadTypeDef | list[FileUploadTypeDef],
        initial_comment: str | None = None
    ) -> SlackResponse:
        """
        Send files using Slack's files.upload_v2 API.
        
        Args:
            channel_id: Channel ID to upload to
            file_uploads: File upload specifications
            initial_comment: Comment to add with files
            
        Returns:
            SlackResponse: Upload response
        """
        
    def send_file_v1_to_v2(
        self,
        *,
        channels: str | Sequence[str] | None = None,
        file: str | Path | None = None,
        content: str | None = None,
        filename: str | None = None,
        initial_comment: str | None = None,
        title: str | None = None,
        snippet_type: str | None = None
    ) -> list[SlackResponse]:
        """
        Compatibility method for transitioning from files.upload v1 to v2.
        
        Args:
            channels: Channel names or IDs
            file: File path to upload
            content: File content as string
            filename: Display filename
            initial_comment: Comment to add with file
            title: File title
            snippet_type: Syntax highlighting type
            
        Returns:
            list[SlackResponse]: List of upload responses
        """
        
    def get_channel_id(self, channel_name: str) -> str:
        """
        Get channel ID from channel name.
        
        Args:
            channel_name: Channel name (with or without #)
            
        Returns:
            str: Channel ID
            
        Raises:
            SlackApiError: If channel not found
        """
        
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Slack API connection.
        
        Returns:
            tuple[bool, str]: (success, message)
        """
        
    @classmethod
    def get_connection_form_widgets(cls) -> dict[str, Any]:
        """Return connection form widgets for Airflow UI."""
        
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """Return custom UI field behavior for connections."""

Usage Examples

# Basic API call
hook = SlackHook(slack_conn_id='my_slack_conn')
response = hook.call('chat.postMessage', 
                    channel='#general', 
                    text='Hello from Airflow!')

# File upload
hook.send_file_v2(
    channel_id='C1234567890',
    file_uploads=[{
        'file': '/path/to/report.csv',
        'title': 'Daily Report',
        'initial_comment': 'Here is today\'s report'
    }]
)

# Get channel ID
channel_id = hook.get_channel_id('#general')

Slack Webhook Hook

Wrapper around slack_sdk.WebhookClient for Slack Incoming Webhooks, providing both synchronous and asynchronous message sending capabilities.

class SlackWebhookHook(BaseHook):
    """Hook for Slack Incoming Webhooks using slack_sdk.WebhookClient."""
    
    conn_name_attr = "slack_webhook_conn_id"
    default_conn_name = "slack_default"
    conn_type = "slackwebhook"
    hook_name = "Slack Incoming Webhook"
    
    def __init__(
        self,
        *,
        slack_webhook_conn_id: str,
        timeout: int | None = None,
        proxy: str | None = None,
        retry_handlers: list[RetryHandler] | None = None,
        **extra_client_args: Any,
    ):
        """
        Initialize SlackWebhookHook.
        
        Args:
            slack_webhook_conn_id: Airflow connection ID for webhook
            timeout: Request timeout in seconds
            proxy: Proxy server URL  
            retry_handlers: Custom retry handlers
            **extra_client_args: Additional WebhookClient arguments
        """
        
    @cached_property
    def client(self) -> WebhookClient:
        """Returns the underlying WebhookClient instance."""
        
    def get_conn(self) -> WebhookClient:
        """Returns the underlying WebhookClient instance."""
        
    def send_dict(
        self, 
        body: dict[str, Any] | str, 
        *, 
        headers: dict[str, str] | None = None
    ):
        """
        Send dictionary or JSON string to webhook.
        
        Args:
            body: Message data as dict or JSON string
            headers: Additional HTTP headers
        """
        
    def send(
        self,
        *,
        text: str | None = None,
        blocks: list[dict[str, Any]] | None = None,
        response_type: str | None = None,
        replace_original: bool | None = None,
        delete_original: bool | None = None,
        unfurl_links: bool | None = None,
        unfurl_media: bool | None = None,
        headers: dict[str, str] | None = None,
        attachments: list[dict[str, Any]] | None = None,
        **kwargs
    ):
        """
        Send message to Slack webhook.
        
        Args:
            text: Message text
            blocks: Block Kit blocks
            response_type: Response type ('in_channel' or 'ephemeral')
            replace_original: Replace original message
            delete_original: Delete original message
            unfurl_links: Auto-unfurl links
            unfurl_media: Auto-unfurl media
            headers: Additional HTTP headers
            attachments: Legacy message attachments
            **kwargs: Additional message parameters
        """
        
    def send_text(
        self, 
        text: str, 
        *, 
        unfurl_links: bool | None = None,
        unfurl_media: bool | None = None,
        headers: dict[str, str] | None = None
    ):
        """
        Send simple text message to webhook.
        
        Args:
            text: Message text
            unfurl_links: Auto-unfurl links
            unfurl_media: Auto-unfurl media  
            headers: Additional HTTP headers
        """
        
    @classmethod
    def get_connection_form_widgets(cls) -> dict[str, Any]:
        """Return connection form widgets for Airflow UI."""
        
    @classmethod
    def get_ui_field_behaviour(cls) -> dict[str, Any]:
        """Return custom UI field behavior for connections."""

Usage Examples

# Basic webhook message
hook = SlackWebhookHook(slack_webhook_conn_id='webhook_conn')
hook.send(text='Process completed successfully!', 
          channel='#alerts')

# Message with blocks
blocks = [
    {
        "type": "section",
        "text": {
            "type": "mrkdwn",
            "text": "*Alert:* Process failed"
        }
    }
]
hook.send(blocks=blocks)

# Simple text message
hook.send_text('Simple text message')

Utility Functions

def check_webhook_response(func: Callable) -> Callable:
    """
    Decorator to validate webhook responses.
    
    Args:
        func: Function that makes webhook calls
        
    Returns:
        Callable: Decorated function with response validation
    """

async def async_check_webhook_response(func: Callable) -> Callable:
    """
    Async decorator to validate webhook responses.
    
    Args:
        func: Async function that makes webhook calls
        
    Returns:
        Callable: Decorated async function with response validation
    """

Constants

LEGACY_INTEGRATION_PARAMS: tuple = (
    "channel",
    "username", 
    "icon_emoji",
    "icon_url"
)

Connection Configuration

Slack API Connection Setup

  1. Create a Slack App at https://api.slack.com/apps
  2. Install the app to your workspace
  3. Copy the Bot User OAuth Token (starts with xoxb-)
  4. Create Airflow connection:
# Connection ID: slack_api_default
# Connection Type: slack
# Extra: {"token": "xoxb-your-bot-token-here"}

Slack Webhook Connection Setup

  1. Create an Incoming Webhook in your Slack App
  2. Copy the webhook URL
  3. Create Airflow connection:
# Connection ID: slack_webhook_default
# Connection Type: slackwebhook  
# Host: hooks.slack.com
# Password: /services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX

Error Handling

All hooks include proper error handling for common scenarios:

  • Connection failures: Automatic retry with configurable handlers
  • API rate limits: Built-in rate limit handling from slack_sdk
  • Invalid tokens: Clear error messages for authentication issues
  • Network timeouts: Configurable timeout settings
  • Channel not found: Specific error handling for channel operations