or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

hooks.mdindex.mdnotifications.mdoperators.mdtransfers.md
tile.json

operators.mddocs/

Operators

Task execution components for posting messages, uploading files, and sending webhook notifications within Airflow workflows. All operators support Airflow's templating system and integrate with the task execution framework.

Capabilities

Slack API Post Operator

Posts messages to Slack channels using the Slack Web API with support for rich formatting, attachments, and Block Kit.

class SlackAPIPostOperator(SlackAPIOperator):
    """
    Posts messages to Slack channel using Slack Web API.
    
    Supports rich text formatting, attachments, Block Kit blocks,
    and custom message styling.
    """
    
    template_fields = ("username", "text", "attachments", "blocks", "channel")
    ui_color = "#FFBA40"
    
    def __init__(
        self,
        channel: str = "#general",
        username: str = "Airflow",
        text: str = (
            "No message has been set.\n"
            "Here is a cat video instead\n"
            "https://www.youtube.com/watch?v=J---aiyznGQ"
        ),
        icon_url: str = "https://raw.githubusercontent.com/apache/airflow/main/airflow-core/src/airflow/ui/public/pin_100.png",
        blocks: list | None = None,
        attachments: list | None = None,
        slack_conn_id: str = SlackHook.default_conn_name,
        method: str = "chat.postMessage",
        api_params: dict | None = None,
        base_url: str | None = None,
        proxy: str | None = None,
        timeout: int | None = None,
        retry_handlers: list[RetryHandler] | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize SlackAPIPostOperator.
        
        Args:
            channel: Slack channel name or ID
            username: Bot username for the message  
            text: Message text content
            icon_url: URL for bot icon
            blocks: Block Kit blocks for rich formatting
            attachments: Legacy message attachments
            slack_conn_id: Airflow connection ID
            method: Slack API method (default: chat.postMessage)
            api_params: Additional API parameters
            base_url: Custom Slack API base URL
            proxy: Proxy server URL
            timeout: Request timeout in seconds
            retry_handlers: Custom retry handlers
            **kwargs: Additional operator arguments
        """
        
    def construct_api_call_params(self) -> Any:
        """Construct parameters for chat.postMessage API call."""

Usage Examples

# Simple text message
send_message = SlackAPIPostOperator(
    task_id='send_alert',
    slack_conn_id='slack_api_default',
    channel='#alerts',
    text='Task {{ task_instance.task_id }} completed at {{ ts }}',
    username='Airflow Alert Bot'
)

# Message with Block Kit formatting
blocks = [
    {
        "type": "section",
        "text": {
            "type": "mrkdwn", 
            "text": "*Task Status Update*\n:white_check_mark: Task completed successfully"
        }
    },
    {
        "type": "context",
        "elements": [
            {
                "type": "mrkdwn",
                "text": "DAG: {{ dag.dag_id }} | Task: {{ task_instance.task_id }}"
            }
        ]
    }
]

send_rich_message = SlackAPIPostOperator(
    task_id='send_rich_alert',
    channel='#general',
    blocks=blocks,
    text='Fallback text for notifications'
)

# Using legacy attachments
attachments = [
    {
        "color": "good",
        "title": "Process Completed",
        "text": "All data processing tasks finished successfully",
        "fields": [
            {"title": "Duration", "value": "45 minutes", "short": True},
            {"title": "Records", "value": "10,543", "short": True}
        ]
    }
]

send_attachment = SlackAPIPostOperator(
    task_id='send_report',
    channel='#reports',
    text='Daily processing report',
    attachments=attachments
)

Slack API File Operator

Uploads files to Slack channels with support for multiple file formats, custom metadata, and both file paths and content strings.

class SlackAPIFileOperator(SlackAPIOperator):
    """
    Uploads files to Slack channels using Slack Web API.
    
    Supports file uploads from local paths or string content,
    with customizable metadata and multiple channel targeting.
    """
    
    template_fields = ("channels", "initial_comment", "filename", "filetype", "content", "title", "snippet_type")
    ui_color = "#44BEDF"
    
    def __init__(
        self,
        channels: str | Sequence[str] | None = None,
        initial_comment: str | None = None,
        filename: str | None = None,
        filetype: str | None = None,
        content: str | None = None,
        title: str | None = None,
        method_version: Literal["v1", "v2"] | None = None,
        snippet_type: str | None = None,
        slack_conn_id: str = SlackHook.default_conn_name,
        method: str = "files.upload",
        api_params: dict | None = None,
        base_url: str | None = None,
        proxy: str | None = None,
        timeout: int | None = None,
        retry_handlers: list[RetryHandler] | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize SlackAPIFileOperator.
        
        Args:
            channels: Channel names or IDs for file upload
            initial_comment: Comment to add with the file
            filename: Local file path to upload
            filetype: File type hint for Slack
            content: File content as string (alternative to filename)
            title: Display title for the file
            method_version: API version ("v1" or "v2")
            snippet_type: Syntax highlighting type for code files
            slack_conn_id: Airflow connection ID
            method: Slack API method (files.upload or files.upload_v2)
            api_params: Additional API parameters
            base_url: Custom Slack API base URL
            proxy: Proxy server URL
            timeout: Request timeout in seconds
            retry_handlers: Custom retry handlers
            **kwargs: Additional operator arguments
        """
        
    def execute(self, context: Context):
        """Execute the file upload operation."""

Usage Examples

# Upload file from local path
upload_report = SlackAPIFileOperator(
    task_id='upload_daily_report',
    slack_conn_id='slack_api_default',
    channels=['#reports', '#management'],
    filename='/tmp/daily_report_{{ ds }}.csv',
    title='Daily Sales Report - {{ ds }}',
    initial_comment='Here is the daily sales report for {{ ds }}',
    filetype='csv'
)

# Upload content as file
upload_log = SlackAPIFileOperator(
    task_id='upload_error_log',
    channels='#alerts',
    content='{{ task_instance.log }}',
    filename='error_log_{{ ts_nodash }}.txt',
    title='Task Error Log',
    initial_comment='Error log for failed task {{ task_instance.task_id }}',
    snippet_type='text'
)

# Upload code snippet with syntax highlighting
upload_code = SlackAPIFileOperator(
    task_id='share_sql_query',
    channels='#data-team',
    content="""
    SELECT 
        date_trunc('day', created_at) as date,
        count(*) as daily_count
    FROM user_events 
    WHERE created_at >= '{{ ds }}'
    GROUP BY 1
    ORDER BY 1;
    """,
    filename='daily_query.sql',
    title='Daily User Events Query',
    snippet_type='sql',
    method_version='v2'
)

Slack Webhook Operator

Posts messages using Slack Incoming Webhooks, providing a simpler integration method without requiring a full Slack app.

class SlackWebhookOperator(BaseOperator):
    """
    Posts messages to Slack using Incoming Webhooks.
    
    Simpler alternative to API operators that doesn't require
    a full Slack app, just webhook configuration.
    """
    
    template_fields = ("message", "attachments", "blocks", "channel", "username", "proxy")
    
    def __init__(
        self,
        *,
        slack_webhook_conn_id,
        message: str = "",
        attachments: list | None = None,
        blocks: list | None = None,
        channel: str | None = None,
        username: str | None = None,
        icon_emoji: str | None = None,
        icon_url: str | None = None,
        proxy: str | None = None,
        timeout: int | None = None,
        retry_handlers: list[RetryHandler] | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize SlackWebhookOperator.
        
        Args:
            slack_webhook_conn_id: Airflow connection ID for webhook
            message: Message text content
            attachments: Legacy message attachments
            blocks: Block Kit blocks for rich formatting
            channel: Override default webhook channel
            username: Override default webhook username
            icon_emoji: Custom emoji icon (e.g., ':robot_face:')
            icon_url: Custom icon URL
            proxy: Proxy server URL
            timeout: Request timeout in seconds
            retry_handlers: Custom retry handlers
            **kwargs: Additional operator arguments
        """
        
    @cached_property
    def hook(self) -> SlackWebhookHook:
        """Returns SlackWebhookHook instance."""
        
    def execute(self, context: Context) -> None:
        """Execute the webhook operation."""

Usage Examples

# Simple webhook message
send_webhook = SlackWebhookOperator(
    task_id='notify_completion',
    slack_webhook_conn_id='slack_webhook_default',
    message='Workflow {{ dag.dag_id }} completed successfully! ✅',
    username='Airflow Webhook Bot',
    icon_emoji=':robot_face:'
)

# Webhook with custom channel override
send_alert = SlackWebhookOperator(
    task_id='send_failure_alert',
    slack_webhook_conn_id='slack_webhook_default',
    message='❌ Task {{ task_instance.task_id }} failed in DAG {{ dag.dag_id }}',
    channel='#critical-alerts',
    username='Alert Bot'
)

# Webhook with Block Kit blocks
failure_blocks = [
    {
        "type": "header",
        "text": {
            "type": "plain_text",
            "text": "🚨 Task Failure Alert"
        }
    },
    {
        "type": "section",
        "fields": [
            {
                "type": "mrkdwn",
                "text": "*DAG:*\n{{ dag.dag_id }}"
            },
            {
                "type": "mrkdwn", 
                "text": "*Task:*\n{{ task_instance.task_id }}"
            },
            {
                "type": "mrkdwn",
                "text": "*Execution Date:*\n{{ ds }}"
            },
            {
                "type": "mrkdwn",
                "text": "*Log URL:*\n<{{ task_instance.log_url }}|View Logs>"
            }
        ]
    }
]

send_failure_details = SlackWebhookOperator(
    task_id='detailed_failure_alert',
    slack_webhook_conn_id='slack_webhook_default',
    blocks=failure_blocks,
    message='Task failure - see details above'
)

Base Slack API Operator

Abstract base class for Slack API operators providing common functionality and interface.

class SlackAPIOperator(BaseOperator, ABC):
    """
    Abstract base class for Slack API operators.
    
    Provides common functionality for API authentication,
    connection management, and error handling.
    """
    
    def __init__(
        self,
        *,
        slack_conn_id: str = SlackHook.default_conn_name,
        method: str,
        api_params: dict | None = None,
        base_url: str | None = None,
        proxy: str | None = None,
        timeout: int | None = None,
        retry_handlers: list[RetryHandler] | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize base Slack API operator.
        
        Args:
            slack_conn_id: Airflow connection ID
            method: Slack API method name
            api_params: Additional API parameters
            base_url: Custom Slack API base URL
            proxy: Proxy server URL
            timeout: Request timeout in seconds
            retry_handlers: Custom retry handlers
            **kwargs: Additional operator arguments
        """
        
    @cached_property
    def hook(self) -> SlackHook:
        """Returns SlackHook instance."""
        
    @abstractmethod
    def construct_api_call_params(self) -> Any:
        """Construct API call parameters (must be implemented by subclasses)."""
        
    def execute(self, context: Context):
        """Execute the Slack API operation."""

Template Fields

All operators support Airflow templating for dynamic content:

SlackAPIPostOperator Template Fields

  • username: Bot username
  • text: Message text
  • attachments: Message attachments
  • blocks: Block Kit blocks
  • channel: Target channel

SlackAPIFileOperator Template Fields

  • channels: Target channels
  • initial_comment: File comment
  • filename: File path
  • filetype: File type
  • content: File content
  • title: File title
  • snippet_type: Syntax highlighting

SlackWebhookOperator Template Fields

  • message: Message text
  • attachments: Message attachments
  • blocks: Block Kit blocks
  • channel: Target channel
  • username: Bot username
  • proxy: Proxy URL

Error Handling

All operators include comprehensive error handling:

  • Connection failures: Automatic retry with exponential backoff
  • API rate limits: Built-in rate limit handling
  • Invalid parameters: Clear validation error messages
  • Network timeouts: Configurable timeout with fallback
  • Authentication errors: Detailed error messages for token issues