0
# Apache Airflow Snowflake Provider
1
2
A comprehensive provider package for integrating Apache Airflow with Snowflake, the cloud data warehouse platform. This provider enables complete data pipeline orchestration with Snowflake including database operations, Snowpark integration, data transfers from cloud storage, and asynchronous execution patterns.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-snowflake
7
- **Language**: Python
8
- **Python Version**: Requires >=3.10, <3.14
9
- **Installation**: `pip install apache-airflow-providers-snowflake`
10
- **Apache Airflow Version**: Requires 2.10.0+
11
12
## Core Imports
13
14
```python
15
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
16
from airflow.providers.snowflake.hooks.snowflake_sql_api import SnowflakeSqlApiHook
17
from airflow.providers.snowflake.operators.snowflake import (
18
SnowflakeCheckOperator,
19
SnowflakeValueCheckOperator,
20
SnowflakeIntervalCheckOperator,
21
SnowflakeSqlApiOperator
22
)
23
from airflow.providers.snowflake.operators.snowpark import SnowparkOperator
24
from airflow.providers.snowflake.decorators.snowpark import snowpark_task
25
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
26
from airflow.providers.snowflake.triggers.snowflake_trigger import SnowflakeSqlApiTrigger
27
```
28
29
## Basic Usage
30
31
### Simple SQL Execution with Hook
32
33
```python
34
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
35
36
def process_data(**context):
37
hook = SnowflakeHook(snowflake_conn_id='my_snowflake_conn')
38
39
# Execute SQL query
40
result = hook.run(
41
sql="SELECT * FROM sales WHERE date >= '2024-01-01'",
42
handler=lambda cursor: cursor.fetchall()
43
)
44
45
return result
46
```
47
48
### Basic Operator Usage
49
50
```python
51
from airflow import DAG
52
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
53
from datetime import datetime
54
55
with DAG('snowflake_example', start_date=datetime(2024, 1, 1)) as dag:
56
57
create_table = SnowflakeSqlApiOperator(
58
task_id='create_sales_table',
59
snowflake_conn_id='snowflake_default',
60
sql='''
61
CREATE TABLE IF NOT EXISTS sales (
62
id INT,
63
amount DECIMAL(10,2),
64
date DATE
65
)
66
''',
67
statement_count=1
68
)
69
```
70
71
### Snowpark Integration
72
73
```python
74
from airflow.providers.snowflake.decorators.snowpark import snowpark_task
75
76
@snowpark_task
77
def process_with_snowpark(session):
78
# Snowpark session is automatically injected
79
df = session.sql("SELECT * FROM raw_data")
80
81
# Transform data using Snowpark DataFrame API
82
transformed_df = df.filter(df.col("status") == "active")
83
84
# Write back to Snowflake
85
transformed_df.write.save_as_table("processed_data", mode="overwrite")
86
87
return transformed_df.count()
88
```
89
90
## Architecture
91
92
The provider is organized into several key components:
93
94
- **Hooks**: Core connection and execution layer for database operations and API interactions
95
- **Operators**: Task implementations for common Snowflake operations and data quality checks
96
- **Decorators**: Python task decorators for native Snowpark integration
97
- **Transfers**: Specialized operators for bulk data loading from cloud storage
98
- **Triggers**: Asynchronous execution support for deferrable tasks
99
- **Utils**: Helper functions for parameter handling, authentication, and lineage tracking
100
101
The provider supports both traditional SQL execution patterns and modern Snowpark Python workflows, enabling comprehensive data engineering pipelines within Apache Airflow's orchestration framework.
102
103
## Capabilities
104
105
### Database Connections and Hooks
106
107
Core connectivity layer providing both standard database connections and Snowflake SQL API integration. Supports multiple authentication methods, connection pooling, and session management.
108
109
```python { .api }
110
class SnowflakeHook(DbApiHook):
111
def __init__(self, snowflake_conn_id: str = "snowflake_default", **kwargs): ...
112
def get_conn(self) -> SnowflakeConnection: ...
113
def run(self, sql: str | Iterable[str], **kwargs): ...
114
def get_snowpark_session(self): ...
115
116
class SnowflakeSqlApiHook(SnowflakeHook):
117
def __init__(self, snowflake_conn_id: str, **kwargs): ...
118
def execute_query(self, sql: str, statement_count: int, **kwargs) -> list[str]: ...
119
def wait_for_query(self, query_id: str, **kwargs) -> dict[str, str | list[str]]: ...
120
```
121
122
[Database Connections and Hooks](./hooks.md)
123
124
### SQL Operators and Data Quality
125
126
Task operators for executing SQL commands, performing data quality checks, and managing database operations with built-in validation and monitoring capabilities.
127
128
```python { .api }
129
class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
130
def __init__(self, *, snowflake_conn_id: str = "snowflake_default", **kwargs): ...
131
132
class SnowflakeCheckOperator(SQLCheckOperator):
133
def __init__(self, *, sql: str, snowflake_conn_id: str = "snowflake_default", **kwargs): ...
134
135
class SnowflakeValueCheckOperator(SQLValueCheckOperator):
136
def __init__(self, *, sql: str, pass_value: Any, **kwargs): ...
137
```
138
139
[SQL Operators and Data Quality](./operators.md)
140
141
### Snowpark Integration
142
143
Native Snowpark Python integration enabling DataFrame-based data processing workflows directly within Airflow tasks with automatic session management.
144
145
```python { .api }
146
class SnowparkOperator(PythonOperator):
147
def __init__(self, *, python_callable: Callable, snowflake_conn_id: str = "snowflake_default", **kwargs): ...
148
149
def snowpark_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: ...
150
```
151
152
[Snowpark Integration](./snowpark.md)
153
154
### Data Transfer Operations
155
156
Specialized operators for efficient bulk data loading from cloud storage services (S3, GCS, Azure Blob) into Snowflake using COPY INTO operations.
157
158
```python { .api }
159
class CopyFromExternalStageToSnowflakeOperator(BaseOperator):
160
def __init__(
161
self, *,
162
table: str,
163
stage: str,
164
file_format: str,
165
snowflake_conn_id: str = "snowflake_default",
166
**kwargs
167
): ...
168
```
169
170
[Data Transfer Operations](./transfers.md)
171
172
### Asynchronous Execution
173
174
Deferrable task execution through triggers, enabling efficient resource utilization for long-running Snowflake operations without blocking worker slots.
175
176
```python { .api }
177
class SnowflakeSqlApiTrigger(BaseTrigger):
178
def __init__(
179
self,
180
poll_interval: float,
181
query_ids: list[str],
182
snowflake_conn_id: str,
183
**kwargs
184
): ...
185
```
186
187
[Asynchronous Execution](./triggers.md)
188
189
### Utility Functions
190
191
Helper functions for parameter formatting, authentication token management, OpenLineage integration, and Snowpark session injection.
192
193
```python { .api }
194
def enclose_param(param: str) -> str: ...
195
def inject_session_into_op_kwargs(python_callable: Callable, op_kwargs: dict, session: Session | None) -> dict: ...
196
197
class JWTGenerator:
198
def __init__(self, account: str, user: str, private_key: Any, **kwargs): ...
199
def get_token(self) -> str | None: ...
200
```
201
202
[Utility Functions](./utils.md)
203
204
## Connection Configuration
205
206
The provider uses Airflow connections with connection type `snowflake`. Required connection parameters:
207
208
- **Host**: Snowflake account identifier
209
- **Login**: Username
210
- **Password**: Password or private key
211
- **Schema**: Default schema
212
- **Extra**: JSON with additional parameters like `warehouse`, `database`, `role`, `authenticator`
213
214
## Error Handling
215
216
All operators and hooks provide comprehensive error handling with detailed exception information. Common exceptions include connection timeouts, authentication failures, and SQL execution errors with specific Snowflake error codes and messages.