Jenkins provider for Apache Airflow that enables workflow orchestration and automation with Jenkins CI/CD systems
npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-jenkins@2021.3.00
# Apache Airflow Backport Providers Jenkins
1
2
Jenkins integration provider for Apache Airflow 1.10.*, enabling workflow orchestration and automation with Jenkins CI/CD systems. This backport provider allows Airflow users to trigger Jenkins jobs, monitor build status, and integrate Jenkins operations into their data pipelines and workflow DAGs.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-backport-providers-jenkins
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-backport-providers-jenkins`
10
- **Version**: 2021.3.3
11
- **Python Support**: Python 3.6+
12
13
## Core Imports
14
15
```python
16
from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
17
from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTriggerOperator
18
from distutils.util import strtobool
19
```
20
21
## Basic Usage
22
23
```python
24
from datetime import datetime, timedelta
25
from airflow import DAG
26
from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTriggerOperator
27
28
# Define DAG
29
default_args = {
30
"owner": "airflow",
31
"retries": 1,
32
"retry_delay": timedelta(minutes=5),
33
}
34
35
with DAG(
36
"jenkins_example",
37
default_args=default_args,
38
start_date=datetime(2021, 1, 1),
39
schedule_interval=None
40
) as dag:
41
42
# Trigger a Jenkins job
43
trigger_build = JenkinsJobTriggerOperator(
44
task_id="trigger_jenkins_job",
45
jenkins_connection_id="jenkins_default",
46
job_name="my-build-job",
47
parameters={"branch": "main", "environment": "staging"},
48
allowed_jenkins_states=["SUCCESS"],
49
sleep_time=10
50
)
51
```
52
53
## Connection Configuration
54
55
Before using the Jenkins provider, configure a Jenkins connection in Airflow:
56
57
1. **Connection Type**: jenkins
58
2. **Host**: Jenkins server hostname (e.g., jenkins.company.com)
59
3. **Port**: Jenkins server port (typically 8080)
60
4. **Login**: Jenkins username
61
5. **Password**: Jenkins password or API token
62
6. **Extra**: `"true"` for HTTPS, `"false"` for HTTP (default: false)
63
64
## Capabilities
65
66
### Jenkins Connection Management
67
68
Low-level Jenkins server connection and authentication handling through the JenkinsHook class.
69
70
```python { .api }
71
class JenkinsHook(BaseHook):
72
conn_name_attr = 'conn_id'
73
default_conn_name = 'jenkins_default'
74
conn_type = 'jenkins'
75
hook_name = 'Jenkins'
76
77
def __init__(self, conn_id: str = default_conn_name) -> None: ...
78
def get_jenkins_server(self) -> jenkins.Jenkins: ...
79
```
80
81
The JenkinsHook manages connections to Jenkins servers with automatic HTTPS/HTTP detection based on connection configuration. It provides access to the underlying python-jenkins library for advanced operations.
82
83
#### Usage Example
84
85
```python
86
from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
87
88
# Initialize hook with connection
89
hook = JenkinsHook("my_jenkins_connection")
90
91
# Get Jenkins server instance for direct API access
92
jenkins_server = hook.get_jenkins_server()
93
94
# Use jenkins server for advanced operations
95
job_info = jenkins_server.get_job_info("my-job")
96
build_info = jenkins_server.get_build_info("my-job", 42)
97
```
98
99
### Job Triggering and Monitoring
100
101
Automated Jenkins job execution with parameter passing, status monitoring, and state validation through the JenkinsJobTriggerOperator.
102
103
```python { .api }
104
class JenkinsJobTriggerOperator(BaseOperator):
105
template_fields = ('parameters',)
106
template_ext = ('.json',)
107
ui_color = '#f9ec86'
108
109
def __init__(
110
self,
111
*,
112
jenkins_connection_id: str,
113
job_name: str,
114
parameters: Optional[Union[str, Dict, List]] = "",
115
sleep_time: int = 10,
116
max_try_before_job_appears: int = 10,
117
allowed_jenkins_states: Optional[Iterable[str]] = None,
118
**kwargs
119
): ...
120
121
def execute(self, context: Mapping[Any, Any]) -> Optional[str]: ...
122
def build_job(self, jenkins_server: jenkins.Jenkins, params: Optional[Union[str, Dict, List]] = "") -> Optional[JenkinsRequest]: ...
123
def poll_job_in_queue(self, location: str, jenkins_server: jenkins.Jenkins) -> int: ...
124
def get_hook(self) -> JenkinsHook: ...
125
```
126
127
The operator handles the complete Jenkins job lifecycle: triggering builds with parameters, polling the queue until execution starts, monitoring job progress, and validating final job state against allowed outcomes.
128
129
#### Parameters
130
131
- **jenkins_connection_id** (str): Airflow connection ID for Jenkins server
132
- **job_name** (str): Name of Jenkins job to trigger
133
- **parameters** (str|Dict|List, optional): Job parameters as JSON string, dictionary, or list (templated)
134
- **sleep_time** (int, default=10): Seconds to wait between status checks (minimum 1)
135
- **max_try_before_job_appears** (int, default=10): Maximum attempts to find job in queue
136
- **allowed_jenkins_states** (Iterable[str], optional): Acceptable job completion states (default: ['SUCCESS'])
137
138
#### Usage Examples
139
140
Basic job triggering:
141
142
```python
143
trigger_job = JenkinsJobTriggerOperator(
144
task_id="trigger_build",
145
jenkins_connection_id="jenkins_prod",
146
job_name="deploy-application"
147
)
148
```
149
150
Parametrized job with custom success states:
151
152
```python
153
trigger_with_params = JenkinsJobTriggerOperator(
154
task_id="trigger_test_job",
155
jenkins_connection_id="jenkins_test",
156
job_name="run-tests",
157
parameters={
158
"test_suite": "integration",
159
"parallel_jobs": 4,
160
"timeout": 3600
161
},
162
allowed_jenkins_states=["SUCCESS", "UNSTABLE"],
163
sleep_time=15
164
)
165
```
166
167
Using JSON string parameters:
168
169
```python
170
trigger_from_file = JenkinsJobTriggerOperator(
171
task_id="trigger_from_config",
172
jenkins_connection_id="jenkins_default",
173
job_name="process-data",
174
parameters='{"input_path": "/data/input", "output_format": "parquet"}',
175
max_try_before_job_appears=20
176
)
177
```
178
179
### Helper Functions
180
181
Utility functions for advanced Jenkins API interactions with proper error handling.
182
183
```python { .api }
184
def jenkins_request_with_headers(
185
jenkins_server: jenkins.Jenkins,
186
req: requests.Request
187
) -> Optional[JenkinsRequest]: ...
188
```
189
190
Executes Jenkins requests returning both response body and headers, essential for obtaining queue location information during job triggering.
191
192
## Types
193
194
```python { .api }
195
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union
196
from requests import Request
197
import jenkins
198
199
# Type aliases
200
JenkinsRequest = Mapping[str, Any]
201
ParamType = Optional[Union[str, Dict, List]]
202
203
# Jenkins library types
204
Jenkins = jenkins.Jenkins
205
JenkinsException = jenkins.JenkinsException
206
NotFoundException = jenkins.NotFoundException
207
TimeoutException = jenkins.TimeoutException
208
EmptyResponseException = jenkins.EmptyResponseException
209
```
210
211
## Error Handling
212
213
The provider handles various Jenkins-specific exceptions:
214
215
- **jenkins.JenkinsException**: General Jenkins API errors (authentication, invalid parameters)
216
- **jenkins.NotFoundException**: Job or build not found (404 errors)
217
- **jenkins.TimeoutException**: Request timeout during API calls
218
- **jenkins.EmptyResponseException**: Empty response from Jenkins server
219
- **AirflowException**: Airflow-specific errors (missing parameters, job execution failures)
220
221
Common error scenarios:
222
223
```python
224
try:
225
result = trigger_job.execute(context)
226
except AirflowException as e:
227
# Handle Airflow-specific errors (missing connection, job failures)
228
print(f"Job execution failed: {e}")
229
except jenkins.JenkinsException as e:
230
# Handle Jenkins API errors (authentication, invalid job parameters)
231
print(f"Jenkins API error: {e}")
232
```
233
234
## Advanced Usage Patterns
235
236
### Artifact Retrieval
237
238
Combine JenkinsJobTriggerOperator with custom operators for artifact handling:
239
240
```python
241
from airflow.operators.python import PythonOperator
242
from requests import Request
243
244
def download_artifact(**context):
245
hook = JenkinsHook("jenkins_default")
246
jenkins_server = hook.get_jenkins_server()
247
248
# Get job URL from previous task
249
job_url = context['task_instance'].xcom_pull(task_ids='trigger_job')
250
artifact_url = f"{job_url}artifact/build-output.zip"
251
252
# Download artifact using Jenkins API
253
request = Request(method='GET', url=artifact_url)
254
response = jenkins_server.jenkins_open(request)
255
256
return response
257
258
# Task sequence
259
trigger_job = JenkinsJobTriggerOperator(
260
task_id="trigger_job",
261
jenkins_connection_id="jenkins_default",
262
job_name="build-package"
263
)
264
265
download_artifacts = PythonOperator(
266
task_id="download_artifacts",
267
python_callable=download_artifact
268
)
269
270
trigger_job >> download_artifacts
271
```
272
273
### Multiple Job Coordination
274
275
Trigger multiple related Jenkins jobs with dependency management:
276
277
```python
278
# Trigger build job
279
build_job = JenkinsJobTriggerOperator(
280
task_id="build_application",
281
jenkins_connection_id="jenkins_ci",
282
job_name="build-app",
283
parameters={"branch": "{{ dag_run.conf.get('branch', 'main') }}"}
284
)
285
286
# Trigger test job after build
287
test_job = JenkinsJobTriggerOperator(
288
task_id="run_tests",
289
jenkins_connection_id="jenkins_ci",
290
job_name="test-app",
291
parameters={"build_number": "{{ ti.xcom_pull(task_ids='build_application') }}"},
292
allowed_jenkins_states=["SUCCESS", "UNSTABLE"]
293
)
294
295
# Trigger deployment after successful tests
296
deploy_job = JenkinsJobTriggerOperator(
297
task_id="deploy_application",
298
jenkins_connection_id="jenkins_prod",
299
job_name="deploy-app",
300
parameters={"environment": "production"}
301
)
302
303
build_job >> test_job >> deploy_job
304
```