0
# Monitoring and Observability
1
2
Real-time monitoring tools for watching resource changes, streaming logs, collecting metrics, and implementing the informer pattern for efficient resource caching and event handling.
3
4
## Capabilities
5
6
### Watch Class
7
8
Watch Kubernetes resources for real-time changes using the Kubernetes watch API.
9
10
```typescript { .api }
11
/**
12
* Watch Kubernetes resources for changes
13
*/
14
class Watch {
15
constructor(config: KubeConfig);
16
17
/**
18
* Watch for resource changes
19
* @param path - API path to watch (e.g., '/api/v1/namespaces/default/pods')
20
* @param queryParams - Query parameters for filtering
21
* @param callback - Called for each change event
22
* @param done - Called when watch ends or errors
23
* @returns Promise that resolves to AbortController for canceling the watch
24
*/
25
watch(
26
path: string,
27
queryParams: Record<string, string | number | boolean | undefined>,
28
callback: (phase: string, apiObj: any, watchObj?: any) => void,
29
done: (err: any) => void
30
): Promise<AbortController>;
31
32
/** Server-side close indicator */
33
static readonly SERVER_SIDE_CLOSE: object;
34
}
35
```
36
37
**Usage Examples:**
38
39
```typescript
40
import { KubeConfig, Watch } from '@kubernetes/client-node';
41
42
const kc = new KubeConfig();
43
kc.loadFromDefault();
44
45
const watch = new Watch(kc);
46
47
// Watch pods in a namespace
48
const watchPods = async () => {
49
try {
50
const controller = await watch.watch(
51
'/api/v1/namespaces/default/pods',
52
{
53
labelSelector: 'app=web'
54
},
55
(phase, pod) => {
56
console.log(`Pod ${phase}:`, pod.metadata?.name);
57
switch (phase) {
58
case 'ADDED':
59
console.log('New pod created');
60
break;
61
case 'MODIFIED':
62
console.log('Pod updated, status:', pod.status?.phase);
63
break;
64
case 'DELETED':
65
console.log('Pod deleted');
66
break;
67
case 'ERROR':
68
console.error('Watch error:', pod);
69
break;
70
}
71
},
72
(err) => {
73
if (err) {
74
console.error('Watch ended with error:', err);
75
} else {
76
console.log('Watch ended normally');
77
}
78
}
79
);
80
81
// Cancel watch after 30 seconds
82
setTimeout(() => {
83
controller.abort();
84
}, 30000);
85
86
} catch (error) {
87
console.error('Failed to start watch:', error);
88
}
89
};
90
91
// Watch deployments with resource version
92
const watchDeployments = async () => {
93
let resourceVersion: string | undefined;
94
95
const startWatch = async () => {
96
const controller = await watch.watch(
97
'/apis/apps/v1/namespaces/default/deployments',
98
{
99
resourceVersion,
100
watch: true
101
},
102
(phase, deployment, watchObj) => {
103
console.log(`Deployment ${phase}:`, deployment.metadata?.name);
104
105
// Update resource version for reconnection
106
if (watchObj?.object?.metadata?.resourceVersion) {
107
resourceVersion = watchObj.object.metadata.resourceVersion;
108
}
109
},
110
(err) => {
111
if (err && err !== Watch.SERVER_SIDE_CLOSE) {
112
console.error('Watch error, restarting:', err);
113
// Restart watch on error
114
setTimeout(startWatch, 1000);
115
}
116
}
117
);
118
};
119
120
startWatch();
121
};
122
123
// Watch all resources in a namespace
124
const watchNamespace = async () => {
125
const resources = [
126
'/api/v1/namespaces/default/pods',
127
'/api/v1/namespaces/default/services',
128
'/apis/apps/v1/namespaces/default/deployments'
129
];
130
131
const controllers = await Promise.all(
132
resources.map(path =>
133
watch.watch(
134
path,
135
{},
136
(phase, obj) => {
137
console.log(`${obj.kind} ${phase}:`, obj.metadata?.name);
138
},
139
(err) => {
140
if (err) console.error(`Watch error for ${path}:`, err);
141
}
142
)
143
)
144
);
145
146
// Cancel all watches
147
const cancelAll = () => {
148
controllers.forEach(controller => controller.abort());
149
};
150
151
process.on('SIGINT', cancelAll);
152
};
153
```
154
155
### Log Class
156
157
Stream logs from pod containers with various filtering and formatting options.
158
159
```typescript { .api }
160
/**
161
* Stream logs from pod containers
162
*/
163
class Log {
164
constructor(config: KubeConfig);
165
166
/**
167
* Stream container logs
168
* @param namespace - Pod namespace
169
* @param podName - Pod name
170
* @param containerName - Container name
171
* @param stream - Writable stream for log output
172
* @param options - Log streaming options
173
* @returns Promise that resolves to AbortController for canceling the stream
174
*/
175
log(
176
namespace: string,
177
podName: string,
178
containerName: string,
179
stream: Writable,
180
options?: LogOptions
181
): Promise<AbortController>;
182
}
183
184
interface LogOptions {
185
/** Follow log stream (tail -f behavior) */
186
follow?: boolean;
187
/** Limit number of bytes returned */
188
limitBytes?: number;
189
/** Pretty-print the output */
190
pretty?: boolean;
191
/** Return logs from previous container instance */
192
previous?: boolean;
193
/** Relative time in seconds before now */
194
sinceSeconds?: number;
195
/** Absolute time (RFC3339) to start streaming from */
196
sinceTime?: string;
197
/** Number of lines from end of log to show */
198
tailLines?: number;
199
/** Include timestamps in log output */
200
timestamps?: boolean;
201
}
202
203
/**
204
* Utility function to add log options to URL search parameters
205
*/
206
function AddOptionsToSearchParams(options: LogOptions, searchParams: URLSearchParams): void;
207
```
208
209
**Usage Examples:**
210
211
```typescript
212
import { KubeConfig, Log } from '@kubernetes/client-node';
213
import * as fs from 'fs';
214
215
const kc = new KubeConfig();
216
kc.loadFromDefault();
217
218
const logClient = new Log(kc);
219
220
// Stream logs to console
221
const streamLogs = async () => {
222
try {
223
const controller = await logClient.log(
224
'default',
225
'my-pod',
226
'my-container',
227
process.stdout,
228
{
229
follow: true,
230
timestamps: true,
231
tailLines: 100
232
}
233
);
234
235
console.log('Streaming logs... Press Ctrl+C to stop');
236
237
process.on('SIGINT', () => {
238
controller.abort();
239
process.exit(0);
240
});
241
} catch (error) {
242
console.error('Log streaming error:', error);
243
}
244
};
245
246
// Save logs to file
247
const saveLogs = async () => {
248
const logFile = fs.createWriteStream('./pod-logs.txt');
249
250
try {
251
await logClient.log(
252
'default',
253
'my-pod',
254
'my-container',
255
logFile,
256
{
257
sinceSeconds: 3600, // Last hour
258
timestamps: true
259
}
260
);
261
262
console.log('Logs saved to pod-logs.txt');
263
} catch (error) {
264
console.error('Failed to save logs:', error);
265
} finally {
266
logFile.end();
267
}
268
};
269
270
// Get previous container logs
271
const getPreviousLogs = async () => {
272
try {
273
await logClient.log(
274
'default',
275
'crashed-pod',
276
'app-container',
277
process.stdout,
278
{
279
previous: true,
280
tailLines: 50
281
}
282
);
283
} catch (error) {
284
console.error('Failed to get previous logs:', error);
285
}
286
};
287
288
// Multi-container pod logs
289
const multiContainerLogs = async () => {
290
const containers = ['web', 'sidecar', 'init'];
291
292
for (const container of containers) {
293
console.log(`\n=== Logs for container: ${container} ===`);
294
295
try {
296
await logClient.log(
297
'default',
298
'multi-container-pod',
299
container,
300
process.stdout,
301
{
302
tailLines: 20,
303
timestamps: true
304
}
305
);
306
} catch (error) {
307
console.error(`Error getting logs for ${container}:`, error);
308
}
309
}
310
};
311
```
312
313
### Informer System
314
315
Implement the informer pattern for efficient resource watching and caching with event-driven updates.
316
317
```typescript { .api }
318
/**
319
* Create an informer for watching and caching resources
320
* @param kubeconfig - Kubernetes configuration
321
* @param path - API path to watch
322
* @param listPromiseFn - Function to list resources
323
* @param labelSelector - Optional label selector for filtering
324
* @returns Informer instance
325
*/
326
function makeInformer<T>(
327
kubeconfig: KubeConfig,
328
path: string,
329
listPromiseFn: ListPromise<T>,
330
labelSelector?: string
331
): Informer<T>;
332
333
interface Informer<T> {
334
/** Register event handler */
335
on(verb: string, fn: ObjectCallback<T>): void;
336
/** Unregister event handler */
337
off(verb: string, fn: ObjectCallback<T>): void;
338
/** Start the informer */
339
start(): Promise<void>;
340
/** Stop the informer */
341
stop(): Promise<void>;
342
}
343
344
type ObjectCallback<T> = (obj: T) => void;
345
type ErrorCallback = (err?: any) => void;
346
type ListCallback<T> = (list: T[], ResourceVersion: string) => void;
347
type ListPromise<T> = () => Promise<KubernetesListObject<T>>;
348
349
// Event type constants
350
const ADD: string;
351
const UPDATE: string;
352
const CHANGE: string;
353
const DELETE: string;
354
const CONNECT: string;
355
const ERROR: string;
356
```
357
358
**Usage Examples:**
359
360
```typescript
361
import {
362
KubeConfig,
363
CoreV1Api,
364
makeInformer,
365
ADD,
366
UPDATE,
367
DELETE,
368
ERROR
369
} from '@kubernetes/client-node';
370
371
const kc = new KubeConfig();
372
kc.loadFromDefault();
373
374
const k8sApi = kc.makeApiClient(CoreV1Api);
375
376
// Create pod informer
377
const createPodInformer = () => {
378
const informer = makeInformer(
379
kc,
380
'/api/v1/namespaces/default/pods',
381
() => k8sApi.listNamespacedPod('default'),
382
'app=web' // label selector
383
);
384
385
// Register event handlers
386
informer.on(ADD, (pod) => {
387
console.log('Pod added:', pod.metadata?.name);
388
console.log('Status:', pod.status?.phase);
389
});
390
391
informer.on(UPDATE, (pod) => {
392
console.log('Pod updated:', pod.metadata?.name);
393
console.log('New status:', pod.status?.phase);
394
});
395
396
informer.on(DELETE, (pod) => {
397
console.log('Pod deleted:', pod.metadata?.name);
398
});
399
400
informer.on(ERROR, (err) => {
401
console.error('Informer error:', err);
402
});
403
404
return informer;
405
};
406
407
// Start and manage informer
408
const runInformer = async () => {
409
const informer = createPodInformer();
410
411
try {
412
await informer.start();
413
console.log('Informer started successfully');
414
415
// Stop informer after 60 seconds
416
setTimeout(async () => {
417
await informer.stop();
418
console.log('Informer stopped');
419
}, 60000);
420
421
} catch (error) {
422
console.error('Failed to start informer:', error);
423
}
424
};
425
426
// Deployment informer with custom handlers
427
const createDeploymentInformer = () => {
428
const informer = makeInformer(
429
kc,
430
'/apis/apps/v1/namespaces/default/deployments',
431
() => kc.makeApiClient(AppsV1Api).listNamespacedDeployment('default')
432
);
433
434
const handleDeploymentChange = (deployment) => {
435
const name = deployment.metadata?.name;
436
const replicas = deployment.status?.replicas || 0;
437
const ready = deployment.status?.readyReplicas || 0;
438
439
console.log(`Deployment ${name}: ${ready}/${replicas} ready`);
440
441
if (ready < replicas) {
442
console.log('Deployment not fully ready');
443
}
444
};
445
446
informer.on(ADD, handleDeploymentChange);
447
informer.on(UPDATE, handleDeploymentChange);
448
449
informer.on(DELETE, (deployment) => {
450
console.log('Deployment deleted:', deployment.metadata?.name);
451
});
452
453
return informer;
454
};
455
```
456
457
### Metrics Classes
458
459
Collect resource usage metrics from nodes and pods for monitoring and analysis.
460
461
```typescript { .api }
462
/**
463
* Collect metrics from Kubernetes cluster
464
*/
465
class Metrics {
466
constructor(config: KubeConfig);
467
468
/**
469
* Get node resource metrics
470
* @returns Promise that resolves to node metrics list
471
*/
472
getNodeMetrics(): Promise<NodeMetricsList>;
473
474
/**
475
* Get pod resource metrics
476
* @param namespace - Optional namespace filter
477
* @returns Promise that resolves to pod metrics list
478
*/
479
getPodMetrics(namespace?: string): Promise<PodMetricsList>;
480
}
481
482
interface Usage {
483
/** CPU usage (e.g., "250m" for 250 millicores) */
484
cpu: string;
485
/** Memory usage (e.g., "128Mi" for 128 mebibytes) */
486
memory: string;
487
}
488
489
interface ContainerMetric {
490
/** Container name */
491
name: string;
492
/** Resource usage */
493
usage: Usage;
494
}
495
496
interface PodMetric {
497
/** Pod metadata */
498
metadata: {
499
name: string;
500
namespace: string;
501
selfLink: string;
502
creationTimestamp: string;
503
};
504
/** Timestamp of metrics collection */
505
timestamp: string;
506
/** Metrics collection window */
507
window: string;
508
/** Container metrics */
509
containers: ContainerMetric[];
510
}
511
512
interface NodeMetric {
513
/** Node metadata */
514
metadata: {
515
name: string;
516
selfLink: string;
517
creationTimestamp: string;
518
};
519
/** Timestamp of metrics collection */
520
timestamp: string;
521
/** Metrics collection window */
522
window: string;
523
/** Node resource usage */
524
usage: Usage;
525
}
526
527
interface PodMetricsList {
528
kind: string;
529
apiVersion: string;
530
metadata: object;
531
items: PodMetric[];
532
}
533
534
interface NodeMetricsList {
535
kind: string;
536
apiVersion: string;
537
metadata: object;
538
items: NodeMetric[];
539
}
540
```
541
542
**Usage Examples:**
543
544
```typescript
545
import { KubeConfig, Metrics } from '@kubernetes/client-node';
546
547
const kc = new KubeConfig();
548
kc.loadFromDefault();
549
550
const metrics = new Metrics(kc);
551
552
// Get node metrics
553
const getNodeMetrics = async () => {
554
try {
555
const nodeMetrics = await metrics.getNodeMetrics();
556
557
console.log('Node Metrics:');
558
nodeMetrics.items.forEach(node => {
559
console.log(`Node: ${node.metadata.name}`);
560
console.log(`CPU: ${node.usage.cpu}`);
561
console.log(`Memory: ${node.usage.memory}`);
562
console.log('---');
563
});
564
} catch (error) {
565
console.error('Failed to get node metrics:', error);
566
}
567
};
568
569
// Get pod metrics for specific namespace
570
const getPodMetrics = async () => {
571
try {
572
const podMetrics = await metrics.getPodMetrics('default');
573
574
console.log('Pod Metrics:');
575
podMetrics.items.forEach(pod => {
576
console.log(`Pod: ${pod.metadata.name}`);
577
console.log(`Namespace: ${pod.metadata.namespace}`);
578
console.log('Containers:');
579
580
pod.containers.forEach(container => {
581
console.log(` ${container.name}:`);
582
console.log(` CPU: ${container.usage.cpu}`);
583
console.log(` Memory: ${container.usage.memory}`);
584
});
585
console.log('---');
586
});
587
} catch (error) {
588
console.error('Failed to get pod metrics:', error);
589
}
590
};
591
592
// Monitor resource usage over time
593
const monitorResources = async () => {
594
const interval = setInterval(async () => {
595
try {
596
const [nodeMetrics, podMetrics] = await Promise.all([
597
metrics.getNodeMetrics(),
598
metrics.getPodMetrics()
599
]);
600
601
// Calculate total cluster resource usage
602
const totalNodeCpu = nodeMetrics.items.reduce((sum, node) => {
603
const cpu = parseInt(node.usage.cpu.replace('n', '')) / 1000000; // Convert nanocores to millicores
604
return sum + cpu;
605
}, 0);
606
607
const totalPodCpu = podMetrics.items.reduce((sum, pod) => {
608
const podCpu = pod.containers.reduce((containerSum, container) => {
609
const cpu = parseInt(container.usage.cpu.replace('m', ''));
610
return containerSum + cpu;
611
}, 0);
612
return sum + podCpu;
613
}, 0);
614
615
console.log(`Cluster CPU Usage: ${totalPodCpu}m / ${totalNodeCpu}m`);
616
console.log(`CPU Utilization: ${((totalPodCpu / totalNodeCpu) * 100).toFixed(2)}%`);
617
618
} catch (error) {
619
console.error('Metrics collection error:', error);
620
}
621
}, 30000); // Every 30 seconds
622
623
// Stop monitoring after 5 minutes
624
setTimeout(() => {
625
clearInterval(interval);
626
console.log('Resource monitoring stopped');
627
}, 300000);
628
};
629
```
630
631
### Top Classes
632
633
Resource usage analysis similar to `kubectl top` with detailed capacity and usage information.
634
635
```typescript { .api }
636
/**
637
* Resource usage information
638
*/
639
class ResourceUsage {
640
/** Total resource capacity */
641
Capacity: number | bigint;
642
/** Total requested resources */
643
RequestTotal: number | bigint;
644
/** Total resource limits */
645
LimitTotal: number | bigint;
646
}
647
648
/**
649
* Current resource usage
650
*/
651
class CurrentResourceUsage {
652
/** Current resource usage */
653
CurrentUsage: number | bigint;
654
/** Total requested resources */
655
RequestTotal: number | bigint;
656
/** Total resource limits */
657
LimitTotal: number | bigint;
658
}
659
660
/**
661
* Node resource status
662
*/
663
class NodeStatus {
664
/** Node object */
665
Node: V1Node;
666
/** CPU usage information */
667
CPU: ResourceUsage;
668
/** Memory usage information */
669
Memory: ResourceUsage;
670
}
671
672
/**
673
* Container resource status
674
*/
675
class ContainerStatus {
676
/** Container name */
677
name: string;
678
/** CPU usage */
679
CPUUsage: CurrentResourceUsage;
680
/** Memory usage */
681
MemoryUsage: CurrentResourceUsage;
682
}
683
684
/**
685
* Pod resource status
686
*/
687
class PodStatus {
688
/** Pod object */
689
Pod: V1Pod;
690
/** Container statuses */
691
Containers: ContainerStatus[];
692
}
693
```
694
695
**Usage Examples:**
696
697
```typescript
698
import { KubeConfig, CoreV1Api, Metrics } from '@kubernetes/client-node';
699
700
const kc = new KubeConfig();
701
kc.loadFromDefault();
702
703
// Utility functions for resource analysis
704
const parseResourceQuantity = (quantity: string): number => {
705
// Parse Kubernetes resource quantities (e.g., "100m", "1Gi")
706
const units = {
707
'n': 1e-9, 'u': 1e-6, 'm': 1e-3,
708
'k': 1e3, 'M': 1e6, 'G': 1e9, 'T': 1e12,
709
'Ki': 1024, 'Mi': 1024**2, 'Gi': 1024**3, 'Ti': 1024**4
710
};
711
712
const match = quantity.match(/^(\d+(?:\.\d+)?)(.*?)$/);
713
if (!match) return 0;
714
715
const [, value, unit] = match;
716
const multiplier = units[unit] || 1;
717
return parseFloat(value) * multiplier;
718
};
719
720
// Create top-like analysis
721
const analyzeClusterResources = async () => {
722
const coreApi = kc.makeApiClient(CoreV1Api);
723
const metrics = new Metrics(kc);
724
725
try {
726
const [nodes, nodeMetrics, podMetrics] = await Promise.all([
727
coreApi.listNode(),
728
metrics.getNodeMetrics(),
729
metrics.getPodMetrics()
730
]);
731
732
console.log('CLUSTER RESOURCE ANALYSIS');
733
console.log('=========================');
734
735
// Analyze nodes
736
console.log('\nNODES:');
737
console.log('NAME\t\tCPU\t\tMEMORY');
738
739
nodes.body.items.forEach(node => {
740
const nodeName = node.metadata?.name || 'unknown';
741
const nodeMetric = nodeMetrics.items.find(m => m.metadata.name === nodeName);
742
743
if (nodeMetric) {
744
const cpuUsage = parseResourceQuantity(nodeMetric.usage.cpu);
745
const memoryUsage = parseResourceQuantity(nodeMetric.usage.memory);
746
747
console.log(`${nodeName}\t${(cpuUsage * 1000).toFixed(0)}m\t\t${(memoryUsage / 1024**2).toFixed(0)}Mi`);
748
}
749
});
750
751
// Analyze pods
752
console.log('\nPODS:');
753
console.log('NAMESPACE\tNAME\t\tCPU\t\tMEMORY');
754
755
podMetrics.items.forEach(pod => {
756
const namespace = pod.metadata.namespace;
757
const name = pod.metadata.name;
758
759
const totalCpu = pod.containers.reduce((sum, container) => {
760
return sum + parseResourceQuantity(container.usage.cpu);
761
}, 0);
762
763
const totalMemory = pod.containers.reduce((sum, container) => {
764
return sum + parseResourceQuantity(container.usage.memory);
765
}, 0);
766
767
console.log(`${namespace}\t\t${name}\t${(totalCpu * 1000).toFixed(0)}m\t\t${(totalMemory / 1024**2).toFixed(0)}Mi`);
768
});
769
770
} catch (error) {
771
console.error('Resource analysis failed:', error);
772
}
773
};
774
```
775
776
### Cache System
777
778
In-memory caching for Kubernetes objects with automatic synchronization through the informer pattern.
779
780
```typescript { .api }
781
/**
782
* Cache interface for Kubernetes objects
783
*/
784
interface ObjectCache<T> {
785
/** Get object by name and optional namespace */
786
get(name: string, namespace?: string): T | undefined;
787
/** List all objects, optionally filtered by namespace */
788
list(namespace?: string): ReadonlyArray<T>;
789
}
790
791
/**
792
* Internal cache structure
793
*/
794
type CacheMap<T> = Map<string, Map<string, T>>;
795
796
/**
797
* Combined list/watch with caching
798
*/
799
class ListWatch<T> implements ObjectCache<T>, Informer<T> {
800
constructor(
801
path: string,
802
watch: Watch,
803
listFn: ListPromise<T>,
804
autoStart?: boolean,
805
labelSelector?: string,
806
fieldSelector?: string
807
);
808
809
// ObjectCache methods
810
get(name: string, namespace?: string): T | undefined;
811
list(namespace?: string): ReadonlyArray<T>;
812
813
// Informer methods
814
on(verb: string, fn: ObjectCallback<T>): void;
815
off(verb: string, fn: ObjectCallback<T>): void;
816
start(): Promise<void>;
817
stop(): Promise<void>;
818
}
819
```
820
821
**Usage Examples:**
822
823
```typescript
824
import { KubeConfig, CoreV1Api, Watch, ListWatch } from '@kubernetes/client-node';
825
826
const kc = new KubeConfig();
827
kc.loadFromDefault();
828
829
// Create cached pod watcher
830
const createPodCache = () => {
831
const watch = new Watch(kc);
832
const coreApi = kc.makeApiClient(CoreV1Api);
833
834
const podCache = new ListWatch(
835
'/api/v1/namespaces/default/pods',
836
watch,
837
() => coreApi.listNamespacedPod('default'),
838
true, // auto-start
839
'app=web' // label selector
840
);
841
842
// Use cache methods
843
podCache.on('ADD', (pod) => {
844
console.log('Pod cached:', pod.metadata?.name);
845
});
846
847
return podCache;
848
};
849
850
// Query cached data
851
const queryCachedPods = (cache: ObjectCache<V1Pod>) => {
852
// Get specific pod
853
const pod = cache.get('my-pod', 'default');
854
if (pod) {
855
console.log('Found pod:', pod.metadata?.name);
856
}
857
858
// List all cached pods
859
const allPods = cache.list();
860
console.log(`Total cached pods: ${allPods.length}`);
861
862
// List pods in specific namespace
863
const namespacedPods = cache.list('production');
864
console.log(`Pods in production: ${namespacedPods.length}`);
865
};
866
```