or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-integration.mddecorators.mdexecutors.mdindex.mdjob-management.mdmonitoring.mdpod-operations.mdresource-management.mdspark-integration.md

api-integration.mddocs/

0

# API Integration

1

2

Connect to Kubernetes clusters with comprehensive API access supporting both synchronous and asynchronous operations. The hooks provide low-level access to Kubernetes resources and enable custom integrations.

3

4

## Capabilities

5

6

### Kubernetes Hook

7

8

Synchronous hook for interacting with Kubernetes API, supporting cluster connections, resource management, and pod operations.

9

10

```python { .api }

11

class KubernetesHook(BaseHook):

12

"""

13

Creates Kubernetes API connection with multiple configuration options.

14

15

Args:

16

conn_id (str): Airflow connection ID. Default: 'kubernetes_default'

17

client_configuration (Configuration, optional): Kubernetes client configuration

18

cluster_context (str, optional): Kubernetes cluster context

19

config_file (str, optional): Path to kubeconfig file

20

in_cluster (bool, optional): Use in-cluster configuration

21

disable_verify_ssl (bool): Disable SSL verification. Default: False

22

disable_tcp_keepalive (bool): Disable TCP keepalive. Default: False

23

"""

24

def __init__(

25

self,

26

conn_id: str = "kubernetes_default",

27

client_configuration: Configuration | None = None,

28

cluster_context: str | None = None,

29

config_file: str | None = None,

30

in_cluster: bool | None = None,

31

disable_verify_ssl: bool = False,

32

disable_tcp_keepalive: bool = False,

33

**kwargs

34

): ...

35

36

def get_conn(self) -> ApiClient:

37

"""Returns kubernetes api session."""

38

...

39

40

def create_custom_object(

41

self,

42

group: str,

43

version: str,

44

plural: str,

45

body: dict,

46

namespace: str | None = None

47

) -> dict:

48

"""Create custom resource definition object."""

49

...

50

51

def get_pod(self, name: str, namespace: str) -> V1Pod:

52

"""Read pod object from kubernetes API."""

53

...

54

55

def create_job(self, job: V1Job, **kwargs) -> V1Job:

56

"""Run Job."""

57

...

58

59

def get_pod_logs(

60

self,

61

pod_name: str,

62

container: str | None = "",

63

namespace: str | None = None,

64

) -> str:

65

"""Retrieve a container's log from the specified pod."""

66

...

67

68

def get_pod_log_stream(

69

self,

70

pod_name: str,

71

container: str | None = "",

72

namespace: str | None = None,

73

) -> tuple[watch.Watch, Generator[str, None, None]]:

74

"""Retrieve a log stream for a container in a kubernetes pod."""

75

...

76

77

def get_namespaced_pod_list(

78

self,

79

label_selector: str | None = "",

80

namespace: str | None = None,

81

**kwargs

82

) -> list[V1Pod]:

83

"""Get list of pods in a namespace."""

84

...

85

```

86

87

### Asynchronous Kubernetes Hook

88

89

Asynchronous hook for non-blocking Kubernetes API operations, enabling efficient concurrent processing and improved performance.

90

91

```python { .api }

92

class AsyncKubernetesHook(KubernetesHook):

93

"""

94

Asynchronous Kubernetes API hook for non-blocking operations.

95

96

Args:

97

config_dict (dict, optional): Configuration dictionary

98

All other parameters inherited from KubernetesHook

99

"""

100

def __init__(

101

self,

102

config_dict: dict | None = None,

103

*args,

104

**kwargs

105

): ...

106

107

async def get_conn(self) -> AsyncApiClient:

108

"""Returns asynchronous kubernetes api session."""

109

...

110

111

async def get_pod(self, name: str, namespace: str) -> V1Pod:

112

"""Get pod's object asynchronously."""

113

...

114

115

def get_custom_object(

116

self,

117

group: str,

118

version: str,

119

plural: str,

120

name: str,

121

namespace: str | None = None

122

) -> dict:

123

"""Get custom resource definition object."""

124

...

125

126

def delete_custom_object(

127

self,

128

group: str,

129

version: str,

130

plural: str,

131

name: str,

132

namespace: str | None = None,

133

body: V1DeleteOptions | None = None

134

) -> dict:

135

"""Delete custom resource definition object."""

136

...

137

138

def get_namespace(self) -> str:

139

"""Return the namespace defined in connection."""

140

...

141

142

def get_pod_log_stream(

143

self,

144

pod_name: str,

145

container: str | None = None,

146

namespace: str | None = None,

147

**kwargs

148

):

149

"""Retrieve log stream for container in pod."""

150

...

151

152

def get_pod_logs(

153

self,

154

pod_name: str,

155

container: str | None = None,

156

namespace: str | None = None,

157

**kwargs

158

) -> str:

159

"""Retrieve container's log from pod."""

160

...

161

162

def get_pod(self, name: str, namespace: str) -> V1Pod:

163

"""Read pod object from kubernetes API."""

164

...

165

166

def create_job(self, job: V1Job) -> V1Job:

167

"""Run Kubernetes Job."""

168

...

169

170

def get_job(self, job_name: str, namespace: str) -> V1Job:

171

"""Get Job of specified name and namespace."""

172

...

173

174

def get_job_status(self, job_name: str, namespace: str) -> V1JobStatus:

175

"""Get job status."""

176

...

177

178

def wait_until_job_complete(

179

self,

180

job_name: str,

181

namespace: str,

182

timeout: int = 3600

183

) -> V1Job:

184

"""Block until job is complete or failed."""

185

...

186

187

def is_job_complete(self, job: V1Job) -> bool:

188

"""Check if job is complete."""

189

...

190

191

def is_job_failed(self, job: V1Job) -> bool:

192

"""Check if job failed."""

193

...

194

195

def is_job_successful(self, job: V1Job) -> bool:

196

"""Check if job completed successfully."""

197

...

198

199

def apply_from_yaml_file(

200

self,

201

yaml_file: str | None = None,

202

yaml_objects: list[dict] | None = None,

203

verbose: bool = False,

204

namespace: str = "default"

205

) -> list[dict]:

206

"""Perform action from yaml file."""

207

...

208

209

def test_connection(self) -> tuple[bool, str]:

210

"""Test the connection."""

211

...

212

```

213

214

### Asynchronous Kubernetes Hook

215

216

Asynchronous hook for non-blocking Kubernetes operations, supporting concurrent resource management and monitoring.

217

218

```python { .api }

219

class AsyncKubernetesHook(KubernetesHook):

220

"""

221

Hook to use Kubernetes SDK asynchronously.

222

223

Inherits all configuration from KubernetesHook with async operation support.

224

"""

225

def __init__(self, **kwargs): ...

226

227

@asynccontextmanager

228

async def get_conn(self) -> AsyncGenerator[ApiClient, None]:

229

"""Async context manager for API client."""

230

...

231

232

async def get_pod(self, name: str, namespace: str) -> V1Pod:

233

"""Get pod's object asynchronously."""

234

...

235

236

async def delete_pod(self, name: str, namespace: str) -> V1Status:

237

"""Delete pod's object asynchronously."""

238

...

239

240

async def read_logs(

241

self,

242

name: str,

243

namespace: str,

244

container: str | None = None,

245

follow: bool = False,

246

**kwargs

247

) -> str:

248

"""Read logs inside the pod."""

249

...

250

251

async def get_job_status(self, job_name: str, namespace: str) -> V1JobStatus:

252

"""Get job's status object asynchronously."""

253

...

254

255

async def wait_until_job_complete(

256

self,

257

job_name: str,

258

namespace: str,

259

timeout: int = 3600,

260

check_interval: int = 10

261

) -> V1Job:

262

"""Wait until job complete asynchronously."""

263

...

264

265

async def wait_until_container_complete(

266

self,

267

pod_name: str,

268

namespace: str,

269

container_name: str | None = None,

270

timeout: int = 3600

271

) -> None:

272

"""Wait for container completion."""

273

...

274

275

async def wait_until_container_started(

276

self,

277

pod_name: str,

278

namespace: str,

279

container_name: str | None = None,

280

timeout: int = 120

281

) -> None:

282

"""Wait for container to start."""

283

...

284

```

285

286

### Hook Constants

287

288

Constants used by the Kubernetes hooks for configuration and status checking.

289

290

```python { .api }

291

# Resource loading constant

292

LOADING_KUBE_CONFIG_FILE_RESOURCE: str

293

294

# Job status condition types

295

JOB_FINAL_STATUS_CONDITION_TYPES: list[str]

296

JOB_STATUS_CONDITION_TYPES: list[str]

297

```

298

299

## Usage Examples

300

301

### Basic Hook Usage

302

303

```python

304

from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook

305

306

# Connect to Kubernetes cluster

307

hook = KubernetesHook(

308

conn_id='kubernetes_default',

309

in_cluster=False,

310

config_file='/path/to/kubeconfig'

311

)

312

313

# Get API client

314

client = hook.get_conn()

315

316

# Test connection

317

is_connected, message = hook.test_connection()

318

print(f"Connection status: {is_connected}, Message: {message}")

319

```

320

321

### Pod Operations with Hook

322

323

```python

324

# Get pod information

325

pod = hook.get_pod(name='my-pod', namespace='default')

326

print(f"Pod status: {pod.status.phase}")

327

328

# Get pod logs

329

logs = hook.get_pod_logs(

330

pod_name='my-pod',

331

namespace='default',

332

container='main-container'

333

)

334

print(logs)

335

336

# Stream pod logs

337

for log_line in hook.get_pod_log_stream(

338

pod_name='my-pod',

339

namespace='default',

340

follow=True

341

):

342

print(log_line)

343

```

344

345

### Job Management with Hook

346

347

```python

348

from kubernetes.client import V1Job, V1JobSpec, V1PodTemplateSpec, V1PodSpec, V1Container

349

350

# Create job specification

351

job_spec = V1Job(

352

api_version='batch/v1',

353

kind='Job',

354

metadata={'name': 'my-job', 'namespace': 'default'},

355

spec=V1JobSpec(

356

template=V1PodTemplateSpec(

357

spec=V1PodSpec(

358

containers=[

359

V1Container(

360

name='worker',

361

image='busybox:latest',

362

command=['sh', '-c', 'echo "Job completed"']

363

)

364

],

365

restart_policy='Never'

366

)

367

)

368

)

369

)

370

371

# Create and monitor job

372

created_job = hook.create_job(job_spec)

373

print(f"Job created: {created_job.metadata.name}")

374

375

# Wait for completion

376

completed_job = hook.wait_until_job_complete(

377

job_name='my-job',

378

namespace='default',

379

timeout=600

380

)

381

382

# Check job status

383

if hook.is_job_successful(completed_job):

384

print("Job completed successfully")

385

elif hook.is_job_failed(completed_job):

386

print("Job failed")

387

```

388

389

### Custom Resource Operations

390

391

```python

392

# Create custom resource

393

custom_resource = {

394

'apiVersion': 'apps.example.com/v1',

395

'kind': 'MyApp',

396

'metadata': {

397

'name': 'my-app-instance',

398

'namespace': 'default'

399

},

400

'spec': {

401

'replicas': 3,

402

'image': 'my-app:latest'

403

}

404

}

405

406

# Create the custom resource

407

created_cr = hook.create_custom_object(

408

group='apps.example.com',

409

version='v1',

410

plural='myapps',

411

body=custom_resource,

412

namespace='default'

413

)

414

415

# Get custom resource

416

cr = hook.get_custom_object(

417

group='apps.example.com',

418

version='v1',

419

plural='myapps',

420

name='my-app-instance',

421

namespace='default'

422

)

423

424

# Delete custom resource

425

hook.delete_custom_object(

426

group='apps.example.com',

427

version='v1',

428

plural='myapps',

429

name='my-app-instance',

430

namespace='default'

431

)

432

```

433

434

### YAML Resource Management

435

436

```python

437

# Apply resources from YAML file

438

applied_resources = hook.apply_from_yaml_file(

439

yaml_file='/path/to/resources.yaml',

440

namespace='default',

441

verbose=True

442

)

443

444

# Apply resources from YAML objects

445

yaml_objects = [

446

{

447

'apiVersion': 'v1',

448

'kind': 'ConfigMap',

449

'metadata': {'name': 'my-config', 'namespace': 'default'},

450

'data': {'key': 'value'}

451

}

452

]

453

454

hook.apply_from_yaml_file(

455

yaml_objects=yaml_objects,

456

namespace='default'

457

)

458

```

459

460

### Asynchronous Operations

461

462

```python

463

import asyncio

464

from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook

465

466

async def async_operations():

467

# Create async hook

468

async_hook = AsyncKubernetesHook(

469

conn_id='kubernetes_default',

470

in_cluster=True

471

)

472

473

# Use async context manager for connection

474

async with async_hook.get_conn() as client:

475

# Get pod asynchronously

476

pod = await async_hook.get_pod(

477

name='my-pod',

478

namespace='default'

479

)

480

481

# Read logs asynchronously

482

logs = await async_hook.read_logs(

483

name='my-pod',

484

namespace='default',

485

container='main',

486

follow=False

487

)

488

489

# Monitor job completion

490

completed_job = await async_hook.wait_until_job_complete(

491

job_name='my-job',

492

namespace='default',

493

timeout=600,

494

check_interval=5

495

)

496

497

# Wait for container to start

498

await async_hook.wait_until_container_started(

499

pod_name='my-pod',

500

namespace='default',

501

container_name='main',

502

timeout=120

503

)

504

505

# Run async operations

506

asyncio.run(async_operations())

507

```

508

509

### Multi-cluster Configuration

510

511

```python

512

# Connect to different clusters

513

prod_hook = KubernetesHook(

514

conn_id='k8s_prod',

515

cluster_context='production-cluster',

516

config_file='/home/user/.kube/config'

517

)

518

519

staging_hook = KubernetesHook(

520

conn_id='k8s_staging',

521

cluster_context='staging-cluster',

522

config_file='/home/user/.kube/config'

523

)

524

525

# In-cluster configuration

526

in_cluster_hook = KubernetesHook(

527

conn_id='k8s_in_cluster',

528

in_cluster=True

529

)

530

```

531

532

### Connection Testing and Debugging

533

534

```python

535

# Test connection with detailed output

536

is_connected, message = hook.test_connection()

537

if not is_connected:

538

print(f"Connection failed: {message}")

539

else:

540

print("Successfully connected to Kubernetes cluster")

541

542

# Get namespace from connection

543

namespace = hook.get_namespace()

544

print(f"Default namespace: {namespace}")

545

546

# Disable SSL verification for testing

547

test_hook = KubernetesHook(

548

conn_id='k8s_test',

549

disable_verify_ssl=True,

550

disable_tcp_keepalive=True

551

)

552

```