Provider package apache-airflow-providers-snowflake for Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Task operators for executing SQL commands, performing data quality checks, and managing database operations with built-in validation and monitoring capabilities. These operators provide the core task execution layer for Snowflake operations in Airflow DAGs.
Primary operator for executing multiple SQL statements using Snowflake's SQL API, with support for asynchronous execution, deferrable mode, and comprehensive parameter binding.
class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
"""
Execute multiple SQL statements using Snowflake SQL API.
Supports asynchronous execution and deferrable mode for efficient resource utilization.
"""
LIFETIME = timedelta(minutes=59) # JWT Token lifetime
RENEWAL_DELTA = timedelta(minutes=54) # JWT Token renewal time
template_fields: Sequence[str] # Includes snowflake_conn_id and SQL fields
conn_id_field = "snowflake_conn_id"
def __init__(
self,
*,
snowflake_conn_id: str = "snowflake_default",
warehouse: str | None = None,
database: str | None = None,
role: str | None = None,
schema: str | None = None,
authenticator: str | None = None,
session_parameters: dict[str, Any] | None = None,
poll_interval: int = 5,
statement_count: int = 0,
token_life_time: timedelta = LIFETIME,
token_renewal_delta: timedelta = RENEWAL_DELTA,
bindings: dict[str, Any] | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
snowflake_api_retry_args: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""
Initialize SQL API operator.
Parameters:
- snowflake_conn_id: Snowflake connection ID
- warehouse: Snowflake warehouse name
- database: Snowflake database name
- role: Snowflake role name
- schema: Snowflake schema name
- authenticator: Authentication method
- session_parameters: Session-level parameters
- poll_interval: Polling interval for async execution (seconds)
- statement_count: Number of SQL statements (0 for auto-detect)
- token_life_time: JWT token lifetime
- token_renewal_delta: JWT token renewal interval
- bindings: Parameter bindings for SQL statements
- deferrable: Enable deferrable execution mode
- snowflake_api_retry_args: API retry configuration
"""def execute(self, context: Context) -> None:
"""
Execute the SQL statements.
Parameters:
- context: Airflow task execution context
"""
def poll_on_queries(self):
"""
Poll on requested queries for completion status.
Used in synchronous execution mode.
"""
def execute_complete(
self,
context: Context,
event: dict[str, str | list[str]] | None = None
) -> None:
"""
Callback method when trigger fires in deferrable mode.
Parameters:
- context: Airflow task execution context
- event: Event data from trigger
"""Specialized operators for performing data quality validations and monitoring database state with configurable thresholds and alerting.
class SnowflakeCheckOperator(SQLCheckOperator):
"""
Perform a check against Snowflake database.
Expects a SQL query that returns a single row for boolean evaluation.
"""
template_fields: Sequence[str] = ["sql", "snowflake_conn_id"]
template_ext: Sequence[str] = (".sql",)
ui_color = "#ededed"
conn_id_field = "snowflake_conn_id"
def __init__(
self,
*,
sql: str,
snowflake_conn_id: str = "snowflake_default",
parameters: Iterable | Mapping[str, Any] | None = None,
warehouse: str | None = None,
database: str | None = None,
role: str | None = None,
schema: str | None = None,
authenticator: str | None = None,
session_parameters: dict | None = None,
**kwargs,
) -> None:
"""
Initialize check operator.
Parameters:
- sql: SQL query returning single boolean result
- snowflake_conn_id: Snowflake connection ID
- parameters: Query parameters for parameterized SQL
- warehouse: Snowflake warehouse name
- database: Snowflake database name
- role: Snowflake role name
- schema: Snowflake schema name
- authenticator: Authentication method
- session_parameters: Session-level parameters
"""class SnowflakeValueCheckOperator(SQLValueCheckOperator):
"""
Perform a simple check using SQL code against a specified value.
Supports tolerance levels for numeric comparisons.
"""
template_fields: Sequence[str] = ["sql", "pass_value", "snowflake_conn_id"]
conn_id_field = "snowflake_conn_id"
def __init__(
self,
*,
sql: str,
pass_value: Any,
tolerance: Any = None,
snowflake_conn_id: str = "snowflake_default",
parameters: Iterable | Mapping[str, Any] | None = None,
warehouse: str | None = None,
database: str | None = None,
role: str | None = None,
schema: str | None = None,
authenticator: str | None = None,
session_parameters: dict | None = None,
**kwargs,
) -> None:
"""
Initialize value check operator.
Parameters:
- sql: SQL query returning single value for comparison
- pass_value: Expected value for comparison
- tolerance: Tolerance for numeric comparisons (absolute or percentage)
- snowflake_conn_id: Snowflake connection ID
- parameters: Query parameters for parameterized SQL
- warehouse: Snowflake warehouse name
- database: Snowflake database name
- role: Snowflake role name
- schema: Snowflake schema name
- authenticator: Authentication method
- session_parameters: Session-level parameters
"""class SnowflakeIntervalCheckOperator(SQLIntervalCheckOperator):
"""
Check that metrics are within tolerance of values from days_back before.
Useful for detecting anomalies in time series data.
"""
template_fields: Sequence[str] = ["table", "metrics_thresholds", "snowflake_conn_id"]
conn_id_field = "snowflake_conn_id"
def __init__(
self,
*,
table: str,
metrics_thresholds: dict,
date_filter_column: str = "ds",
days_back: SupportsAbs[int] = -7,
snowflake_conn_id: str = "snowflake_default",
warehouse: str | None = None,
database: str | None = None,
role: str | None = None,
schema: str | None = None,
authenticator: str | None = None,
session_parameters: dict | None = None,
**kwargs,
) -> None:
"""
Initialize interval check operator.
Parameters:
- table: Table name to check
- metrics_thresholds: Dictionary of metric_name -> threshold_dict
- date_filter_column: Column name for date filtering
- days_back: Number of days back for comparison (negative integer)
- snowflake_conn_id: Snowflake connection ID
- warehouse: Snowflake warehouse name
- database: Snowflake database name
- role: Snowflake role name
- schema: Snowflake schema name
- authenticator: Authentication method
- session_parameters: Session-level parameters
"""from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
from datetime import datetime, timedelta
with DAG(
'snowflake_sql_example',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False
) as dag:
# Execute multiple SQL statements
create_and_load = SnowflakeSqlApiOperator(
task_id='create_and_load_data',
snowflake_conn_id='snowflake_prod',
sql='''
-- Create staging table
CREATE OR REPLACE TABLE staging.daily_sales AS
SELECT
date_trunc('day', transaction_date) as sale_date,
region,
SUM(amount) as total_sales,
COUNT(*) as transaction_count
FROM raw.transactions
WHERE transaction_date >= '{{ ds }}'
AND transaction_date < '{{ next_ds }}'
GROUP BY 1, 2;
-- Update summary table
MERGE INTO analytics.sales_summary s
USING staging.daily_sales ds ON s.sale_date = ds.sale_date AND s.region = ds.region
WHEN MATCHED THEN UPDATE SET
total_sales = ds.total_sales,
transaction_count = ds.transaction_count
WHEN NOT MATCHED THEN INSERT (sale_date, region, total_sales, transaction_count)
VALUES (ds.sale_date, ds.region, ds.total_sales, ds.transaction_count);
''',
statement_count=2,
warehouse='ANALYTICS_WH',
database='ANALYTICS_DB'
)from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
# Large data processing with deferrable execution
process_large_dataset = SnowflakeSqlApiOperator(
task_id='process_large_dataset',
snowflake_conn_id='snowflake_prod',
sql='''
CREATE OR REPLACE TABLE analytics.customer_360 AS
SELECT
c.customer_id,
c.customer_name,
COUNT(DISTINCT o.order_id) as total_orders,
SUM(o.order_amount) as lifetime_value,
MAX(o.order_date) as last_order_date,
AVG(o.order_amount) as avg_order_value
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
WHERE o.order_date >= '2020-01-01'
GROUP BY c.customer_id, c.customer_name;
''',
statement_count=1,
deferrable=True, # Enable async execution
poll_interval=30, # Check every 30 seconds
warehouse='HEAVY_COMPUTE_WH'
)from airflow.providers.snowflake.operators.snowflake import (
SnowflakeCheckOperator,
SnowflakeValueCheckOperator,
SnowflakeIntervalCheckOperator
)
# Basic data quality check
data_freshness_check = SnowflakeCheckOperator(
task_id='check_data_freshness',
snowflake_conn_id='snowflake_prod',
sql='''
SELECT COUNT(*) > 0
FROM raw.transactions
WHERE date_trunc('day', transaction_date) = '{{ ds }}'
''',
warehouse='ANALYTICS_WH'
)
# Value validation with tolerance
revenue_check = SnowflakeValueCheckOperator(
task_id='validate_daily_revenue',
snowflake_conn_id='snowflake_prod',
sql='''
SELECT SUM(amount)
FROM raw.transactions
WHERE date_trunc('day', transaction_date) = '{{ ds }}'
''',
pass_value=50000, # Expected minimum daily revenue
tolerance=0.1, # 10% tolerance
warehouse='ANALYTICS_WH'
)
# Time series anomaly detection
anomaly_check = SnowflakeIntervalCheckOperator(
task_id='detect_sales_anomalies',
snowflake_conn_id='snowflake_prod',
table='analytics.daily_sales_summary',
metrics_thresholds={
'total_sales': {'min_threshold': 0.8, 'max_threshold': 1.2}, # ±20% from historical
'order_count': {'min_threshold': 0.7, 'max_threshold': 1.3} # ±30% from historical
},
date_filter_column='sale_date',
days_back=-7, # Compare with same day last week
warehouse='ANALYTICS_WH'
)from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
# Using parameter bindings
parameterized_query = SnowflakeSqlApiOperator(
task_id='parameterized_analysis',
snowflake_conn_id='snowflake_prod',
sql='''
SELECT
region,
COUNT(*) as customer_count,
AVG(lifetime_value) as avg_ltv
FROM analytics.customer_360
WHERE lifetime_value >= ?
AND last_order_date >= ?
GROUP BY region
ORDER BY avg_ltv DESC;
''',
statement_count=1,
bindings={
'1': 1000, # Minimum lifetime value
'2': '2024-01-01' # Minimum last order date
},
warehouse='ANALYTICS_WH'
)from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
# Custom retry configuration for API calls
robust_operation = SnowflakeSqlApiOperator(
task_id='robust_data_processing',
snowflake_conn_id='snowflake_prod',
sql='''
CREATE OR REPLACE TABLE staging.processed_data AS
SELECT * FROM raw.data_stream
WHERE processed_at IS NULL;
UPDATE raw.data_stream
SET processed_at = CURRENT_TIMESTAMP()
WHERE id IN (SELECT id FROM staging.processed_data);
''',
statement_count=2,
snowflake_api_retry_args={
'retries': 3,
'backoff_factor': 2,
'status_forcelist': [500, 502, 503, 504]
},
# Airflow task retry configuration
retries=2,
retry_delay=timedelta(minutes=5),
warehouse='PROCESSING_WH'
)All operators support Airflow's template variables and macros:
{{ ds }}: Execution date as YYYY-MM-DD{{ ts }}: Execution timestamp{{ next_ds }}: Next execution date{{ params }}: User-defined parameters{{ var.value.variable_name }}: Airflow variables# Use appropriate warehouse sizes
large_compute_task = SnowflakeSqlApiOperator(
task_id='heavy_processing',
warehouse='X_LARGE_WH', # Scale up for heavy workloads
sql='SELECT * FROM huge_table_join_operation',
statement_count=1
)
# Auto-suspend warehouses after use
cleanup_task = SnowflakeSqlApiOperator(
task_id='suspend_warehouse',
sql='ALTER WAREHOUSE X_LARGE_WH SUSPEND',
statement_count=1
)
large_compute_task >> cleanup_taskUse connection pooling for high-frequency operations by reusing connections across tasks in the same worker process.
All operators provide comprehensive error handling with detailed exception information:
Error messages include Snowflake-specific error codes and suggestions for resolution.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-snowflake