or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dialects.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

sensors.mddocs/

0

# SQL Sensors

1

2

SQL sensors monitor database conditions and states by periodically executing SQL queries until specified criteria are met. They enable data-driven workflow orchestration based on database changes.

3

4

## Capabilities

5

6

### SQL Sensor

7

8

Monitors database state by repeatedly executing SQL queries until success criteria are met.

9

10

```python { .api }

11

class SqlSensor:

12

"""

13

Runs SQL statement repeatedly until criteria is met.

14

15

Args:

16

conn_id (str): Database connection ID

17

sql (str): SQL statement to execute (templated)

18

parameters (Mapping, optional): Query parameters (templated)

19

success (callable, optional): Success criteria function

20

failure (callable, optional): Failure criteria function

21

selector (callable): Function to transform result row (default: itemgetter(0))

22

fail_on_empty (bool): Fail if query returns no rows (default: False)

23

hook_params (Mapping, optional): Additional hook parameters

24

**kwargs: Additional sensor arguments (poke_interval, timeout, etc.)

25

"""

26

27

def __init__(self, *, conn_id, sql, parameters=None, success=None, failure=None,

28

selector=None, fail_on_empty=False, hook_params=None, **kwargs):

29

pass

30

31

def poke(self, context):

32

"""

33

Check if sensor condition is met.

34

35

Args:

36

context (dict): Airflow task context

37

38

Returns:

39

bool: True if condition is met, False otherwise

40

"""

41

pass

42

```

43

44

## Usage Examples

45

46

### Basic File Monitoring

47

48

```python

49

from airflow.providers.common.sql.sensors.sql import SqlSensor

50

51

# Wait for new records to appear

52

file_sensor = SqlSensor(

53

task_id='wait_for_new_data',

54

conn_id='my_database',

55

sql='SELECT COUNT(*) FROM uploads WHERE date = {{ ds }} AND status = "completed"',

56

success=lambda x: x[0][0] > 0, # Success when count > 0

57

poke_interval=60, # Check every minute

58

timeout=3600 # Timeout after 1 hour

59

)

60

```

61

62

### Data Quality Monitoring

63

64

```python

65

# Wait for data quality checks to pass

66

quality_sensor = SqlSensor(

67

task_id='wait_for_quality',

68

conn_id='my_database',

69

sql='''

70

SELECT

71

SUM(CASE WHEN email IS NULL THEN 1 ELSE 0 END) as null_emails,

72

SUM(CASE WHEN age < 0 OR age > 150 THEN 1 ELSE 0 END) as invalid_ages

73

FROM users

74

WHERE created_date = {{ ds }}

75

''',

76

success=lambda x: x[0][0] == 0 and x[0][1] == 0, # No null emails or invalid ages

77

poke_interval=300, # Check every 5 minutes

78

timeout=7200 # Timeout after 2 hours

79

)

80

```

81

82

### Complex Condition Monitoring

83

84

```python

85

# Wait for processing to complete with custom success criteria

86

def check_processing_complete(records):

87

\"\"\"Check if all processing stages are complete.\"\"\"

88

if not records:

89

return False

90

91

row = records[0]

92

pending_count = row[0]

93

error_count = row[1]

94

95

# Success when no pending items and no errors

96

return pending_count == 0 and error_count == 0

97

98

processing_sensor = SqlSensor(

99

task_id='wait_for_processing',

100

conn_id='my_database',

101

sql='''

102

SELECT

103

COUNT(*) FILTER (WHERE status = 'pending') as pending,

104

COUNT(*) FILTER (WHERE status = 'error') as errors

105

FROM job_queue

106

WHERE batch_id = {{ params.batch_id }}

107

''',

108

parameters={'batch_id': '{{ dag_run.conf.batch_id }}'},

109

success=check_processing_complete,

110

poke_interval=120, # Check every 2 minutes

111

timeout=10800 # Timeout after 3 hours

112

)

113

```

114

115

### Monitoring with Failure Conditions

116

117

```python

118

# Monitor with both success and failure criteria

119

def check_success(records):

120

return records and records[0][0] >= 1000 # At least 1000 processed

121

122

def check_failure(records):

123

return records and records[0][1] > 10 # More than 10 errors

124

125

monitor_sensor = SqlSensor(

126

task_id='monitor_batch_job',

127

conn_id='my_database',

128

sql='''

129

SELECT

130

SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,

131

SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed

132

FROM batch_processing

133

WHERE job_id = {{ params.job_id }}

134

''',

135

success=check_success,

136

failure=check_failure, # Fail task if too many errors

137

poke_interval=180,

138

timeout=14400

139

)

140

```

141

142

### Parameterized Monitoring

143

144

```python

145

# Use templated parameters for dynamic monitoring

146

dynamic_sensor = SqlSensor(

147

task_id='wait_for_threshold',

148

conn_id='my_database',

149

sql='''

150

SELECT COUNT(*)

151

FROM events

152

WHERE event_type = %(event_type)s

153

AND timestamp >= %(start_time)s

154

AND timestamp <= %(end_time)s

155

''',

156

parameters={

157

'event_type': '{{ dag_run.conf.event_type }}',

158

'start_time': '{{ ds }} 00:00:00',

159

'end_time': '{{ ds }} 23:59:59'

160

},

161

success=lambda x: x[0][0] >= 100, # At least 100 events

162

poke_interval=300

163

)

164

```

165

166

### Database Connection Monitoring

167

168

```python

169

# Monitor database connectivity and basic health

170

health_sensor = SqlSensor(

171

task_id='check_db_health',

172

conn_id='my_database',

173

sql='SELECT 1', # Simple connectivity check

174

success=lambda x: x is not None and len(x) > 0,

175

failure=lambda x: x is None,

176

poke_interval=30,

177

timeout=300

178

)

179

```

180

181

## Success and Failure Criteria Functions

182

183

Success and failure criteria are callable functions that receive the query results and return a boolean:

184

185

```python

186

def success_criteria(records):

187

\"\"\"

188

Define success condition.

189

190

Args:

191

records (list): Query result records (list of tuples)

192

193

Returns:

194

bool: True if success condition is met

195

\"\"\"

196

return len(records) > 0 and records[0][0] > threshold

197

198

def failure_criteria(records):

199

\"\"\"

200

Define failure condition.

201

202

Args:

203

records (list): Query result records (list of tuples)

204

205

Returns:

206

bool: True if failure condition is met (task should fail)

207

\"\"\"

208

return records and records[0][1] > error_threshold

209

```

210

211

## Common Patterns

212

213

### Count-based Monitoring

214

```python

215

# Wait for specific record count

216

success=lambda x: x and x[0][0] >= expected_count

217

```

218

219

### Threshold Monitoring

220

```python

221

# Wait for metric to exceed threshold

222

success=lambda x: x and x[0][0] > threshold_value

223

```

224

225

### Status-based Monitoring

226

```python

227

# Wait for specific status

228

success=lambda x: x and x[0][0] == 'COMPLETED'

229

```

230

231

### Empty Result Handling

232

```python

233

# Handle cases where query might return no results

234

success=lambda x: x is not None and len(x) > 0 and x[0][0] > 0

235

```