or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

hooks.mdindex.mdoperators.mdsnowpark.mdtransfers.mdtriggers.mdutils.md
tile.json

tessl/pypi-apache-airflow-providers-snowflake

Provider package apache-airflow-providers-snowflake for Apache Airflow

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-snowflake@6.5.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-snowflake@6.5.0

index.mddocs/

Apache Airflow Snowflake Provider

A comprehensive provider package for integrating Apache Airflow with Snowflake, the cloud data warehouse platform. This provider enables complete data pipeline orchestration with Snowflake including database operations, Snowpark integration, data transfers from cloud storage, and asynchronous execution patterns.

Package Information

  • Package Name: apache-airflow-providers-snowflake
  • Language: Python
  • Python Version: Requires >=3.10, <3.14
  • Installation: pip install apache-airflow-providers-snowflake
  • Apache Airflow Version: Requires 2.10.0+

Core Imports

from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.providers.snowflake.hooks.snowflake_sql_api import SnowflakeSqlApiHook
from airflow.providers.snowflake.operators.snowflake import (
    SnowflakeCheckOperator,
    SnowflakeValueCheckOperator,
    SnowflakeIntervalCheckOperator,
    SnowflakeSqlApiOperator
)
from airflow.providers.snowflake.operators.snowpark import SnowparkOperator
from airflow.providers.snowflake.decorators.snowpark import snowpark_task
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from airflow.providers.snowflake.triggers.snowflake_trigger import SnowflakeSqlApiTrigger

Basic Usage

Simple SQL Execution with Hook

from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

def process_data(**context):
    hook = SnowflakeHook(snowflake_conn_id='my_snowflake_conn')
    
    # Execute SQL query
    result = hook.run(
        sql="SELECT * FROM sales WHERE date >= '2024-01-01'",
        handler=lambda cursor: cursor.fetchall()
    )
    
    return result

Basic Operator Usage

from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
from datetime import datetime

with DAG('snowflake_example', start_date=datetime(2024, 1, 1)) as dag:
    
    create_table = SnowflakeSqlApiOperator(
        task_id='create_sales_table',
        snowflake_conn_id='snowflake_default',
        sql='''
            CREATE TABLE IF NOT EXISTS sales (
                id INT,
                amount DECIMAL(10,2),
                date DATE
            )
        ''',
        statement_count=1
    )

Snowpark Integration

from airflow.providers.snowflake.decorators.snowpark import snowpark_task

@snowpark_task
def process_with_snowpark(session):
    # Snowpark session is automatically injected
    df = session.sql("SELECT * FROM raw_data")
    
    # Transform data using Snowpark DataFrame API
    transformed_df = df.filter(df.col("status") == "active")
    
    # Write back to Snowflake
    transformed_df.write.save_as_table("processed_data", mode="overwrite")
    
    return transformed_df.count()

Architecture

The provider is organized into several key components:

  • Hooks: Core connection and execution layer for database operations and API interactions
  • Operators: Task implementations for common Snowflake operations and data quality checks
  • Decorators: Python task decorators for native Snowpark integration
  • Transfers: Specialized operators for bulk data loading from cloud storage
  • Triggers: Asynchronous execution support for deferrable tasks
  • Utils: Helper functions for parameter handling, authentication, and lineage tracking

The provider supports both traditional SQL execution patterns and modern Snowpark Python workflows, enabling comprehensive data engineering pipelines within Apache Airflow's orchestration framework.

Capabilities

Database Connections and Hooks

Core connectivity layer providing both standard database connections and Snowflake SQL API integration. Supports multiple authentication methods, connection pooling, and session management.

class SnowflakeHook(DbApiHook):
    def __init__(self, snowflake_conn_id: str = "snowflake_default", **kwargs): ...
    def get_conn(self) -> SnowflakeConnection: ...
    def run(self, sql: str | Iterable[str], **kwargs): ...
    def get_snowpark_session(self): ...

class SnowflakeSqlApiHook(SnowflakeHook):
    def __init__(self, snowflake_conn_id: str, **kwargs): ...
    def execute_query(self, sql: str, statement_count: int, **kwargs) -> list[str]: ...
    def wait_for_query(self, query_id: str, **kwargs) -> dict[str, str | list[str]]: ...

Database Connections and Hooks

SQL Operators and Data Quality

Task operators for executing SQL commands, performing data quality checks, and managing database operations with built-in validation and monitoring capabilities.

class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
    def __init__(self, *, snowflake_conn_id: str = "snowflake_default", **kwargs): ...

class SnowflakeCheckOperator(SQLCheckOperator):
    def __init__(self, *, sql: str, snowflake_conn_id: str = "snowflake_default", **kwargs): ...

class SnowflakeValueCheckOperator(SQLValueCheckOperator):
    def __init__(self, *, sql: str, pass_value: Any, **kwargs): ...

SQL Operators and Data Quality

Snowpark Integration

Native Snowpark Python integration enabling DataFrame-based data processing workflows directly within Airflow tasks with automatic session management.

class SnowparkOperator(PythonOperator):
    def __init__(self, *, python_callable: Callable, snowflake_conn_id: str = "snowflake_default", **kwargs): ...

def snowpark_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: ...

Snowpark Integration

Data Transfer Operations

Specialized operators for efficient bulk data loading from cloud storage services (S3, GCS, Azure Blob) into Snowflake using COPY INTO operations.

class CopyFromExternalStageToSnowflakeOperator(BaseOperator):
    def __init__(
        self, *, 
        table: str, 
        stage: str, 
        file_format: str,
        snowflake_conn_id: str = "snowflake_default",
        **kwargs
    ): ...

Data Transfer Operations

Asynchronous Execution

Deferrable task execution through triggers, enabling efficient resource utilization for long-running Snowflake operations without blocking worker slots.

class SnowflakeSqlApiTrigger(BaseTrigger):
    def __init__(
        self,
        poll_interval: float,
        query_ids: list[str],
        snowflake_conn_id: str,
        **kwargs
    ): ...

Asynchronous Execution

Utility Functions

Helper functions for parameter formatting, authentication token management, OpenLineage integration, and Snowpark session injection.

def enclose_param(param: str) -> str: ...
def inject_session_into_op_kwargs(python_callable: Callable, op_kwargs: dict, session: Session | None) -> dict: ...

class JWTGenerator:
    def __init__(self, account: str, user: str, private_key: Any, **kwargs): ...
    def get_token(self) -> str | None: ...

Utility Functions

Connection Configuration

The provider uses Airflow connections with connection type snowflake. Required connection parameters:

  • Host: Snowflake account identifier
  • Login: Username
  • Password: Password or private key
  • Schema: Default schema
  • Extra: JSON with additional parameters like warehouse, database, role, authenticator

Error Handling

All operators and hooks provide comprehensive error handling with detailed exception information. Common exceptions include connection timeouts, authentication failures, and SQL execution errors with specific Snowflake error codes and messages.