or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-workloads.mdautoscaling.mdconfiguration.mdcore-resources.mdcustom-resources.mddynamic-client.mdindex.mdleader-election.mdnetworking.mdrbac-security.mdresource-watching.mdstorage.mdstreaming-operations.mdutilities.md

resource-watching.mddocs/

0

# Resource Watching

1

2

Event streaming capabilities for monitoring real-time changes to Kubernetes resources. Provides efficient watch operations that can be filtered and processed for automated responses to cluster state changes, enabling reactive applications and monitoring systems.

3

4

## Capabilities

5

6

### Watch Events

7

8

Stream real-time events for any Kubernetes resource changes including creation, updates, and deletions.

9

10

```python { .api }

11

class Watch:

12

def stream(

13

self,

14

func,

15

*args,

16

timeout_seconds: int = None,

17

resource_version: str = None,

18

**kwargs

19

):

20

"""

21

Stream events from a Kubernetes API watch endpoint.

22

23

Parameters:

24

- func: API function to watch (e.g., v1.list_namespaced_pod)

25

- *args: Arguments for the API function

26

- timeout_seconds: Timeout for watch operation

27

- resource_version: Resource version to start watching from

28

- **kwargs: Additional arguments for API function

29

30

Yields:

31

Event dictionaries with 'type' and 'object' keys

32

"""

33

34

def stop(self) -> None:

35

"""Stop the watch operation."""

36

```

37

38

### Event Types

39

40

Watch events contain information about the type of change and the affected resource.

41

42

```python { .api }

43

# Event structure returned by watch.stream()

44

{

45

"type": str, # ADDED, MODIFIED, DELETED, ERROR

46

"object": dict, # The Kubernetes resource object

47

"raw_object": dict # Raw object from API response

48

}

49

```

50

51

## Usage Examples

52

53

### Watching Pods

54

55

```python

56

from kubernetes import client, config, watch

57

58

config.load_kube_config()

59

v1 = client.CoreV1Api()

60

w = watch.Watch()

61

62

# Watch all pod events in default namespace

63

print("Watching for pod events...")

64

for event in w.stream(v1.list_namespaced_pod, namespace="default"):

65

event_type = event['type']

66

pod = event['object']

67

pod_name = pod.metadata.name

68

69

print(f"Event: {event_type} - Pod: {pod_name}")

70

71

if event_type == "ADDED":

72

print(f" Pod {pod_name} was created")

73

elif event_type == "MODIFIED":

74

print(f" Pod {pod_name} was updated")

75

print(f" Status: {pod.status.phase}")

76

elif event_type == "DELETED":

77

print(f" Pod {pod_name} was deleted")

78

elif event_type == "ERROR":

79

print(f" Error occurred: {pod}")

80

break

81

```

82

83

### Watching with Label Selectors

84

85

```python

86

from kubernetes import client, config, watch

87

88

config.load_kube_config()

89

v1 = client.CoreV1Api()

90

w = watch.Watch()

91

92

# Watch only pods with specific labels

93

print("Watching for app=nginx pods...")

94

for event in w.stream(

95

v1.list_namespaced_pod,

96

namespace="default",

97

label_selector="app=nginx"

98

):

99

event_type = event['type']

100

pod = event['object']

101

102

print(f"{event_type}: {pod.metadata.name}")

103

104

# Process specific events

105

if event_type == "MODIFIED":

106

# Check if pod is ready

107

conditions = pod.status.conditions or []

108

ready_condition = next(

109

(c for c in conditions if c.type == "Ready"),

110

None

111

)

112

if ready_condition and ready_condition.status == "True":

113

print(f" Pod {pod.metadata.name} is ready!")

114

```

115

116

### Watching Deployments

117

118

```python

119

from kubernetes import client, config, watch

120

121

config.load_kube_config()

122

apps_v1 = client.AppsV1Api()

123

w = watch.Watch()

124

125

# Watch deployment rollouts

126

print("Watching deployment changes...")

127

for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):

128

event_type = event['type']

129

deployment = event['object']

130

deployment_name = deployment.metadata.name

131

132

if event_type == "MODIFIED":

133

status = deployment.status

134

if status:

135

ready_replicas = status.ready_replicas or 0

136

desired_replicas = deployment.spec.replicas or 0

137

138

print(f"Deployment {deployment_name}: {ready_replicas}/{desired_replicas} ready")

139

140

# Check if rollout is complete

141

conditions = status.conditions or []

142

progressing = next(

143

(c for c in conditions if c.type == "Progressing"),

144

None

145

)

146

if progressing and progressing.reason == "NewReplicaSetAvailable":

147

print(f" Rollout complete for {deployment_name}")

148

```

149

150

### Watching with Timeout

151

152

```python

153

from kubernetes import client, config, watch

154

import time

155

156

config.load_kube_config()

157

v1 = client.CoreV1Api()

158

w = watch.Watch()

159

160

# Watch with timeout to avoid infinite loops

161

print("Watching pods for 60 seconds...")

162

start_time = time.time()

163

164

for event in w.stream(

165

v1.list_namespaced_pod,

166

namespace="default",

167

timeout_seconds=60

168

):

169

event_type = event['type']

170

pod = event['object']

171

172

elapsed = time.time() - start_time

173

print(f"[{elapsed:.1f}s] {event_type}: {pod.metadata.name}")

174

175

# Handle timeout

176

if event_type == "ERROR":

177

error_obj = event['object']

178

if hasattr(error_obj, 'code') and error_obj.code == 410:

179

print("Watch expired, need to restart with new resource version")

180

break

181

182

print("Watch completed")

183

```

184

185

### Watching from Specific Resource Version

186

187

```python

188

from kubernetes import client, config, watch

189

190

config.load_kube_config()

191

v1 = client.CoreV1Api()

192

w = watch.Watch()

193

194

# Get current resource version

195

pod_list = v1.list_namespaced_pod(namespace="default")

196

resource_version = pod_list.metadata.resource_version

197

198

print(f"Starting watch from resource version: {resource_version}")

199

200

# Watch from specific point in time

201

for event in w.stream(

202

v1.list_namespaced_pod,

203

namespace="default",

204

resource_version=resource_version

205

):

206

event_type = event['type']

207

pod = event['object']

208

209

print(f"{event_type}: {pod.metadata.name}")

210

211

# Update resource version for next watch

212

if hasattr(pod.metadata, 'resource_version'):

213

resource_version = pod.metadata.resource_version

214

```

215

216

### Watching Custom Resources

217

218

```python

219

from kubernetes import client, config, watch, dynamic

220

221

config.load_kube_config()

222

dyn_client = dynamic.DynamicClient(client.ApiClient())

223

w = watch.Watch()

224

225

# Get custom resource definition

226

my_resource = dyn_client.resources.get(

227

api_version="mycompany.io/v1",

228

kind="MyCustomResource"

229

)

230

231

# Watch custom resource events

232

print("Watching custom resources...")

233

for event in w.stream(my_resource.get, namespace="default"):

234

event_type = event['type']

235

obj = event['object']

236

237

print(f"{event_type}: {obj.metadata.name}")

238

239

if event_type == "MODIFIED":

240

# Access custom fields

241

if hasattr(obj, 'status') and obj.status:

242

print(f" Status: {obj.status.get('phase', 'Unknown')}")

243

```

244

245

### Watching Multiple Resources

246

247

```python

248

from kubernetes import client, config, watch

249

import threading

250

import queue

251

252

config.load_kube_config()

253

v1 = client.CoreV1Api()

254

apps_v1 = client.AppsV1Api()

255

256

# Create event queue for multiple watchers

257

event_queue = queue.Queue()

258

259

def watch_pods():

260

w = watch.Watch()

261

for event in w.stream(v1.list_namespaced_pod, namespace="default"):

262

event['resource_type'] = 'Pod'

263

event_queue.put(event)

264

265

def watch_deployments():

266

w = watch.Watch()

267

for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):

268

event['resource_type'] = 'Deployment'

269

event_queue.put(event)

270

271

# Start watchers in separate threads

272

pod_thread = threading.Thread(target=watch_pods, daemon=True)

273

deployment_thread = threading.Thread(target=watch_deployments, daemon=True)

274

275

pod_thread.start()

276

deployment_thread.start()

277

278

# Process events from queue

279

print("Watching pods and deployments...")

280

try:

281

while True:

282

event = event_queue.get(timeout=1)

283

resource_type = event['resource_type']

284

event_type = event['type']

285

obj = event['object']

286

287

print(f"{resource_type} {event_type}: {obj.metadata.name}")

288

289

except queue.Empty:

290

print("No events received")

291

except KeyboardInterrupt:

292

print("Stopping watchers...")

293

```

294

295

### Error Handling and Reconnection

296

297

```python

298

from kubernetes import client, config, watch

299

from kubernetes.client.rest import ApiException

300

import time

301

302

config.load_kube_config()

303

v1 = client.CoreV1Api()

304

305

def watch_with_reconnect():

306

"""Watch pods with automatic reconnection on errors."""

307

resource_version = None

308

309

while True:

310

try:

311

w = watch.Watch()

312

print(f"Starting watch from resource version: {resource_version}")

313

314

for event in w.stream(

315

v1.list_namespaced_pod,

316

namespace="default",

317

resource_version=resource_version,

318

timeout_seconds=300 # 5 minute timeout

319

):

320

event_type = event['type']

321

322

if event_type == "ERROR":

323

error_obj = event['object']

324

print(f"Watch error: {error_obj}")

325

326

# Handle resource version too old error

327

if hasattr(error_obj, 'code') and error_obj.code == 410:

328

print("Resource version expired, restarting watch")

329

resource_version = None

330

break

331

else:

332

raise Exception(f"Watch error: {error_obj}")

333

334

pod = event['object']

335

print(f"{event_type}: {pod.metadata.name}")

336

337

# Update resource version

338

resource_version = pod.metadata.resource_version

339

340

except ApiException as e:

341

print(f"API exception: {e}")

342

time.sleep(5) # Wait before reconnecting

343

344

except Exception as e:

345

print(f"Unexpected error: {e}")

346

time.sleep(5)

347

348

except KeyboardInterrupt:

349

print("Stopping watch...")

350

break

351

352

# Start watching with reconnection

353

watch_with_reconnect()

354

```