Apache Airflow provider package for seamless GitHub integration through hooks, operators, and sensors
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
GitHub Operator provides generic execution of GitHub API operations as Airflow tasks. Uses PyGithub SDK methods dynamically with support for templated parameters and result processing.
Generic operator for executing any GitHub API method through PyGithub client.
class GithubOperator(BaseOperator):
"""
Interact and perform actions on GitHub API.
This operator is designed to use GitHub's Python SDK: https://github.com/PyGithub/PyGithub
Executes any method available on the PyGithub client with provided arguments.
"""
# Template fields for dynamic argument substitution
template_fields = ("github_method_args",)
def __init__(
self,
*,
github_method: str,
github_conn_id: str = "github_default",
github_method_args: dict | None = None,
result_processor: Callable | None = None,
**kwargs,
) -> None:
"""
Initialize GitHub operator.
Parameters:
- github_method: Method name from PyGithub client to be called
- github_conn_id: Reference to pre-defined GitHub Connection
- github_method_args: Method parameters for the github_method (templated)
- result_processor: Function to further process the response from GitHub API
- **kwargs: Additional BaseOperator parameters
"""
def execute(self, context: Context) -> Any:
"""
Execute GitHub method with provided arguments.
Creates GithubHook, gets client, and calls specified method.
Optionally processes results through result_processor function.
Parameters:
- context: Airflow task execution context
Returns:
Any: Result from GitHub API method, optionally processed
Raises:
AirflowException: If GitHub operation fails or method doesn't exist
"""from airflow.providers.github.operators.github import GithubOperator
# Get user information
get_user = GithubOperator(
task_id='get_github_user',
github_method='get_user',
dag=dag
)
# Get specific repository
get_repo = GithubOperator(
task_id='get_repository',
github_method='get_repo',
github_method_args={'full_name_or_id': 'apache/airflow'},
dag=dag
)# List user repositories
list_repos = GithubOperator(
task_id='list_repositories',
github_method='get_user',
result_processor=lambda user: [repo.name for repo in user.get_repos()],
dag=dag
)
# Get repository issues
get_issues = GithubOperator(
task_id='get_repo_issues',
github_method='get_repo',
github_method_args={'full_name_or_id': 'apache/airflow'},
result_processor=lambda repo: list(repo.get_issues(state='open')),
dag=dag
)
# Get repository tags
list_tags = GithubOperator(
task_id='list_repo_tags',
github_method='get_repo',
github_method_args={'full_name_or_id': 'apache/airflow'},
result_processor=lambda repo: [tag.name for tag in repo.get_tags()],
dag=dag
)# Get organization
get_org = GithubOperator(
task_id='get_organization',
github_method='get_organization',
github_method_args={'login': 'apache'},
dag=dag
)
# List organization repositories
org_repos = GithubOperator(
task_id='list_org_repos',
github_method='get_organization',
github_method_args={'login': 'apache'},
result_processor=lambda org: [repo.name for repo in org.get_repos()],
dag=dag
)# Use templated arguments with Airflow context
templated_operation = GithubOperator(
task_id='templated_github_call',
github_method='get_repo',
github_method_args={
'full_name_or_id': '{{ dag_run.conf["repo_name"] }}' # Templated
},
result_processor=lambda repo: repo.stargazers_count,
dag=dag
)import logging
def process_repo_info(repo):
"""Custom processor to extract and log repository information."""
info = {
'name': repo.name,
'stars': repo.stargazers_count,
'forks': repo.forks_count,
'language': repo.language,
'open_issues': repo.open_issues_count
}
logging.info(f"Repository info: {info}")
return info
analyze_repo = GithubOperator(
task_id='analyze_repository',
github_method='get_repo',
github_method_args={'full_name_or_id': 'apache/airflow'},
result_processor=process_repo_info,
dag=dag
)def get_recent_releases(repo):
"""Get releases from the last 30 days."""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=30)
recent_releases = []
for release in repo.get_releases():
if release.created_at >= cutoff_date:
recent_releases.append({
'tag': release.tag_name,
'name': release.name,
'created': release.created_at.isoformat()
})
return recent_releases
recent_releases = GithubOperator(
task_id='get_recent_releases',
github_method='get_repo',
github_method_args={'full_name_or_id': 'apache/airflow'},
result_processor=get_recent_releases,
dag=dag
)The operator can call any method available on the PyGithub Github client. Common methods include:
get_user(): Get authenticated userget_user(login): Get specific user by loginget_repo(full_name_or_id): Get specific repositorysearch_repositories(query): Search repositoriesget_organization(login): Get organizationsearch_users(query): Search usersThe operator wraps GitHub API exceptions:
# GitHub API errors are caught and re-raised as AirflowException
try:
result = operator.execute(context)
except AirflowException as e:
# Handle GitHub API failures
if "404" in str(e):
print("Resource not found")
elif "403" in str(e):
print("Access forbidden - check token permissions")AirflowException with GitHub error detailsInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-github