or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-apis.mdconfiguration.mddynamic-client.mdindex.mdleader-election.mdstreaming.mdutils.mdwatch.md

watch.mddocs/

0

# Watch

1

2

Event streaming for monitoring resource changes in real-time. Efficiently tracks additions, modifications, deletions, and errors for any Kubernetes resource, providing a reactive interface for building controllers and monitoring applications.

3

4

## Capabilities

5

6

### Watch Class

7

8

Primary interface for streaming Kubernetes resource change events.

9

10

```python { .api }

11

class Watch:

12

def __init__(self, return_type=None):

13

"""

14

Initialize watch instance for streaming resource events.

15

16

Parameters:

17

- return_type: type, expected return type for automatic deserialization

18

"""

19

20

def stop(self):

21

"""

22

Stop the watch stream and clean up resources.

23

"""

24

25

def get_return_type(self, func):

26

"""

27

Determine the return type for event objects from API function.

28

29

Parameters:

30

- func: callable, Kubernetes API function that supports watching

31

32

Returns:

33

- type: Expected return type for event objects

34

"""

35

36

def get_watch_argument_name(self, func):

37

"""

38

Get the watch parameter name for the API function.

39

40

Parameters:

41

- func: callable, Kubernetes API function

42

43

Returns:

44

- str: Parameter name for watch flag (usually 'watch')

45

"""

46

47

def unmarshal_event(self, data, return_type):

48

"""

49

Parse raw watch event data into structured event object.

50

51

Parameters:

52

- data: str, raw JSON event data from watch stream

53

- return_type: type, expected object type for deserialization

54

55

Returns:

56

- dict: Parsed event with 'type' and 'object' fields

57

"""

58

59

async def stream(self, func, *args, **kwargs):

60

"""

61

Start streaming events from Kubernetes API list operation.

62

63

Parameters:

64

- func: callable, API function that supports watch parameter

65

- *args: positional arguments passed to API function

66

- **kwargs: keyword arguments passed to API function

67

68

Yields:

69

- dict: Event objects with structure:

70

{

71

'type': str, # 'ADDED', 'MODIFIED', 'DELETED', 'ERROR'

72

'object': object # Kubernetes resource object or error status

73

}

74

"""

75

```

76

77

### Stream Base Class

78

79

Base class for event stream implementations.

80

81

```python { .api }

82

class Stream:

83

def __init__(self, func, *args, **kwargs):

84

"""

85

Base class for event stream wrappers.

86

87

Parameters:

88

- func: callable, API function for streaming

89

- *args: positional arguments for API function

90

- **kwargs: keyword arguments for API function

91

"""

92

93

async def __aiter__(self):

94

"""Async iterator interface for streaming events."""

95

96

async def __anext__(self):

97

"""Get next event from stream."""

98

```

99

100

### Constants

101

102

Important constants used in watch operations.

103

104

```python { .api }

105

TYPE_LIST_SUFFIX: str = "List" # Suffix used for inferring object types from list operations

106

```

107

108

## Usage Examples

109

110

### Basic Resource Watching

111

112

```python

113

import asyncio

114

from kubernetes_asyncio import client, config, watch

115

116

async def watch_pods():

117

await config.load_config()

118

v1 = client.CoreV1Api()

119

w = watch.Watch()

120

121

try:

122

print("Starting to watch pods in default namespace...")

123

124

async for event in w.stream(v1.list_namespaced_pod, namespace="default"):

125

event_type = event['type']

126

pod = event['object']

127

128

print(f"{event_type}: Pod {pod.metadata.name}")

129

print(f" Namespace: {pod.metadata.namespace}")

130

print(f" Phase: {pod.status.phase}")

131

print(f" Node: {pod.spec.node_name or 'Not scheduled'}")

132

print("---")

133

134

# Example: Stop watching after specific condition

135

if event_type == "DELETED" and pod.metadata.name == "target-pod":

136

print("Target pod deleted, stopping watch")

137

break

138

139

finally:

140

w.stop()

141

await v1.api_client.close()

142

143

asyncio.run(watch_pods())

144

```

145

146

### Watching with Resource Version

147

148

```python

149

async def watch_with_resource_version():

150

await config.load_config()

151

v1 = client.CoreV1Api()

152

w = watch.Watch()

153

154

try:

155

# Get current resource version

156

pod_list = await v1.list_namespaced_pod(namespace="default")

157

resource_version = pod_list.metadata.resource_version

158

159

print(f"Starting watch from resource version: {resource_version}")

160

161

# Watch from specific resource version to avoid missing events

162

async for event in w.stream(

163

v1.list_namespaced_pod,

164

namespace="default",

165

resource_version=resource_version,

166

timeout_seconds=300 # 5 minute timeout

167

):

168

event_type = event['type']

169

pod = event['object']

170

171

print(f"{event_type}: {pod.metadata.name} (rv: {pod.metadata.resource_version})")

172

173

except asyncio.TimeoutError:

174

print("Watch timed out")

175

finally:

176

w.stop()

177

await v1.api_client.close()

178

179

asyncio.run(watch_with_resource_version())

180

```

181

182

### Watching Multiple Resource Types

183

184

```python

185

async def watch_multiple_resources():

186

await config.load_config()

187

v1 = client.CoreV1Api()

188

apps_v1 = client.AppsV1Api()

189

190

async def watch_pods():

191

w = watch.Watch()

192

try:

193

async for event in w.stream(v1.list_namespaced_pod, namespace="default"):

194

pod = event['object']

195

print(f"POD {event['type']}: {pod.metadata.name}")

196

finally:

197

w.stop()

198

199

async def watch_deployments():

200

w = watch.Watch()

201

try:

202

async for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):

203

deployment = event['object']

204

print(f"DEPLOYMENT {event['type']}: {deployment.metadata.name}")

205

finally:

206

w.stop()

207

208

# Watch both resource types concurrently

209

try:

210

await asyncio.gather(

211

watch_pods(),

212

watch_deployments()

213

)

214

finally:

215

await v1.api_client.close()

216

await apps_v1.api_client.close()

217

218

# asyncio.run(watch_multiple_resources()) # Uncomment to run

219

```

220

221

### Watching with Label Selectors

222

223

```python

224

async def watch_with_selectors():

225

await config.load_config()

226

v1 = client.CoreV1Api()

227

w = watch.Watch()

228

229

try:

230

# Watch only pods with specific labels

231

async for event in w.stream(

232

v1.list_namespaced_pod,

233

namespace="default",

234

label_selector="app=nginx,env=production"

235

):

236

event_type = event['type']

237

pod = event['object']

238

239

print(f"{event_type}: {pod.metadata.name}")

240

print(f" Labels: {pod.metadata.labels}")

241

242

# Handle specific events

243

if event_type == "ADDED":

244

print(" -> New nginx production pod created")

245

elif event_type == "MODIFIED":

246

print(f" -> Pod updated, phase: {pod.status.phase}")

247

elif event_type == "DELETED":

248

print(" -> Nginx production pod removed")

249

250

finally:

251

w.stop()

252

await v1.api_client.close()

253

254

asyncio.run(watch_with_selectors())

255

```

256

257

### Building a Simple Controller

258

259

```python

260

async def simple_controller():

261

await config.load_config()

262

v1 = client.CoreV1Api()

263

w = watch.Watch()

264

265

try:

266

print("Starting simple pod controller...")

267

268

async for event in w.stream(

269

v1.list_namespaced_pod,

270

namespace="default",

271

label_selector="managed-by=simple-controller"

272

):

273

event_type = event['type']

274

pod = event['object']

275

276

if event_type == "ADDED":

277

await handle_pod_added(v1, pod)

278

elif event_type == "MODIFIED":

279

await handle_pod_modified(v1, pod)

280

elif event_type == "DELETED":

281

await handle_pod_deleted(v1, pod)

282

elif event_type == "ERROR":

283

print(f"Watch error: {pod}")

284

break

285

286

finally:

287

w.stop()

288

await v1.api_client.close()

289

290

async def handle_pod_added(v1, pod):

291

print(f"Controller: Managing new pod {pod.metadata.name}")

292

293

# Example: Add finalizer to pod

294

if not pod.metadata.finalizers:

295

patch_body = {

296

"metadata": {

297

"finalizers": ["simple-controller/cleanup"]

298

}

299

}

300

301

await v1.patch_namespaced_pod(

302

name=pod.metadata.name,

303

namespace=pod.metadata.namespace,

304

body=patch_body

305

)

306

print(f" -> Added finalizer to {pod.metadata.name}")

307

308

async def handle_pod_modified(v1, pod):

309

print(f"Controller: Pod {pod.metadata.name} modified, phase: {pod.status.phase}")

310

311

# Example: React to pod phase changes

312

if pod.status.phase == "Failed":

313

# Clean up or restart failed pod

314

print(f" -> Pod {pod.metadata.name} failed, taking corrective action")

315

316

async def handle_pod_deleted(v1, pod):

317

print(f"Controller: Pod {pod.metadata.name} deleted")

318

319

# Example: Cleanup external resources

320

if pod.metadata.finalizers and "simple-controller/cleanup" in pod.metadata.finalizers:

321

print(f" -> Cleaning up resources for {pod.metadata.name}")

322

323

# Remove finalizer after cleanup

324

patch_body = {

325

"metadata": {

326

"finalizers": [f for f in pod.metadata.finalizers

327

if f != "simple-controller/cleanup"]

328

}

329

}

330

331

await v1.patch_namespaced_pod(

332

name=pod.metadata.name,

333

namespace=pod.metadata.namespace,

334

body=patch_body

335

)

336

337

# asyncio.run(simple_controller()) # Uncomment to run

338

```

339

340

### Error Handling and Reconnection

341

342

```python

343

async def resilient_watch():

344

await config.load_config()

345

v1 = client.CoreV1Api()

346

347

retry_count = 0

348

max_retries = 5

349

resource_version = None

350

351

while retry_count < max_retries:

352

w = watch.Watch()

353

354

try:

355

print(f"Starting watch (attempt {retry_count + 1})")

356

357

# Start from last known resource version to avoid duplicates

358

watch_kwargs = {"namespace": "default"}

359

if resource_version:

360

watch_kwargs["resource_version"] = resource_version

361

362

async for event in w.stream(v1.list_namespaced_pod, **watch_kwargs):

363

event_type = event['type']

364

365

if event_type == "ERROR":

366

error_status = event['object']

367

print(f"Watch error: {error_status}")

368

369

# Handle specific error types

370

if error_status.code == 410: # Gone - resource version too old

371

print("Resource version expired, restarting watch")

372

resource_version = None

373

break

374

else:

375

raise Exception(f"Watch error: {error_status.message}")

376

377

else:

378

pod = event['object']

379

resource_version = pod.metadata.resource_version

380

print(f"{event_type}: {pod.metadata.name} (rv: {resource_version})")

381

382

# Reset retry count on successful events

383

retry_count = 0

384

385

except Exception as e:

386

print(f"Watch failed: {e}")

387

retry_count += 1

388

389

if retry_count < max_retries:

390

wait_time = min(2 ** retry_count, 30) # Exponential backoff, max 30s

391

print(f"Retrying in {wait_time} seconds...")

392

await asyncio.sleep(wait_time)

393

else:

394

print("Max retries exceeded, giving up")

395

break

396

397

finally:

398

w.stop()

399

400

await v1.api_client.close()

401

402

asyncio.run(resilient_watch())

403

```

404

405

### Watching Cluster-Wide Resources

406

407

```python

408

async def watch_cluster_wide():

409

await config.load_config()

410

v1 = client.CoreV1Api()

411

w = watch.Watch()

412

413

try:

414

# Watch all pods across all namespaces

415

print("Watching all pods cluster-wide...")

416

417

async for event in w.stream(v1.list_pod_for_all_namespaces):

418

event_type = event['type']

419

pod = event['object']

420

421

print(f"{event_type}: {pod.metadata.namespace}/{pod.metadata.name}")

422

423

# Example: Monitor pods in system namespaces

424

if pod.metadata.namespace.startswith("kube-"):

425

print(f" -> System pod event in {pod.metadata.namespace}")

426

427

# Example: Track resource usage patterns

428

if event_type == "ADDED" and pod.spec.resources:

429

requests = pod.spec.resources.requests or {}

430

limits = pod.spec.resources.limits or {}

431

print(f" -> Resource requests: {requests}")

432

print(f" -> Resource limits: {limits}")

433

434

finally:

435

w.stop()

436

await v1.api_client.close()

437

438

asyncio.run(watch_cluster_wide())

439

```