CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-github

Apache Airflow provider package for seamless GitHub integration through hooks, operators, and sensors

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sensors.mddocs/

GitHub Sensors

GitHub Sensors monitor GitHub resources and trigger downstream tasks when conditions are met. Provides base sensor classes and specialized sensors for common GitHub monitoring scenarios.

Capabilities

GithubSensor

Base sensor for monitoring any GitHub resource using PyGithub methods.

class GithubSensor(BaseSensorOperator):
    """
    Base GithubSensor which can monitor for any change.
    
    Executes specified PyGithub method and evaluates result through
    optional result processor to determine sensor condition.
    """
    
    def __init__(
        self,
        *,
        method_name: str,
        github_conn_id: str = "github_default",
        method_params: dict | None = None,
        result_processor: Callable | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize GitHub sensor.
        
        Parameters:
        - method_name: Method name from PyGithub to be executed
        - github_conn_id: Reference to pre-defined GitHub Connection
        - method_params: Parameters for the method_name
        - result_processor: Function that returns boolean and acts as sensor response
        - **kwargs: Additional BaseSensorOperator parameters (timeout, poke_interval, etc.)
        """
    
    def poke(self, context: Context) -> bool:
        """
        Check GitHub resource state and return boolean result.
        
        Executes GitHub method with parameters and optionally 
        processes result through result_processor function.
        
        Note: method_params must not be None when poke() is called, or TypeError will occur.
        
        Parameters:
        - context: Airflow task execution context
        
        Returns:
        bool: True if condition is met, False otherwise
        """

BaseGithubRepositorySensor

Base sensor for repository-level monitoring operations.

class BaseGithubRepositorySensor(GithubSensor):
    """
    Base GitHub sensor at Repository level.
    
    Pre-configured to use get_repo method with repository_name parameter.
    Designed to be subclassed for specific repository monitoring scenarios.
    """
    
    def __init__(
        self,
        *,
        github_conn_id: str = "github_default",
        repository_name: str | None = None,
        result_processor: Callable | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize repository sensor.
        
        Parameters:
        - github_conn_id: Reference to pre-defined GitHub Connection
        - repository_name: Full qualified name of repository to monitor (e.g., "apache/airflow")
        - result_processor: Function to process repository object and return boolean
        - **kwargs: Additional BaseSensorOperator parameters
        """
    
    def poke(self, context: Context) -> bool:
        """
        Check sensor status; sensors deriving this class should override.
        
        Base implementation raises AirflowException requiring override.
        
        Raises:
        AirflowException: Must be overridden in subclasses
        """

GithubTagSensor

Specialized sensor for monitoring tag creation in repositories.

class GithubTagSensor(BaseGithubRepositorySensor):
    """
    Monitor a GitHub repository for tag creation.
    
    Checks if specified tag exists in the repository's tags.
    """
    
    # Template fields for dynamic tag name substitution
    template_fields = ("tag_name",)
    
    def __init__(
        self,
        *,
        github_conn_id: str = "github_default",
        tag_name: str | None = None,
        repository_name: str | None = None,
        **kwargs,
    ) -> None:
        """
        Initialize tag sensor.
        
        Parameters:
        - github_conn_id: Reference to pre-defined GitHub Connection
        - tag_name: Name of the tag to be monitored
        - repository_name: Full qualified name of repository (e.g., "apache/airflow")
        - **kwargs: Additional BaseSensorOperator parameters
        """
    
    def poke(self, context: Context) -> bool:
        """
        Check for tag existence in repository.
        
        Logs progress and delegates to parent GithubSensor.poke().
        
        Parameters:
        - context: Airflow task execution context
        
        Returns:
        bool: True if tag exists in repository, False otherwise
        """
    
    def tag_checker(self, repo: Any) -> bool | None:
        """
        Check existence of tag in repository.
        
        Parameters:
        - repo: PyGithub Repository object
        
        Returns:
        bool | None: True if tag exists, False if not, None on error
        
        Raises:
        AirflowException: If GitHub API call fails
        """

Usage Examples

Basic Sensor Usage

from airflow.providers.github.sensors.github import GithubSensor

# Monitor for repository existence
repo_sensor = GithubSensor(
    task_id='wait_for_repo',
    method_name='get_repo',
    method_params={'full_name_or_id': 'apache/airflow'},
    timeout=300,
    poke_interval=30,
    dag=dag
)

Tag Monitoring

from airflow.providers.github.sensors.github import GithubTagSensor

# Wait for specific tag to be created
tag_sensor = GithubTagSensor(
    task_id='wait_for_release_tag',
    repository_name='apache/airflow',
    tag_name='v2.10.0',
    timeout=1800,  # 30 minutes
    poke_interval=60,  # Check every minute
    dag=dag
)

# Wait for templated tag name
dynamic_tag_sensor = GithubTagSensor(
    task_id='wait_for_dynamic_tag',
    repository_name='apache/airflow',
    tag_name='{{ dag_run.conf["expected_tag"] }}',  # Templated
    timeout=600,
    poke_interval=30,
    dag=dag
)

Custom Repository Monitoring

def check_open_issues(repo):
    """Check if repository has fewer than 100 open issues."""
    return repo.open_issues_count < 100

issues_sensor = GithubSensor(
    task_id='monitor_issue_count',
    method_name='get_repo',
    method_params={'full_name_or_id': 'apache/airflow'},
    result_processor=check_open_issues,
    timeout=300,
    poke_interval=60,
    dag=dag
)

Release Monitoring

def check_new_release(repo):
    """Check if repository has a release in the last 24 hours."""
    from datetime import datetime, timedelta
    
    cutoff = datetime.now() - timedelta(hours=24)
    
    try:
        latest_release = repo.get_latest_release()
        return latest_release.created_at >= cutoff
    except:
        return False  # No releases found

release_sensor = GithubSensor(
    task_id='wait_for_new_release',
    method_name='get_repo',
    method_params={'full_name_or_id': 'apache/airflow'},
    result_processor=check_new_release,
    timeout=3600,
    poke_interval=300,  # Check every 5 minutes
    dag=dag
)

Pull Request Monitoring

def check_pr_merged(repo):
    """Check if specific PR is merged."""
    try:
        pr = repo.get_pull(123)  # PR number 123
        return pr.merged
    except:
        return False

pr_sensor = GithubSensor(
    task_id='wait_for_pr_merge',
    method_name='get_repo',
    method_params={'full_name_or_id': 'apache/airflow'},
    result_processor=check_pr_merged,
    timeout=1800,
    poke_interval=120,
    dag=dag
)

Organization Member Monitoring

def check_user_membership(org):
    """Check if user is member of organization."""
    try:
        return org.has_in_members(org.get_member('username'))
    except:
        return False

member_sensor = GithubSensor(
    task_id='check_org_membership',
    method_name='get_organization',
    method_params={'login': 'apache'},
    result_processor=check_user_membership,
    timeout=300,
    poke_interval=60,
    dag=dag
)

Complex Repository State Monitoring

def check_repo_health(repo):
    """Check multiple repository health indicators."""
    try:
        # Multiple conditions for repo health
        has_readme = any(f.name.lower().startswith('readme') for f in repo.get_contents(''))
        has_license = repo.license is not None
        recent_activity = (datetime.now() - repo.updated_at).days < 30
        
        return has_readme and has_license and recent_activity
    except:
        return False

health_sensor = GithubSensor(
    task_id='monitor_repo_health',
    method_name='get_repo',
    method_params={'full_name_or_id': 'apache/airflow'},
    result_processor=check_repo_health,
    timeout=600,
    poke_interval=300,
    dag=dag
)

Custom Sensor Implementation

Extend BaseGithubRepositorySensor for specific repository monitoring:

from airflow.providers.github.sensors.github import BaseGithubRepositorySensor

class GithubStarsSensor(BaseGithubRepositorySensor):
    """Monitor repository for minimum number of stars."""
    
    def __init__(self, min_stars: int, **kwargs):
        super().__init__(**kwargs)
        self.min_stars = min_stars
    
    def poke(self, context):
        """Check if repository has minimum stars."""
        hook = GithubHook(github_conn_id=self.github_conn_id)
        repo = hook.client.get_repo(self.repository_name)
        
        current_stars = repo.stargazers_count
        self.log.info(f"Repository has {current_stars} stars, need {self.min_stars}")
        
        return current_stars >= self.min_stars

# Usage
stars_sensor = GithubStarsSensor(
    task_id='wait_for_popularity',
    repository_name='apache/airflow',
    min_stars=1000,
    timeout=3600,
    dag=dag
)

Error Handling

Sensors handle GitHub API exceptions gracefully:

# GitHub API errors are caught and logged
try:
    result = sensor.poke(context)
except AirflowException as e:
    # Sensor will retry based on configuration
    if "rate limit" in str(e).lower():
        print("Rate limit exceeded, will retry")
    elif "404" in str(e):
        print("Repository not found")

Sensor Configuration

Common sensor parameters:

  • timeout: Maximum time to wait (seconds)
  • poke_interval: Time between checks (seconds)
  • soft_fail: Continue DAG on sensor timeout
  • mode: 'poke' (default) or 'reschedule'
sensor = GithubTagSensor(
    task_id='wait_for_tag',
    repository_name='apache/airflow',
    tag_name='v2.10.0',
    timeout=3600,        # Wait up to 1 hour
    poke_interval=300,   # Check every 5 minutes
    soft_fail=True,      # Don't fail entire DAG on timeout
    mode='reschedule',   # Free up worker slot between checks
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-github

docs

hooks.md

index.md

operators.md

sensors.md

tile.json