or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdoperators.mdsnowpark.mdtransfers.mdtriggers.mdutils.md

snowpark.mddocs/

0

# Snowpark Integration

1

2

Native Snowpark Python integration enabling DataFrame-based data processing workflows directly within Airflow tasks. This capability provides automatic Snowpark session management, seamless integration with Airflow's task execution model, and native Python-based data transformations that run directly in Snowflake's compute environment.

3

4

## Capabilities

5

6

### Snowpark Operator

7

8

Operator for executing Python functions with Snowpark integration, automatically injecting a configured Snowpark session into the callable function.

9

10

```python { .api }

11

class SnowparkOperator(PythonOperator):

12

"""

13

Execute Python function with Snowpark Python code.

14

Automatically injects a Snowpark session configured with connection parameters.

15

"""

16

17

def __init__(

18

self,

19

*,

20

snowflake_conn_id: str = "snowflake_default",

21

python_callable: Callable,

22

op_args: Collection[Any] | None = None,

23

op_kwargs: Mapping[str, Any] | None = None,

24

templates_dict: dict[str, Any] | None = None,

25

templates_exts: Sequence[str] | None = None,

26

show_return_value_in_logs: bool = True,

27

warehouse: str | None = None,

28

database: str | None = None,

29

schema: str | None = None,

30

role: str | None = None,

31

authenticator: str | None = None,

32

session_parameters: dict | None = None,

33

**kwargs,

34

):

35

"""

36

Initialize Snowpark operator.

37

38

Parameters:

39

- snowflake_conn_id: Snowflake connection ID

40

- python_callable: Python function to execute with Snowpark session

41

- op_args: Positional arguments for python_callable

42

- op_kwargs: Keyword arguments for python_callable

43

- templates_dict: Dictionary of templates for Jinja templating

44

- templates_exts: File extensions to apply Jinja templating

45

- show_return_value_in_logs: Show function return value in logs

46

- warehouse: Snowflake warehouse name

47

- database: Snowflake database name

48

- schema: Snowflake schema name

49

- role: Snowflake role name

50

- authenticator: Authentication method

51

- session_parameters: Session-level parameters

52

"""

53

```

54

55

#### Execution Method

56

57

```python { .api }

58

def execute_callable(self):

59

"""

60

Execute the callable with Snowpark session injection.

61

Automatically provides 'session' parameter to callable if defined in signature.

62

63

Returns:

64

Result of python_callable execution

65

"""

66

```

67

68

### Task Decorator

69

70

Decorator function for converting regular Python functions into Snowpark-enabled Airflow tasks with automatic session management.

71

72

```python { .api }

73

def snowpark_task(

74

python_callable: Callable | None = None,

75

multiple_outputs: bool | None = None,

76

**kwargs,

77

) -> TaskDecorator:

78

"""

79

Decorator to wrap a function containing Snowpark code into an Airflow operator.

80

81

Parameters:

82

- python_callable: Function to be decorated (auto-provided when used as decorator)

83

- multiple_outputs: Enable multiple outputs for XCom

84

- **kwargs: Additional arguments passed to SnowparkOperator

85

86

Returns:

87

TaskDecorator for creating Snowpark tasks

88

"""

89

```

90

91

### Internal Decorator Class

92

93

```python { .api }

94

class _SnowparkDecoratedOperator(DecoratedOperator, SnowparkOperator):

95

"""

96

Internal decorated operator for Snowpark tasks.

97

Combines DecoratedOperator functionality with Snowpark session management.

98

"""

99

100

custom_operator_name = "@task.snowpark"

101

```

102

103

## Usage Examples

104

105

### Basic Snowpark Operator Usage

106

107

```python

108

from airflow import DAG

109

from airflow.providers.snowflake.operators.snowpark import SnowparkOperator

110

from datetime import datetime

111

112

def process_sales_data(session, **context):

113

"""

114

Process sales data using Snowpark DataFrame API.

115

116

Args:

117

session: Snowpark session (automatically injected)

118

**context: Airflow context variables

119

"""

120

# Read data using Snowpark

121

raw_sales = session.table("raw.sales_transactions")

122

123

# Transform data using DataFrame API

124

daily_sales = (

125

raw_sales

126

.filter(raw_sales.col("transaction_date") >= context['ds'])

127

.filter(raw_sales.col("transaction_date") < context['next_ds'])

128

.group_by("region", "product_category")

129

.agg({

130

"amount": "sum",

131

"transaction_id": "count"

132

})

133

.with_column_renamed("SUM(AMOUNT)", "total_sales")

134

.with_column_renamed("COUNT(TRANSACTION_ID)", "transaction_count")

135

)

136

137

# Write results back to Snowflake

138

daily_sales.write.save_as_table(

139

"analytics.daily_sales_summary",

140

mode="append"

141

)

142

143

# Return metrics for downstream tasks

144

total_records = daily_sales.count()

145

return {"processed_records": total_records}

146

147

with DAG(

148

'snowpark_processing_example',

149

start_date=datetime(2024, 1, 1),

150

schedule_interval='@daily',

151

catchup=False

152

) as dag:

153

154

process_data = SnowparkOperator(

155

task_id='process_daily_sales',

156

snowflake_conn_id='snowflake_prod',

157

python_callable=process_sales_data,

158

warehouse='ANALYTICS_WH',

159

database='ANALYTICS_DB',

160

schema='PUBLIC'

161

)

162

```

163

164

### Task Decorator Usage

165

166

```python

167

from airflow import DAG

168

from airflow.providers.snowflake.decorators.snowpark import snowpark_task

169

from datetime import datetime

170

171

@snowpark_task(

172

snowflake_conn_id='snowflake_prod',

173

warehouse='ML_WH',

174

database='FEATURE_STORE'

175

)

176

def create_ml_features(session, **context):

177

"""

178

Create machine learning features using Snowpark.

179

180

Args:

181

session: Snowpark session (automatically injected)

182

"""

183

# Load base tables

184

customers = session.table("raw.customers")

185

orders = session.table("raw.orders")

186

187

# Create feature engineering pipeline

188

customer_features = (

189

customers

190

.join(orders, customers.col("customer_id") == orders.col("customer_id"), "left")

191

.group_by("customer_id", "customer_segment", "registration_date")

192

.agg({

193

"order_amount": "sum",

194

"order_id": "count",

195

"order_date": "max"

196

})

197

.with_column_renamed("SUM(ORDER_AMOUNT)", "lifetime_value")

198

.with_column_renamed("COUNT(ORDER_ID)", "total_orders")

199

.with_column_renamed("MAX(ORDER_DATE)", "last_order_date")

200

)

201

202

# Add derived features

203

from snowflake.snowpark.functions import col, when, datediff, current_date

204

205

enriched_features = customer_features.with_columns([

206

when(col("total_orders") > 10, "high_value")

207

.when(col("total_orders") > 5, "medium_value")

208

.otherwise("low_value").alias("customer_tier"),

209

210

datediff("day", col("last_order_date"), current_date()).alias("days_since_last_order"),

211

212

(col("lifetime_value") / col("total_orders")).alias("avg_order_value")

213

])

214

215

# Save feature table

216

enriched_features.write.save_as_table(

217

"features.customer_features_v1",

218

mode="overwrite"

219

)

220

221

return enriched_features.count()

222

223

@snowpark_task(

224

snowflake_conn_id='snowflake_prod',

225

warehouse='ML_WH'

226

)

227

def train_model_features(session, **context):

228

"""

229

Prepare training dataset using Snowpark ML functions.

230

"""

231

# Load feature table

232

features = session.table("features.customer_features_v1")

233

234

# Prepare training data with labels

235

training_data = (

236

features

237

.filter(col("days_since_last_order") <= 365) # Active customers only

238

.with_column(

239

"will_churn",

240

when(col("days_since_last_order") > 90, 1).otherwise(0)

241

)

242

.select([

243

"customer_id",

244

"lifetime_value",

245

"total_orders",

246

"avg_order_value",

247

"days_since_last_order",

248

"will_churn"

249

])

250

)

251

252

# Save training dataset

253

training_data.write.save_as_table(

254

"ml.churn_training_data",

255

mode="overwrite"

256

)

257

258

return training_data.count()

259

260

with DAG(

261

'ml_feature_pipeline',

262

start_date=datetime(2024, 1, 1),

263

schedule_interval='@weekly',

264

catchup=False

265

) as dag:

266

267

# Tasks are automatically created from decorated functions

268

features_task = create_ml_features()

269

training_task = train_model_features()

270

271

features_task >> training_task

272

```

273

274

### Complex Data Processing Pipeline

275

276

```python

277

from airflow.providers.snowflake.decorators.snowpark import snowpark_task

278

from snowflake.snowpark.functions import col, sum as spark_sum, count, avg, max as spark_max

279

from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, DoubleType

280

281

@snowpark_task(

282

snowflake_conn_id='snowflake_prod',

283

warehouse='ETL_WH',

284

multiple_outputs=True

285

)

286

def comprehensive_etl_process(session, **context):

287

"""

288

Comprehensive ETL process using Snowpark DataFrame API.

289

"""

290

execution_date = context['ds']

291

292

# 1. Data Quality Checks

293

raw_data = session.table("raw.transaction_stream")

294

295

quality_metrics = {

296

'total_records': raw_data.count(),

297

'null_customer_ids': raw_data.filter(col("customer_id").is_null()).count(),

298

'invalid_amounts': raw_data.filter(col("amount") <= 0).count()

299

}

300

301

# 2. Data Cleaning and Transformation

302

clean_data = (

303

raw_data

304

.filter(col("customer_id").is_not_null())

305

.filter(col("amount") > 0)

306

.filter(col("transaction_date") >= execution_date)

307

.with_column("amount_category",

308

when(col("amount") >= 1000, "high")

309

.when(col("amount") >= 100, "medium")

310

.otherwise("low"))

311

)

312

313

# 3. Aggregation and Business Logic

314

customer_summary = (

315

clean_data

316

.group_by("customer_id", "amount_category")

317

.agg({

318

"amount": "sum",

319

"transaction_id": "count"

320

})

321

.with_column_renamed("SUM(AMOUNT)", "total_spent")

322

.with_column_renamed("COUNT(TRANSACTION_ID)", "transaction_count")

323

)

324

325

# 4. Advanced Analytics

326

pivot_summary = customer_summary.pivot(

327

"amount_category",

328

["high", "medium", "low"]

329

).agg({

330

"total_spent": "sum",

331

"transaction_count": "sum"

332

})

333

334

# 5. Write Results to Multiple Tables

335

336

# Clean transactional data

337

clean_data.write.save_as_table(

338

f"staging.clean_transactions_{execution_date.replace('-', '_')}",

339

mode="overwrite"

340

)

341

342

# Customer summaries

343

customer_summary.write.save_as_table(

344

"analytics.customer_transaction_summary",

345

mode="append"

346

)

347

348

# Pivot analysis

349

pivot_summary.write.save_as_table(

350

"analytics.spending_category_analysis",

351

mode="append"

352

)

353

354

# Return comprehensive metrics

355

return {

356

'quality_metrics': quality_metrics,

357

'processed_customers': customer_summary.select("customer_id").distinct().count(),

358

'clean_records': clean_data.count(),

359

'summary_records': customer_summary.count()

360

}

361

362

@snowpark_task(

363

snowflake_conn_id='snowflake_prod',

364

warehouse='ANALYTICS_WH'

365

)

366

def generate_business_reports(session, processed_metrics, **context):

367

"""

368

Generate business reports using processed data.

369

370

Args:

371

processed_metrics: Output from previous Snowpark task

372

"""

373

execution_date = context['ds']

374

375

# Create executive summary report

376

summary_data = session.sql(f"""

377

SELECT

378

'{execution_date}' as report_date,

379

COUNT(DISTINCT customer_id) as active_customers,

380

SUM(total_spent) as total_revenue,

381

AVG(total_spent) as avg_customer_spend,

382

SUM(transaction_count) as total_transactions

383

FROM analytics.customer_transaction_summary

384

WHERE DATE(created_at) = '{execution_date}'

385

""")

386

387

# Save executive dashboard data

388

summary_data.write.save_as_table(

389

"reports.daily_executive_summary",

390

mode="append"

391

)

392

393

return {

394

'report_generated': True,

395

'input_metrics': processed_metrics

396

}

397

```

398

399

### Session Configuration and Advanced Features

400

401

```python

402

@snowpark_task(

403

snowflake_conn_id='snowflake_prod',

404

warehouse='HEAVY_COMPUTE_WH',

405

session_parameters={

406

'QUERY_TAG': 'airflow_snowpark_processing',

407

'MULTI_STATEMENT_COUNT': 5,

408

'AUTOCOMMIT': True

409

}

410

)

411

def advanced_snowpark_processing(session, **context):

412

"""

413

Advanced Snowpark processing with custom session configuration.

414

"""

415

# Enable query profiling

416

session.sql("ALTER SESSION SET USE_CACHED_RESULT = FALSE").collect()

417

418

# Use Snowpark ML functions (if available)

419

try:

420

from snowflake.ml.functions import detect_anomalies

421

422

# Load time series data

423

ts_data = session.table("analytics.daily_metrics")

424

425

# Detect anomalies using ML functions

426

anomaly_results = ts_data.select(

427

"*",

428

detect_anomalies(col("metric_value")).over(

429

partition_by=col("metric_type"),

430

order_by=col("date")

431

).alias("is_anomaly")

432

)

433

434

# Save anomaly detection results

435

anomaly_results.write.save_as_table(

436

"ml.anomaly_detection_results",

437

mode="append"

438

)

439

440

except ImportError:

441

# Fallback to statistical anomaly detection

442

session.sql("""

443

CREATE OR REPLACE TABLE ml.anomaly_detection_results AS

444

SELECT *,

445

CASE WHEN ABS(metric_value - AVG(metric_value) OVER (

446

PARTITION BY metric_type

447

ORDER BY date

448

ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING

449

)) > 2 * STDDEV(metric_value) OVER (

450

PARTITION BY metric_type

451

ORDER BY date

452

ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING

453

) THEN TRUE ELSE FALSE END as is_anomaly

454

FROM analytics.daily_metrics

455

""").collect()

456

457

return {"anomaly_detection_completed": True}

458

```

459

460

## Session Management

461

462

The Snowpark integration automatically handles:

463

464

- **Session Creation**: Configured with connection parameters

465

- **Authentication**: Uses Airflow connection credentials

466

- **Resource Management**: Automatic session cleanup after task completion

467

- **Error Handling**: Comprehensive exception handling with cleanup

468

469

## DataFrame Operations

470

471

Snowpark provides a rich DataFrame API for data processing:

472

473

### Data Loading

474

- `session.table()`: Load existing tables

475

- `session.sql()`: Execute SQL and return DataFrame

476

- `session.read.options().csv()`: Read from files

477

478

### Transformations

479

- `filter()`: Filter rows based on conditions

480

- `select()`: Select specific columns

481

- `group_by().agg()`: Grouping and aggregation

482

- `join()`: Join operations between DataFrames

483

- `pivot()`: Pivot table operations

484

- `with_column()`: Add computed columns

485

486

### Actions

487

- `collect()`: Materialize DataFrame results

488

- `count()`: Count rows

489

- `write.save_as_table()`: Save to Snowflake table

490

- `show()`: Display sample data

491

492

## Error Handling

493

494

Snowpark integration provides comprehensive error handling:

495

496

- **Connection Errors**: Session creation failures, authentication issues

497

- **DataFrame Errors**: Invalid operations, schema mismatches, resource limits

498

- **SQL Errors**: Embedded SQL execution failures within Snowpark operations

499

- **Resource Errors**: Warehouse capacity, memory limitations

500

501

All errors include detailed stack traces and Snowflake-specific error information for troubleshooting.