or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-integration.mddecorators.mdexecutors.mdindex.mdjob-management.mdmonitoring.mdpod-operations.mdresource-management.mdspark-integration.md

job-management.mddocs/

0

# Job Management

1

2

Create, monitor, and manage Kubernetes Jobs for batch processing, parallel execution, and complex workload orchestration. Jobs provide reliable execution with automatic retry, parallel processing, and completion tracking.

3

4

## Capabilities

5

6

### Job Execution

7

8

Execute batch workloads as Kubernetes Jobs with support for parallel processing, completion tracking, and automatic cleanup.

9

10

```python { .api }

11

class KubernetesJobOperator(KubernetesPodOperator):

12

"""

13

Executes a Kubernetes Job.

14

15

Args:

16

name (str): Name of the job

17

image (str): Docker image to run

18

namespace (str): Kubernetes namespace. Defaults to 'default'

19

parallelism (int, optional): Number of parallel pods

20

completions (int, optional): Number of successful completions needed

21

backoff_limit (int, optional): Number of retries before marking job failed

22

ttl_seconds_after_finished (int, optional): TTL for cleanup after completion

23

active_deadline_seconds (int, optional): Maximum duration for job execution

24

suspend (bool): Whether to suspend the job. Default: False

25

manual_selector (bool): Manually manage pod selector. Default: False

26

completion_mode (str, optional): Completion mode ('NonIndexed' or 'Indexed')

27

pod_failure_policy (dict, optional): Pod failure policy configuration

28

**kwargs: Additional arguments passed to KubernetesPodOperator

29

"""

30

def __init__(

31

self,

32

name: str,

33

image: str,

34

namespace: str = "default",

35

parallelism: int | None = None,

36

completions: int | None = None,

37

backoff_limit: int | None = None,

38

ttl_seconds_after_finished: int | None = None,

39

active_deadline_seconds: int | None = None,

40

suspend: bool = False,

41

manual_selector: bool = False,

42

completion_mode: str | None = None,

43

pod_failure_policy: dict | None = None,

44

**kwargs

45

): ...

46

47

def execute(self, context: Context) -> Any:

48

"""Execute the job."""

49

...

50

51

def build_job_request_obj(self, context: Context) -> V1Job:

52

"""Build Kubernetes job request object."""

53

...

54

55

def create_job_request_obj(self, context: Context) -> V1Job:

56

"""Create job from pod template."""

57

...

58

```

59

60

### Job Deletion

61

62

Delete existing Kubernetes Jobs with optional cleanup of associated pods and resources.

63

64

```python { .api }

65

class KubernetesDeleteJobOperator(BaseOperator):

66

"""

67

Delete a Kubernetes Job.

68

69

Args:

70

name (str): Name of the job to delete

71

namespace (str): Kubernetes namespace. Defaults to 'default'

72

cluster_context (str, optional): Kubernetes cluster context

73

config_file (str, optional): Path to kubeconfig file

74

in_cluster (bool, optional): Use in-cluster configuration

75

delete_on_success (bool): Delete job on successful completion. Default: True

76

delete_on_failure (bool): Delete job on failure. Default: False

77

propagation_policy (str): Deletion propagation policy. Default: 'Background'

78

"""

79

def __init__(

80

self,

81

name: str,

82

namespace: str = "default",

83

cluster_context: str | None = None,

84

config_file: str | None = None,

85

in_cluster: bool | None = None,

86

delete_on_success: bool = True,

87

delete_on_failure: bool = False,

88

propagation_policy: str = "Background",

89

**kwargs

90

): ...

91

92

def execute(self, context: Context) -> Any:

93

"""Delete the job."""

94

...

95

```

96

97

### Job Patching

98

99

Update existing Kubernetes Jobs with new specifications or configuration changes.

100

101

```python { .api }

102

class KubernetesPatchJobOperator(BaseOperator):

103

"""

104

Patch a Kubernetes Job.

105

106

Args:

107

name (str): Name of the job to patch

108

namespace (str): Kubernetes namespace. Defaults to 'default'

109

body (dict): Patch body as dictionary

110

cluster_context (str, optional): Kubernetes cluster context

111

config_file (str, optional): Path to kubeconfig file

112

in_cluster (bool, optional): Use in-cluster configuration

113

"""

114

def __init__(

115

self,

116

name: str,

117

namespace: str,

118

body: dict,

119

cluster_context: str | None = None,

120

config_file: str | None = None,

121

in_cluster: bool | None = None,

122

**kwargs

123

): ...

124

125

def execute(self, context: Context) -> Any:

126

"""Patch the job."""

127

...

128

```

129

130

### Job Triggers

131

132

Asynchronous monitoring of Kubernetes Job completion status with triggers for deferrable execution.

133

134

```python { .api }

135

class KubernetesJobTrigger(BaseTrigger):

136

"""

137

Trigger for monitoring Kubernetes Job completion.

138

139

Args:

140

job_name (str): Name of the job to monitor

141

job_namespace (str): Namespace of the job

142

cluster_context (str, optional): Kubernetes cluster context

143

config_file (str, optional): Path to kubeconfig file

144

in_cluster (bool): Use in-cluster configuration. Default: True

145

get_logs (bool): Retrieve job logs. Default: True

146

startup_timeout (int): Startup timeout in seconds. Default: 120

147

"""

148

def __init__(

149

self,

150

job_name: str,

151

job_namespace: str,

152

cluster_context: str | None = None,

153

config_file: str | None = None,

154

in_cluster: bool = True,

155

get_logs: bool = True,

156

startup_timeout: int = 120,

157

**kwargs

158

): ...

159

160

async def run(self) -> AsyncIterator[TriggerEvent]:

161

"""Monitor job execution asynchronously."""

162

...

163

```

164

165

## Usage Examples

166

167

### Basic Job Execution

168

169

```python

170

from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

171

172

# Simple batch job

173

batch_job = KubernetesJobOperator(

174

task_id='batch_processing',

175

name='data-processor',

176

namespace='default',

177

image='batch-processor:latest',

178

cmds=['python', 'process_batch.py'],

179

completions=1,

180

parallelism=1,

181

dag=dag

182

)

183

```

184

185

### Parallel Job Processing

186

187

```python

188

# Parallel processing job

189

parallel_job = KubernetesJobOperator(

190

task_id='parallel_processing',

191

name='parallel-processor',

192

namespace='default',

193

image='parallel-processor:latest',

194

cmds=['python', 'parallel_process.py'],

195

completions=10, # Need 10 successful completions

196

parallelism=3, # Run 3 pods in parallel

197

backoff_limit=2, # Allow 2 retries per pod

198

dag=dag

199

)

200

```

201

202

### Job with Cleanup Configuration

203

204

```python

205

# Job with automatic cleanup

206

cleanup_job = KubernetesJobOperator(

207

task_id='cleanup_job',

208

name='cleanup-processor',

209

namespace='default',

210

image='cleanup:latest',

211

ttl_seconds_after_finished=3600, # Clean up after 1 hour

212

active_deadline_seconds=1800, # Max 30 minutes execution

213

dag=dag

214

)

215

```

216

217

### Job with Failure Policy

218

219

```python

220

# Job with custom failure handling

221

failure_policy_job = KubernetesJobOperator(

222

task_id='resilient_job',

223

name='resilient-processor',

224

namespace='default',

225

image='resilient:latest',

226

backoff_limit=5,

227

pod_failure_policy={

228

'rules': [

229

{

230

'action': 'FailJob',

231

'onExitCodes': {

232

'containerName': 'main',

233

'operator': 'In',

234

'values': [1, 2, 3]

235

}

236

},

237

{

238

'action': 'Ignore',

239

'onPodConditions': [

240

{

241

'type': 'DisruptionTarget'

242

}

243

]

244

}

245

]

246

},

247

dag=dag

248

)

249

```

250

251

### Job Deletion Task

252

253

```python

254

from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator

255

256

# Clean up completed job

257

delete_job = KubernetesDeleteJobOperator(

258

task_id='delete_completed_job',

259

name='data-processor',

260

namespace='default',

261

delete_on_success=True,

262

propagation_policy='Foreground', # Wait for pods to be deleted

263

dag=dag

264

)

265

```

266

267

### Job Status Monitoring

268

269

```python

270

from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

271

272

# Job with monitoring

273

monitored_job = KubernetesJobOperator(

274

task_id='monitored_job',

275

name='monitored-processor',

276

namespace='default',

277

image='processor:latest',

278

deferrable=True, # Use async monitoring

279

get_logs=True,

280

log_events_on_failure=True,

281

dag=dag

282

)

283

```

284

285

### Indexed Job Processing

286

287

```python

288

# Indexed job for array processing

289

indexed_job = KubernetesJobOperator(

290

task_id='indexed_processing',

291

name='indexed-processor',

292

namespace='default',

293

image='array-processor:latest',

294

cmds=['python', 'process_index.py'],

295

completions=10,

296

parallelism=3,

297

completion_mode='Indexed', # Each pod gets a completion index

298

env_vars=[

299

V1EnvVar(

300

name='JOB_COMPLETION_INDEX',

301

value_from=V1EnvVarSource(

302

field_ref=V1ObjectFieldSelector(

303

field_path='metadata.annotations["batch.kubernetes.io/job-completion-index"]'

304

)

305

)

306

)

307

],

308

dag=dag

309

)

310

```

311

312

### Job Patching Example

313

314

```python

315

from airflow.providers.cncf.kubernetes.operators.job import KubernetesPatchJobOperator

316

317

# Suspend a running job

318

patch_job = KubernetesPatchJobOperator(

319

task_id='suspend_job',

320

name='long-running-job',

321

namespace='default',

322

body={

323

'spec': {

324

'suspend': True

325

}

326

},

327

dag=dag

328

)

329

```