Jenkins provider for Apache Airflow that enables workflow orchestration and automation with Jenkins CI/CD systems
npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-jenkins@2021.3.0Jenkins integration provider for Apache Airflow 1.10.*, enabling workflow orchestration and automation with Jenkins CI/CD systems. This backport provider allows Airflow users to trigger Jenkins jobs, monitor build status, and integrate Jenkins operations into their data pipelines and workflow DAGs.
pip install apache-airflow-backport-providers-jenkinsfrom airflow.providers.jenkins.hooks.jenkins import JenkinsHook
from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTriggerOperator
from distutils.util import strtoboolfrom datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTriggerOperator
# Define DAG
default_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"jenkins_example",
default_args=default_args,
start_date=datetime(2021, 1, 1),
schedule_interval=None
) as dag:
# Trigger a Jenkins job
trigger_build = JenkinsJobTriggerOperator(
task_id="trigger_jenkins_job",
jenkins_connection_id="jenkins_default",
job_name="my-build-job",
parameters={"branch": "main", "environment": "staging"},
allowed_jenkins_states=["SUCCESS"],
sleep_time=10
)Before using the Jenkins provider, configure a Jenkins connection in Airflow:
"true" for HTTPS, "false" for HTTP (default: false)Low-level Jenkins server connection and authentication handling through the JenkinsHook class.
class JenkinsHook(BaseHook):
conn_name_attr = 'conn_id'
default_conn_name = 'jenkins_default'
conn_type = 'jenkins'
hook_name = 'Jenkins'
def __init__(self, conn_id: str = default_conn_name) -> None: ...
def get_jenkins_server(self) -> jenkins.Jenkins: ...The JenkinsHook manages connections to Jenkins servers with automatic HTTPS/HTTP detection based on connection configuration. It provides access to the underlying python-jenkins library for advanced operations.
from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
# Initialize hook with connection
hook = JenkinsHook("my_jenkins_connection")
# Get Jenkins server instance for direct API access
jenkins_server = hook.get_jenkins_server()
# Use jenkins server for advanced operations
job_info = jenkins_server.get_job_info("my-job")
build_info = jenkins_server.get_build_info("my-job", 42)Automated Jenkins job execution with parameter passing, status monitoring, and state validation through the JenkinsJobTriggerOperator.
class JenkinsJobTriggerOperator(BaseOperator):
template_fields = ('parameters',)
template_ext = ('.json',)
ui_color = '#f9ec86'
def __init__(
self,
*,
jenkins_connection_id: str,
job_name: str,
parameters: Optional[Union[str, Dict, List]] = "",
sleep_time: int = 10,
max_try_before_job_appears: int = 10,
allowed_jenkins_states: Optional[Iterable[str]] = None,
**kwargs
): ...
def execute(self, context: Mapping[Any, Any]) -> Optional[str]: ...
def build_job(self, jenkins_server: jenkins.Jenkins, params: Optional[Union[str, Dict, List]] = "") -> Optional[JenkinsRequest]: ...
def poll_job_in_queue(self, location: str, jenkins_server: jenkins.Jenkins) -> int: ...
def get_hook(self) -> JenkinsHook: ...The operator handles the complete Jenkins job lifecycle: triggering builds with parameters, polling the queue until execution starts, monitoring job progress, and validating final job state against allowed outcomes.
Basic job triggering:
trigger_job = JenkinsJobTriggerOperator(
task_id="trigger_build",
jenkins_connection_id="jenkins_prod",
job_name="deploy-application"
)Parametrized job with custom success states:
trigger_with_params = JenkinsJobTriggerOperator(
task_id="trigger_test_job",
jenkins_connection_id="jenkins_test",
job_name="run-tests",
parameters={
"test_suite": "integration",
"parallel_jobs": 4,
"timeout": 3600
},
allowed_jenkins_states=["SUCCESS", "UNSTABLE"],
sleep_time=15
)Using JSON string parameters:
trigger_from_file = JenkinsJobTriggerOperator(
task_id="trigger_from_config",
jenkins_connection_id="jenkins_default",
job_name="process-data",
parameters='{"input_path": "/data/input", "output_format": "parquet"}',
max_try_before_job_appears=20
)Utility functions for advanced Jenkins API interactions with proper error handling.
def jenkins_request_with_headers(
jenkins_server: jenkins.Jenkins,
req: requests.Request
) -> Optional[JenkinsRequest]: ...Executes Jenkins requests returning both response body and headers, essential for obtaining queue location information during job triggering.
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union
from requests import Request
import jenkins
# Type aliases
JenkinsRequest = Mapping[str, Any]
ParamType = Optional[Union[str, Dict, List]]
# Jenkins library types
Jenkins = jenkins.Jenkins
JenkinsException = jenkins.JenkinsException
NotFoundException = jenkins.NotFoundException
TimeoutException = jenkins.TimeoutException
EmptyResponseException = jenkins.EmptyResponseExceptionThe provider handles various Jenkins-specific exceptions:
Common error scenarios:
try:
result = trigger_job.execute(context)
except AirflowException as e:
# Handle Airflow-specific errors (missing connection, job failures)
print(f"Job execution failed: {e}")
except jenkins.JenkinsException as e:
# Handle Jenkins API errors (authentication, invalid job parameters)
print(f"Jenkins API error: {e}")Combine JenkinsJobTriggerOperator with custom operators for artifact handling:
from airflow.operators.python import PythonOperator
from requests import Request
def download_artifact(**context):
hook = JenkinsHook("jenkins_default")
jenkins_server = hook.get_jenkins_server()
# Get job URL from previous task
job_url = context['task_instance'].xcom_pull(task_ids='trigger_job')
artifact_url = f"{job_url}artifact/build-output.zip"
# Download artifact using Jenkins API
request = Request(method='GET', url=artifact_url)
response = jenkins_server.jenkins_open(request)
return response
# Task sequence
trigger_job = JenkinsJobTriggerOperator(
task_id="trigger_job",
jenkins_connection_id="jenkins_default",
job_name="build-package"
)
download_artifacts = PythonOperator(
task_id="download_artifacts",
python_callable=download_artifact
)
trigger_job >> download_artifactsTrigger multiple related Jenkins jobs with dependency management:
# Trigger build job
build_job = JenkinsJobTriggerOperator(
task_id="build_application",
jenkins_connection_id="jenkins_ci",
job_name="build-app",
parameters={"branch": "{{ dag_run.conf.get('branch', 'main') }}"}
)
# Trigger test job after build
test_job = JenkinsJobTriggerOperator(
task_id="run_tests",
jenkins_connection_id="jenkins_ci",
job_name="test-app",
parameters={"build_number": "{{ ti.xcom_pull(task_ids='build_application') }}"},
allowed_jenkins_states=["SUCCESS", "UNSTABLE"]
)
# Trigger deployment after successful tests
deploy_job = JenkinsJobTriggerOperator(
task_id="deploy_application",
jenkins_connection_id="jenkins_prod",
job_name="deploy-app",
parameters={"environment": "production"}
)
build_job >> test_job >> deploy_job