or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdoperators.mdsnowpark.mdtransfers.mdtriggers.mdutils.md

index.mddocs/

0

# Apache Airflow Snowflake Provider

1

2

A comprehensive provider package for integrating Apache Airflow with Snowflake, the cloud data warehouse platform. This provider enables complete data pipeline orchestration with Snowflake including database operations, Snowpark integration, data transfers from cloud storage, and asynchronous execution patterns.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-snowflake

7

- **Language**: Python

8

- **Python Version**: Requires >=3.10, <3.14

9

- **Installation**: `pip install apache-airflow-providers-snowflake`

10

- **Apache Airflow Version**: Requires 2.10.0+

11

12

## Core Imports

13

14

```python

15

from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

16

from airflow.providers.snowflake.hooks.snowflake_sql_api import SnowflakeSqlApiHook

17

from airflow.providers.snowflake.operators.snowflake import (

18

SnowflakeCheckOperator,

19

SnowflakeValueCheckOperator,

20

SnowflakeIntervalCheckOperator,

21

SnowflakeSqlApiOperator

22

)

23

from airflow.providers.snowflake.operators.snowpark import SnowparkOperator

24

from airflow.providers.snowflake.decorators.snowpark import snowpark_task

25

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

26

from airflow.providers.snowflake.triggers.snowflake_trigger import SnowflakeSqlApiTrigger

27

```

28

29

## Basic Usage

30

31

### Simple SQL Execution with Hook

32

33

```python

34

from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

35

36

def process_data(**context):

37

hook = SnowflakeHook(snowflake_conn_id='my_snowflake_conn')

38

39

# Execute SQL query

40

result = hook.run(

41

sql="SELECT * FROM sales WHERE date >= '2024-01-01'",

42

handler=lambda cursor: cursor.fetchall()

43

)

44

45

return result

46

```

47

48

### Basic Operator Usage

49

50

```python

51

from airflow import DAG

52

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

53

from datetime import datetime

54

55

with DAG('snowflake_example', start_date=datetime(2024, 1, 1)) as dag:

56

57

create_table = SnowflakeSqlApiOperator(

58

task_id='create_sales_table',

59

snowflake_conn_id='snowflake_default',

60

sql='''

61

CREATE TABLE IF NOT EXISTS sales (

62

id INT,

63

amount DECIMAL(10,2),

64

date DATE

65

)

66

''',

67

statement_count=1

68

)

69

```

70

71

### Snowpark Integration

72

73

```python

74

from airflow.providers.snowflake.decorators.snowpark import snowpark_task

75

76

@snowpark_task

77

def process_with_snowpark(session):

78

# Snowpark session is automatically injected

79

df = session.sql("SELECT * FROM raw_data")

80

81

# Transform data using Snowpark DataFrame API

82

transformed_df = df.filter(df.col("status") == "active")

83

84

# Write back to Snowflake

85

transformed_df.write.save_as_table("processed_data", mode="overwrite")

86

87

return transformed_df.count()

88

```

89

90

## Architecture

91

92

The provider is organized into several key components:

93

94

- **Hooks**: Core connection and execution layer for database operations and API interactions

95

- **Operators**: Task implementations for common Snowflake operations and data quality checks

96

- **Decorators**: Python task decorators for native Snowpark integration

97

- **Transfers**: Specialized operators for bulk data loading from cloud storage

98

- **Triggers**: Asynchronous execution support for deferrable tasks

99

- **Utils**: Helper functions for parameter handling, authentication, and lineage tracking

100

101

The provider supports both traditional SQL execution patterns and modern Snowpark Python workflows, enabling comprehensive data engineering pipelines within Apache Airflow's orchestration framework.

102

103

## Capabilities

104

105

### Database Connections and Hooks

106

107

Core connectivity layer providing both standard database connections and Snowflake SQL API integration. Supports multiple authentication methods, connection pooling, and session management.

108

109

```python { .api }

110

class SnowflakeHook(DbApiHook):

111

def __init__(self, snowflake_conn_id: str = "snowflake_default", **kwargs): ...

112

def get_conn(self) -> SnowflakeConnection: ...

113

def run(self, sql: str | Iterable[str], **kwargs): ...

114

def get_snowpark_session(self): ...

115

116

class SnowflakeSqlApiHook(SnowflakeHook):

117

def __init__(self, snowflake_conn_id: str, **kwargs): ...

118

def execute_query(self, sql: str, statement_count: int, **kwargs) -> list[str]: ...

119

def wait_for_query(self, query_id: str, **kwargs) -> dict[str, str | list[str]]: ...

120

```

121

122

[Database Connections and Hooks](./hooks.md)

123

124

### SQL Operators and Data Quality

125

126

Task operators for executing SQL commands, performing data quality checks, and managing database operations with built-in validation and monitoring capabilities.

127

128

```python { .api }

129

class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):

130

def __init__(self, *, snowflake_conn_id: str = "snowflake_default", **kwargs): ...

131

132

class SnowflakeCheckOperator(SQLCheckOperator):

133

def __init__(self, *, sql: str, snowflake_conn_id: str = "snowflake_default", **kwargs): ...

134

135

class SnowflakeValueCheckOperator(SQLValueCheckOperator):

136

def __init__(self, *, sql: str, pass_value: Any, **kwargs): ...

137

```

138

139

[SQL Operators and Data Quality](./operators.md)

140

141

### Snowpark Integration

142

143

Native Snowpark Python integration enabling DataFrame-based data processing workflows directly within Airflow tasks with automatic session management.

144

145

```python { .api }

146

class SnowparkOperator(PythonOperator):

147

def __init__(self, *, python_callable: Callable, snowflake_conn_id: str = "snowflake_default", **kwargs): ...

148

149

def snowpark_task(python_callable: Callable | None = None, **kwargs) -> TaskDecorator: ...

150

```

151

152

[Snowpark Integration](./snowpark.md)

153

154

### Data Transfer Operations

155

156

Specialized operators for efficient bulk data loading from cloud storage services (S3, GCS, Azure Blob) into Snowflake using COPY INTO operations.

157

158

```python { .api }

159

class CopyFromExternalStageToSnowflakeOperator(BaseOperator):

160

def __init__(

161

self, *,

162

table: str,

163

stage: str,

164

file_format: str,

165

snowflake_conn_id: str = "snowflake_default",

166

**kwargs

167

): ...

168

```

169

170

[Data Transfer Operations](./transfers.md)

171

172

### Asynchronous Execution

173

174

Deferrable task execution through triggers, enabling efficient resource utilization for long-running Snowflake operations without blocking worker slots.

175

176

```python { .api }

177

class SnowflakeSqlApiTrigger(BaseTrigger):

178

def __init__(

179

self,

180

poll_interval: float,

181

query_ids: list[str],

182

snowflake_conn_id: str,

183

**kwargs

184

): ...

185

```

186

187

[Asynchronous Execution](./triggers.md)

188

189

### Utility Functions

190

191

Helper functions for parameter formatting, authentication token management, OpenLineage integration, and Snowpark session injection.

192

193

```python { .api }

194

def enclose_param(param: str) -> str: ...

195

def inject_session_into_op_kwargs(python_callable: Callable, op_kwargs: dict, session: Session | None) -> dict: ...

196

197

class JWTGenerator:

198

def __init__(self, account: str, user: str, private_key: Any, **kwargs): ...

199

def get_token(self) -> str | None: ...

200

```

201

202

[Utility Functions](./utils.md)

203

204

## Connection Configuration

205

206

The provider uses Airflow connections with connection type `snowflake`. Required connection parameters:

207

208

- **Host**: Snowflake account identifier

209

- **Login**: Username

210

- **Password**: Password or private key

211

- **Schema**: Default schema

212

- **Extra**: JSON with additional parameters like `warehouse`, `database`, `role`, `authenticator`

213

214

## Error Handling

215

216

All operators and hooks provide comprehensive error handling with detailed exception information. Common exceptions include connection timeouts, authentication failures, and SQL execution errors with specific Snowflake error codes and messages.