Apache Airflow provider package for seamless GitHub integration through hooks, operators, and sensors
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
GitHub Sensors monitor GitHub resources and trigger downstream tasks when conditions are met. Provides base sensor classes and specialized sensors for common GitHub monitoring scenarios.
Base sensor for monitoring any GitHub resource using PyGithub methods.
class GithubSensor(BaseSensorOperator):
"""
Base GithubSensor which can monitor for any change.
Executes specified PyGithub method and evaluates result through
optional result processor to determine sensor condition.
"""
def __init__(
self,
*,
method_name: str,
github_conn_id: str = "github_default",
method_params: dict | None = None,
result_processor: Callable | None = None,
**kwargs,
) -> None:
"""
Initialize GitHub sensor.
Parameters:
- method_name: Method name from PyGithub to be executed
- github_conn_id: Reference to pre-defined GitHub Connection
- method_params: Parameters for the method_name
- result_processor: Function that returns boolean and acts as sensor response
- **kwargs: Additional BaseSensorOperator parameters (timeout, poke_interval, etc.)
"""
def poke(self, context: Context) -> bool:
"""
Check GitHub resource state and return boolean result.
Executes GitHub method with parameters and optionally
processes result through result_processor function.
Note: method_params must not be None when poke() is called, or TypeError will occur.
Parameters:
- context: Airflow task execution context
Returns:
bool: True if condition is met, False otherwise
"""Base sensor for repository-level monitoring operations.
class BaseGithubRepositorySensor(GithubSensor):
"""
Base GitHub sensor at Repository level.
Pre-configured to use get_repo method with repository_name parameter.
Designed to be subclassed for specific repository monitoring scenarios.
"""
def __init__(
self,
*,
github_conn_id: str = "github_default",
repository_name: str | None = None,
result_processor: Callable | None = None,
**kwargs,
) -> None:
"""
Initialize repository sensor.
Parameters:
- github_conn_id: Reference to pre-defined GitHub Connection
- repository_name: Full qualified name of repository to monitor (e.g., "apache/airflow")
- result_processor: Function to process repository object and return boolean
- **kwargs: Additional BaseSensorOperator parameters
"""
def poke(self, context: Context) -> bool:
"""
Check sensor status; sensors deriving this class should override.
Base implementation raises AirflowException requiring override.
Raises:
AirflowException: Must be overridden in subclasses
"""Specialized sensor for monitoring tag creation in repositories.
class GithubTagSensor(BaseGithubRepositorySensor):
"""
Monitor a GitHub repository for tag creation.
Checks if specified tag exists in the repository's tags.
"""
# Template fields for dynamic tag name substitution
template_fields = ("tag_name",)
def __init__(
self,
*,
github_conn_id: str = "github_default",
tag_name: str | None = None,
repository_name: str | None = None,
**kwargs,
) -> None:
"""
Initialize tag sensor.
Parameters:
- github_conn_id: Reference to pre-defined GitHub Connection
- tag_name: Name of the tag to be monitored
- repository_name: Full qualified name of repository (e.g., "apache/airflow")
- **kwargs: Additional BaseSensorOperator parameters
"""
def poke(self, context: Context) -> bool:
"""
Check for tag existence in repository.
Logs progress and delegates to parent GithubSensor.poke().
Parameters:
- context: Airflow task execution context
Returns:
bool: True if tag exists in repository, False otherwise
"""
def tag_checker(self, repo: Any) -> bool | None:
"""
Check existence of tag in repository.
Parameters:
- repo: PyGithub Repository object
Returns:
bool | None: True if tag exists, False if not, None on error
Raises:
AirflowException: If GitHub API call fails
"""from airflow.providers.github.sensors.github import GithubSensor
# Monitor for repository existence
repo_sensor = GithubSensor(
task_id='wait_for_repo',
method_name='get_repo',
method_params={'full_name_or_id': 'apache/airflow'},
timeout=300,
poke_interval=30,
dag=dag
)from airflow.providers.github.sensors.github import GithubTagSensor
# Wait for specific tag to be created
tag_sensor = GithubTagSensor(
task_id='wait_for_release_tag',
repository_name='apache/airflow',
tag_name='v2.10.0',
timeout=1800, # 30 minutes
poke_interval=60, # Check every minute
dag=dag
)
# Wait for templated tag name
dynamic_tag_sensor = GithubTagSensor(
task_id='wait_for_dynamic_tag',
repository_name='apache/airflow',
tag_name='{{ dag_run.conf["expected_tag"] }}', # Templated
timeout=600,
poke_interval=30,
dag=dag
)def check_open_issues(repo):
"""Check if repository has fewer than 100 open issues."""
return repo.open_issues_count < 100
issues_sensor = GithubSensor(
task_id='monitor_issue_count',
method_name='get_repo',
method_params={'full_name_or_id': 'apache/airflow'},
result_processor=check_open_issues,
timeout=300,
poke_interval=60,
dag=dag
)def check_new_release(repo):
"""Check if repository has a release in the last 24 hours."""
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(hours=24)
try:
latest_release = repo.get_latest_release()
return latest_release.created_at >= cutoff
except:
return False # No releases found
release_sensor = GithubSensor(
task_id='wait_for_new_release',
method_name='get_repo',
method_params={'full_name_or_id': 'apache/airflow'},
result_processor=check_new_release,
timeout=3600,
poke_interval=300, # Check every 5 minutes
dag=dag
)def check_pr_merged(repo):
"""Check if specific PR is merged."""
try:
pr = repo.get_pull(123) # PR number 123
return pr.merged
except:
return False
pr_sensor = GithubSensor(
task_id='wait_for_pr_merge',
method_name='get_repo',
method_params={'full_name_or_id': 'apache/airflow'},
result_processor=check_pr_merged,
timeout=1800,
poke_interval=120,
dag=dag
)def check_user_membership(org):
"""Check if user is member of organization."""
try:
return org.has_in_members(org.get_member('username'))
except:
return False
member_sensor = GithubSensor(
task_id='check_org_membership',
method_name='get_organization',
method_params={'login': 'apache'},
result_processor=check_user_membership,
timeout=300,
poke_interval=60,
dag=dag
)def check_repo_health(repo):
"""Check multiple repository health indicators."""
try:
# Multiple conditions for repo health
has_readme = any(f.name.lower().startswith('readme') for f in repo.get_contents(''))
has_license = repo.license is not None
recent_activity = (datetime.now() - repo.updated_at).days < 30
return has_readme and has_license and recent_activity
except:
return False
health_sensor = GithubSensor(
task_id='monitor_repo_health',
method_name='get_repo',
method_params={'full_name_or_id': 'apache/airflow'},
result_processor=check_repo_health,
timeout=600,
poke_interval=300,
dag=dag
)Extend BaseGithubRepositorySensor for specific repository monitoring:
from airflow.providers.github.sensors.github import BaseGithubRepositorySensor
class GithubStarsSensor(BaseGithubRepositorySensor):
"""Monitor repository for minimum number of stars."""
def __init__(self, min_stars: int, **kwargs):
super().__init__(**kwargs)
self.min_stars = min_stars
def poke(self, context):
"""Check if repository has minimum stars."""
hook = GithubHook(github_conn_id=self.github_conn_id)
repo = hook.client.get_repo(self.repository_name)
current_stars = repo.stargazers_count
self.log.info(f"Repository has {current_stars} stars, need {self.min_stars}")
return current_stars >= self.min_stars
# Usage
stars_sensor = GithubStarsSensor(
task_id='wait_for_popularity',
repository_name='apache/airflow',
min_stars=1000,
timeout=3600,
dag=dag
)Sensors handle GitHub API exceptions gracefully:
# GitHub API errors are caught and logged
try:
result = sensor.poke(context)
except AirflowException as e:
# Sensor will retry based on configuration
if "rate limit" in str(e).lower():
print("Rate limit exceeded, will retry")
elif "404" in str(e):
print("Repository not found")Common sensor parameters:
sensor = GithubTagSensor(
task_id='wait_for_tag',
repository_name='apache/airflow',
tag_name='v2.10.0',
timeout=3600, # Wait up to 1 hour
poke_interval=300, # Check every 5 minutes
soft_fail=True, # Don't fail entire DAG on timeout
mode='reschedule', # Free up worker slot between checks
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-github