0
# Snowpark Integration
1
2
Native Snowpark Python integration enabling DataFrame-based data processing workflows directly within Airflow tasks. This capability provides automatic Snowpark session management, seamless integration with Airflow's task execution model, and native Python-based data transformations that run directly in Snowflake's compute environment.
3
4
## Capabilities
5
6
### Snowpark Operator
7
8
Operator for executing Python functions with Snowpark integration, automatically injecting a configured Snowpark session into the callable function.
9
10
```python { .api }
11
class SnowparkOperator(PythonOperator):
12
"""
13
Execute Python function with Snowpark Python code.
14
Automatically injects a Snowpark session configured with connection parameters.
15
"""
16
17
def __init__(
18
self,
19
*,
20
snowflake_conn_id: str = "snowflake_default",
21
python_callable: Callable,
22
op_args: Collection[Any] | None = None,
23
op_kwargs: Mapping[str, Any] | None = None,
24
templates_dict: dict[str, Any] | None = None,
25
templates_exts: Sequence[str] | None = None,
26
show_return_value_in_logs: bool = True,
27
warehouse: str | None = None,
28
database: str | None = None,
29
schema: str | None = None,
30
role: str | None = None,
31
authenticator: str | None = None,
32
session_parameters: dict | None = None,
33
**kwargs,
34
):
35
"""
36
Initialize Snowpark operator.
37
38
Parameters:
39
- snowflake_conn_id: Snowflake connection ID
40
- python_callable: Python function to execute with Snowpark session
41
- op_args: Positional arguments for python_callable
42
- op_kwargs: Keyword arguments for python_callable
43
- templates_dict: Dictionary of templates for Jinja templating
44
- templates_exts: File extensions to apply Jinja templating
45
- show_return_value_in_logs: Show function return value in logs
46
- warehouse: Snowflake warehouse name
47
- database: Snowflake database name
48
- schema: Snowflake schema name
49
- role: Snowflake role name
50
- authenticator: Authentication method
51
- session_parameters: Session-level parameters
52
"""
53
```
54
55
#### Execution Method
56
57
```python { .api }
58
def execute_callable(self):
59
"""
60
Execute the callable with Snowpark session injection.
61
Automatically provides 'session' parameter to callable if defined in signature.
62
63
Returns:
64
Result of python_callable execution
65
"""
66
```
67
68
### Task Decorator
69
70
Decorator function for converting regular Python functions into Snowpark-enabled Airflow tasks with automatic session management.
71
72
```python { .api }
73
def snowpark_task(
74
python_callable: Callable | None = None,
75
multiple_outputs: bool | None = None,
76
**kwargs,
77
) -> TaskDecorator:
78
"""
79
Decorator to wrap a function containing Snowpark code into an Airflow operator.
80
81
Parameters:
82
- python_callable: Function to be decorated (auto-provided when used as decorator)
83
- multiple_outputs: Enable multiple outputs for XCom
84
- **kwargs: Additional arguments passed to SnowparkOperator
85
86
Returns:
87
TaskDecorator for creating Snowpark tasks
88
"""
89
```
90
91
### Internal Decorator Class
92
93
```python { .api }
94
class _SnowparkDecoratedOperator(DecoratedOperator, SnowparkOperator):
95
"""
96
Internal decorated operator for Snowpark tasks.
97
Combines DecoratedOperator functionality with Snowpark session management.
98
"""
99
100
custom_operator_name = "@task.snowpark"
101
```
102
103
## Usage Examples
104
105
### Basic Snowpark Operator Usage
106
107
```python
108
from airflow import DAG
109
from airflow.providers.snowflake.operators.snowpark import SnowparkOperator
110
from datetime import datetime
111
112
def process_sales_data(session, **context):
113
"""
114
Process sales data using Snowpark DataFrame API.
115
116
Args:
117
session: Snowpark session (automatically injected)
118
**context: Airflow context variables
119
"""
120
# Read data using Snowpark
121
raw_sales = session.table("raw.sales_transactions")
122
123
# Transform data using DataFrame API
124
daily_sales = (
125
raw_sales
126
.filter(raw_sales.col("transaction_date") >= context['ds'])
127
.filter(raw_sales.col("transaction_date") < context['next_ds'])
128
.group_by("region", "product_category")
129
.agg({
130
"amount": "sum",
131
"transaction_id": "count"
132
})
133
.with_column_renamed("SUM(AMOUNT)", "total_sales")
134
.with_column_renamed("COUNT(TRANSACTION_ID)", "transaction_count")
135
)
136
137
# Write results back to Snowflake
138
daily_sales.write.save_as_table(
139
"analytics.daily_sales_summary",
140
mode="append"
141
)
142
143
# Return metrics for downstream tasks
144
total_records = daily_sales.count()
145
return {"processed_records": total_records}
146
147
with DAG(
148
'snowpark_processing_example',
149
start_date=datetime(2024, 1, 1),
150
schedule_interval='@daily',
151
catchup=False
152
) as dag:
153
154
process_data = SnowparkOperator(
155
task_id='process_daily_sales',
156
snowflake_conn_id='snowflake_prod',
157
python_callable=process_sales_data,
158
warehouse='ANALYTICS_WH',
159
database='ANALYTICS_DB',
160
schema='PUBLIC'
161
)
162
```
163
164
### Task Decorator Usage
165
166
```python
167
from airflow import DAG
168
from airflow.providers.snowflake.decorators.snowpark import snowpark_task
169
from datetime import datetime
170
171
@snowpark_task(
172
snowflake_conn_id='snowflake_prod',
173
warehouse='ML_WH',
174
database='FEATURE_STORE'
175
)
176
def create_ml_features(session, **context):
177
"""
178
Create machine learning features using Snowpark.
179
180
Args:
181
session: Snowpark session (automatically injected)
182
"""
183
# Load base tables
184
customers = session.table("raw.customers")
185
orders = session.table("raw.orders")
186
187
# Create feature engineering pipeline
188
customer_features = (
189
customers
190
.join(orders, customers.col("customer_id") == orders.col("customer_id"), "left")
191
.group_by("customer_id", "customer_segment", "registration_date")
192
.agg({
193
"order_amount": "sum",
194
"order_id": "count",
195
"order_date": "max"
196
})
197
.with_column_renamed("SUM(ORDER_AMOUNT)", "lifetime_value")
198
.with_column_renamed("COUNT(ORDER_ID)", "total_orders")
199
.with_column_renamed("MAX(ORDER_DATE)", "last_order_date")
200
)
201
202
# Add derived features
203
from snowflake.snowpark.functions import col, when, datediff, current_date
204
205
enriched_features = customer_features.with_columns([
206
when(col("total_orders") > 10, "high_value")
207
.when(col("total_orders") > 5, "medium_value")
208
.otherwise("low_value").alias("customer_tier"),
209
210
datediff("day", col("last_order_date"), current_date()).alias("days_since_last_order"),
211
212
(col("lifetime_value") / col("total_orders")).alias("avg_order_value")
213
])
214
215
# Save feature table
216
enriched_features.write.save_as_table(
217
"features.customer_features_v1",
218
mode="overwrite"
219
)
220
221
return enriched_features.count()
222
223
@snowpark_task(
224
snowflake_conn_id='snowflake_prod',
225
warehouse='ML_WH'
226
)
227
def train_model_features(session, **context):
228
"""
229
Prepare training dataset using Snowpark ML functions.
230
"""
231
# Load feature table
232
features = session.table("features.customer_features_v1")
233
234
# Prepare training data with labels
235
training_data = (
236
features
237
.filter(col("days_since_last_order") <= 365) # Active customers only
238
.with_column(
239
"will_churn",
240
when(col("days_since_last_order") > 90, 1).otherwise(0)
241
)
242
.select([
243
"customer_id",
244
"lifetime_value",
245
"total_orders",
246
"avg_order_value",
247
"days_since_last_order",
248
"will_churn"
249
])
250
)
251
252
# Save training dataset
253
training_data.write.save_as_table(
254
"ml.churn_training_data",
255
mode="overwrite"
256
)
257
258
return training_data.count()
259
260
with DAG(
261
'ml_feature_pipeline',
262
start_date=datetime(2024, 1, 1),
263
schedule_interval='@weekly',
264
catchup=False
265
) as dag:
266
267
# Tasks are automatically created from decorated functions
268
features_task = create_ml_features()
269
training_task = train_model_features()
270
271
features_task >> training_task
272
```
273
274
### Complex Data Processing Pipeline
275
276
```python
277
from airflow.providers.snowflake.decorators.snowpark import snowpark_task
278
from snowflake.snowpark.functions import col, sum as spark_sum, count, avg, max as spark_max
279
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, DoubleType
280
281
@snowpark_task(
282
snowflake_conn_id='snowflake_prod',
283
warehouse='ETL_WH',
284
multiple_outputs=True
285
)
286
def comprehensive_etl_process(session, **context):
287
"""
288
Comprehensive ETL process using Snowpark DataFrame API.
289
"""
290
execution_date = context['ds']
291
292
# 1. Data Quality Checks
293
raw_data = session.table("raw.transaction_stream")
294
295
quality_metrics = {
296
'total_records': raw_data.count(),
297
'null_customer_ids': raw_data.filter(col("customer_id").is_null()).count(),
298
'invalid_amounts': raw_data.filter(col("amount") <= 0).count()
299
}
300
301
# 2. Data Cleaning and Transformation
302
clean_data = (
303
raw_data
304
.filter(col("customer_id").is_not_null())
305
.filter(col("amount") > 0)
306
.filter(col("transaction_date") >= execution_date)
307
.with_column("amount_category",
308
when(col("amount") >= 1000, "high")
309
.when(col("amount") >= 100, "medium")
310
.otherwise("low"))
311
)
312
313
# 3. Aggregation and Business Logic
314
customer_summary = (
315
clean_data
316
.group_by("customer_id", "amount_category")
317
.agg({
318
"amount": "sum",
319
"transaction_id": "count"
320
})
321
.with_column_renamed("SUM(AMOUNT)", "total_spent")
322
.with_column_renamed("COUNT(TRANSACTION_ID)", "transaction_count")
323
)
324
325
# 4. Advanced Analytics
326
pivot_summary = customer_summary.pivot(
327
"amount_category",
328
["high", "medium", "low"]
329
).agg({
330
"total_spent": "sum",
331
"transaction_count": "sum"
332
})
333
334
# 5. Write Results to Multiple Tables
335
336
# Clean transactional data
337
clean_data.write.save_as_table(
338
f"staging.clean_transactions_{execution_date.replace('-', '_')}",
339
mode="overwrite"
340
)
341
342
# Customer summaries
343
customer_summary.write.save_as_table(
344
"analytics.customer_transaction_summary",
345
mode="append"
346
)
347
348
# Pivot analysis
349
pivot_summary.write.save_as_table(
350
"analytics.spending_category_analysis",
351
mode="append"
352
)
353
354
# Return comprehensive metrics
355
return {
356
'quality_metrics': quality_metrics,
357
'processed_customers': customer_summary.select("customer_id").distinct().count(),
358
'clean_records': clean_data.count(),
359
'summary_records': customer_summary.count()
360
}
361
362
@snowpark_task(
363
snowflake_conn_id='snowflake_prod',
364
warehouse='ANALYTICS_WH'
365
)
366
def generate_business_reports(session, processed_metrics, **context):
367
"""
368
Generate business reports using processed data.
369
370
Args:
371
processed_metrics: Output from previous Snowpark task
372
"""
373
execution_date = context['ds']
374
375
# Create executive summary report
376
summary_data = session.sql(f"""
377
SELECT
378
'{execution_date}' as report_date,
379
COUNT(DISTINCT customer_id) as active_customers,
380
SUM(total_spent) as total_revenue,
381
AVG(total_spent) as avg_customer_spend,
382
SUM(transaction_count) as total_transactions
383
FROM analytics.customer_transaction_summary
384
WHERE DATE(created_at) = '{execution_date}'
385
""")
386
387
# Save executive dashboard data
388
summary_data.write.save_as_table(
389
"reports.daily_executive_summary",
390
mode="append"
391
)
392
393
return {
394
'report_generated': True,
395
'input_metrics': processed_metrics
396
}
397
```
398
399
### Session Configuration and Advanced Features
400
401
```python
402
@snowpark_task(
403
snowflake_conn_id='snowflake_prod',
404
warehouse='HEAVY_COMPUTE_WH',
405
session_parameters={
406
'QUERY_TAG': 'airflow_snowpark_processing',
407
'MULTI_STATEMENT_COUNT': 5,
408
'AUTOCOMMIT': True
409
}
410
)
411
def advanced_snowpark_processing(session, **context):
412
"""
413
Advanced Snowpark processing with custom session configuration.
414
"""
415
# Enable query profiling
416
session.sql("ALTER SESSION SET USE_CACHED_RESULT = FALSE").collect()
417
418
# Use Snowpark ML functions (if available)
419
try:
420
from snowflake.ml.functions import detect_anomalies
421
422
# Load time series data
423
ts_data = session.table("analytics.daily_metrics")
424
425
# Detect anomalies using ML functions
426
anomaly_results = ts_data.select(
427
"*",
428
detect_anomalies(col("metric_value")).over(
429
partition_by=col("metric_type"),
430
order_by=col("date")
431
).alias("is_anomaly")
432
)
433
434
# Save anomaly detection results
435
anomaly_results.write.save_as_table(
436
"ml.anomaly_detection_results",
437
mode="append"
438
)
439
440
except ImportError:
441
# Fallback to statistical anomaly detection
442
session.sql("""
443
CREATE OR REPLACE TABLE ml.anomaly_detection_results AS
444
SELECT *,
445
CASE WHEN ABS(metric_value - AVG(metric_value) OVER (
446
PARTITION BY metric_type
447
ORDER BY date
448
ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
449
)) > 2 * STDDEV(metric_value) OVER (
450
PARTITION BY metric_type
451
ORDER BY date
452
ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
453
) THEN TRUE ELSE FALSE END as is_anomaly
454
FROM analytics.daily_metrics
455
""").collect()
456
457
return {"anomaly_detection_completed": True}
458
```
459
460
## Session Management
461
462
The Snowpark integration automatically handles:
463
464
- **Session Creation**: Configured with connection parameters
465
- **Authentication**: Uses Airflow connection credentials
466
- **Resource Management**: Automatic session cleanup after task completion
467
- **Error Handling**: Comprehensive exception handling with cleanup
468
469
## DataFrame Operations
470
471
Snowpark provides a rich DataFrame API for data processing:
472
473
### Data Loading
474
- `session.table()`: Load existing tables
475
- `session.sql()`: Execute SQL and return DataFrame
476
- `session.read.options().csv()`: Read from files
477
478
### Transformations
479
- `filter()`: Filter rows based on conditions
480
- `select()`: Select specific columns
481
- `group_by().agg()`: Grouping and aggregation
482
- `join()`: Join operations between DataFrames
483
- `pivot()`: Pivot table operations
484
- `with_column()`: Add computed columns
485
486
### Actions
487
- `collect()`: Materialize DataFrame results
488
- `count()`: Count rows
489
- `write.save_as_table()`: Save to Snowflake table
490
- `show()`: Display sample data
491
492
## Error Handling
493
494
Snowpark integration provides comprehensive error handling:
495
496
- **Connection Errors**: Session creation failures, authentication issues
497
- **DataFrame Errors**: Invalid operations, schema mismatches, resource limits
498
- **SQL Errors**: Embedded SQL execution failures within Snowpark operations
499
- **Resource Errors**: Warehouse capacity, memory limitations
500
501
All errors include detailed stack traces and Snowflake-specific error information for troubleshooting.