or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdoperators.mdsnowpark.mdtransfers.mdtriggers.mdutils.md

triggers.mddocs/

0

# Asynchronous Execution

1

2

Deferrable task execution through triggers, enabling efficient resource utilization for long-running Snowflake operations without blocking worker slots. This capability allows Airflow workers to handle other tasks while Snowflake queries execute, improving overall system throughput and resource efficiency.

3

4

## Capabilities

5

6

### SQL API Trigger

7

8

Trigger for polling Snowflake SQL API query status in deferrable mode, providing asynchronous monitoring of long-running query execution with configurable polling intervals and comprehensive status reporting.

9

10

```python { .api }

11

class SnowflakeSqlApiTrigger(BaseTrigger):

12

"""

13

Trigger for polling Snowflake SQL API query status in deferrable mode.

14

Monitors query execution progress and triggers task completion when queries finish.

15

"""

16

17

def __init__(

18

self,

19

poll_interval: float,

20

query_ids: list[str],

21

snowflake_conn_id: str,

22

token_life_time: timedelta,

23

token_renewal_delta: timedelta,

24

):

25

"""

26

Initialize SQL API trigger.

27

28

Parameters:

29

- poll_interval: Polling interval in seconds for checking query status

30

- query_ids: List of Snowflake query IDs to monitor

31

- snowflake_conn_id: Snowflake connection ID for API access

32

- token_life_time: JWT token lifetime for authentication

33

- token_renewal_delta: JWT token renewal interval

34

"""

35

```

36

37

#### Core Methods

38

39

```python { .api }

40

def serialize(self) -> tuple[str, dict[str, Any]]:

41

"""

42

Serialize trigger arguments and classpath for persistence.

43

44

Returns:

45

Tuple of (classpath, serialized_arguments) for trigger reconstruction

46

"""

47

48

async def run(self) -> AsyncIterator[TriggerEvent]:

49

"""

50

Wait for Snowflake queries to complete and yield trigger events.

51

Continuously polls query status until all queries complete or fail.

52

53

Yields:

54

TriggerEvent objects containing query status and completion information

55

"""

56

57

async def get_query_status(self, query_id: str) -> dict[str, Any]:

58

"""

59

Get query status asynchronously from Snowflake SQL API.

60

61

Parameters:

62

- query_id: Snowflake query ID to check

63

64

Returns:

65

Dictionary containing query status, metadata, and execution details

66

"""

67

68

def _set_context(self, context):

69

"""

70

Set trigger context (no-op implementation for compatibility).

71

72

Parameters:

73

- context: Trigger execution context

74

"""

75

```

76

77

## Usage Examples

78

79

### Basic Deferrable SQL Execution

80

81

```python

82

from airflow import DAG

83

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

84

from datetime import datetime, timedelta

85

86

with DAG(

87

'deferrable_snowflake_example',

88

start_date=datetime(2024, 1, 1),

89

schedule_interval='@daily',

90

catchup=False

91

) as dag:

92

93

# Long-running data processing with deferrable execution

94

heavy_processing = SnowflakeSqlApiOperator(

95

task_id='heavy_data_processing',

96

snowflake_conn_id='snowflake_prod',

97

sql='''

98

-- Large table creation and transformation

99

CREATE OR REPLACE TABLE analytics.customer_360_view AS

100

SELECT

101

c.customer_id,

102

c.customer_name,

103

c.registration_date,

104

COUNT(DISTINCT o.order_id) as total_orders,

105

SUM(o.order_amount) as lifetime_value,

106

AVG(o.order_amount) as avg_order_value,

107

MAX(o.order_date) as last_order_date,

108

MIN(o.order_date) as first_order_date,

109

COUNT(DISTINCT DATE_TRUNC('month', o.order_date)) as active_months

110

FROM customers c

111

LEFT JOIN orders o ON c.customer_id = o.customer_id

112

WHERE c.registration_date >= '2020-01-01'

113

GROUP BY c.customer_id, c.customer_name, c.registration_date;

114

115

-- Create summary statistics

116

CREATE OR REPLACE TABLE analytics.customer_segments AS

117

SELECT

118

CASE

119

WHEN lifetime_value >= 10000 THEN 'High Value'

120

WHEN lifetime_value >= 1000 THEN 'Medium Value'

121

ELSE 'Low Value'

122

END as segment,

123

COUNT(*) as customer_count,

124

AVG(lifetime_value) as avg_segment_value,

125

AVG(total_orders) as avg_orders_per_customer

126

FROM analytics.customer_360_view

127

GROUP BY 1;

128

''',

129

statement_count=2,

130

deferrable=True, # Enable deferrable execution

131

poll_interval=30, # Check status every 30 seconds

132

warehouse='X_LARGE_WH',

133

database='ANALYTICS'

134

)

135

```

136

137

### Multiple Query Monitoring

138

139

```python

140

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

141

142

# Multiple independent long-running operations

143

parallel_processing = [

144

SnowflakeSqlApiOperator(

145

task_id=f'process_region_{region}',

146

snowflake_conn_id='snowflake_prod',

147

sql=f'''

148

CREATE OR REPLACE TABLE analytics.regional_summary_{region.lower()} AS

149

SELECT

150

DATE_TRUNC('month', order_date) as month,

151

COUNT(*) as total_orders,

152

SUM(order_amount) as total_revenue,

153

COUNT(DISTINCT customer_id) as unique_customers

154

FROM orders

155

WHERE region = '{region}'

156

AND order_date >= '2023-01-01'

157

GROUP BY 1

158

ORDER BY 1;

159

''',

160

statement_count=1,

161

deferrable=True,

162

poll_interval=15, # Faster polling for smaller queries

163

warehouse='LARGE_WH',

164

session_parameters={

165

'QUERY_TAG': f'regional_processing_{region}_{datetime.now().isoformat()}'

166

}

167

)

168

for region in ['NORTH', 'SOUTH', 'EAST', 'WEST']

169

]

170

171

# All regional processing tasks run in parallel without blocking workers

172

for task in parallel_processing:

173

task

174

```

175

176

### Complex ETL Pipeline with Deferrable Tasks

177

178

```python

179

from airflow import DAG

180

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

181

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

182

183

with DAG(

184

'deferrable_etl_pipeline',

185

start_date=datetime(2024, 1, 1),

186

schedule_interval='@daily',

187

catchup=False,

188

max_active_runs=1

189

) as dag:

190

191

# Stage 1: Data ingestion (synchronous - typically fast)

192

ingest_data = CopyFromExternalStageToSnowflakeOperator(

193

task_id='ingest_raw_data',

194

table='raw.daily_transactions',

195

stage='@s3_data_stage',

196

prefix='transactions/{{ ds }}/',

197

file_format='csv_transactions',

198

warehouse='LOADING_WH'

199

)

200

201

# Stage 2: Heavy data processing (deferrable)

202

process_transactions = SnowflakeSqlApiOperator(

203

task_id='process_transactions',

204

snowflake_conn_id='snowflake_prod',

205

sql='''

206

-- Clean and standardize transaction data

207

CREATE OR REPLACE TABLE staging.clean_transactions AS

208

SELECT

209

transaction_id,

210

customer_id,

211

UPPER(TRIM(product_category)) as product_category,

212

ROUND(transaction_amount, 2) as transaction_amount,

213

transaction_date,

214

CASE

215

WHEN payment_method IN ('CREDIT', 'DEBIT', 'CASH') THEN payment_method

216

ELSE 'OTHER'

217

END as payment_method_clean

218

FROM raw.daily_transactions

219

WHERE transaction_amount > 0

220

AND customer_id IS NOT NULL

221

AND transaction_date = '{{ ds }}';

222

223

-- Create enriched transaction view with customer data

224

CREATE OR REPLACE TABLE staging.enriched_transactions AS

225

SELECT

226

t.*,

227

c.customer_segment,

228

c.customer_tier,

229

c.registration_date,

230

DATEDIFF('day', c.registration_date, t.transaction_date) as days_since_registration

231

FROM staging.clean_transactions t

232

JOIN dim.customers c ON t.customer_id = c.customer_id;

233

234

-- Aggregate daily metrics by segment

235

CREATE OR REPLACE TABLE analytics.daily_segment_metrics AS

236

SELECT

237

'{{ ds }}' as metric_date,

238

customer_segment,

239

product_category,

240

payment_method_clean,

241

COUNT(*) as transaction_count,

242

SUM(transaction_amount) as total_revenue,

243

AVG(transaction_amount) as avg_transaction_value,

244

COUNT(DISTINCT customer_id) as unique_customers

245

FROM staging.enriched_transactions

246

GROUP BY customer_segment, product_category, payment_method_clean;

247

''',

248

statement_count=3,

249

deferrable=True,

250

poll_interval=20,

251

warehouse='HEAVY_PROCESSING_WH',

252

token_life_time=timedelta(hours=2) # Extended token lifetime for long operations

253

)

254

255

# Stage 3: ML feature generation (deferrable)

256

generate_ml_features = SnowflakeSqlApiOperator(

257

task_id='generate_ml_features',

258

snowflake_conn_id='snowflake_prod',

259

sql='''

260

-- Generate rolling window features

261

CREATE OR REPLACE TABLE ml.customer_features_{{ ds | regex_replace('-', '_') }} AS

262

SELECT

263

customer_id,

264

'{{ ds }}' as feature_date,

265

-- 7-day rolling features

266

COUNT(*) OVER (

267

PARTITION BY customer_id

268

ORDER BY transaction_date

269

RANGE BETWEEN INTERVAL '7 days' PRECEDING AND CURRENT ROW

270

) as transactions_7d,

271

SUM(transaction_amount) OVER (

272

PARTITION BY customer_id

273

ORDER BY transaction_date

274

RANGE BETWEEN INTERVAL '7 days' PRECEDING AND CURRENT ROW

275

) as revenue_7d,

276

-- 30-day rolling features

277

COUNT(*) OVER (

278

PARTITION BY customer_id

279

ORDER BY transaction_date

280

RANGE BETWEEN INTERVAL '30 days' PRECEDING AND CURRENT ROW

281

) as transactions_30d,

282

SUM(transaction_amount) OVER (

283

PARTITION BY customer_id

284

ORDER BY transaction_date

285

RANGE BETWEEN INTERVAL '30 days' PRECEDING AND CURRENT ROW

286

) as revenue_30d,

287

-- Recency features

288

DATEDIFF('day',

289

LAG(transaction_date) OVER (PARTITION BY customer_id ORDER BY transaction_date),

290

transaction_date

291

) as days_since_last_transaction

292

FROM staging.enriched_transactions

293

ORDER BY customer_id, transaction_date;

294

295

-- Update master feature table

296

MERGE INTO ml.customer_features_master m

297

USING ml.customer_features_{{ ds | regex_replace('-', '_') }} f

298

ON m.customer_id = f.customer_id AND m.feature_date = f.feature_date

299

WHEN MATCHED THEN UPDATE SET

300

transactions_7d = f.transactions_7d,

301

revenue_7d = f.revenue_7d,

302

transactions_30d = f.transactions_30d,

303

revenue_30d = f.revenue_30d,

304

days_since_last_transaction = f.days_since_last_transaction

305

WHEN NOT MATCHED THEN INSERT (

306

customer_id, feature_date, transactions_7d, revenue_7d,

307

transactions_30d, revenue_30d, days_since_last_transaction

308

) VALUES (

309

f.customer_id, f.feature_date, f.transactions_7d, f.revenue_7d,

310

f.transactions_30d, f.revenue_30d, f.days_since_last_transaction

311

);

312

''',

313

statement_count=2,

314

deferrable=True,

315

poll_interval=25,

316

warehouse='ML_WH'

317

)

318

319

# Stage 4: Data quality validation (synchronous - fast)

320

validate_results = SnowflakeSqlApiOperator(

321

task_id='validate_processing_results',

322

snowflake_conn_id='snowflake_prod',

323

sql='''

324

-- Validate record counts match expectations

325

SELECT

326

CASE

327

WHEN staging_count > 0 AND

328

staging_count = analytics_count AND

329

ml_count > 0

330

THEN 'PASSED'

331

ELSE 'FAILED'

332

END as validation_result

333

FROM (

334

SELECT

335

(SELECT COUNT(*) FROM staging.clean_transactions) as staging_count,

336

(SELECT SUM(transaction_count) FROM analytics.daily_segment_metrics WHERE metric_date = '{{ ds }}') as analytics_count,

337

(SELECT COUNT(DISTINCT customer_id) FROM ml.customer_features_master WHERE feature_date = '{{ ds }}') as ml_count

338

);

339

''',

340

statement_count=1,

341

warehouse='VALIDATION_WH'

342

)

343

344

# Define pipeline dependencies

345

ingest_data >> process_transactions >> generate_ml_features >> validate_results

346

```

347

348

### Custom Trigger Event Handling

349

350

```python

351

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

352

353

class CustomSnowflakeOperator(SnowflakeSqlApiOperator):

354

"""Custom operator with enhanced trigger event handling."""

355

356

def execute_complete(self, context, event=None):

357

"""Custom completion handler with detailed logging."""

358

359

# Extract query results from trigger event

360

if event and 'query_results' in event:

361

query_results = event['query_results']

362

363

# Log execution statistics

364

for query_id, result in query_results.items():

365

if result.get('status') == 'SUCCESS':

366

execution_time = result.get('execution_time_ms', 0) / 1000

367

rows_affected = result.get('rows_affected', 0)

368

369

self.log.info(

370

f"Query {query_id} completed successfully: "

371

f"{rows_affected} rows affected in {execution_time:.2f} seconds"

372

)

373

374

# Store metrics for monitoring

375

context['task_instance'].xcom_push(

376

key=f'query_{query_id}_metrics',

377

value={

378

'execution_time_seconds': execution_time,

379

'rows_affected': rows_affected,

380

'status': 'SUCCESS'

381

}

382

)

383

else:

384

self.log.error(f"Query {query_id} failed: {result.get('error_message', 'Unknown error')}")

385

raise Exception(f"Query execution failed: {result.get('error_message')}")

386

387

# Call parent completion handler

388

super().execute_complete(context, event)

389

390

# Usage of custom operator

391

custom_deferrable_task = CustomSnowflakeOperator(

392

task_id='custom_processing_with_metrics',

393

sql='SELECT COUNT(*) FROM large_table WHERE date >= CURRENT_DATE - 30',

394

statement_count=1,

395

deferrable=True,

396

poll_interval=10

397

)

398

```

399

400

## Trigger Event Structure

401

402

The SnowflakeSqlApiTrigger yields TriggerEvent objects with the following structure:

403

404

```python

405

TriggerEvent({

406

"status": "success" | "error",

407

"query_results": {

408

"query_id_1": {

409

"status": "SUCCESS" | "FAILED" | "RUNNING",

410

"execution_time_ms": 1234,

411

"rows_affected": 567,

412

"error_message": "...", # Only present on failure

413

"query_text": "...",

414

"warehouse": "..."

415

},

416

# ... additional query results

417

},

418

"message": "All queries completed successfully" | "Error details"

419

})

420

```

421

422

## Performance Considerations

423

424

### Polling Intervals

425

- **Short intervals (5-15s)**: For queries expected to complete quickly

426

- **Medium intervals (30-60s)**: For typical data processing tasks

427

- **Long intervals (2-5 minutes)**: For very long-running operations (hours)

428

429

### Token Management

430

- Set appropriate `token_life_time` for expected query duration

431

- Use `token_renewal_delta` to ensure tokens don't expire during execution

432

- Monitor token usage for long-running pipelines

433

434

### Resource Optimization

435

- Deferrable tasks free up worker slots for other work

436

- Use appropriate warehouse sizes for the workload

437

- Consider warehouse auto-suspend settings for cost optimization

438

439

## Monitoring and Troubleshooting

440

441

### Query Status Monitoring

442

```sql

443

-- Monitor long-running queries

444

SELECT

445

query_id,

446

query_text,

447

user_name,

448

warehouse_name,

449

start_time,

450

end_time,

451

total_elapsed_time,

452

execution_status

453

FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())

454

WHERE start_time >= DATEADD(hour, -24, CURRENT_TIMESTAMP())

455

AND execution_status IN ('RUNNING', 'QUEUED')

456

ORDER BY start_time DESC;

457

```

458

459

### Trigger Event Logging

460

All trigger events are logged with detailed information about query execution, including:

461

- Query IDs and status

462

- Execution times and resource usage

463

- Error messages and troubleshooting information

464

- Warehouse and session details

465

466

## Error Handling

467

468

The deferrable execution system provides comprehensive error handling:

469

470

### Query Execution Errors

471

- SQL syntax errors, permission issues, resource constraints

472

- Detailed error messages with Snowflake error codes

473

- Automatic task failure with appropriate error context

474

475

### Trigger Infrastructure Errors

476

- Network connectivity issues during polling

477

- Authentication failures and token expiration

478

- Polling timeout and retry logic

479

480

### Resource Management Errors

481

- Warehouse suspension during query execution

482

- Query queue limits and concurrency restrictions

483

- Memory and compute resource exhaustion

484

485

All errors include detailed logging, Snowflake query IDs for investigation, and clear guidance for resolution.