Data transfer operators for executing SQL queries and sending results to Slack channels in various formats. These operators bridge the gap between database systems and Slack communication, enabling automated data reporting and alerting.
Executes SQL queries and uploads results as files to Slack channels using the Slack Web API. Supports multiple output formats (CSV, JSON, HTML) and advanced file handling options.
class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
"""
Execute SQL query and send results as file to Slack via API.
Supports CSV, JSON, and HTML output formats with customizable
file metadata and multiple channel targeting.
"""
template_fields = ("sql", "slack_channels", "slack_filename", "slack_initial_comment", "slack_title")
template_ext = (".sql", ".jinja", ".j2")
template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
SUPPORTED_FILE_FORMATS = ("csv", "json", "html")
def __init__(
self,
*,
sql: str,
sql_conn_id: str,
sql_hook_params: dict | None = None,
parameters: list | tuple | Mapping[str, Any] | None = None,
slack_conn_id: str = SlackHook.default_conn_name,
slack_filename: str,
slack_channels: str | Sequence[str] | None = None,
slack_initial_comment: str | None = None,
slack_title: str | None = None,
slack_base_url: str | None = None,
slack_method_version: Literal["v1", "v2"] | None = None,
df_kwargs: dict | None = None,
action_on_empty_df: Literal["send", "skip", "error"] = "send",
slack_proxy: str | None = None,
slack_timeout: int | None = None,
slack_retry_handlers: list[RetryHandler] | None = None,
**kwargs,
):
"""
Initialize SQL to Slack API file transfer operator.
Args:
sql: SQL query to execute
sql_conn_id: Airflow connection ID for database
sql_hook_params: Additional parameters for SQL hook
parameters: Query parameters for SQL execution
slack_conn_id: Airflow connection ID for Slack API
slack_filename: Name for uploaded file
slack_channels: Channel names or IDs for file upload
slack_initial_comment: Comment to add with file
slack_title: Display title for file
slack_base_url: Custom Slack API base URL
slack_method_version: File upload API version ("v1" or "v2")
df_kwargs: Additional pandas DataFrame arguments
action_on_empty_df: Action when query returns no results
slack_proxy: Proxy server URL for Slack requests
slack_timeout: Request timeout for Slack API
slack_retry_handlers: Custom retry handlers for Slack
**kwargs: Additional operator arguments
"""
@cached_property
def slack_hook(self) -> SlackHook:
"""Returns SlackHook instance."""
def execute(self, context: Context) -> None:
"""Execute the SQL query and upload results to Slack."""# Daily sales report as CSV
daily_sales_report = SqlToSlackApiFileOperator(
task_id='send_daily_sales',
sql="""
SELECT
DATE(order_date) as date,
COUNT(*) as total_orders,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value
FROM orders
WHERE DATE(order_date) = '{{ ds }}'
GROUP BY DATE(order_date)
""",
sql_conn_id='postgres_default',
slack_conn_id='slack_api_default',
slack_filename='daily_sales_{{ ds }}.csv',
slack_channels=['#sales-reports', '#management'],
slack_title='Daily Sales Report - {{ ds }}',
slack_initial_comment='📊 Daily sales metrics for {{ ds }}',
df_kwargs={'index': False}
)
# Error report as JSON with conditional sending
error_report = SqlToSlackApiFileOperator(
task_id='send_error_report',
sql="""
SELECT
error_type,
COUNT(*) as error_count,
MIN(created_at) as first_occurrence,
MAX(created_at) as last_occurrence
FROM error_logs
WHERE DATE(created_at) = '{{ ds }}'
GROUP BY error_type
HAVING COUNT(*) > 5
""",
sql_conn_id='mysql_default',
slack_filename='errors_{{ ds }}.json',
slack_channels='#alerts',
slack_title='High-Frequency Errors - {{ ds }}',
slack_initial_comment='🚨 Errors occurring more than 5 times today',
action_on_empty_df='skip', # Don't send if no high-frequency errors
df_kwargs={'orient': 'records', 'date_format': 'iso'}
)
# User analytics as HTML table
user_analytics = SqlToSlackApiFileOperator(
task_id='send_user_analytics',
sql="""
SELECT
user_segment,
COUNT(DISTINCT user_id) as active_users,
AVG(session_duration) as avg_session_minutes,
SUM(page_views) as total_page_views
FROM user_analytics
WHERE DATE(session_date) = '{{ ds }}'
GROUP BY user_segment
ORDER BY active_users DESC
""",
sql_conn_id='redshift_default',
slack_filename='user_analytics_{{ ds }}.html',
slack_channels='#product',
slack_title='Daily User Analytics - {{ ds }}',
slack_initial_comment='📈 User engagement metrics by segment',
df_kwargs={'escape': False, 'table_id': 'analytics-table'}
)
# Parameterized query with custom handling
monthly_summary = SqlToSlackApiFileOperator(
task_id='monthly_summary',
sql="""
SELECT
product_category,
SUM(revenue) as total_revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM sales
WHERE EXTRACT(YEAR FROM sale_date) = %(year)s
AND EXTRACT(MONTH FROM sale_date) = %(month)s
GROUP BY product_category
ORDER BY total_revenue DESC
""",
parameters={'year': 2024, 'month': '{{ macros.ds_format(ds, "%Y-%m-%d", "%m") | int }}'},
sql_conn_id='postgres_default',
slack_filename='monthly_summary_{{ macros.ds_format(ds, "%Y-%m-%d", "%Y_%m") }}.csv',
slack_channels='#finance',
action_on_empty_df='error', # Fail if no data found
slack_method_version='v2'
)Executes SQL queries and sends formatted results as messages to Slack Incoming Webhooks. Ideal for alerts, summaries, and formatted data sharing.
class SqlToSlackWebhookOperator(BaseSqlToSlackOperator):
"""
Execute SQL query and send results to Slack via webhook.
Formats query results into Slack message content with
templating support for custom message formatting.
"""
template_fields = ("sql", "slack_message")
template_ext = (".sql", ".jinja", ".j2")
template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}
times_rendered = 0
def __init__(
self,
*,
sql: str,
sql_conn_id: str,
slack_webhook_conn_id: str | None = None,
sql_hook_params: dict | None = None,
slack_channel: str | None = None,
slack_message: str,
results_df_name: str = "results_df",
parameters: list | tuple | Mapping[str, Any] | None = None,
slack_proxy: str | None = None,
slack_timeout: int | None = None,
slack_retry_handlers: list[RetryHandler] | None = None,
**kwargs,
) -> None:
"""
Initialize SQL to Slack webhook transfer operator.
Args:
sql: SQL query to execute
sql_conn_id: Airflow connection ID for database
slack_webhook_conn_id: Airflow connection ID for webhook
sql_hook_params: Additional parameters for SQL hook
slack_channel: Override webhook default channel
slack_message: Message template with access to query results
results_df_name: Variable name for DataFrame in template context
parameters: Query parameters for SQL execution
slack_proxy: Proxy server URL for webhook requests
slack_timeout: Request timeout for webhook
slack_retry_handlers: Custom retry handlers for webhook
**kwargs: Additional operator arguments
"""
def render_template_fields(self, context, jinja_env=None) -> None:
"""Render template fields with query results available."""
def execute(self, context: Context) -> None:
"""Execute SQL query and send formatted results to webhook."""# Daily metrics summary
daily_metrics = SqlToSlackWebhookOperator(
task_id='send_daily_metrics',
sql="""
SELECT
COUNT(*) as total_orders,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE DATE(created_at) = '{{ ds }}'
""",
sql_conn_id='postgres_default',
slack_webhook_conn_id='slack_webhook_default',
slack_message="""
📊 *Daily Metrics for {{ ds }}*
• Total Orders: {{ results_df.iloc[0]['total_orders'] }}
• Total Revenue: ${{ "%.2f"|format(results_df.iloc[0]['total_revenue']) }}
• Average Order Value: ${{ "%.2f"|format(results_df.iloc[0]['avg_order_value']) }}
• Unique Customers: {{ results_df.iloc[0]['unique_customers'] }}
{% if results_df.iloc[0]['total_orders'] > 100 %}
🎉 Great day! We exceeded 100 orders!
{% elif results_df.iloc[0]['total_orders'] < 50 %}
⚠️ Low order volume today - investigate potential issues
{% endif %}
""",
slack_channel='#daily-reports'
)
# Alert for high error rates
error_alert = SqlToSlackWebhookOperator(
task_id='error_rate_alert',
sql="""
SELECT
COUNT(*) as error_count,
COUNT(*) * 100.0 / (
SELECT COUNT(*) FROM requests
WHERE DATE(created_at) = '{{ ds }}'
) as error_rate_percent
FROM errors
WHERE DATE(created_at) = '{{ ds }}'
""",
sql_conn_id='mysql_default',
slack_webhook_conn_id='alerts_webhook',
slack_message="""
{% set error_count = results_df.iloc[0]['error_count'] %}
{% set error_rate = results_df.iloc[0]['error_rate_percent'] %}
{% if error_rate > 5.0 %}
🚨 *HIGH ERROR RATE ALERT*
Date: {{ ds }}
Error Count: {{ error_count }}
Error Rate: {{ "%.2f"|format(error_rate) }}%
This exceeds our 5% threshold. Immediate investigation required!
<https://monitoring.company.com/errors?date={{ ds }}|View Error Dashboard>
{% elif error_rate > 2.0 %}
⚠️ *Elevated Error Rate*
Date: {{ ds }}
Error Count: {{ error_count }}
Error Rate: {{ "%.2f"|format(error_rate) }}%
Monitor closely for potential issues.
{% else %}
✅ Error rates normal for {{ ds }} ({{ "%.2f"|format(error_rate) }}%)
{% endif %}
""",
slack_channel='#alerts'
)
# Top products report with table formatting
top_products = SqlToSlackWebhookOperator(
task_id='top_products_report',
sql="""
SELECT
product_name,
SUM(quantity) as total_sold,
SUM(revenue) as total_revenue
FROM product_sales
WHERE DATE(sale_date) = '{{ ds }}'
GROUP BY product_name
ORDER BY total_revenue DESC
LIMIT 10
""",
sql_conn_id='postgres_default',
slack_webhook_conn_id='reports_webhook',
slack_message="""
🏆 *Top 10 Products by Revenue - {{ ds }}*{% for idx, row in results_df.iterrows() -%} {{ loop.index }}. {{ row['product_name'][:30].ljust(30) }} | ${{ "%8.2f"|format(row['total_revenue']) }} | {{ "%4d"|format(row['total_sold']) }} sold {% endfor %}
Total products analyzed: {{ results_df.shape[0] }}
""",
results_df_name='results_df'
)Abstract base class providing common functionality for SQL to Slack transfer operations.
class BaseSqlToSlackOperator(BaseOperator, ABC):
"""
Base class for SQL to Slack transfer operators.
Provides common SQL execution and connection management
functionality for transferring query results to Slack.
"""
def __init__(
self,
*,
sql: str,
sql_conn_id: str,
sql_hook_params: dict | None = None,
parameters: list | tuple | Mapping[str, Any] | None = None,
slack_proxy: str | None = None,
slack_timeout: int | None = None,
slack_retry_handlers: list[RetryHandler] | None = None,
**kwargs,
):
"""
Initialize base SQL to Slack operator.
Args:
sql: SQL query to execute
sql_conn_id: Airflow connection ID for database
sql_hook_params: Additional parameters for SQL hook
parameters: Query parameters for SQL execution
slack_proxy: Proxy server URL for Slack requests
slack_timeout: Request timeout for Slack operations
slack_retry_handlers: Custom retry handlers for Slack
**kwargs: Additional operator arguments
"""# CSV with custom formatting
df_kwargs = {
'index': False, # Exclude row indices
'sep': ',', # Column separator
'encoding': 'utf-8', # Text encoding
'float_format': '%.2f', # Number formatting
'date_format': '%Y-%m-%d' # Date formatting
}# JSON with different orientations
df_kwargs = {
'orient': 'records', # Array of objects
'date_format': 'iso', # ISO date format
'indent': 2, # Pretty printing
'force_ascii': False # Unicode support
}# HTML table with styling
df_kwargs = {
'escape': False, # Allow HTML in data
'table_id': 'data-table', # CSS table ID
'classes': 'table table-striped', # CSS classes
'border': 0 # Table border width
}# Configure behavior for empty query results
action_on_empty_df='send' # Always send file/message
action_on_empty_df='skip' # Skip when no results
action_on_empty_df='error' # Fail task when no resultsSQL to Slack webhook operators provide query results in template context:
results_df: Pandas DataFrame with query resultsresults_df_name for alternative variable namingslack_method_version based on file size and requirements