or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-backport-providers-jenkins

Jenkins provider for Apache Airflow that enables workflow orchestration and automation with Jenkins CI/CD systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-backport-providers-jenkins@2021.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-jenkins@2021.3.0

index.mddocs/

Apache Airflow Backport Providers Jenkins

Jenkins integration provider for Apache Airflow 1.10.*, enabling workflow orchestration and automation with Jenkins CI/CD systems. This backport provider allows Airflow users to trigger Jenkins jobs, monitor build status, and integrate Jenkins operations into their data pipelines and workflow DAGs.

Package Information

  • Package Name: apache-airflow-backport-providers-jenkins
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-backport-providers-jenkins
  • Version: 2021.3.3
  • Python Support: Python 3.6+

Core Imports

from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTriggerOperator
from distutils.util import strtobool

Basic Usage

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTriggerOperator

# Define DAG
default_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "jenkins_example",
    default_args=default_args,
    start_date=datetime(2021, 1, 1),
    schedule_interval=None
) as dag:
    
    # Trigger a Jenkins job
    trigger_build = JenkinsJobTriggerOperator(
        task_id="trigger_jenkins_job",
        jenkins_connection_id="jenkins_default",
        job_name="my-build-job",
        parameters={"branch": "main", "environment": "staging"},
        allowed_jenkins_states=["SUCCESS"],
        sleep_time=10
    )

Connection Configuration

Before using the Jenkins provider, configure a Jenkins connection in Airflow:

  1. Connection Type: jenkins
  2. Host: Jenkins server hostname (e.g., jenkins.company.com)
  3. Port: Jenkins server port (typically 8080)
  4. Login: Jenkins username
  5. Password: Jenkins password or API token
  6. Extra: "true" for HTTPS, "false" for HTTP (default: false)

Capabilities

Jenkins Connection Management

Low-level Jenkins server connection and authentication handling through the JenkinsHook class.

class JenkinsHook(BaseHook):
    conn_name_attr = 'conn_id'
    default_conn_name = 'jenkins_default'
    conn_type = 'jenkins'
    hook_name = 'Jenkins'
    
    def __init__(self, conn_id: str = default_conn_name) -> None: ...
    def get_jenkins_server(self) -> jenkins.Jenkins: ...

The JenkinsHook manages connections to Jenkins servers with automatic HTTPS/HTTP detection based on connection configuration. It provides access to the underlying python-jenkins library for advanced operations.

Usage Example

from airflow.providers.jenkins.hooks.jenkins import JenkinsHook

# Initialize hook with connection
hook = JenkinsHook("my_jenkins_connection")

# Get Jenkins server instance for direct API access
jenkins_server = hook.get_jenkins_server()

# Use jenkins server for advanced operations
job_info = jenkins_server.get_job_info("my-job")
build_info = jenkins_server.get_build_info("my-job", 42)

Job Triggering and Monitoring

Automated Jenkins job execution with parameter passing, status monitoring, and state validation through the JenkinsJobTriggerOperator.

class JenkinsJobTriggerOperator(BaseOperator):
    template_fields = ('parameters',)
    template_ext = ('.json',)
    ui_color = '#f9ec86'
    
    def __init__(
        self,
        *,
        jenkins_connection_id: str,
        job_name: str,
        parameters: Optional[Union[str, Dict, List]] = "",
        sleep_time: int = 10,
        max_try_before_job_appears: int = 10,
        allowed_jenkins_states: Optional[Iterable[str]] = None,
        **kwargs
    ): ...
    
    def execute(self, context: Mapping[Any, Any]) -> Optional[str]: ...
    def build_job(self, jenkins_server: jenkins.Jenkins, params: Optional[Union[str, Dict, List]] = "") -> Optional[JenkinsRequest]: ...
    def poll_job_in_queue(self, location: str, jenkins_server: jenkins.Jenkins) -> int: ...
    def get_hook(self) -> JenkinsHook: ...

The operator handles the complete Jenkins job lifecycle: triggering builds with parameters, polling the queue until execution starts, monitoring job progress, and validating final job state against allowed outcomes.

Parameters

  • jenkins_connection_id (str): Airflow connection ID for Jenkins server
  • job_name (str): Name of Jenkins job to trigger
  • parameters (str|Dict|List, optional): Job parameters as JSON string, dictionary, or list (templated)
  • sleep_time (int, default=10): Seconds to wait between status checks (minimum 1)
  • max_try_before_job_appears (int, default=10): Maximum attempts to find job in queue
  • allowed_jenkins_states (Iterable[str], optional): Acceptable job completion states (default: ['SUCCESS'])

Usage Examples

Basic job triggering:

trigger_job = JenkinsJobTriggerOperator(
    task_id="trigger_build",
    jenkins_connection_id="jenkins_prod",
    job_name="deploy-application"
)

Parametrized job with custom success states:

trigger_with_params = JenkinsJobTriggerOperator(
    task_id="trigger_test_job",
    jenkins_connection_id="jenkins_test",
    job_name="run-tests",
    parameters={
        "test_suite": "integration",
        "parallel_jobs": 4,
        "timeout": 3600
    },
    allowed_jenkins_states=["SUCCESS", "UNSTABLE"],
    sleep_time=15
)

Using JSON string parameters:

trigger_from_file = JenkinsJobTriggerOperator(
    task_id="trigger_from_config",
    jenkins_connection_id="jenkins_default",
    job_name="process-data",
    parameters='{"input_path": "/data/input", "output_format": "parquet"}',
    max_try_before_job_appears=20
)

Helper Functions

Utility functions for advanced Jenkins API interactions with proper error handling.

def jenkins_request_with_headers(
    jenkins_server: jenkins.Jenkins, 
    req: requests.Request
) -> Optional[JenkinsRequest]: ...

Executes Jenkins requests returning both response body and headers, essential for obtaining queue location information during job triggering.

Types

from typing import Any, Dict, Iterable, List, Mapping, Optional, Union
from requests import Request
import jenkins

# Type aliases
JenkinsRequest = Mapping[str, Any]
ParamType = Optional[Union[str, Dict, List]]

# Jenkins library types  
Jenkins = jenkins.Jenkins
JenkinsException = jenkins.JenkinsException
NotFoundException = jenkins.NotFoundException
TimeoutException = jenkins.TimeoutException
EmptyResponseException = jenkins.EmptyResponseException

Error Handling

The provider handles various Jenkins-specific exceptions:

  • jenkins.JenkinsException: General Jenkins API errors (authentication, invalid parameters)
  • jenkins.NotFoundException: Job or build not found (404 errors)
  • jenkins.TimeoutException: Request timeout during API calls
  • jenkins.EmptyResponseException: Empty response from Jenkins server
  • AirflowException: Airflow-specific errors (missing parameters, job execution failures)

Common error scenarios:

try:
    result = trigger_job.execute(context)
except AirflowException as e:
    # Handle Airflow-specific errors (missing connection, job failures)
    print(f"Job execution failed: {e}")
except jenkins.JenkinsException as e:
    # Handle Jenkins API errors (authentication, invalid job parameters)
    print(f"Jenkins API error: {e}")

Advanced Usage Patterns

Artifact Retrieval

Combine JenkinsJobTriggerOperator with custom operators for artifact handling:

from airflow.operators.python import PythonOperator
from requests import Request

def download_artifact(**context):
    hook = JenkinsHook("jenkins_default")
    jenkins_server = hook.get_jenkins_server()
    
    # Get job URL from previous task
    job_url = context['task_instance'].xcom_pull(task_ids='trigger_job')
    artifact_url = f"{job_url}artifact/build-output.zip"
    
    # Download artifact using Jenkins API
    request = Request(method='GET', url=artifact_url)
    response = jenkins_server.jenkins_open(request)
    
    return response

# Task sequence
trigger_job = JenkinsJobTriggerOperator(
    task_id="trigger_job",
    jenkins_connection_id="jenkins_default", 
    job_name="build-package"
)

download_artifacts = PythonOperator(
    task_id="download_artifacts",
    python_callable=download_artifact
)

trigger_job >> download_artifacts

Multiple Job Coordination

Trigger multiple related Jenkins jobs with dependency management:

# Trigger build job
build_job = JenkinsJobTriggerOperator(
    task_id="build_application",
    jenkins_connection_id="jenkins_ci",
    job_name="build-app",
    parameters={"branch": "{{ dag_run.conf.get('branch', 'main') }}"}
)

# Trigger test job after build
test_job = JenkinsJobTriggerOperator(
    task_id="run_tests", 
    jenkins_connection_id="jenkins_ci",
    job_name="test-app",
    parameters={"build_number": "{{ ti.xcom_pull(task_ids='build_application') }}"},
    allowed_jenkins_states=["SUCCESS", "UNSTABLE"]
)

# Trigger deployment after successful tests
deploy_job = JenkinsJobTriggerOperator(
    task_id="deploy_application",
    jenkins_connection_id="jenkins_prod", 
    job_name="deploy-app",
    parameters={"environment": "production"}
)

build_job >> test_job >> deploy_job