or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dialects.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

operators.mddocs/

0

# SQL Operators

1

2

SQL operators execute database operations as tasks within Airflow DAGs. This includes query execution, data validation, conditional workflows, and data transfers between databases.

3

4

## Capabilities

5

6

### Base SQL Operator

7

8

Foundation class providing database hook functionality for SQL operators.

9

10

```python { .api }

11

class BaseSQLOperator:

12

"""

13

Base class providing DB hook functionality for SQL operators.

14

"""

15

16

def get_hook(self):

17

"""

18

Get hook for connection.

19

20

Returns:

21

Database hook instance

22

"""

23

24

def get_db_hook(self):

25

"""

26

Get database hook.

27

28

Returns:

29

Database hook instance

30

"""

31

```

32

33

### SQL Query Execution

34

35

Execute SQL queries and commands within Airflow tasks.

36

37

```python { .api }

38

class SQLExecuteQueryOperator:

39

"""

40

Executes SQL code in a database.

41

42

Args:

43

sql (str or list): SQL query/queries to execute (templated)

44

autocommit (bool): Auto-commit mode for queries (default: False)

45

parameters (Mapping or Iterable, optional): Query parameters (templated)

46

handler (callable, optional): Result handler function (default: fetch_all_handler)

47

output_processor (callable, optional): Function to process results

48

conn_id (str, optional): Database connection ID

49

database (str, optional): Database name to override connection default

50

split_statements (bool, optional): Split multiple statements

51

return_last (bool): Return only last query result (default: True)

52

show_return_value_in_logs (bool): Log returned values (default: False)

53

requires_result_fetch (bool): Ensure results are fetched (default: False)

54

**kwargs: Additional operator arguments

55

"""

56

57

def __init__(self, *, sql, autocommit=False, parameters=None, handler=None,

58

output_processor=None, conn_id=None, database=None, split_statements=None,

59

return_last=True, show_return_value_in_logs=False, requires_result_fetch=False, **kwargs):

60

pass

61

```

62

63

### Data Quality Check Operators

64

65

Operators for validating data quality and consistency.

66

67

```python { .api }

68

class SQLCheckOperator:

69

"""

70

Performs checks using SQL statements.

71

72

Args:

73

sql (str): SQL query that should return a single row (templated)

74

conn_id (str): Database connection ID

75

**kwargs: Additional operator arguments

76

"""

77

78

def __init__(self, sql, conn_id, **kwargs):

79

pass

80

81

class SQLValueCheckOperator:

82

"""

83

Checks that SQL query returns expected value.

84

85

Args:

86

sql (str): SQL query to execute (templated)

87

pass_value: Expected value for comparison

88

tolerance (float, optional): Tolerance for numeric comparisons

89

conn_id (str): Database connection ID

90

**kwargs: Additional operator arguments

91

"""

92

93

def __init__(self, sql, pass_value, tolerance=None, conn_id=None, **kwargs):

94

pass

95

96

class SQLColumnCheckOperator:

97

"""

98

Performs data quality checks on table columns.

99

100

Args:

101

table (str): Table name to check

102

column_mapping (dict): Column checks mapping

103

partition_clause (str, optional): SQL partition clause

104

conn_id (str): Database connection ID

105

**kwargs: Additional operator arguments

106

"""

107

108

def __init__(self, table, column_mapping, partition_clause=None, conn_id=None, **kwargs):

109

pass

110

111

class SQLTableCheckOperator:

112

"""

113

Performs data quality checks on tables.

114

115

Args:

116

table (str): Table name to check

117

checks (dict): Table-level checks mapping

118

partition_clause (str, optional): SQL partition clause

119

conn_id (str): Database connection ID

120

**kwargs: Additional operator arguments

121

"""

122

123

def __init__(self, table, checks, partition_clause=None, conn_id=None, **kwargs):

124

pass

125

126

class SQLIntervalCheckOperator:

127

"""

128

Checks data over time intervals.

129

130

Args:

131

table (str): Table name to check

132

metrics_thresholds (dict): Metrics and threshold definitions

133

date_filter_column (str): Column for date filtering

134

days_back (int): Number of days to look back

135

ratio_formula (str): Formula for ratio calculations

136

conn_id (str): Database connection ID

137

**kwargs: Additional operator arguments

138

"""

139

140

def __init__(self, table, metrics_thresholds, date_filter_column='ds',

141

days_back=-7, ratio_formula='max_over_min', conn_id=None, **kwargs):

142

pass

143

144

class SQLThresholdCheckOperator:

145

"""

146

Checks if metrics are within thresholds.

147

148

Args:

149

sql (str): SQL query returning metrics (templated)

150

min_threshold (float): Minimum threshold value

151

max_threshold (float): Maximum threshold value

152

conn_id (str): Database connection ID

153

**kwargs: Additional operator arguments

154

"""

155

156

def __init__(self, sql, min_threshold, max_threshold, conn_id=None, **kwargs):

157

pass

158

```

159

160

### Conditional Workflow Operators

161

162

Operators for implementing conditional logic based on SQL query results.

163

164

```python { .api }

165

class BranchSQLOperator:

166

"""

167

Branches workflow based on SQL query results.

168

169

Args:

170

sql (str): SQL query to execute (templated)

171

follow_task_ids_if_true (list): Task IDs to follow if condition is true

172

follow_task_ids_if_false (list): Task IDs to follow if condition is false

173

conn_id (str): Database connection ID

174

**kwargs: Additional operator arguments

175

"""

176

177

def __init__(self, sql, follow_task_ids_if_true, follow_task_ids_if_false,

178

conn_id=None, **kwargs):

179

pass

180

```

181

182

### Data Transfer Operators

183

184

Operators for transferring data between different database connections.

185

186

```python { .api }

187

class GenericTransfer:

188

"""

189

Transfers data between different database connections.

190

191

Args:

192

sql (str): SQL query for source data (templated)

193

destination_table (str): Target table name (templated)

194

source_conn_id (str): Source connection ID (templated)

195

destination_conn_id (str): Destination connection ID (templated)

196

source_hook_params (dict, optional): Source hook parameters

197

destination_hook_params (dict, optional): Destination hook parameters

198

preoperator (str or list, optional): SQL to execute before transfer (templated)

199

insert_args (dict, optional): Additional arguments for insert operation (templated)

200

page_size (int, optional): Number of records for paginated mode

201

**kwargs: Additional operator arguments

202

"""

203

204

def __init__(self, *, sql, destination_table, source_conn_id, destination_conn_id,

205

source_hook_params=None, destination_hook_params=None, preoperator=None,

206

insert_args=None, page_size=None, **kwargs):

207

pass

208

```

209

210

### Utility Functions

211

212

```python { .api }

213

def default_output_processor(results):

214

"""

215

Default output processor for query results.

216

217

Args:

218

results: Raw query results

219

220

Returns:

221

Any: Processed results

222

"""

223

```

224

225

## Usage Examples

226

227

### Basic SQL Execution

228

229

```python

230

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

231

232

sql_task = SQLExecuteQueryOperator(

233

task_id='execute_sql',

234

conn_id='my_postgres_conn',

235

sql='INSERT INTO logs (message, timestamp) VALUES (%(msg)s, %(ts)s)',

236

parameters={'msg': 'Task completed', 'ts': '2023-01-01 12:00:00'},

237

autocommit=True

238

)

239

```

240

241

### Data Quality Checks

242

243

```python

244

from airflow.providers.common.sql.operators.sql import (

245

SQLCheckOperator,

246

SQLValueCheckOperator,

247

SQLColumnCheckOperator

248

)

249

250

# Check that query returns non-empty result

251

check_data = SQLCheckOperator(

252

task_id='check_data',

253

conn_id='my_database',

254

sql='SELECT COUNT(*) FROM users WHERE created_date = {{ ds }}'

255

)

256

257

# Check specific value

258

value_check = SQLValueCheckOperator(

259

task_id='check_total',

260

conn_id='my_database',

261

sql='SELECT SUM(amount) FROM orders WHERE date = {{ ds }}',

262

pass_value=10000,

263

tolerance=0.1 # 10% tolerance

264

)

265

266

# Column quality checks

267

column_check = SQLColumnCheckOperator(

268

task_id='check_columns',

269

conn_id='my_database',

270

table='users',

271

column_mapping={

272

'id': {'null_check': {'equal_to': 0}},

273

'email': {'unique_check': {'equal_to': 0}},

274

'age': {'min': {'greater_than': 0}, 'max': {'less_than': 150}}

275

}

276

)

277

```

278

279

### Conditional Workflows

280

281

```python

282

from airflow.providers.common.sql.operators.sql import BranchSQLOperator

283

284

branch_task = BranchSQLOperator(

285

task_id='check_condition',

286

conn_id='my_database',

287

sql='SELECT COUNT(*) FROM new_orders WHERE date = {{ ds }}',

288

follow_task_ids_if_true=['process_orders'],

289

follow_task_ids_if_false=['send_notification']

290

)

291

```

292

293

### Data Transfer

294

295

```python

296

from airflow.providers.common.sql.operators.generic_transfer import GenericTransfer

297

298

transfer_task = GenericTransfer(

299

task_id='transfer_data',

300

sql='SELECT * FROM source_table WHERE date = {{ ds }}',

301

destination_table='target_table',

302

source_conn_id='source_db',

303

destination_conn_id='target_db',

304

preoperator='TRUNCATE TABLE target_table'

305

)

306

```

307

308

### Multiple SQL Statements

309

310

```python

311

sql_multi = SQLExecuteQueryOperator(

312

task_id='multi_sql',

313

conn_id='my_database',

314

sql=[

315

'UPDATE inventory SET last_updated = NOW()',

316

'INSERT INTO audit_log (action, timestamp) VALUES ("inventory_update", NOW())',

317

'CALL update_stats_procedure()'

318

],

319

split_statements=True,

320

autocommit=True

321

)

322

```