or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdoperators.mdsnowpark.mdtransfers.mdtriggers.mdutils.md

operators.mddocs/

0

# SQL Operators and Data Quality

1

2

Task operators for executing SQL commands, performing data quality checks, and managing database operations with built-in validation and monitoring capabilities. These operators provide the core task execution layer for Snowflake operations in Airflow DAGs.

3

4

## Capabilities

5

6

### SQL API Operator

7

8

Primary operator for executing multiple SQL statements using Snowflake's SQL API, with support for asynchronous execution, deferrable mode, and comprehensive parameter binding.

9

10

```python { .api }

11

class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):

12

"""

13

Execute multiple SQL statements using Snowflake SQL API.

14

Supports asynchronous execution and deferrable mode for efficient resource utilization.

15

"""

16

17

LIFETIME = timedelta(minutes=59) # JWT Token lifetime

18

RENEWAL_DELTA = timedelta(minutes=54) # JWT Token renewal time

19

template_fields: Sequence[str] # Includes snowflake_conn_id and SQL fields

20

conn_id_field = "snowflake_conn_id"

21

22

def __init__(

23

self,

24

*,

25

snowflake_conn_id: str = "snowflake_default",

26

warehouse: str | None = None,

27

database: str | None = None,

28

role: str | None = None,

29

schema: str | None = None,

30

authenticator: str | None = None,

31

session_parameters: dict[str, Any] | None = None,

32

poll_interval: int = 5,

33

statement_count: int = 0,

34

token_life_time: timedelta = LIFETIME,

35

token_renewal_delta: timedelta = RENEWAL_DELTA,

36

bindings: dict[str, Any] | None = None,

37

deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),

38

snowflake_api_retry_args: dict[str, Any] | None = None,

39

**kwargs: Any,

40

) -> None:

41

"""

42

Initialize SQL API operator.

43

44

Parameters:

45

- snowflake_conn_id: Snowflake connection ID

46

- warehouse: Snowflake warehouse name

47

- database: Snowflake database name

48

- role: Snowflake role name

49

- schema: Snowflake schema name

50

- authenticator: Authentication method

51

- session_parameters: Session-level parameters

52

- poll_interval: Polling interval for async execution (seconds)

53

- statement_count: Number of SQL statements (0 for auto-detect)

54

- token_life_time: JWT token lifetime

55

- token_renewal_delta: JWT token renewal interval

56

- bindings: Parameter bindings for SQL statements

57

- deferrable: Enable deferrable execution mode

58

- snowflake_api_retry_args: API retry configuration

59

"""

60

```

61

62

#### Execution Methods

63

64

```python { .api }

65

def execute(self, context: Context) -> None:

66

"""

67

Execute the SQL statements.

68

69

Parameters:

70

- context: Airflow task execution context

71

"""

72

73

def poll_on_queries(self):

74

"""

75

Poll on requested queries for completion status.

76

Used in synchronous execution mode.

77

"""

78

79

def execute_complete(

80

self,

81

context: Context,

82

event: dict[str, str | list[str]] | None = None

83

) -> None:

84

"""

85

Callback method when trigger fires in deferrable mode.

86

87

Parameters:

88

- context: Airflow task execution context

89

- event: Event data from trigger

90

"""

91

```

92

93

### Data Quality Check Operators

94

95

Specialized operators for performing data quality validations and monitoring database state with configurable thresholds and alerting.

96

97

#### Basic Check Operator

98

99

```python { .api }

100

class SnowflakeCheckOperator(SQLCheckOperator):

101

"""

102

Perform a check against Snowflake database.

103

Expects a SQL query that returns a single row for boolean evaluation.

104

"""

105

106

template_fields: Sequence[str] = ["sql", "snowflake_conn_id"]

107

template_ext: Sequence[str] = (".sql",)

108

ui_color = "#ededed"

109

conn_id_field = "snowflake_conn_id"

110

111

def __init__(

112

self,

113

*,

114

sql: str,

115

snowflake_conn_id: str = "snowflake_default",

116

parameters: Iterable | Mapping[str, Any] | None = None,

117

warehouse: str | None = None,

118

database: str | None = None,

119

role: str | None = None,

120

schema: str | None = None,

121

authenticator: str | None = None,

122

session_parameters: dict | None = None,

123

**kwargs,

124

) -> None:

125

"""

126

Initialize check operator.

127

128

Parameters:

129

- sql: SQL query returning single boolean result

130

- snowflake_conn_id: Snowflake connection ID

131

- parameters: Query parameters for parameterized SQL

132

- warehouse: Snowflake warehouse name

133

- database: Snowflake database name

134

- role: Snowflake role name

135

- schema: Snowflake schema name

136

- authenticator: Authentication method

137

- session_parameters: Session-level parameters

138

"""

139

```

140

141

#### Value Check Operator

142

143

```python { .api }

144

class SnowflakeValueCheckOperator(SQLValueCheckOperator):

145

"""

146

Perform a simple check using SQL code against a specified value.

147

Supports tolerance levels for numeric comparisons.

148

"""

149

150

template_fields: Sequence[str] = ["sql", "pass_value", "snowflake_conn_id"]

151

conn_id_field = "snowflake_conn_id"

152

153

def __init__(

154

self,

155

*,

156

sql: str,

157

pass_value: Any,

158

tolerance: Any = None,

159

snowflake_conn_id: str = "snowflake_default",

160

parameters: Iterable | Mapping[str, Any] | None = None,

161

warehouse: str | None = None,

162

database: str | None = None,

163

role: str | None = None,

164

schema: str | None = None,

165

authenticator: str | None = None,

166

session_parameters: dict | None = None,

167

**kwargs,

168

) -> None:

169

"""

170

Initialize value check operator.

171

172

Parameters:

173

- sql: SQL query returning single value for comparison

174

- pass_value: Expected value for comparison

175

- tolerance: Tolerance for numeric comparisons (absolute or percentage)

176

- snowflake_conn_id: Snowflake connection ID

177

- parameters: Query parameters for parameterized SQL

178

- warehouse: Snowflake warehouse name

179

- database: Snowflake database name

180

- role: Snowflake role name

181

- schema: Snowflake schema name

182

- authenticator: Authentication method

183

- session_parameters: Session-level parameters

184

"""

185

```

186

187

#### Interval Check Operator

188

189

```python { .api }

190

class SnowflakeIntervalCheckOperator(SQLIntervalCheckOperator):

191

"""

192

Check that metrics are within tolerance of values from days_back before.

193

Useful for detecting anomalies in time series data.

194

"""

195

196

template_fields: Sequence[str] = ["table", "metrics_thresholds", "snowflake_conn_id"]

197

conn_id_field = "snowflake_conn_id"

198

199

def __init__(

200

self,

201

*,

202

table: str,

203

metrics_thresholds: dict,

204

date_filter_column: str = "ds",

205

days_back: SupportsAbs[int] = -7,

206

snowflake_conn_id: str = "snowflake_default",

207

warehouse: str | None = None,

208

database: str | None = None,

209

role: str | None = None,

210

schema: str | None = None,

211

authenticator: str | None = None,

212

session_parameters: dict | None = None,

213

**kwargs,

214

) -> None:

215

"""

216

Initialize interval check operator.

217

218

Parameters:

219

- table: Table name to check

220

- metrics_thresholds: Dictionary of metric_name -> threshold_dict

221

- date_filter_column: Column name for date filtering

222

- days_back: Number of days back for comparison (negative integer)

223

- snowflake_conn_id: Snowflake connection ID

224

- warehouse: Snowflake warehouse name

225

- database: Snowflake database name

226

- role: Snowflake role name

227

- schema: Snowflake schema name

228

- authenticator: Authentication method

229

- session_parameters: Session-level parameters

230

"""

231

```

232

233

## Usage Examples

234

235

### Basic SQL Execution

236

237

```python

238

from airflow import DAG

239

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

240

from datetime import datetime, timedelta

241

242

with DAG(

243

'snowflake_sql_example',

244

start_date=datetime(2024, 1, 1),

245

schedule_interval='@daily',

246

catchup=False

247

) as dag:

248

249

# Execute multiple SQL statements

250

create_and_load = SnowflakeSqlApiOperator(

251

task_id='create_and_load_data',

252

snowflake_conn_id='snowflake_prod',

253

sql='''

254

-- Create staging table

255

CREATE OR REPLACE TABLE staging.daily_sales AS

256

SELECT

257

date_trunc('day', transaction_date) as sale_date,

258

region,

259

SUM(amount) as total_sales,

260

COUNT(*) as transaction_count

261

FROM raw.transactions

262

WHERE transaction_date >= '{{ ds }}'

263

AND transaction_date < '{{ next_ds }}'

264

GROUP BY 1, 2;

265

266

-- Update summary table

267

MERGE INTO analytics.sales_summary s

268

USING staging.daily_sales ds ON s.sale_date = ds.sale_date AND s.region = ds.region

269

WHEN MATCHED THEN UPDATE SET

270

total_sales = ds.total_sales,

271

transaction_count = ds.transaction_count

272

WHEN NOT MATCHED THEN INSERT (sale_date, region, total_sales, transaction_count)

273

VALUES (ds.sale_date, ds.region, ds.total_sales, ds.transaction_count);

274

''',

275

statement_count=2,

276

warehouse='ANALYTICS_WH',

277

database='ANALYTICS_DB'

278

)

279

```

280

281

### Asynchronous Execution

282

283

```python

284

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

285

286

# Large data processing with deferrable execution

287

process_large_dataset = SnowflakeSqlApiOperator(

288

task_id='process_large_dataset',

289

snowflake_conn_id='snowflake_prod',

290

sql='''

291

CREATE OR REPLACE TABLE analytics.customer_360 AS

292

SELECT

293

c.customer_id,

294

c.customer_name,

295

COUNT(DISTINCT o.order_id) as total_orders,

296

SUM(o.order_amount) as lifetime_value,

297

MAX(o.order_date) as last_order_date,

298

AVG(o.order_amount) as avg_order_value

299

FROM customers c

300

LEFT JOIN orders o ON c.customer_id = o.customer_id

301

WHERE o.order_date >= '2020-01-01'

302

GROUP BY c.customer_id, c.customer_name;

303

''',

304

statement_count=1,

305

deferrable=True, # Enable async execution

306

poll_interval=30, # Check every 30 seconds

307

warehouse='HEAVY_COMPUTE_WH'

308

)

309

```

310

311

### Data Quality Checks

312

313

```python

314

from airflow.providers.snowflake.operators.snowflake import (

315

SnowflakeCheckOperator,

316

SnowflakeValueCheckOperator,

317

SnowflakeIntervalCheckOperator

318

)

319

320

# Basic data quality check

321

data_freshness_check = SnowflakeCheckOperator(

322

task_id='check_data_freshness',

323

snowflake_conn_id='snowflake_prod',

324

sql='''

325

SELECT COUNT(*) > 0

326

FROM raw.transactions

327

WHERE date_trunc('day', transaction_date) = '{{ ds }}'

328

''',

329

warehouse='ANALYTICS_WH'

330

)

331

332

# Value validation with tolerance

333

revenue_check = SnowflakeValueCheckOperator(

334

task_id='validate_daily_revenue',

335

snowflake_conn_id='snowflake_prod',

336

sql='''

337

SELECT SUM(amount)

338

FROM raw.transactions

339

WHERE date_trunc('day', transaction_date) = '{{ ds }}'

340

''',

341

pass_value=50000, # Expected minimum daily revenue

342

tolerance=0.1, # 10% tolerance

343

warehouse='ANALYTICS_WH'

344

)

345

346

# Time series anomaly detection

347

anomaly_check = SnowflakeIntervalCheckOperator(

348

task_id='detect_sales_anomalies',

349

snowflake_conn_id='snowflake_prod',

350

table='analytics.daily_sales_summary',

351

metrics_thresholds={

352

'total_sales': {'min_threshold': 0.8, 'max_threshold': 1.2}, # ±20% from historical

353

'order_count': {'min_threshold': 0.7, 'max_threshold': 1.3} # ±30% from historical

354

},

355

date_filter_column='sale_date',

356

days_back=-7, # Compare with same day last week

357

warehouse='ANALYTICS_WH'

358

)

359

```

360

361

### Parameterized Queries

362

363

```python

364

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

365

366

# Using parameter bindings

367

parameterized_query = SnowflakeSqlApiOperator(

368

task_id='parameterized_analysis',

369

snowflake_conn_id='snowflake_prod',

370

sql='''

371

SELECT

372

region,

373

COUNT(*) as customer_count,

374

AVG(lifetime_value) as avg_ltv

375

FROM analytics.customer_360

376

WHERE lifetime_value >= ?

377

AND last_order_date >= ?

378

GROUP BY region

379

ORDER BY avg_ltv DESC;

380

''',

381

statement_count=1,

382

bindings={

383

'1': 1000, # Minimum lifetime value

384

'2': '2024-01-01' # Minimum last order date

385

},

386

warehouse='ANALYTICS_WH'

387

)

388

```

389

390

### Error Handling and Retry Configuration

391

392

```python

393

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

394

395

# Custom retry configuration for API calls

396

robust_operation = SnowflakeSqlApiOperator(

397

task_id='robust_data_processing',

398

snowflake_conn_id='snowflake_prod',

399

sql='''

400

CREATE OR REPLACE TABLE staging.processed_data AS

401

SELECT * FROM raw.data_stream

402

WHERE processed_at IS NULL;

403

404

UPDATE raw.data_stream

405

SET processed_at = CURRENT_TIMESTAMP()

406

WHERE id IN (SELECT id FROM staging.processed_data);

407

''',

408

statement_count=2,

409

snowflake_api_retry_args={

410

'retries': 3,

411

'backoff_factor': 2,

412

'status_forcelist': [500, 502, 503, 504]

413

},

414

# Airflow task retry configuration

415

retries=2,

416

retry_delay=timedelta(minutes=5),

417

warehouse='PROCESSING_WH'

418

)

419

```

420

421

## Template Variables

422

423

All operators support Airflow's template variables and macros:

424

425

- `{{ ds }}`: Execution date as YYYY-MM-DD

426

- `{{ ts }}`: Execution timestamp

427

- `{{ next_ds }}`: Next execution date

428

- `{{ params }}`: User-defined parameters

429

- `{{ var.value.variable_name }}`: Airflow variables

430

431

## Performance Optimization

432

433

### Warehouse Management

434

435

```python

436

# Use appropriate warehouse sizes

437

large_compute_task = SnowflakeSqlApiOperator(

438

task_id='heavy_processing',

439

warehouse='X_LARGE_WH', # Scale up for heavy workloads

440

sql='SELECT * FROM huge_table_join_operation',

441

statement_count=1

442

)

443

444

# Auto-suspend warehouses after use

445

cleanup_task = SnowflakeSqlApiOperator(

446

task_id='suspend_warehouse',

447

sql='ALTER WAREHOUSE X_LARGE_WH SUSPEND',

448

statement_count=1

449

)

450

451

large_compute_task >> cleanup_task

452

```

453

454

### Connection Pooling

455

456

Use connection pooling for high-frequency operations by reusing connections across tasks in the same worker process.

457

458

## Error Handling

459

460

All operators provide comprehensive error handling with detailed exception information:

461

462

- **SQL Execution Errors**: Syntax errors, constraint violations, permission issues

463

- **Connection Errors**: Authentication failures, network timeouts, warehouse suspension

464

- **Resource Errors**: Warehouse capacity limits, query complexity limits

465

- **API Errors**: Rate limiting, malformed requests, service unavailable

466

467

Error messages include Snowflake-specific error codes and suggestions for resolution.