or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dialects.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

triggers.mddocs/

0

# SQL Triggers

1

2

SQL triggers enable asynchronous execution of SQL operations without blocking the Airflow scheduler. They provide efficient handling of long-running database operations through Airflow's triggerer component.

3

4

## Capabilities

5

6

### SQL Execute Query Trigger

7

8

Executes SQL statements asynchronously using Airflow's trigger mechanism.

9

10

```python { .api }

11

class SQLExecuteQueryTrigger:

12

"""

13

Executes SQL code asynchronously.

14

15

Args:

16

sql (str or list): SQL statement(s) to execute

17

conn_id (str): Database connection ID

18

hook_params (dict, optional): Additional hook parameters

19

**kwargs: Additional trigger arguments

20

"""

21

22

def __init__(self, sql, conn_id, hook_params=None, **kwargs):

23

pass

24

25

def serialize(self):

26

"""

27

Serialize trigger configuration for storage.

28

29

Returns:

30

tuple: (class_path, kwargs) for trigger reconstruction

31

"""

32

pass

33

34

def get_hook(self):

35

"""

36

Get database hook for connection.

37

38

Returns:

39

Database hook instance

40

"""

41

pass

42

43

async def run(self):

44

"""

45

Execute the SQL asynchronously.

46

47

Yields:

48

TriggerEvent: Results or status updates

49

"""

50

pass

51

```

52

53

## Usage Examples

54

55

### Basic Asynchronous SQL Execution

56

57

```python

58

from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger

59

from airflow.sensors.base import BaseSensorOperator

60

61

class AsyncSQLOperator(BaseSensorOperator):

62

def __init__(self, sql, conn_id, **kwargs):

63

super().__init__(**kwargs)

64

self.sql = sql

65

self.conn_id = conn_id

66

67

def execute(self, context):

68

# Defer to trigger for async execution

69

self.defer(

70

trigger=SQLExecuteQueryTrigger(

71

sql=self.sql,

72

conn_id=self.conn_id

73

),

74

method_name='execute_complete'

75

)

76

77

def execute_complete(self, context, event):

78

# Handle trigger completion

79

if event['status'] == 'success':

80

self.log.info(f"SQL executed successfully: {event['results']}")

81

return event['results']

82

else:

83

raise Exception(f"SQL execution failed: {event['error']}")

84

85

# Use the async operator

86

async_sql = AsyncSQLOperator(

87

task_id='async_sql_execution',

88

sql='SELECT * FROM large_table WHERE date = {{ ds }}',

89

conn_id='my_database'

90

)

91

```

92

93

### Long-Running Query with Progress Updates

94

95

```python

96

from airflow.triggers.base import TriggerEvent

97

from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger

98

99

class LongRunningQueryTrigger(SQLExecuteQueryTrigger):

100

"""Custom trigger with progress reporting."""

101

102

async def run(self):

103

hook = self.get_hook()

104

105

try:

106

# Start the query

107

self.log.info("Starting long-running query...")

108

yield TriggerEvent({'status': 'started', 'message': 'Query started'})

109

110

# Execute query (this could take a long time)

111

results = hook.run(self.sql)

112

113

# Report completion

114

yield TriggerEvent({

115

'status': 'success',

116

'results': results,

117

'message': f'Query completed with {len(results) if results else 0} results'

118

})

119

120

except Exception as e:

121

yield TriggerEvent({

122

'status': 'error',

123

'error': str(e),

124

'message': f'Query failed: {str(e)}'

125

})

126

127

# Use custom trigger

128

custom_trigger_task = AsyncSQLOperator(

129

task_id='long_query',

130

sql='SELECT * FROM very_large_table ORDER BY created_date',

131

conn_id='my_database'

132

)

133

```

134

135

### Batch Processing with Triggers

136

137

```python

138

class BatchSQLTrigger(SQLExecuteQueryTrigger):

139

"""Trigger for batch SQL processing."""

140

141

def __init__(self, sql_statements, conn_id, batch_size=100, **kwargs):

142

super().__init__(sql_statements, conn_id, **kwargs)

143

self.batch_size = batch_size

144

145

async def run(self):

146

hook = self.get_hook()

147

148

try:

149

total_statements = len(self.sql)

150

processed = 0

151

152

# Process in batches

153

for i in range(0, total_statements, self.batch_size):

154

batch = self.sql[i:i + self.batch_size]

155

156

# Execute batch

157

for stmt in batch:

158

hook.run(stmt, autocommit=True)

159

processed += 1

160

161

# Report progress

162

yield TriggerEvent({

163

'status': 'progress',

164

'processed': processed,

165

'total': total_statements,

166

'message': f'Processed {processed}/{total_statements} statements'

167

})

168

169

# Report completion

170

yield TriggerEvent({

171

'status': 'success',

172

'processed': processed,

173

'message': f'All {processed} statements processed successfully'

174

})

175

176

except Exception as e:

177

yield TriggerEvent({

178

'status': 'error',

179

'processed': processed,

180

'error': str(e),

181

'message': f'Batch processing failed at statement {processed}: {str(e)}'

182

})

183

184

# Use batch trigger

185

batch_statements = [

186

"INSERT INTO logs (message) VALUES ('Batch item 1')",

187

"INSERT INTO logs (message) VALUES ('Batch item 2')",

188

# ... many more statements

189

]

190

191

batch_task = AsyncSQLOperator(

192

task_id='batch_processing',

193

sql=batch_statements,

194

conn_id='my_database'

195

)

196

```

197

198

### Conditional Async Execution

199

200

```python

201

class ConditionalSQLTrigger(SQLExecuteQueryTrigger):

202

"""Conditionally execute SQL based on query results."""

203

204

def __init__(self, check_sql, execute_sql, conn_id, condition_func=None, **kwargs):

205

super().__init__(execute_sql, conn_id, **kwargs)

206

self.check_sql = check_sql

207

self.condition_func = condition_func or (lambda x: bool(x))

208

209

async def run(self):

210

hook = self.get_hook()

211

212

try:

213

# First, check condition

214

check_results = hook.get_records(self.check_sql)

215

216

if self.condition_func(check_results):

217

# Condition met, execute main SQL

218

yield TriggerEvent({

219

'status': 'executing',

220

'message': 'Condition met, executing SQL'

221

})

222

223

results = hook.run(self.sql)

224

225

yield TriggerEvent({

226

'status': 'success',

227

'results': results,

228

'message': 'SQL executed successfully'

229

})

230

else:

231

# Condition not met, skip execution

232

yield TriggerEvent({

233

'status': 'skipped',

234

'message': 'Condition not met, skipping SQL execution'

235

})

236

237

except Exception as e:

238

yield TriggerEvent({

239

'status': 'error',

240

'error': str(e),

241

'message': f'Conditional execution failed: {str(e)}'

242

})

243

244

# Use conditional trigger

245

conditional_task = AsyncSQLOperator(

246

task_id='conditional_sql',

247

sql='UPDATE inventory SET processed = true WHERE date = {{ ds }}',

248

conn_id='my_database'

249

)

250

```

251

252

### Monitoring Long-Running Operations

253

254

```python

255

from airflow.providers.common.sql.sensors.sql import SqlSensor

256

257

class AsyncSQLSensor(SqlSensor):

258

"""SQL sensor using async triggers."""

259

260

def execute(self, context):

261

# Use trigger for async monitoring

262

self.defer(

263

trigger=SQLExecuteQueryTrigger(

264

sql=self.sql,

265

conn_id=self.conn_id,

266

hook_params=self.hook_params

267

),

268

method_name='execute_complete'

269

)

270

271

def execute_complete(self, context, event):

272

if event['status'] == 'success':

273

results = event['results']

274

275

# Apply success criteria

276

if self.success and callable(self.success):

277

if self.success(results):

278

self.log.info("Sensor condition met")

279

return True

280

else:

281

# Condition not met, defer again

282

self.defer(

283

trigger=SQLExecuteQueryTrigger(

284

sql=self.sql,

285

conn_id=self.conn_id,

286

hook_params=self.hook_params

287

),

288

method_name='execute_complete'

289

)

290

291

return results

292

else:

293

raise Exception(f"Sensor failed: {event['error']}")

294

295

# Use async sensor

296

async_sensor = AsyncSQLSensor(

297

task_id='async_wait_for_data',

298

conn_id='my_database',

299

sql='SELECT COUNT(*) FROM processing_queue WHERE status = "pending"',

300

success=lambda x: x[0][0] == 0, # Wait for queue to be empty

301

poke_interval=60

302

)

303

```

304

305

## Trigger Configuration

306

307

### Hook Parameters

308

309

```python

310

# Customize hook behavior through hook_params

311

trigger = SQLExecuteQueryTrigger(

312

sql='SELECT * FROM data',

313

conn_id='my_conn',

314

hook_params={

315

'schema': 'custom_schema',

316

'autocommit': True,

317

'isolation_level': 'READ_COMMITTED'

318

}

319

)

320

```

321

322

### Serialization

323

324

Triggers must be serializable to be stored and reconstructed by the triggerer:

325

326

```python

327

# The serialize method returns class path and arguments

328

class_path, kwargs = trigger.serialize()

329

330

# This allows the triggerer to reconstruct the trigger:

331

# trigger_class = import_from_path(class_path)

332

# reconstructed_trigger = trigger_class(**kwargs)

333

```

334

335

## Benefits of Using Triggers

336

337

1. **Non-blocking**: Triggers don't block worker slots while waiting

338

2. **Scalable**: The triggerer can handle many concurrent triggers

339

3. **Resource efficient**: Reduces worker resource consumption

340

4. **Progress reporting**: Can yield intermediate status updates

341

5. **Error handling**: Structured error reporting and recovery