or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-backport-providers-jenkins

Jenkins provider for Apache Airflow that enables workflow orchestration and automation with Jenkins CI/CD systems

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-backport-providers-jenkins@2021.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-backport-providers-jenkins@2021.3.0

0

# Apache Airflow Backport Providers Jenkins

1

2

Jenkins integration provider for Apache Airflow 1.10.*, enabling workflow orchestration and automation with Jenkins CI/CD systems. This backport provider allows Airflow users to trigger Jenkins jobs, monitor build status, and integrate Jenkins operations into their data pipelines and workflow DAGs.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-backport-providers-jenkins

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-backport-providers-jenkins`

10

- **Version**: 2021.3.3

11

- **Python Support**: Python 3.6+

12

13

## Core Imports

14

15

```python

16

from airflow.providers.jenkins.hooks.jenkins import JenkinsHook

17

from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTriggerOperator

18

from distutils.util import strtobool

19

```

20

21

## Basic Usage

22

23

```python

24

from datetime import datetime, timedelta

25

from airflow import DAG

26

from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTriggerOperator

27

28

# Define DAG

29

default_args = {

30

"owner": "airflow",

31

"retries": 1,

32

"retry_delay": timedelta(minutes=5),

33

}

34

35

with DAG(

36

"jenkins_example",

37

default_args=default_args,

38

start_date=datetime(2021, 1, 1),

39

schedule_interval=None

40

) as dag:

41

42

# Trigger a Jenkins job

43

trigger_build = JenkinsJobTriggerOperator(

44

task_id="trigger_jenkins_job",

45

jenkins_connection_id="jenkins_default",

46

job_name="my-build-job",

47

parameters={"branch": "main", "environment": "staging"},

48

allowed_jenkins_states=["SUCCESS"],

49

sleep_time=10

50

)

51

```

52

53

## Connection Configuration

54

55

Before using the Jenkins provider, configure a Jenkins connection in Airflow:

56

57

1. **Connection Type**: jenkins

58

2. **Host**: Jenkins server hostname (e.g., jenkins.company.com)

59

3. **Port**: Jenkins server port (typically 8080)

60

4. **Login**: Jenkins username

61

5. **Password**: Jenkins password or API token

62

6. **Extra**: `"true"` for HTTPS, `"false"` for HTTP (default: false)

63

64

## Capabilities

65

66

### Jenkins Connection Management

67

68

Low-level Jenkins server connection and authentication handling through the JenkinsHook class.

69

70

```python { .api }

71

class JenkinsHook(BaseHook):

72

conn_name_attr = 'conn_id'

73

default_conn_name = 'jenkins_default'

74

conn_type = 'jenkins'

75

hook_name = 'Jenkins'

76

77

def __init__(self, conn_id: str = default_conn_name) -> None: ...

78

def get_jenkins_server(self) -> jenkins.Jenkins: ...

79

```

80

81

The JenkinsHook manages connections to Jenkins servers with automatic HTTPS/HTTP detection based on connection configuration. It provides access to the underlying python-jenkins library for advanced operations.

82

83

#### Usage Example

84

85

```python

86

from airflow.providers.jenkins.hooks.jenkins import JenkinsHook

87

88

# Initialize hook with connection

89

hook = JenkinsHook("my_jenkins_connection")

90

91

# Get Jenkins server instance for direct API access

92

jenkins_server = hook.get_jenkins_server()

93

94

# Use jenkins server for advanced operations

95

job_info = jenkins_server.get_job_info("my-job")

96

build_info = jenkins_server.get_build_info("my-job", 42)

97

```

98

99

### Job Triggering and Monitoring

100

101

Automated Jenkins job execution with parameter passing, status monitoring, and state validation through the JenkinsJobTriggerOperator.

102

103

```python { .api }

104

class JenkinsJobTriggerOperator(BaseOperator):

105

template_fields = ('parameters',)

106

template_ext = ('.json',)

107

ui_color = '#f9ec86'

108

109

def __init__(

110

self,

111

*,

112

jenkins_connection_id: str,

113

job_name: str,

114

parameters: Optional[Union[str, Dict, List]] = "",

115

sleep_time: int = 10,

116

max_try_before_job_appears: int = 10,

117

allowed_jenkins_states: Optional[Iterable[str]] = None,

118

**kwargs

119

): ...

120

121

def execute(self, context: Mapping[Any, Any]) -> Optional[str]: ...

122

def build_job(self, jenkins_server: jenkins.Jenkins, params: Optional[Union[str, Dict, List]] = "") -> Optional[JenkinsRequest]: ...

123

def poll_job_in_queue(self, location: str, jenkins_server: jenkins.Jenkins) -> int: ...

124

def get_hook(self) -> JenkinsHook: ...

125

```

126

127

The operator handles the complete Jenkins job lifecycle: triggering builds with parameters, polling the queue until execution starts, monitoring job progress, and validating final job state against allowed outcomes.

128

129

#### Parameters

130

131

- **jenkins_connection_id** (str): Airflow connection ID for Jenkins server

132

- **job_name** (str): Name of Jenkins job to trigger

133

- **parameters** (str|Dict|List, optional): Job parameters as JSON string, dictionary, or list (templated)

134

- **sleep_time** (int, default=10): Seconds to wait between status checks (minimum 1)

135

- **max_try_before_job_appears** (int, default=10): Maximum attempts to find job in queue

136

- **allowed_jenkins_states** (Iterable[str], optional): Acceptable job completion states (default: ['SUCCESS'])

137

138

#### Usage Examples

139

140

Basic job triggering:

141

142

```python

143

trigger_job = JenkinsJobTriggerOperator(

144

task_id="trigger_build",

145

jenkins_connection_id="jenkins_prod",

146

job_name="deploy-application"

147

)

148

```

149

150

Parametrized job with custom success states:

151

152

```python

153

trigger_with_params = JenkinsJobTriggerOperator(

154

task_id="trigger_test_job",

155

jenkins_connection_id="jenkins_test",

156

job_name="run-tests",

157

parameters={

158

"test_suite": "integration",

159

"parallel_jobs": 4,

160

"timeout": 3600

161

},

162

allowed_jenkins_states=["SUCCESS", "UNSTABLE"],

163

sleep_time=15

164

)

165

```

166

167

Using JSON string parameters:

168

169

```python

170

trigger_from_file = JenkinsJobTriggerOperator(

171

task_id="trigger_from_config",

172

jenkins_connection_id="jenkins_default",

173

job_name="process-data",

174

parameters='{"input_path": "/data/input", "output_format": "parquet"}',

175

max_try_before_job_appears=20

176

)

177

```

178

179

### Helper Functions

180

181

Utility functions for advanced Jenkins API interactions with proper error handling.

182

183

```python { .api }

184

def jenkins_request_with_headers(

185

jenkins_server: jenkins.Jenkins,

186

req: requests.Request

187

) -> Optional[JenkinsRequest]: ...

188

```

189

190

Executes Jenkins requests returning both response body and headers, essential for obtaining queue location information during job triggering.

191

192

## Types

193

194

```python { .api }

195

from typing import Any, Dict, Iterable, List, Mapping, Optional, Union

196

from requests import Request

197

import jenkins

198

199

# Type aliases

200

JenkinsRequest = Mapping[str, Any]

201

ParamType = Optional[Union[str, Dict, List]]

202

203

# Jenkins library types

204

Jenkins = jenkins.Jenkins

205

JenkinsException = jenkins.JenkinsException

206

NotFoundException = jenkins.NotFoundException

207

TimeoutException = jenkins.TimeoutException

208

EmptyResponseException = jenkins.EmptyResponseException

209

```

210

211

## Error Handling

212

213

The provider handles various Jenkins-specific exceptions:

214

215

- **jenkins.JenkinsException**: General Jenkins API errors (authentication, invalid parameters)

216

- **jenkins.NotFoundException**: Job or build not found (404 errors)

217

- **jenkins.TimeoutException**: Request timeout during API calls

218

- **jenkins.EmptyResponseException**: Empty response from Jenkins server

219

- **AirflowException**: Airflow-specific errors (missing parameters, job execution failures)

220

221

Common error scenarios:

222

223

```python

224

try:

225

result = trigger_job.execute(context)

226

except AirflowException as e:

227

# Handle Airflow-specific errors (missing connection, job failures)

228

print(f"Job execution failed: {e}")

229

except jenkins.JenkinsException as e:

230

# Handle Jenkins API errors (authentication, invalid job parameters)

231

print(f"Jenkins API error: {e}")

232

```

233

234

## Advanced Usage Patterns

235

236

### Artifact Retrieval

237

238

Combine JenkinsJobTriggerOperator with custom operators for artifact handling:

239

240

```python

241

from airflow.operators.python import PythonOperator

242

from requests import Request

243

244

def download_artifact(**context):

245

hook = JenkinsHook("jenkins_default")

246

jenkins_server = hook.get_jenkins_server()

247

248

# Get job URL from previous task

249

job_url = context['task_instance'].xcom_pull(task_ids='trigger_job')

250

artifact_url = f"{job_url}artifact/build-output.zip"

251

252

# Download artifact using Jenkins API

253

request = Request(method='GET', url=artifact_url)

254

response = jenkins_server.jenkins_open(request)

255

256

return response

257

258

# Task sequence

259

trigger_job = JenkinsJobTriggerOperator(

260

task_id="trigger_job",

261

jenkins_connection_id="jenkins_default",

262

job_name="build-package"

263

)

264

265

download_artifacts = PythonOperator(

266

task_id="download_artifacts",

267

python_callable=download_artifact

268

)

269

270

trigger_job >> download_artifacts

271

```

272

273

### Multiple Job Coordination

274

275

Trigger multiple related Jenkins jobs with dependency management:

276

277

```python

278

# Trigger build job

279

build_job = JenkinsJobTriggerOperator(

280

task_id="build_application",

281

jenkins_connection_id="jenkins_ci",

282

job_name="build-app",

283

parameters={"branch": "{{ dag_run.conf.get('branch', 'main') }}"}

284

)

285

286

# Trigger test job after build

287

test_job = JenkinsJobTriggerOperator(

288

task_id="run_tests",

289

jenkins_connection_id="jenkins_ci",

290

job_name="test-app",

291

parameters={"build_number": "{{ ti.xcom_pull(task_ids='build_application') }}"},

292

allowed_jenkins_states=["SUCCESS", "UNSTABLE"]

293

)

294

295

# Trigger deployment after successful tests

296

deploy_job = JenkinsJobTriggerOperator(

297

task_id="deploy_application",

298

jenkins_connection_id="jenkins_prod",

299

job_name="deploy-app",

300

parameters={"environment": "production"}

301

)

302

303

build_job >> test_job >> deploy_job

304

```