CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-snowflake

Provider package apache-airflow-providers-snowflake for Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

operators.mddocs/

SQL Operators and Data Quality

Task operators for executing SQL commands, performing data quality checks, and managing database operations with built-in validation and monitoring capabilities. These operators provide the core task execution layer for Snowflake operations in Airflow DAGs.

Capabilities

SQL API Operator

Primary operator for executing multiple SQL statements using Snowflake's SQL API, with support for asynchronous execution, deferrable mode, and comprehensive parameter binding.

class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
    """
    Execute multiple SQL statements using Snowflake SQL API.
    Supports asynchronous execution and deferrable mode for efficient resource utilization.
    """
    
    LIFETIME = timedelta(minutes=59)      # JWT Token lifetime
    RENEWAL_DELTA = timedelta(minutes=54) # JWT Token renewal time
    template_fields: Sequence[str]        # Includes snowflake_conn_id and SQL fields
    conn_id_field = "snowflake_conn_id"
    
    def __init__(
        self,
        *,
        snowflake_conn_id: str = "snowflake_default",
        warehouse: str | None = None,
        database: str | None = None,
        role: str | None = None,
        schema: str | None = None,
        authenticator: str | None = None,
        session_parameters: dict[str, Any] | None = None,
        poll_interval: int = 5,
        statement_count: int = 0,
        token_life_time: timedelta = LIFETIME,
        token_renewal_delta: timedelta = RENEWAL_DELTA,
        bindings: dict[str, Any] | None = None,
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
        snowflake_api_retry_args: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        """
        Initialize SQL API operator.
        
        Parameters:
        - snowflake_conn_id: Snowflake connection ID
        - warehouse: Snowflake warehouse name
        - database: Snowflake database name
        - role: Snowflake role name
        - schema: Snowflake schema name
        - authenticator: Authentication method
        - session_parameters: Session-level parameters
        - poll_interval: Polling interval for async execution (seconds)
        - statement_count: Number of SQL statements (0 for auto-detect)
        - token_life_time: JWT token lifetime
        - token_renewal_delta: JWT token renewal interval
        - bindings: Parameter bindings for SQL statements
        - deferrable: Enable deferrable execution mode
        - snowflake_api_retry_args: API retry configuration
        """

Execution Methods

def execute(self, context: Context) -> None:
    """
    Execute the SQL statements.
    
    Parameters:
    - context: Airflow task execution context
    """

def poll_on_queries(self):
    """
    Poll on requested queries for completion status.
    Used in synchronous execution mode.
    """

def execute_complete(
    self, 
    context: Context, 
    event: dict[str, str | list[str]] | None = None
) -> None:
    """
    Callback method when trigger fires in deferrable mode.
    
    Parameters:
    - context: Airflow task execution context
    - event: Event data from trigger
    """

Data Quality Check Operators

Specialized operators for performing data quality validations and monitoring database state with configurable thresholds and alerting.

Basic Check Operator

class SnowflakeCheckOperator(SQLCheckOperator):
    """
    Perform a check against Snowflake database.
    Expects a SQL query that returns a single row for boolean evaluation.
    """
    
    template_fields: Sequence[str] = ["sql", "snowflake_conn_id"]
    template_ext: Sequence[str] = (".sql",)
    ui_color = "#ededed"
    conn_id_field = "snowflake_conn_id"
    
    def __init__(
        self,
        *,
        sql: str,
        snowflake_conn_id: str = "snowflake_default",
        parameters: Iterable | Mapping[str, Any] | None = None,
        warehouse: str | None = None,
        database: str | None = None,
        role: str | None = None,
        schema: str | None = None,
        authenticator: str | None = None,
        session_parameters: dict | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize check operator.
        
        Parameters:
        - sql: SQL query returning single boolean result
        - snowflake_conn_id: Snowflake connection ID
        - parameters: Query parameters for parameterized SQL
        - warehouse: Snowflake warehouse name
        - database: Snowflake database name
        - role: Snowflake role name
        - schema: Snowflake schema name
        - authenticator: Authentication method
        - session_parameters: Session-level parameters
        """

Value Check Operator

class SnowflakeValueCheckOperator(SQLValueCheckOperator):
    """
    Perform a simple check using SQL code against a specified value.
    Supports tolerance levels for numeric comparisons.
    """
    
    template_fields: Sequence[str] = ["sql", "pass_value", "snowflake_conn_id"]
    conn_id_field = "snowflake_conn_id"
    
    def __init__(
        self,
        *,
        sql: str,
        pass_value: Any,
        tolerance: Any = None,
        snowflake_conn_id: str = "snowflake_default",
        parameters: Iterable | Mapping[str, Any] | None = None,
        warehouse: str | None = None,
        database: str | None = None,
        role: str | None = None,
        schema: str | None = None,
        authenticator: str | None = None,
        session_parameters: dict | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize value check operator.
        
        Parameters:
        - sql: SQL query returning single value for comparison
        - pass_value: Expected value for comparison
        - tolerance: Tolerance for numeric comparisons (absolute or percentage)
        - snowflake_conn_id: Snowflake connection ID
        - parameters: Query parameters for parameterized SQL
        - warehouse: Snowflake warehouse name
        - database: Snowflake database name
        - role: Snowflake role name
        - schema: Snowflake schema name
        - authenticator: Authentication method
        - session_parameters: Session-level parameters
        """

Interval Check Operator

class SnowflakeIntervalCheckOperator(SQLIntervalCheckOperator):
    """
    Check that metrics are within tolerance of values from days_back before.
    Useful for detecting anomalies in time series data.
    """
    
    template_fields: Sequence[str] = ["table", "metrics_thresholds", "snowflake_conn_id"]
    conn_id_field = "snowflake_conn_id"
    
    def __init__(
        self,
        *,
        table: str,
        metrics_thresholds: dict,
        date_filter_column: str = "ds",
        days_back: SupportsAbs[int] = -7,
        snowflake_conn_id: str = "snowflake_default",
        warehouse: str | None = None,
        database: str | None = None,
        role: str | None = None,
        schema: str | None = None,
        authenticator: str | None = None,
        session_parameters: dict | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize interval check operator.
        
        Parameters:
        - table: Table name to check
        - metrics_thresholds: Dictionary of metric_name -> threshold_dict
        - date_filter_column: Column name for date filtering
        - days_back: Number of days back for comparison (negative integer)
        - snowflake_conn_id: Snowflake connection ID
        - warehouse: Snowflake warehouse name
        - database: Snowflake database name
        - role: Snowflake role name
        - schema: Snowflake schema name
        - authenticator: Authentication method
        - session_parameters: Session-level parameters
        """

Usage Examples

Basic SQL Execution

from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
from datetime import datetime, timedelta

with DAG(
    'snowflake_sql_example',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:
    
    # Execute multiple SQL statements
    create_and_load = SnowflakeSqlApiOperator(
        task_id='create_and_load_data',
        snowflake_conn_id='snowflake_prod',
        sql='''
            -- Create staging table
            CREATE OR REPLACE TABLE staging.daily_sales AS
            SELECT 
                date_trunc('day', transaction_date) as sale_date,
                region,
                SUM(amount) as total_sales,
                COUNT(*) as transaction_count
            FROM raw.transactions
            WHERE transaction_date >= '{{ ds }}'
              AND transaction_date < '{{ next_ds }}'
            GROUP BY 1, 2;
            
            -- Update summary table
            MERGE INTO analytics.sales_summary s
            USING staging.daily_sales ds ON s.sale_date = ds.sale_date AND s.region = ds.region
            WHEN MATCHED THEN UPDATE SET
                total_sales = ds.total_sales,
                transaction_count = ds.transaction_count
            WHEN NOT MATCHED THEN INSERT (sale_date, region, total_sales, transaction_count)
                VALUES (ds.sale_date, ds.region, ds.total_sales, ds.transaction_count);
        ''',
        statement_count=2,
        warehouse='ANALYTICS_WH',
        database='ANALYTICS_DB'
    )

Asynchronous Execution

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

# Large data processing with deferrable execution
process_large_dataset = SnowflakeSqlApiOperator(
    task_id='process_large_dataset',
    snowflake_conn_id='snowflake_prod',
    sql='''
        CREATE OR REPLACE TABLE analytics.customer_360 AS
        SELECT 
            c.customer_id,
            c.customer_name,
            COUNT(DISTINCT o.order_id) as total_orders,
            SUM(o.order_amount) as lifetime_value,
            MAX(o.order_date) as last_order_date,
            AVG(o.order_amount) as avg_order_value
        FROM customers c
        LEFT JOIN orders o ON c.customer_id = o.customer_id
        WHERE o.order_date >= '2020-01-01'
        GROUP BY c.customer_id, c.customer_name;
    ''',
    statement_count=1,
    deferrable=True,  # Enable async execution
    poll_interval=30,  # Check every 30 seconds
    warehouse='HEAVY_COMPUTE_WH'
)

Data Quality Checks

from airflow.providers.snowflake.operators.snowflake import (
    SnowflakeCheckOperator,
    SnowflakeValueCheckOperator,
    SnowflakeIntervalCheckOperator
)

# Basic data quality check
data_freshness_check = SnowflakeCheckOperator(
    task_id='check_data_freshness',
    snowflake_conn_id='snowflake_prod',
    sql='''
        SELECT COUNT(*) > 0
        FROM raw.transactions
        WHERE date_trunc('day', transaction_date) = '{{ ds }}'
    ''',
    warehouse='ANALYTICS_WH'
)

# Value validation with tolerance
revenue_check = SnowflakeValueCheckOperator(
    task_id='validate_daily_revenue',
    snowflake_conn_id='snowflake_prod',
    sql='''
        SELECT SUM(amount)
        FROM raw.transactions
        WHERE date_trunc('day', transaction_date) = '{{ ds }}'
    ''',
    pass_value=50000,  # Expected minimum daily revenue
    tolerance=0.1,     # 10% tolerance
    warehouse='ANALYTICS_WH'
)

# Time series anomaly detection
anomaly_check = SnowflakeIntervalCheckOperator(
    task_id='detect_sales_anomalies',
    snowflake_conn_id='snowflake_prod',
    table='analytics.daily_sales_summary',
    metrics_thresholds={
        'total_sales': {'min_threshold': 0.8, 'max_threshold': 1.2},  # ±20% from historical
        'order_count': {'min_threshold': 0.7, 'max_threshold': 1.3}   # ±30% from historical
    },
    date_filter_column='sale_date',
    days_back=-7,  # Compare with same day last week
    warehouse='ANALYTICS_WH'
)

Parameterized Queries

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

# Using parameter bindings
parameterized_query = SnowflakeSqlApiOperator(
    task_id='parameterized_analysis',
    snowflake_conn_id='snowflake_prod',
    sql='''
        SELECT 
            region,
            COUNT(*) as customer_count,
            AVG(lifetime_value) as avg_ltv
        FROM analytics.customer_360
        WHERE lifetime_value >= ?
          AND last_order_date >= ?
        GROUP BY region
        ORDER BY avg_ltv DESC;
    ''',
    statement_count=1,
    bindings={
        '1': 1000,      # Minimum lifetime value
        '2': '2024-01-01'  # Minimum last order date
    },
    warehouse='ANALYTICS_WH'
)

Error Handling and Retry Configuration

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

# Custom retry configuration for API calls
robust_operation = SnowflakeSqlApiOperator(
    task_id='robust_data_processing',
    snowflake_conn_id='snowflake_prod',
    sql='''
        CREATE OR REPLACE TABLE staging.processed_data AS
        SELECT * FROM raw.data_stream
        WHERE processed_at IS NULL;
        
        UPDATE raw.data_stream 
        SET processed_at = CURRENT_TIMESTAMP()
        WHERE id IN (SELECT id FROM staging.processed_data);
    ''',
    statement_count=2,
    snowflake_api_retry_args={
        'retries': 3,
        'backoff_factor': 2,
        'status_forcelist': [500, 502, 503, 504]
    },
    # Airflow task retry configuration
    retries=2,
    retry_delay=timedelta(minutes=5),
    warehouse='PROCESSING_WH'
)

Template Variables

All operators support Airflow's template variables and macros:

  • {{ ds }}: Execution date as YYYY-MM-DD
  • {{ ts }}: Execution timestamp
  • {{ next_ds }}: Next execution date
  • {{ params }}: User-defined parameters
  • {{ var.value.variable_name }}: Airflow variables

Performance Optimization

Warehouse Management

# Use appropriate warehouse sizes
large_compute_task = SnowflakeSqlApiOperator(
    task_id='heavy_processing',
    warehouse='X_LARGE_WH',  # Scale up for heavy workloads
    sql='SELECT * FROM huge_table_join_operation',
    statement_count=1
)

# Auto-suspend warehouses after use
cleanup_task = SnowflakeSqlApiOperator(
    task_id='suspend_warehouse',
    sql='ALTER WAREHOUSE X_LARGE_WH SUSPEND',
    statement_count=1
)

large_compute_task >> cleanup_task

Connection Pooling

Use connection pooling for high-frequency operations by reusing connections across tasks in the same worker process.

Error Handling

All operators provide comprehensive error handling with detailed exception information:

  • SQL Execution Errors: Syntax errors, constraint violations, permission issues
  • Connection Errors: Authentication failures, network timeouts, warehouse suspension
  • Resource Errors: Warehouse capacity limits, query complexity limits
  • API Errors: Rate limiting, malformed requests, service unavailable

Error messages include Snowflake-specific error codes and suggestions for resolution.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-snowflake

docs

hooks.md

index.md

operators.md

snowpark.md

transfers.md

triggers.md

utils.md

tile.json