or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

hooks.mdindex.mdnotifications.mdoperators.mdtransfers.md
tile.json

transfers.mddocs/

Transfers

Data transfer operators for executing SQL queries and sending results to Slack channels in various formats. These operators bridge the gap between database systems and Slack communication, enabling automated data reporting and alerting.

Capabilities

SQL to Slack API File Transfer

Executes SQL queries and uploads results as files to Slack channels using the Slack Web API. Supports multiple output formats (CSV, JSON, HTML) and advanced file handling options.

class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
    """
    Execute SQL query and send results as file to Slack via API.
    
    Supports CSV, JSON, and HTML output formats with customizable
    file metadata and multiple channel targeting.
    """
    
    template_fields = ("sql", "slack_channels", "slack_filename", "slack_initial_comment", "slack_title")
    template_ext = (".sql", ".jinja", ".j2")
    template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
    SUPPORTED_FILE_FORMATS = ("csv", "json", "html")
    
    def __init__(
        self,
        *,
        sql: str,
        sql_conn_id: str,
        sql_hook_params: dict | None = None,
        parameters: list | tuple | Mapping[str, Any] | None = None,
        slack_conn_id: str = SlackHook.default_conn_name,
        slack_filename: str,
        slack_channels: str | Sequence[str] | None = None,
        slack_initial_comment: str | None = None,
        slack_title: str | None = None,
        slack_base_url: str | None = None,
        slack_method_version: Literal["v1", "v2"] | None = None,
        df_kwargs: dict | None = None,
        action_on_empty_df: Literal["send", "skip", "error"] = "send",
        slack_proxy: str | None = None,
        slack_timeout: int | None = None,
        slack_retry_handlers: list[RetryHandler] | None = None,
        **kwargs,
    ):
        """
        Initialize SQL to Slack API file transfer operator.
        
        Args:
            sql: SQL query to execute
            sql_conn_id: Airflow connection ID for database
            sql_hook_params: Additional parameters for SQL hook
            parameters: Query parameters for SQL execution
            slack_conn_id: Airflow connection ID for Slack API
            slack_filename: Name for uploaded file
            slack_channels: Channel names or IDs for file upload
            slack_initial_comment: Comment to add with file
            slack_title: Display title for file
            slack_base_url: Custom Slack API base URL
            slack_method_version: File upload API version ("v1" or "v2")
            df_kwargs: Additional pandas DataFrame arguments
            action_on_empty_df: Action when query returns no results
            slack_proxy: Proxy server URL for Slack requests
            slack_timeout: Request timeout for Slack API
            slack_retry_handlers: Custom retry handlers for Slack
            **kwargs: Additional operator arguments
        """
        
    @cached_property
    def slack_hook(self) -> SlackHook:
        """Returns SlackHook instance."""
        
    def execute(self, context: Context) -> None:
        """Execute the SQL query and upload results to Slack."""

Usage Examples

# Daily sales report as CSV
daily_sales_report = SqlToSlackApiFileOperator(
    task_id='send_daily_sales',
    sql="""
        SELECT 
            DATE(order_date) as date,
            COUNT(*) as total_orders,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value
        FROM orders 
        WHERE DATE(order_date) = '{{ ds }}'
        GROUP BY DATE(order_date)
    """,
    sql_conn_id='postgres_default',
    slack_conn_id='slack_api_default',
    slack_filename='daily_sales_{{ ds }}.csv',
    slack_channels=['#sales-reports', '#management'],
    slack_title='Daily Sales Report - {{ ds }}',
    slack_initial_comment='📊 Daily sales metrics for {{ ds }}',
    df_kwargs={'index': False}
)

# Error report as JSON with conditional sending
error_report = SqlToSlackApiFileOperator(
    task_id='send_error_report',
    sql="""
        SELECT 
            error_type,
            COUNT(*) as error_count,
            MIN(created_at) as first_occurrence,
            MAX(created_at) as last_occurrence
        FROM error_logs 
        WHERE DATE(created_at) = '{{ ds }}'
        GROUP BY error_type
        HAVING COUNT(*) > 5
    """,
    sql_conn_id='mysql_default',
    slack_filename='errors_{{ ds }}.json',
    slack_channels='#alerts',
    slack_title='High-Frequency Errors - {{ ds }}',
    slack_initial_comment='🚨 Errors occurring more than 5 times today',
    action_on_empty_df='skip',  # Don't send if no high-frequency errors
    df_kwargs={'orient': 'records', 'date_format': 'iso'}
)

# User analytics as HTML table
user_analytics = SqlToSlackApiFileOperator(
    task_id='send_user_analytics',
    sql="""
        SELECT 
            user_segment,
            COUNT(DISTINCT user_id) as active_users,
            AVG(session_duration) as avg_session_minutes,
            SUM(page_views) as total_page_views
        FROM user_analytics 
        WHERE DATE(session_date) = '{{ ds }}'
        GROUP BY user_segment
        ORDER BY active_users DESC
    """,
    sql_conn_id='redshift_default',
    slack_filename='user_analytics_{{ ds }}.html',
    slack_channels='#product',
    slack_title='Daily User Analytics - {{ ds }}',
    slack_initial_comment='📈 User engagement metrics by segment',
    df_kwargs={'escape': False, 'table_id': 'analytics-table'}
)

# Parameterized query with custom handling
monthly_summary = SqlToSlackApiFileOperator(
    task_id='monthly_summary',
    sql="""
        SELECT 
            product_category,
            SUM(revenue) as total_revenue,
            COUNT(DISTINCT customer_id) as unique_customers
        FROM sales 
        WHERE EXTRACT(YEAR FROM sale_date) = %(year)s
          AND EXTRACT(MONTH FROM sale_date) = %(month)s
        GROUP BY product_category
        ORDER BY total_revenue DESC
    """,
    parameters={'year': 2024, 'month': '{{ macros.ds_format(ds, "%Y-%m-%d", "%m") | int }}'},
    sql_conn_id='postgres_default',
    slack_filename='monthly_summary_{{ macros.ds_format(ds, "%Y-%m-%d", "%Y_%m") }}.csv',
    slack_channels='#finance',
    action_on_empty_df='error',  # Fail if no data found
    slack_method_version='v2'
)

SQL to Slack Webhook Transfer

Executes SQL queries and sends formatted results as messages to Slack Incoming Webhooks. Ideal for alerts, summaries, and formatted data sharing.

class SqlToSlackWebhookOperator(BaseSqlToSlackOperator):
    """
    Execute SQL query and send results to Slack via webhook.
    
    Formats query results into Slack message content with
    templating support for custom message formatting.
    """
    
    template_fields = ("sql", "slack_message")
    template_ext = (".sql", ".jinja", ".j2")
    template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
    times_rendered = 0
    
    def __init__(
        self,
        *,
        sql: str,
        sql_conn_id: str,
        slack_webhook_conn_id: str | None = None,
        sql_hook_params: dict | None = None,
        slack_channel: str | None = None,
        slack_message: str,
        results_df_name: str = "results_df",
        parameters: list | tuple | Mapping[str, Any] | None = None,
        slack_proxy: str | None = None,
        slack_timeout: int | None = None,
        slack_retry_handlers: list[RetryHandler] | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize SQL to Slack webhook transfer operator.
        
        Args:
            sql: SQL query to execute
            sql_conn_id: Airflow connection ID for database
            slack_webhook_conn_id: Airflow connection ID for webhook
            sql_hook_params: Additional parameters for SQL hook
            slack_channel: Override webhook default channel
            slack_message: Message template with access to query results
            results_df_name: Variable name for DataFrame in template context
            parameters: Query parameters for SQL execution
            slack_proxy: Proxy server URL for webhook requests
            slack_timeout: Request timeout for webhook
            slack_retry_handlers: Custom retry handlers for webhook
            **kwargs: Additional operator arguments
        """
        
    def render_template_fields(self, context, jinja_env=None) -> None:
        """Render template fields with query results available."""
        
    def execute(self, context: Context) -> None:
        """Execute SQL query and send formatted results to webhook."""

Usage Examples

# Daily metrics summary
daily_metrics = SqlToSlackWebhookOperator(
    task_id='send_daily_metrics',
    sql="""
        SELECT 
            COUNT(*) as total_orders,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value,
            COUNT(DISTINCT customer_id) as unique_customers
        FROM orders 
        WHERE DATE(created_at) = '{{ ds }}'
    """,
    sql_conn_id='postgres_default',
    slack_webhook_conn_id='slack_webhook_default',
    slack_message="""
📊 *Daily Metrics for {{ ds }}*

• Total Orders: {{ results_df.iloc[0]['total_orders'] }}
• Total Revenue: ${{ "%.2f"|format(results_df.iloc[0]['total_revenue']) }}
• Average Order Value: ${{ "%.2f"|format(results_df.iloc[0]['avg_order_value']) }}
• Unique Customers: {{ results_df.iloc[0]['unique_customers'] }}

{% if results_df.iloc[0]['total_orders'] > 100 %}
🎉 Great day! We exceeded 100 orders!
{% elif results_df.iloc[0]['total_orders'] < 50 %}
⚠️ Low order volume today - investigate potential issues
{% endif %}
    """,
    slack_channel='#daily-reports'
)

# Alert for high error rates
error_alert = SqlToSlackWebhookOperator(
    task_id='error_rate_alert',
    sql="""
        SELECT 
            COUNT(*) as error_count,
            COUNT(*) * 100.0 / (
                SELECT COUNT(*) FROM requests 
                WHERE DATE(created_at) = '{{ ds }}'
            ) as error_rate_percent
        FROM errors 
        WHERE DATE(created_at) = '{{ ds }}'
    """,
    sql_conn_id='mysql_default',
    slack_webhook_conn_id='alerts_webhook',
    slack_message="""
{% set error_count = results_df.iloc[0]['error_count'] %}
{% set error_rate = results_df.iloc[0]['error_rate_percent'] %}

{% if error_rate > 5.0 %}
🚨 *HIGH ERROR RATE ALERT*

Date: {{ ds }}
Error Count: {{ error_count }}
Error Rate: {{ "%.2f"|format(error_rate) }}%

This exceeds our 5% threshold. Immediate investigation required!

<https://monitoring.company.com/errors?date={{ ds }}|View Error Dashboard>
{% elif error_rate > 2.0 %}
⚠️ *Elevated Error Rate*

Date: {{ ds }}
Error Count: {{ error_count }}
Error Rate: {{ "%.2f"|format(error_rate) }}%

Monitor closely for potential issues.
{% else %}
✅ Error rates normal for {{ ds }} ({{ "%.2f"|format(error_rate) }}%)
{% endif %}
    """,
    slack_channel='#alerts'
)

# Top products report with table formatting
top_products = SqlToSlackWebhookOperator(
    task_id='top_products_report',
    sql="""
        SELECT 
            product_name,
            SUM(quantity) as total_sold,
            SUM(revenue) as total_revenue
        FROM product_sales 
        WHERE DATE(sale_date) = '{{ ds }}'
        GROUP BY product_name
        ORDER BY total_revenue DESC
        LIMIT 10
    """,
    sql_conn_id='postgres_default',
    slack_webhook_conn_id='reports_webhook',
    slack_message="""
🏆 *Top 10 Products by Revenue - {{ ds }}*

{% for idx, row in results_df.iterrows() -%} {{ loop.index }}. {{ row['product_name'][:30].ljust(30) }} | ${{ "%8.2f"|format(row['total_revenue']) }} | {{ "%4d"|format(row['total_sold']) }} sold {% endfor %}

Total products analyzed: {{ results_df.shape[0] }}
    """,
    results_df_name='results_df'
)

Base SQL to Slack Operator

Abstract base class providing common functionality for SQL to Slack transfer operations.

class BaseSqlToSlackOperator(BaseOperator, ABC):
    """
    Base class for SQL to Slack transfer operators.
    
    Provides common SQL execution and connection management
    functionality for transferring query results to Slack.
    """
    
    def __init__(
        self,
        *,
        sql: str,
        sql_conn_id: str,
        sql_hook_params: dict | None = None,
        parameters: list | tuple | Mapping[str, Any] | None = None,
        slack_proxy: str | None = None,
        slack_timeout: int | None = None,
        slack_retry_handlers: list[RetryHandler] | None = None,
        **kwargs,
    ):
        """
        Initialize base SQL to Slack operator.
        
        Args:
            sql: SQL query to execute
            sql_conn_id: Airflow connection ID for database
            sql_hook_params: Additional parameters for SQL hook
            parameters: Query parameters for SQL execution
            slack_proxy: Proxy server URL for Slack requests
            slack_timeout: Request timeout for Slack operations
            slack_retry_handlers: Custom retry handlers for Slack
            **kwargs: Additional operator arguments
        """

File Format Support

CSV Format Options

# CSV with custom formatting
df_kwargs = {
    'index': False,           # Exclude row indices
    'sep': ',',              # Column separator  
    'encoding': 'utf-8',     # Text encoding
    'float_format': '%.2f',  # Number formatting
    'date_format': '%Y-%m-%d' # Date formatting
}

JSON Format Options

# JSON with different orientations
df_kwargs = {
    'orient': 'records',      # Array of objects
    'date_format': 'iso',     # ISO date format
    'indent': 2,              # Pretty printing
    'force_ascii': False      # Unicode support
}

HTML Format Options

# HTML table with styling
df_kwargs = {
    'escape': False,          # Allow HTML in data
    'table_id': 'data-table', # CSS table ID
    'classes': 'table table-striped', # CSS classes
    'border': 0               # Table border width
}

Error Handling and Edge Cases

Empty Result Handling

# Configure behavior for empty query results
action_on_empty_df='send'    # Always send file/message
action_on_empty_df='skip'    # Skip when no results
action_on_empty_df='error'   # Fail task when no results

Template Context

SQL to Slack webhook operators provide query results in template context:

  • results_df: Pandas DataFrame with query results
  • Standard Airflow context variables (ds, task_instance, etc.)
  • Custom results_df_name for alternative variable naming

Performance Considerations

  • Large Result Sets: Consider pagination or filtering for very large datasets
  • File Uploads: Use appropriate slack_method_version based on file size and requirements
  • Query Optimization: Ensure SQL queries are optimized for performance
  • Connection Pooling: Configure appropriate connection pool settings for high-volume operations