0
# Streaming
1
2
WebSocket-based streaming for exec sessions, port forwarding, and attach operations. Provides real-time bidirectional communication with containers running in Kubernetes pods, enabling interactive terminal sessions and data streaming.
3
4
## Capabilities
5
6
### WebSocket API Client
7
8
Enhanced API client with WebSocket support for streaming operations.
9
10
```python { .api }
11
class WsApiClient(ApiClient):
12
def __init__(self, configuration=None, header_name=None, header_value=None,
13
cookie=None, pool_threads=1, heartbeat=None):
14
"""
15
WebSocket-enabled API client for streaming operations.
16
17
Parameters:
18
- configuration: Configuration, client configuration
19
- header_name: str, custom header name for authentication
20
- header_value: str, custom header value for authentication
21
- cookie: str, cookie string for authentication
22
- pool_threads: int, number of threads for connection pooling
23
- heartbeat: int, WebSocket heartbeat interval in seconds
24
"""
25
26
async def request(self, method, url, query_params=None, headers=None, body=None,
27
post_params=None, _preload_content=True, _request_timeout=None):
28
"""
29
Make WebSocket request for streaming operations.
30
31
Parameters:
32
- method: str, HTTP method (GET for WebSocket upgrade)
33
- url: str, WebSocket URL
34
- query_params: dict, query parameters for WebSocket connection
35
- headers: dict, additional headers
36
- body: bytes, initial data to send
37
- post_params: dict, POST parameters (unused for WebSocket)
38
- _preload_content: bool, whether to preload response content
39
- _request_timeout: int, request timeout in seconds
40
41
Returns:
42
- WsResponse: WebSocket response wrapper
43
"""
44
45
def parse_error_data(self, data):
46
"""
47
Parse error channel data from WebSocket stream.
48
49
Parameters:
50
- data: bytes, raw error data from ERROR_CHANNEL
51
52
Returns:
53
- dict: Parsed error information
54
"""
55
```
56
57
### WebSocket Response Handling
58
59
Response wrapper for WebSocket streaming operations.
60
61
```python { .api }
62
class WsResponse:
63
def __init__(self, websocket):
64
"""
65
WebSocket response wrapper.
66
67
Parameters:
68
- websocket: WebSocket connection object
69
"""
70
71
async def read_channel(self, timeout=None):
72
"""
73
Read data from WebSocket with channel information.
74
75
Parameters:
76
- timeout: int, read timeout in seconds
77
78
Returns:
79
- tuple: (channel_number, data) where channel indicates data type
80
"""
81
82
async def write_channel(self, channel, data):
83
"""
84
Write data to specific WebSocket channel.
85
86
Parameters:
87
- channel: int, target channel number
88
- data: bytes, data to write
89
"""
90
91
async def close(self):
92
"""Close WebSocket connection."""
93
```
94
95
### Channel Constants
96
97
WebSocket channel identifiers for different data streams in exec/attach operations.
98
99
```python { .api }
100
# Standard I/O channels
101
STDIN_CHANNEL: int = 0 # Standard input to container
102
STDOUT_CHANNEL: int = 1 # Standard output from container
103
STDERR_CHANNEL: int = 2 # Standard error from container
104
ERROR_CHANNEL: int = 3 # Error information and status
105
RESIZE_CHANNEL: int = 4 # Terminal resize events
106
```
107
108
### Utility Functions
109
110
Helper functions for WebSocket URL handling and connection setup.
111
112
```python { .api }
113
def get_websocket_url(url):
114
"""
115
Convert HTTP/HTTPS URL to WebSocket URL.
116
117
Parameters:
118
- url: str, HTTP or HTTPS URL
119
120
Returns:
121
- str: Corresponding WebSocket URL (ws:// or wss://)
122
"""
123
```
124
125
## Usage Examples
126
127
### Container Exec Session
128
129
```python
130
import asyncio
131
from kubernetes_asyncio import client, config, stream
132
133
async def exec_in_pod():
134
await config.load_config()
135
136
# Create WebSocket-enabled client
137
ws_client = stream.WsApiClient()
138
v1 = client.CoreV1Api(ws_client)
139
140
try:
141
# Execute command in pod
142
exec_command = ['/bin/sh', '-c', 'echo "Hello from container"; ls -la']
143
144
response = await v1.connect_get_namespaced_pod_exec(
145
name="my-pod",
146
namespace="default",
147
command=exec_command,
148
stderr=True,
149
stdin=False,
150
stdout=True,
151
tty=False
152
)
153
154
# Read output from exec session
155
while True:
156
try:
157
channel, data = await response.read_channel(timeout=10)
158
159
if channel == stream.STDOUT_CHANNEL:
160
print(f"STDOUT: {data.decode('utf-8')}", end="")
161
elif channel == stream.STDERR_CHANNEL:
162
print(f"STDERR: {data.decode('utf-8')}", end="")
163
elif channel == stream.ERROR_CHANNEL:
164
error_info = ws_client.parse_error_data(data)
165
if error_info.get('status') == 'Success':
166
print("Command completed successfully")
167
break
168
else:
169
print(f"Error: {error_info}")
170
break
171
172
except asyncio.TimeoutError:
173
print("Exec session timed out")
174
break
175
176
finally:
177
await response.close()
178
await ws_client.close()
179
180
asyncio.run(exec_in_pod())
181
```
182
183
### Interactive Terminal Session
184
185
```python
186
import sys
187
import select
188
import termios
189
import tty
190
from kubernetes_asyncio import client, config, stream
191
192
async def interactive_shell():
193
await config.load_config()
194
195
ws_client = stream.WsApiClient()
196
v1 = client.CoreV1Api(ws_client)
197
198
try:
199
# Start interactive shell
200
response = await v1.connect_get_namespaced_pod_exec(
201
name="my-pod",
202
namespace="default",
203
command=['/bin/bash'],
204
stderr=True,
205
stdin=True,
206
stdout=True,
207
tty=True
208
)
209
210
# Set terminal to raw mode for interactive session
211
old_settings = termios.tcgetattr(sys.stdin)
212
tty.setraw(sys.stdin.fileno())
213
214
try:
215
# Handle bidirectional communication
216
async def read_output():
217
while True:
218
try:
219
channel, data = await response.read_channel(timeout=0.1)
220
221
if channel == stream.STDOUT_CHANNEL:
222
sys.stdout.write(data.decode('utf-8'))
223
sys.stdout.flush()
224
elif channel == stream.STDERR_CHANNEL:
225
sys.stderr.write(data.decode('utf-8'))
226
sys.stderr.flush()
227
elif channel == stream.ERROR_CHANNEL:
228
error_info = ws_client.parse_error_data(data)
229
if error_info.get('status') != 'Success':
230
print(f"\nError: {error_info}")
231
break
232
233
except asyncio.TimeoutError:
234
continue
235
236
async def send_input():
237
while True:
238
# Check for input without blocking
239
if select.select([sys.stdin], [], [], 0.1)[0]:
240
char = sys.stdin.read(1)
241
if char:
242
await response.write_channel(stream.STDIN_CHANNEL, char.encode('utf-8'))
243
await asyncio.sleep(0.01)
244
245
# Run both input and output handlers concurrently
246
await asyncio.gather(read_output(), send_input())
247
248
finally:
249
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
250
251
finally:
252
await response.close()
253
await ws_client.close()
254
255
# asyncio.run(interactive_shell()) # Uncomment to run
256
```
257
258
### Container Attach
259
260
```python
261
async def attach_to_pod():
262
await config.load_config()
263
264
ws_client = stream.WsApiClient()
265
v1 = client.CoreV1Api(ws_client)
266
267
try:
268
# Attach to running container
269
response = await v1.connect_get_namespaced_pod_attach(
270
name="my-pod",
271
namespace="default",
272
container="my-container", # Optional: specify container
273
stderr=True,
274
stdin=True,
275
stdout=True,
276
tty=True
277
)
278
279
# Send initial command
280
command = "/bin/bash\n"
281
await response.write_channel(stream.STDIN_CHANNEL, command.encode('utf-8'))
282
283
# Read responses
284
timeout_count = 0
285
while timeout_count < 5: # Exit after 5 timeouts
286
try:
287
channel, data = await response.read_channel(timeout=2)
288
289
if channel == stream.STDOUT_CHANNEL:
290
print(f"Output: {data.decode('utf-8')}", end="")
291
timeout_count = 0 # Reset timeout counter
292
elif channel == stream.STDERR_CHANNEL:
293
print(f"Error: {data.decode('utf-8')}", end="")
294
timeout_count = 0
295
elif channel == stream.ERROR_CHANNEL:
296
error_info = ws_client.parse_error_data(data)
297
print(f"Status: {error_info}")
298
break
299
300
except asyncio.TimeoutError:
301
timeout_count += 1
302
print("No output received...")
303
304
finally:
305
await response.close()
306
await ws_client.close()
307
308
asyncio.run(attach_to_pod())
309
```
310
311
### Port Forwarding
312
313
```python
314
async def port_forward():
315
await config.load_config()
316
317
ws_client = stream.WsApiClient()
318
v1 = client.CoreV1Api(ws_client)
319
320
try:
321
# Set up port forwarding
322
response = await v1.connect_get_namespaced_pod_portforward(
323
name="my-pod",
324
namespace="default",
325
ports="8080" # Forward port 8080
326
)
327
328
print("Port forwarding established on port 8080")
329
330
# Handle port forwarding data
331
while True:
332
try:
333
channel, data = await response.read_channel(timeout=30)
334
335
if channel == stream.STDOUT_CHANNEL:
336
# Handle forwarded data from port 8080
337
print(f"Received {len(data)} bytes from port 8080")
338
# Process or forward data as needed
339
elif channel == stream.ERROR_CHANNEL:
340
error_info = ws_client.parse_error_data(data)
341
if error_info.get('status') != 'Success':
342
print(f"Port forward error: {error_info}")
343
break
344
345
except asyncio.TimeoutError:
346
print("Port forwarding timeout - connection may be idle")
347
continue
348
349
finally:
350
await response.close()
351
await ws_client.close()
352
353
# asyncio.run(port_forward()) # Uncomment to run
354
```
355
356
### Terminal Resize Handling
357
358
```python
359
import signal
360
import struct
361
import fcntl
362
import termios
363
364
async def exec_with_resize():
365
await config.load_config()
366
367
ws_client = stream.WsApiClient()
368
v1 = client.CoreV1Api(ws_client)
369
370
try:
371
response = await v1.connect_get_namespaced_pod_exec(
372
name="my-pod",
373
namespace="default",
374
command=['/bin/bash'],
375
stderr=True,
376
stdin=True,
377
stdout=True,
378
tty=True
379
)
380
381
# Get initial terminal size
382
def get_terminal_size():
383
s = struct.pack('HHHH', 0, 0, 0, 0)
384
x = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, s)
385
return struct.unpack('HHHH', x)[:2] # rows, cols
386
387
# Send initial terminal size
388
rows, cols = get_terminal_size()
389
resize_data = f'{{"Width":{cols},"Height":{rows}}}'
390
await response.write_channel(stream.RESIZE_CHANNEL, resize_data.encode('utf-8'))
391
392
# Handle terminal resize signals
393
def handle_resize(signum, frame):
394
rows, cols = get_terminal_size()
395
resize_data = f'{{"Width":{cols},"Height":{rows}}}'
396
397
# Note: In real implementation, you'd need to queue this
398
# for the async event loop to process
399
print(f"Terminal resized to {cols}x{rows}")
400
401
signal.signal(signal.SIGWINCH, handle_resize)
402
403
# Continue with normal exec session handling...
404
# (Similar to interactive_shell example above)
405
406
finally:
407
await response.close()
408
await ws_client.close()
409
410
# asyncio.run(exec_with_resize()) # Uncomment to run
411
```
412
413
### File Upload via Exec
414
415
```python
416
import base64
417
418
async def upload_file_to_pod():
419
await config.load_config()
420
421
ws_client = stream.WsApiClient()
422
v1 = client.CoreV1Api(ws_client)
423
424
try:
425
# Read local file
426
with open("/local/path/to/file.txt", "rb") as f:
427
file_content = f.read()
428
429
# Base64 encode for safe transmission
430
encoded_content = base64.b64encode(file_content).decode('utf-8')
431
432
# Create command to decode and write file in pod
433
command = [
434
'/bin/sh', '-c',
435
f'echo "{encoded_content}" | base64 -d > /remote/path/to/file.txt && echo "File uploaded successfully"'
436
]
437
438
response = await v1.connect_get_namespaced_pod_exec(
439
name="my-pod",
440
namespace="default",
441
command=command,
442
stderr=True,
443
stdin=False,
444
stdout=True,
445
tty=False
446
)
447
448
# Monitor upload progress
449
while True:
450
try:
451
channel, data = await response.read_channel(timeout=30)
452
453
if channel == stream.STDOUT_CHANNEL:
454
output = data.decode('utf-8')
455
print(f"Output: {output}")
456
elif channel == stream.STDERR_CHANNEL:
457
error = data.decode('utf-8')
458
print(f"Error: {error}")
459
elif channel == stream.ERROR_CHANNEL:
460
error_info = ws_client.parse_error_data(data)
461
if error_info.get('status') == 'Success':
462
print("File upload completed")
463
break
464
else:
465
print(f"Upload failed: {error_info}")
466
break
467
468
except asyncio.TimeoutError:
469
print("Upload timeout")
470
break
471
472
finally:
473
await response.close()
474
await ws_client.close()
475
476
asyncio.run(upload_file_to_pod())
477
```