0
# GitHub Sensors
1
2
GitHub Sensors monitor GitHub resources and trigger downstream tasks when conditions are met. Provides base sensor classes and specialized sensors for common GitHub monitoring scenarios.
3
4
## Capabilities
5
6
### GithubSensor
7
8
Base sensor for monitoring any GitHub resource using PyGithub methods.
9
10
```python { .api }
11
class GithubSensor(BaseSensorOperator):
12
"""
13
Base GithubSensor which can monitor for any change.
14
15
Executes specified PyGithub method and evaluates result through
16
optional result processor to determine sensor condition.
17
"""
18
19
def __init__(
20
self,
21
*,
22
method_name: str,
23
github_conn_id: str = "github_default",
24
method_params: dict | None = None,
25
result_processor: Callable | None = None,
26
**kwargs,
27
) -> None:
28
"""
29
Initialize GitHub sensor.
30
31
Parameters:
32
- method_name: Method name from PyGithub to be executed
33
- github_conn_id: Reference to pre-defined GitHub Connection
34
- method_params: Parameters for the method_name
35
- result_processor: Function that returns boolean and acts as sensor response
36
- **kwargs: Additional BaseSensorOperator parameters (timeout, poke_interval, etc.)
37
"""
38
39
def poke(self, context: Context) -> bool:
40
"""
41
Check GitHub resource state and return boolean result.
42
43
Executes GitHub method with parameters and optionally
44
processes result through result_processor function.
45
46
Note: method_params must not be None when poke() is called, or TypeError will occur.
47
48
Parameters:
49
- context: Airflow task execution context
50
51
Returns:
52
bool: True if condition is met, False otherwise
53
"""
54
```
55
56
### BaseGithubRepositorySensor
57
58
Base sensor for repository-level monitoring operations.
59
60
```python { .api }
61
class BaseGithubRepositorySensor(GithubSensor):
62
"""
63
Base GitHub sensor at Repository level.
64
65
Pre-configured to use get_repo method with repository_name parameter.
66
Designed to be subclassed for specific repository monitoring scenarios.
67
"""
68
69
def __init__(
70
self,
71
*,
72
github_conn_id: str = "github_default",
73
repository_name: str | None = None,
74
result_processor: Callable | None = None,
75
**kwargs,
76
) -> None:
77
"""
78
Initialize repository sensor.
79
80
Parameters:
81
- github_conn_id: Reference to pre-defined GitHub Connection
82
- repository_name: Full qualified name of repository to monitor (e.g., "apache/airflow")
83
- result_processor: Function to process repository object and return boolean
84
- **kwargs: Additional BaseSensorOperator parameters
85
"""
86
87
def poke(self, context: Context) -> bool:
88
"""
89
Check sensor status; sensors deriving this class should override.
90
91
Base implementation raises AirflowException requiring override.
92
93
Raises:
94
AirflowException: Must be overridden in subclasses
95
"""
96
```
97
98
### GithubTagSensor
99
100
Specialized sensor for monitoring tag creation in repositories.
101
102
```python { .api }
103
class GithubTagSensor(BaseGithubRepositorySensor):
104
"""
105
Monitor a GitHub repository for tag creation.
106
107
Checks if specified tag exists in the repository's tags.
108
"""
109
110
# Template fields for dynamic tag name substitution
111
template_fields = ("tag_name",)
112
113
def __init__(
114
self,
115
*,
116
github_conn_id: str = "github_default",
117
tag_name: str | None = None,
118
repository_name: str | None = None,
119
**kwargs,
120
) -> None:
121
"""
122
Initialize tag sensor.
123
124
Parameters:
125
- github_conn_id: Reference to pre-defined GitHub Connection
126
- tag_name: Name of the tag to be monitored
127
- repository_name: Full qualified name of repository (e.g., "apache/airflow")
128
- **kwargs: Additional BaseSensorOperator parameters
129
"""
130
131
def poke(self, context: Context) -> bool:
132
"""
133
Check for tag existence in repository.
134
135
Logs progress and delegates to parent GithubSensor.poke().
136
137
Parameters:
138
- context: Airflow task execution context
139
140
Returns:
141
bool: True if tag exists in repository, False otherwise
142
"""
143
144
def tag_checker(self, repo: Any) -> bool | None:
145
"""
146
Check existence of tag in repository.
147
148
Parameters:
149
- repo: PyGithub Repository object
150
151
Returns:
152
bool | None: True if tag exists, False if not, None on error
153
154
Raises:
155
AirflowException: If GitHub API call fails
156
"""
157
```
158
159
## Usage Examples
160
161
### Basic Sensor Usage
162
163
```python
164
from airflow.providers.github.sensors.github import GithubSensor
165
166
# Monitor for repository existence
167
repo_sensor = GithubSensor(
168
task_id='wait_for_repo',
169
method_name='get_repo',
170
method_params={'full_name_or_id': 'apache/airflow'},
171
timeout=300,
172
poke_interval=30,
173
dag=dag
174
)
175
```
176
177
### Tag Monitoring
178
179
```python
180
from airflow.providers.github.sensors.github import GithubTagSensor
181
182
# Wait for specific tag to be created
183
tag_sensor = GithubTagSensor(
184
task_id='wait_for_release_tag',
185
repository_name='apache/airflow',
186
tag_name='v2.10.0',
187
timeout=1800, # 30 minutes
188
poke_interval=60, # Check every minute
189
dag=dag
190
)
191
192
# Wait for templated tag name
193
dynamic_tag_sensor = GithubTagSensor(
194
task_id='wait_for_dynamic_tag',
195
repository_name='apache/airflow',
196
tag_name='{{ dag_run.conf["expected_tag"] }}', # Templated
197
timeout=600,
198
poke_interval=30,
199
dag=dag
200
)
201
```
202
203
### Custom Repository Monitoring
204
205
```python
206
def check_open_issues(repo):
207
"""Check if repository has fewer than 100 open issues."""
208
return repo.open_issues_count < 100
209
210
issues_sensor = GithubSensor(
211
task_id='monitor_issue_count',
212
method_name='get_repo',
213
method_params={'full_name_or_id': 'apache/airflow'},
214
result_processor=check_open_issues,
215
timeout=300,
216
poke_interval=60,
217
dag=dag
218
)
219
```
220
221
### Release Monitoring
222
223
```python
224
def check_new_release(repo):
225
"""Check if repository has a release in the last 24 hours."""
226
from datetime import datetime, timedelta
227
228
cutoff = datetime.now() - timedelta(hours=24)
229
230
try:
231
latest_release = repo.get_latest_release()
232
return latest_release.created_at >= cutoff
233
except:
234
return False # No releases found
235
236
release_sensor = GithubSensor(
237
task_id='wait_for_new_release',
238
method_name='get_repo',
239
method_params={'full_name_or_id': 'apache/airflow'},
240
result_processor=check_new_release,
241
timeout=3600,
242
poke_interval=300, # Check every 5 minutes
243
dag=dag
244
)
245
```
246
247
### Pull Request Monitoring
248
249
```python
250
def check_pr_merged(repo):
251
"""Check if specific PR is merged."""
252
try:
253
pr = repo.get_pull(123) # PR number 123
254
return pr.merged
255
except:
256
return False
257
258
pr_sensor = GithubSensor(
259
task_id='wait_for_pr_merge',
260
method_name='get_repo',
261
method_params={'full_name_or_id': 'apache/airflow'},
262
result_processor=check_pr_merged,
263
timeout=1800,
264
poke_interval=120,
265
dag=dag
266
)
267
```
268
269
### Organization Member Monitoring
270
271
```python
272
def check_user_membership(org):
273
"""Check if user is member of organization."""
274
try:
275
return org.has_in_members(org.get_member('username'))
276
except:
277
return False
278
279
member_sensor = GithubSensor(
280
task_id='check_org_membership',
281
method_name='get_organization',
282
method_params={'login': 'apache'},
283
result_processor=check_user_membership,
284
timeout=300,
285
poke_interval=60,
286
dag=dag
287
)
288
```
289
290
### Complex Repository State Monitoring
291
292
```python
293
def check_repo_health(repo):
294
"""Check multiple repository health indicators."""
295
try:
296
# Multiple conditions for repo health
297
has_readme = any(f.name.lower().startswith('readme') for f in repo.get_contents(''))
298
has_license = repo.license is not None
299
recent_activity = (datetime.now() - repo.updated_at).days < 30
300
301
return has_readme and has_license and recent_activity
302
except:
303
return False
304
305
health_sensor = GithubSensor(
306
task_id='monitor_repo_health',
307
method_name='get_repo',
308
method_params={'full_name_or_id': 'apache/airflow'},
309
result_processor=check_repo_health,
310
timeout=600,
311
poke_interval=300,
312
dag=dag
313
)
314
```
315
316
## Custom Sensor Implementation
317
318
Extend `BaseGithubRepositorySensor` for specific repository monitoring:
319
320
```python
321
from airflow.providers.github.sensors.github import BaseGithubRepositorySensor
322
323
class GithubStarsSensor(BaseGithubRepositorySensor):
324
"""Monitor repository for minimum number of stars."""
325
326
def __init__(self, min_stars: int, **kwargs):
327
super().__init__(**kwargs)
328
self.min_stars = min_stars
329
330
def poke(self, context):
331
"""Check if repository has minimum stars."""
332
hook = GithubHook(github_conn_id=self.github_conn_id)
333
repo = hook.client.get_repo(self.repository_name)
334
335
current_stars = repo.stargazers_count
336
self.log.info(f"Repository has {current_stars} stars, need {self.min_stars}")
337
338
return current_stars >= self.min_stars
339
340
# Usage
341
stars_sensor = GithubStarsSensor(
342
task_id='wait_for_popularity',
343
repository_name='apache/airflow',
344
min_stars=1000,
345
timeout=3600,
346
dag=dag
347
)
348
```
349
350
## Error Handling
351
352
Sensors handle GitHub API exceptions gracefully:
353
354
```python
355
# GitHub API errors are caught and logged
356
try:
357
result = sensor.poke(context)
358
except AirflowException as e:
359
# Sensor will retry based on configuration
360
if "rate limit" in str(e).lower():
361
print("Rate limit exceeded, will retry")
362
elif "404" in str(e):
363
print("Repository not found")
364
```
365
366
## Sensor Configuration
367
368
Common sensor parameters:
369
370
- **timeout**: Maximum time to wait (seconds)
371
- **poke_interval**: Time between checks (seconds)
372
- **soft_fail**: Continue DAG on sensor timeout
373
- **mode**: 'poke' (default) or 'reschedule'
374
375
```python
376
sensor = GithubTagSensor(
377
task_id='wait_for_tag',
378
repository_name='apache/airflow',
379
tag_name='v2.10.0',
380
timeout=3600, # Wait up to 1 hour
381
poke_interval=300, # Check every 5 minutes
382
soft_fail=True, # Don't fail entire DAG on timeout
383
mode='reschedule', # Free up worker slot between checks
384
dag=dag
385
)
386
```