or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdoperators.mdsensors.md

sensors.mddocs/

0

# GitHub Sensors

1

2

GitHub Sensors monitor GitHub resources and trigger downstream tasks when conditions are met. Provides base sensor classes and specialized sensors for common GitHub monitoring scenarios.

3

4

## Capabilities

5

6

### GithubSensor

7

8

Base sensor for monitoring any GitHub resource using PyGithub methods.

9

10

```python { .api }

11

class GithubSensor(BaseSensorOperator):

12

"""

13

Base GithubSensor which can monitor for any change.

14

15

Executes specified PyGithub method and evaluates result through

16

optional result processor to determine sensor condition.

17

"""

18

19

def __init__(

20

self,

21

*,

22

method_name: str,

23

github_conn_id: str = "github_default",

24

method_params: dict | None = None,

25

result_processor: Callable | None = None,

26

**kwargs,

27

) -> None:

28

"""

29

Initialize GitHub sensor.

30

31

Parameters:

32

- method_name: Method name from PyGithub to be executed

33

- github_conn_id: Reference to pre-defined GitHub Connection

34

- method_params: Parameters for the method_name

35

- result_processor: Function that returns boolean and acts as sensor response

36

- **kwargs: Additional BaseSensorOperator parameters (timeout, poke_interval, etc.)

37

"""

38

39

def poke(self, context: Context) -> bool:

40

"""

41

Check GitHub resource state and return boolean result.

42

43

Executes GitHub method with parameters and optionally

44

processes result through result_processor function.

45

46

Note: method_params must not be None when poke() is called, or TypeError will occur.

47

48

Parameters:

49

- context: Airflow task execution context

50

51

Returns:

52

bool: True if condition is met, False otherwise

53

"""

54

```

55

56

### BaseGithubRepositorySensor

57

58

Base sensor for repository-level monitoring operations.

59

60

```python { .api }

61

class BaseGithubRepositorySensor(GithubSensor):

62

"""

63

Base GitHub sensor at Repository level.

64

65

Pre-configured to use get_repo method with repository_name parameter.

66

Designed to be subclassed for specific repository monitoring scenarios.

67

"""

68

69

def __init__(

70

self,

71

*,

72

github_conn_id: str = "github_default",

73

repository_name: str | None = None,

74

result_processor: Callable | None = None,

75

**kwargs,

76

) -> None:

77

"""

78

Initialize repository sensor.

79

80

Parameters:

81

- github_conn_id: Reference to pre-defined GitHub Connection

82

- repository_name: Full qualified name of repository to monitor (e.g., "apache/airflow")

83

- result_processor: Function to process repository object and return boolean

84

- **kwargs: Additional BaseSensorOperator parameters

85

"""

86

87

def poke(self, context: Context) -> bool:

88

"""

89

Check sensor status; sensors deriving this class should override.

90

91

Base implementation raises AirflowException requiring override.

92

93

Raises:

94

AirflowException: Must be overridden in subclasses

95

"""

96

```

97

98

### GithubTagSensor

99

100

Specialized sensor for monitoring tag creation in repositories.

101

102

```python { .api }

103

class GithubTagSensor(BaseGithubRepositorySensor):

104

"""

105

Monitor a GitHub repository for tag creation.

106

107

Checks if specified tag exists in the repository's tags.

108

"""

109

110

# Template fields for dynamic tag name substitution

111

template_fields = ("tag_name",)

112

113

def __init__(

114

self,

115

*,

116

github_conn_id: str = "github_default",

117

tag_name: str | None = None,

118

repository_name: str | None = None,

119

**kwargs,

120

) -> None:

121

"""

122

Initialize tag sensor.

123

124

Parameters:

125

- github_conn_id: Reference to pre-defined GitHub Connection

126

- tag_name: Name of the tag to be monitored

127

- repository_name: Full qualified name of repository (e.g., "apache/airflow")

128

- **kwargs: Additional BaseSensorOperator parameters

129

"""

130

131

def poke(self, context: Context) -> bool:

132

"""

133

Check for tag existence in repository.

134

135

Logs progress and delegates to parent GithubSensor.poke().

136

137

Parameters:

138

- context: Airflow task execution context

139

140

Returns:

141

bool: True if tag exists in repository, False otherwise

142

"""

143

144

def tag_checker(self, repo: Any) -> bool | None:

145

"""

146

Check existence of tag in repository.

147

148

Parameters:

149

- repo: PyGithub Repository object

150

151

Returns:

152

bool | None: True if tag exists, False if not, None on error

153

154

Raises:

155

AirflowException: If GitHub API call fails

156

"""

157

```

158

159

## Usage Examples

160

161

### Basic Sensor Usage

162

163

```python

164

from airflow.providers.github.sensors.github import GithubSensor

165

166

# Monitor for repository existence

167

repo_sensor = GithubSensor(

168

task_id='wait_for_repo',

169

method_name='get_repo',

170

method_params={'full_name_or_id': 'apache/airflow'},

171

timeout=300,

172

poke_interval=30,

173

dag=dag

174

)

175

```

176

177

### Tag Monitoring

178

179

```python

180

from airflow.providers.github.sensors.github import GithubTagSensor

181

182

# Wait for specific tag to be created

183

tag_sensor = GithubTagSensor(

184

task_id='wait_for_release_tag',

185

repository_name='apache/airflow',

186

tag_name='v2.10.0',

187

timeout=1800, # 30 minutes

188

poke_interval=60, # Check every minute

189

dag=dag

190

)

191

192

# Wait for templated tag name

193

dynamic_tag_sensor = GithubTagSensor(

194

task_id='wait_for_dynamic_tag',

195

repository_name='apache/airflow',

196

tag_name='{{ dag_run.conf["expected_tag"] }}', # Templated

197

timeout=600,

198

poke_interval=30,

199

dag=dag

200

)

201

```

202

203

### Custom Repository Monitoring

204

205

```python

206

def check_open_issues(repo):

207

"""Check if repository has fewer than 100 open issues."""

208

return repo.open_issues_count < 100

209

210

issues_sensor = GithubSensor(

211

task_id='monitor_issue_count',

212

method_name='get_repo',

213

method_params={'full_name_or_id': 'apache/airflow'},

214

result_processor=check_open_issues,

215

timeout=300,

216

poke_interval=60,

217

dag=dag

218

)

219

```

220

221

### Release Monitoring

222

223

```python

224

def check_new_release(repo):

225

"""Check if repository has a release in the last 24 hours."""

226

from datetime import datetime, timedelta

227

228

cutoff = datetime.now() - timedelta(hours=24)

229

230

try:

231

latest_release = repo.get_latest_release()

232

return latest_release.created_at >= cutoff

233

except:

234

return False # No releases found

235

236

release_sensor = GithubSensor(

237

task_id='wait_for_new_release',

238

method_name='get_repo',

239

method_params={'full_name_or_id': 'apache/airflow'},

240

result_processor=check_new_release,

241

timeout=3600,

242

poke_interval=300, # Check every 5 minutes

243

dag=dag

244

)

245

```

246

247

### Pull Request Monitoring

248

249

```python

250

def check_pr_merged(repo):

251

"""Check if specific PR is merged."""

252

try:

253

pr = repo.get_pull(123) # PR number 123

254

return pr.merged

255

except:

256

return False

257

258

pr_sensor = GithubSensor(

259

task_id='wait_for_pr_merge',

260

method_name='get_repo',

261

method_params={'full_name_or_id': 'apache/airflow'},

262

result_processor=check_pr_merged,

263

timeout=1800,

264

poke_interval=120,

265

dag=dag

266

)

267

```

268

269

### Organization Member Monitoring

270

271

```python

272

def check_user_membership(org):

273

"""Check if user is member of organization."""

274

try:

275

return org.has_in_members(org.get_member('username'))

276

except:

277

return False

278

279

member_sensor = GithubSensor(

280

task_id='check_org_membership',

281

method_name='get_organization',

282

method_params={'login': 'apache'},

283

result_processor=check_user_membership,

284

timeout=300,

285

poke_interval=60,

286

dag=dag

287

)

288

```

289

290

### Complex Repository State Monitoring

291

292

```python

293

def check_repo_health(repo):

294

"""Check multiple repository health indicators."""

295

try:

296

# Multiple conditions for repo health

297

has_readme = any(f.name.lower().startswith('readme') for f in repo.get_contents(''))

298

has_license = repo.license is not None

299

recent_activity = (datetime.now() - repo.updated_at).days < 30

300

301

return has_readme and has_license and recent_activity

302

except:

303

return False

304

305

health_sensor = GithubSensor(

306

task_id='monitor_repo_health',

307

method_name='get_repo',

308

method_params={'full_name_or_id': 'apache/airflow'},

309

result_processor=check_repo_health,

310

timeout=600,

311

poke_interval=300,

312

dag=dag

313

)

314

```

315

316

## Custom Sensor Implementation

317

318

Extend `BaseGithubRepositorySensor` for specific repository monitoring:

319

320

```python

321

from airflow.providers.github.sensors.github import BaseGithubRepositorySensor

322

323

class GithubStarsSensor(BaseGithubRepositorySensor):

324

"""Monitor repository for minimum number of stars."""

325

326

def __init__(self, min_stars: int, **kwargs):

327

super().__init__(**kwargs)

328

self.min_stars = min_stars

329

330

def poke(self, context):

331

"""Check if repository has minimum stars."""

332

hook = GithubHook(github_conn_id=self.github_conn_id)

333

repo = hook.client.get_repo(self.repository_name)

334

335

current_stars = repo.stargazers_count

336

self.log.info(f"Repository has {current_stars} stars, need {self.min_stars}")

337

338

return current_stars >= self.min_stars

339

340

# Usage

341

stars_sensor = GithubStarsSensor(

342

task_id='wait_for_popularity',

343

repository_name='apache/airflow',

344

min_stars=1000,

345

timeout=3600,

346

dag=dag

347

)

348

```

349

350

## Error Handling

351

352

Sensors handle GitHub API exceptions gracefully:

353

354

```python

355

# GitHub API errors are caught and logged

356

try:

357

result = sensor.poke(context)

358

except AirflowException as e:

359

# Sensor will retry based on configuration

360

if "rate limit" in str(e).lower():

361

print("Rate limit exceeded, will retry")

362

elif "404" in str(e):

363

print("Repository not found")

364

```

365

366

## Sensor Configuration

367

368

Common sensor parameters:

369

370

- **timeout**: Maximum time to wait (seconds)

371

- **poke_interval**: Time between checks (seconds)

372

- **soft_fail**: Continue DAG on sensor timeout

373

- **mode**: 'poke' (default) or 'reschedule'

374

375

```python

376

sensor = GithubTagSensor(

377

task_id='wait_for_tag',

378

repository_name='apache/airflow',

379

tag_name='v2.10.0',

380

timeout=3600, # Wait up to 1 hour

381

poke_interval=300, # Check every 5 minutes

382

soft_fail=True, # Don't fail entire DAG on timeout

383

mode='reschedule', # Free up worker slot between checks

384

dag=dag

385

)

386

```