0
# Streaming Operations
1
2
Execute commands in running pods, stream container logs, and establish port forwarding connections. Essential for debugging, log analysis, and establishing secure connections to pod services through the Kubernetes API server.
3
4
## Capabilities
5
6
### Command Execution
7
8
Execute commands inside running containers and stream the results with support for stdin, stdout, stderr, and TTY.
9
10
```python { .api }
11
def stream(
12
ws_client,
13
channel: str,
14
*args,
15
stdin: bool = False,
16
stdout: bool = True,
17
stderr: bool = True,
18
tty: bool = False,
19
**kwargs
20
):
21
"""
22
Stream command execution or log output from pods.
23
24
Parameters:
25
- ws_client: WebSocket client connection
26
- channel: Channel type (stdin, stdout, stderr, error, resize)
27
- *args: Additional arguments
28
- stdin: Enable stdin stream
29
- stdout: Enable stdout stream
30
- stderr: Enable stderr stream
31
- tty: Allocate TTY
32
33
Returns:
34
Stream object for reading/writing data
35
"""
36
```
37
38
### Port Forwarding
39
40
Establish port forwarding connections to pods for accessing services running inside containers.
41
42
```python { .api }
43
def portforward(
44
api_instance,
45
name: str,
46
namespace: str,
47
ports: list,
48
**kwargs
49
):
50
"""
51
Create port forwarding connection to a pod.
52
53
Parameters:
54
- api_instance: CoreV1Api instance
55
- name: Pod name
56
- namespace: Pod namespace
57
- ports: List of port mappings
58
59
Returns:
60
PortForward connection object
61
"""
62
```
63
64
## Usage Examples
65
66
### Executing Commands in Pods
67
68
```python
69
from kubernetes import client, config
70
from kubernetes.stream import stream
71
72
config.load_kube_config()
73
v1 = client.CoreV1Api()
74
75
# Execute a simple command
76
exec_command = ['/bin/sh', '-c', 'echo "Hello from pod"']
77
78
resp = stream(
79
v1.connect_get_namespaced_pod_exec,
80
name="my-pod",
81
namespace="default",
82
command=exec_command,
83
stderr=True,
84
stdin=False,
85
stdout=True,
86
tty=False
87
)
88
89
print("Command output:")
90
print(resp)
91
```
92
93
### Interactive Shell Session
94
95
```python
96
from kubernetes import client, config
97
from kubernetes.stream import stream
98
import sys
99
import select
100
import termios
101
import tty
102
103
config.load_kube_config()
104
v1 = client.CoreV1Api()
105
106
def interactive_shell(pod_name, namespace="default", container=None):
107
"""Create interactive shell session with pod."""
108
109
# Store original terminal settings
110
old_tty = termios.tcgetattr(sys.stdin)
111
112
try:
113
# Set terminal to raw mode
114
tty.setraw(sys.stdin.fileno())
115
116
# Start exec with TTY
117
exec_command = ['/bin/bash']
118
resp = stream(
119
v1.connect_get_namespaced_pod_exec,
120
name=pod_name,
121
namespace=namespace,
122
container=container,
123
command=exec_command,
124
stderr=True,
125
stdin=True,
126
stdout=True,
127
tty=True,
128
_preload_content=False
129
)
130
131
# Handle input/output
132
while resp.is_open():
133
resp.update(timeout=1)
134
135
# Check for input
136
if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
137
input_char = sys.stdin.read(1)
138
if input_char:
139
resp.write_stdin(input_char)
140
141
# Read output
142
if resp.peek_stdout():
143
print(resp.read_stdout(), end='')
144
if resp.peek_stderr():
145
print(resp.read_stderr(), end='', file=sys.stderr)
146
147
resp.close()
148
149
finally:
150
# Restore terminal settings
151
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
152
153
# Start interactive shell
154
interactive_shell("my-pod", "default")
155
```
156
157
### Streaming Container Logs
158
159
```python
160
from kubernetes import client, config
161
from kubernetes.stream import stream
162
163
config.load_kube_config()
164
v1 = client.CoreV1Api()
165
166
# Stream logs from a container
167
def stream_logs(pod_name, namespace="default", container=None, follow=True):
168
"""Stream logs from a pod container."""
169
170
w = stream(
171
v1.read_namespaced_pod_log,
172
name=pod_name,
173
namespace=namespace,
174
container=container,
175
follow=follow,
176
_preload_content=False
177
)
178
179
try:
180
for line in w.stream():
181
print(line.decode('utf-8').rstrip())
182
except KeyboardInterrupt:
183
w.close()
184
print("\nLog streaming stopped")
185
186
# Stream logs
187
stream_logs("my-pod", "default", follow=True)
188
```
189
190
### Port Forwarding to Pod
191
192
```python
193
from kubernetes import client, config
194
from kubernetes.stream import portforward
195
import requests
196
import threading
197
import time
198
199
config.load_kube_config()
200
v1 = client.CoreV1Api()
201
202
def create_port_forward(pod_name, namespace, local_port, pod_port):
203
"""Create port forward connection."""
204
205
# Start port forwarding in background thread
206
def port_forward_thread():
207
try:
208
pf = portforward(
209
v1.connect_get_namespaced_pod_portforward,
210
name=pod_name,
211
namespace=namespace,
212
ports=f"{local_port}:{pod_port}"
213
)
214
215
print(f"Port forwarding active: localhost:{local_port} -> {pod_name}:{pod_port}")
216
217
# Keep connection alive
218
while True:
219
time.sleep(1)
220
221
except Exception as e:
222
print(f"Port forwarding error: {e}")
223
224
# Start port forwarding
225
pf_thread = threading.Thread(target=port_forward_thread, daemon=True)
226
pf_thread.start()
227
228
# Wait for connection to establish
229
time.sleep(2)
230
231
return pf_thread
232
233
# Example: Forward local port 8080 to pod port 80
234
pf_thread = create_port_forward("nginx-pod", "default", 8080, 80)
235
236
# Use the forwarded connection
237
try:
238
response = requests.get("http://localhost:8080")
239
print(f"Response: {response.status_code}")
240
print(response.text[:200])
241
except requests.RequestException as e:
242
print(f"Request failed: {e}")
243
```
244
245
### Copying Files To/From Pods
246
247
```python
248
from kubernetes import client, config
249
from kubernetes.stream import stream
250
import os
251
import tarfile
252
import tempfile
253
254
config.load_kube_config()
255
v1 = client.CoreV1Api()
256
257
def copy_file_to_pod(pod_name, namespace, local_file, pod_path, container=None):
258
"""Copy file from local system to pod."""
259
260
# Create tar archive of the file
261
with tempfile.NamedTemporaryFile() as tar_buffer:
262
with tarfile.open(fileobj=tar_buffer, mode='w') as tar:
263
tar.add(local_file, arcname=os.path.basename(local_file))
264
265
tar_buffer.seek(0)
266
tar_data = tar_buffer.read()
267
268
# Execute tar command to extract file in pod
269
exec_command = ['tar', 'xf', '-', '-C', os.path.dirname(pod_path)]
270
271
resp = stream(
272
v1.connect_get_namespaced_pod_exec,
273
name=pod_name,
274
namespace=namespace,
275
container=container,
276
command=exec_command,
277
stderr=True,
278
stdin=True,
279
stdout=True,
280
tty=False,
281
_preload_content=False
282
)
283
284
# Send tar data to pod
285
resp.write_stdin(tar_data)
286
resp.close()
287
288
print(f"File copied to {pod_name}:{pod_path}")
289
290
def copy_file_from_pod(pod_name, namespace, pod_path, local_file, container=None):
291
"""Copy file from pod to local system."""
292
293
# Create tar archive of the file in pod
294
exec_command = ['tar', 'cf', '-', pod_path]
295
296
resp = stream(
297
v1.connect_get_namespaced_pod_exec,
298
name=pod_name,
299
namespace=namespace,
300
container=container,
301
command=exec_command,
302
stderr=True,
303
stdin=False,
304
stdout=True,
305
tty=False,
306
_preload_content=False
307
)
308
309
# Read tar data
310
tar_data = b''
311
while resp.is_open():
312
resp.update(timeout=1)
313
if resp.peek_stdout():
314
tar_data += resp.read_stdout()
315
316
# Extract file from tar data
317
with tempfile.NamedTemporaryFile() as tar_buffer:
318
tar_buffer.write(tar_data)
319
tar_buffer.seek(0)
320
321
with tarfile.open(fileobj=tar_buffer, mode='r') as tar:
322
tar.extractall(path=os.path.dirname(local_file))
323
324
print(f"File copied from {pod_name}:{pod_path} to {local_file}")
325
326
# Example usage
327
copy_file_to_pod("my-pod", "default", "/local/file.txt", "/tmp/file.txt")
328
copy_file_from_pod("my-pod", "default", "/tmp/output.txt", "/local/output.txt")
329
```
330
331
### Monitoring Pod Resource Usage
332
333
```python
334
from kubernetes import client, config
335
from kubernetes.stream import stream
336
import json
337
import time
338
339
config.load_kube_config()
340
v1 = client.CoreV1Api()
341
342
def monitor_pod_resources(pod_name, namespace="default", container=None):
343
"""Monitor pod resource usage using kubectl top equivalent."""
344
345
# Use metrics from /proc or system commands
346
exec_command = [
347
'sh', '-c',
348
'while true; do '
349
'echo "=== $(date) ==="; '
350
'cat /proc/meminfo | grep -E "MemTotal|MemAvailable"; '
351
'cat /proc/loadavg; '
352
'sleep 5; '
353
'done'
354
]
355
356
resp = stream(
357
v1.connect_get_namespaced_pod_exec,
358
name=pod_name,
359
namespace=namespace,
360
container=container,
361
command=exec_command,
362
stderr=True,
363
stdin=False,
364
stdout=True,
365
tty=False,
366
_preload_content=False
367
)
368
369
print(f"Monitoring resources for {pod_name}")
370
371
try:
372
while resp.is_open():
373
resp.update(timeout=1)
374
if resp.peek_stdout():
375
output = resp.read_stdout()
376
print(output.decode('utf-8'), end='')
377
378
time.sleep(1)
379
380
except KeyboardInterrupt:
381
resp.close()
382
print("\nMonitoring stopped")
383
384
# Monitor pod resources
385
monitor_pod_resources("my-pod", "default")
386
```
387
388
### Debug Pod with Ephemeral Container
389
390
```python
391
from kubernetes import client, config
392
from kubernetes.stream import stream
393
394
config.load_kube_config()
395
v1 = client.CoreV1Api()
396
397
def debug_pod_with_ephemeral(pod_name, namespace="default"):
398
"""Debug pod using ephemeral container (Kubernetes 1.23+)."""
399
400
# Get existing pod
401
pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
402
403
# Add ephemeral container for debugging
404
ephemeral_container = {
405
"name": "debugger",
406
"image": "busybox:latest",
407
"command": ["/bin/sh"],
408
"stdin": True,
409
"tty": True,
410
"targetContainerName": pod.spec.containers[0].name
411
}
412
413
# Update pod with ephemeral container
414
if not pod.spec.ephemeral_containers:
415
pod.spec.ephemeral_containers = []
416
pod.spec.ephemeral_containers.append(ephemeral_container)
417
418
# Patch the pod
419
v1.patch_namespaced_pod(
420
name=pod_name,
421
namespace=namespace,
422
body=pod
423
)
424
425
print(f"Ephemeral container added to {pod_name}")
426
427
# Connect to the ephemeral container
428
resp = stream(
429
v1.connect_get_namespaced_pod_exec,
430
name=pod_name,
431
namespace=namespace,
432
container="debugger",
433
command=["/bin/sh"],
434
stderr=True,
435
stdin=True,
436
stdout=True,
437
tty=True,
438
_preload_content=False
439
)
440
441
print("Connected to debug container. Type 'exit' to quit.")
442
443
# Interactive session with debug container
444
try:
445
while resp.is_open():
446
resp.update(timeout=1)
447
448
if resp.peek_stdout():
449
print(resp.read_stdout().decode('utf-8'), end='')
450
if resp.peek_stderr():
451
print(resp.read_stderr().decode('utf-8'), end='')
452
453
except KeyboardInterrupt:
454
resp.close()
455
print("\nDebug session ended")
456
457
# Debug pod with ephemeral container
458
debug_pod_with_ephemeral("problematic-pod", "default")
459
```