Provider package apache-airflow-providers-snowflake for Apache Airflow
npx @tessl/cli install tessl/pypi-apache-airflow-providers-snowflake@6.5.0A comprehensive provider package for integrating Apache Airflow with Snowflake, the cloud data warehouse platform. This provider enables complete data pipeline orchestration with Snowflake including database operations, Snowpark integration, data transfers from cloud storage, and asynchronous execution patterns.
pip install apache-airflow-providers-snowflakefrom airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.providers.snowflake.hooks.snowflake_sql_api import SnowflakeSqlApiHook
from airflow.providers.snowflake.operators.snowflake import (
SnowflakeCheckOperator,
SnowflakeValueCheckOperator,
SnowflakeIntervalCheckOperator,
SnowflakeSqlApiOperator
)
from airflow.providers.snowflake.operators.snowpark import SnowparkOperator
from airflow.providers.snowflake.decorators.snowpark import snowpark_task
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from airflow.providers.snowflake.triggers.snowflake_trigger import SnowflakeSqlApiTriggerfrom airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
def process_data(**context):
hook = SnowflakeHook(snowflake_conn_id='my_snowflake_conn')
# Execute SQL query
result = hook.run(
sql="SELECT * FROM sales WHERE date >= '2024-01-01'",
handler=lambda cursor: cursor.fetchall()
)
return resultfrom airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
from datetime import datetime
with DAG('snowflake_example', start_date=datetime(2024, 1, 1)) as dag:
create_table = SnowflakeSqlApiOperator(
task_id='create_sales_table',
snowflake_conn_id='snowflake_default',
sql='''
CREATE TABLE IF NOT EXISTS sales (
id INT,
amount DECIMAL(10,2),
date DATE
)
''',
statement_count=1
)from airflow.providers.snowflake.decorators.snowpark import snowpark_task
@snowpark_task
def process_with_snowpark(session):
# Snowpark session is automatically injected
df = session.sql("SELECT * FROM raw_data")
# Transform data using Snowpark DataFrame API
transformed_df = df.filter(df.col("status") == "active")
# Write back to Snowflake
transformed_df.write.save_as_table("processed_data", mode="overwrite")
return transformed_df.count()The provider is organized into several key components:
The provider supports both traditional SQL execution patterns and modern Snowpark Python workflows, enabling comprehensive data engineering pipelines within Apache Airflow's orchestration framework.
Core connectivity layer providing both standard database connections and Snowflake SQL API integration. Supports multiple authentication methods, connection pooling, and session management.
class SnowflakeHook(DbApiHook):
def __init__(self, snowflake_conn_id: str = "snowflake_default", **kwargs): ...
def get_conn(self) -> SnowflakeConnection: ...
def run(self, sql: str | Iterable[str], **kwargs): ...
def get_snowpark_session(self): ...
class SnowflakeSqlApiHook(SnowflakeHook):
def __init__(self, snowflake_conn_id: str, **kwargs): ...
def execute_query(self, sql: str, statement_count: int, **kwargs) -> list[str]: ...
def wait_for_query(self, query_id: str, **kwargs) -> dict[str, str | list[str]]: ...Database Connections and Hooks
Task operators for executing SQL commands, performing data quality checks, and managing database operations with built-in validation and monitoring capabilities.
class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
def __init__(self, *, snowflake_conn_id: str = "snowflake_default", **kwargs): ...
class SnowflakeCheckOperator(SQLCheckOperator):
def __init__(self, *, sql: str, snowflake_conn_id: str = "snowflake_default", **kwargs): ...
class SnowflakeValueCheckOperator(SQLValueCheckOperator):
def __init__(self, *, sql: str, pass_value: Any, **kwargs): ...SQL Operators and Data Quality
Native Snowpark Python integration enabling DataFrame-based data processing workflows directly within Airflow tasks with automatic session management.
class SnowparkOperator(PythonOperator):
def __init__(self, *, python_callable: Callable, snowflake_conn_id: str = "snowflake_default", **kwargs): ...
def snowpark_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: ...Specialized operators for efficient bulk data loading from cloud storage services (S3, GCS, Azure Blob) into Snowflake using COPY INTO operations.
class CopyFromExternalStageToSnowflakeOperator(BaseOperator):
def __init__(
self, *,
table: str,
stage: str,
file_format: str,
snowflake_conn_id: str = "snowflake_default",
**kwargs
): ...Deferrable task execution through triggers, enabling efficient resource utilization for long-running Snowflake operations without blocking worker slots.
class SnowflakeSqlApiTrigger(BaseTrigger):
def __init__(
self,
poll_interval: float,
query_ids: list[str],
snowflake_conn_id: str,
**kwargs
): ...Helper functions for parameter formatting, authentication token management, OpenLineage integration, and Snowpark session injection.
def enclose_param(param: str) -> str: ...
def inject_session_into_op_kwargs(python_callable: Callable, op_kwargs: dict, session: Session | None) -> dict: ...
class JWTGenerator:
def __init__(self, account: str, user: str, private_key: Any, **kwargs): ...
def get_token(self) -> str | None: ...The provider uses Airflow connections with connection type snowflake. Required connection parameters:
warehouse, database, role, authenticatorAll operators and hooks provide comprehensive error handling with detailed exception information. Common exceptions include connection timeouts, authentication failures, and SQL execution errors with specific Snowflake error codes and messages.