Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations
—
WebSocket-based streaming for exec sessions, port forwarding, and attach operations. Provides real-time bidirectional communication with containers running in Kubernetes pods, enabling interactive terminal sessions and data streaming.
Enhanced API client with WebSocket support for streaming operations.
class WsApiClient(ApiClient):
def __init__(self, configuration=None, header_name=None, header_value=None,
cookie=None, pool_threads=1, heartbeat=None):
"""
WebSocket-enabled API client for streaming operations.
Parameters:
- configuration: Configuration, client configuration
- header_name: str, custom header name for authentication
- header_value: str, custom header value for authentication
- cookie: str, cookie string for authentication
- pool_threads: int, number of threads for connection pooling
- heartbeat: int, WebSocket heartbeat interval in seconds
"""
async def request(self, method, url, query_params=None, headers=None, body=None,
post_params=None, _preload_content=True, _request_timeout=None):
"""
Make WebSocket request for streaming operations.
Parameters:
- method: str, HTTP method (GET for WebSocket upgrade)
- url: str, WebSocket URL
- query_params: dict, query parameters for WebSocket connection
- headers: dict, additional headers
- body: bytes, initial data to send
- post_params: dict, POST parameters (unused for WebSocket)
- _preload_content: bool, whether to preload response content
- _request_timeout: int, request timeout in seconds
Returns:
- WsResponse: WebSocket response wrapper
"""
def parse_error_data(self, data):
"""
Parse error channel data from WebSocket stream.
Parameters:
- data: bytes, raw error data from ERROR_CHANNEL
Returns:
- dict: Parsed error information
"""Response wrapper for WebSocket streaming operations.
class WsResponse:
def __init__(self, websocket):
"""
WebSocket response wrapper.
Parameters:
- websocket: WebSocket connection object
"""
async def read_channel(self, timeout=None):
"""
Read data from WebSocket with channel information.
Parameters:
- timeout: int, read timeout in seconds
Returns:
- tuple: (channel_number, data) where channel indicates data type
"""
async def write_channel(self, channel, data):
"""
Write data to specific WebSocket channel.
Parameters:
- channel: int, target channel number
- data: bytes, data to write
"""
async def close(self):
"""Close WebSocket connection."""WebSocket channel identifiers for different data streams in exec/attach operations.
# Standard I/O channels
STDIN_CHANNEL: int = 0 # Standard input to container
STDOUT_CHANNEL: int = 1 # Standard output from container
STDERR_CHANNEL: int = 2 # Standard error from container
ERROR_CHANNEL: int = 3 # Error information and status
RESIZE_CHANNEL: int = 4 # Terminal resize eventsHelper functions for WebSocket URL handling and connection setup.
def get_websocket_url(url):
"""
Convert HTTP/HTTPS URL to WebSocket URL.
Parameters:
- url: str, HTTP or HTTPS URL
Returns:
- str: Corresponding WebSocket URL (ws:// or wss://)
"""import asyncio
from kubernetes_asyncio import client, config, stream
async def exec_in_pod():
await config.load_config()
# Create WebSocket-enabled client
ws_client = stream.WsApiClient()
v1 = client.CoreV1Api(ws_client)
try:
# Execute command in pod
exec_command = ['/bin/sh', '-c', 'echo "Hello from container"; ls -la']
response = await v1.connect_get_namespaced_pod_exec(
name="my-pod",
namespace="default",
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False
)
# Read output from exec session
while True:
try:
channel, data = await response.read_channel(timeout=10)
if channel == stream.STDOUT_CHANNEL:
print(f"STDOUT: {data.decode('utf-8')}", end="")
elif channel == stream.STDERR_CHANNEL:
print(f"STDERR: {data.decode('utf-8')}", end="")
elif channel == stream.ERROR_CHANNEL:
error_info = ws_client.parse_error_data(data)
if error_info.get('status') == 'Success':
print("Command completed successfully")
break
else:
print(f"Error: {error_info}")
break
except asyncio.TimeoutError:
print("Exec session timed out")
break
finally:
await response.close()
await ws_client.close()
asyncio.run(exec_in_pod())import sys
import select
import termios
import tty
from kubernetes_asyncio import client, config, stream
async def interactive_shell():
await config.load_config()
ws_client = stream.WsApiClient()
v1 = client.CoreV1Api(ws_client)
try:
# Start interactive shell
response = await v1.connect_get_namespaced_pod_exec(
name="my-pod",
namespace="default",
command=['/bin/bash'],
stderr=True,
stdin=True,
stdout=True,
tty=True
)
# Set terminal to raw mode for interactive session
old_settings = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
try:
# Handle bidirectional communication
async def read_output():
while True:
try:
channel, data = await response.read_channel(timeout=0.1)
if channel == stream.STDOUT_CHANNEL:
sys.stdout.write(data.decode('utf-8'))
sys.stdout.flush()
elif channel == stream.STDERR_CHANNEL:
sys.stderr.write(data.decode('utf-8'))
sys.stderr.flush()
elif channel == stream.ERROR_CHANNEL:
error_info = ws_client.parse_error_data(data)
if error_info.get('status') != 'Success':
print(f"\nError: {error_info}")
break
except asyncio.TimeoutError:
continue
async def send_input():
while True:
# Check for input without blocking
if select.select([sys.stdin], [], [], 0.1)[0]:
char = sys.stdin.read(1)
if char:
await response.write_channel(stream.STDIN_CHANNEL, char.encode('utf-8'))
await asyncio.sleep(0.01)
# Run both input and output handlers concurrently
await asyncio.gather(read_output(), send_input())
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
finally:
await response.close()
await ws_client.close()
# asyncio.run(interactive_shell()) # Uncomment to runasync def attach_to_pod():
await config.load_config()
ws_client = stream.WsApiClient()
v1 = client.CoreV1Api(ws_client)
try:
# Attach to running container
response = await v1.connect_get_namespaced_pod_attach(
name="my-pod",
namespace="default",
container="my-container", # Optional: specify container
stderr=True,
stdin=True,
stdout=True,
tty=True
)
# Send initial command
command = "/bin/bash\n"
await response.write_channel(stream.STDIN_CHANNEL, command.encode('utf-8'))
# Read responses
timeout_count = 0
while timeout_count < 5: # Exit after 5 timeouts
try:
channel, data = await response.read_channel(timeout=2)
if channel == stream.STDOUT_CHANNEL:
print(f"Output: {data.decode('utf-8')}", end="")
timeout_count = 0 # Reset timeout counter
elif channel == stream.STDERR_CHANNEL:
print(f"Error: {data.decode('utf-8')}", end="")
timeout_count = 0
elif channel == stream.ERROR_CHANNEL:
error_info = ws_client.parse_error_data(data)
print(f"Status: {error_info}")
break
except asyncio.TimeoutError:
timeout_count += 1
print("No output received...")
finally:
await response.close()
await ws_client.close()
asyncio.run(attach_to_pod())async def port_forward():
await config.load_config()
ws_client = stream.WsApiClient()
v1 = client.CoreV1Api(ws_client)
try:
# Set up port forwarding
response = await v1.connect_get_namespaced_pod_portforward(
name="my-pod",
namespace="default",
ports="8080" # Forward port 8080
)
print("Port forwarding established on port 8080")
# Handle port forwarding data
while True:
try:
channel, data = await response.read_channel(timeout=30)
if channel == stream.STDOUT_CHANNEL:
# Handle forwarded data from port 8080
print(f"Received {len(data)} bytes from port 8080")
# Process or forward data as needed
elif channel == stream.ERROR_CHANNEL:
error_info = ws_client.parse_error_data(data)
if error_info.get('status') != 'Success':
print(f"Port forward error: {error_info}")
break
except asyncio.TimeoutError:
print("Port forwarding timeout - connection may be idle")
continue
finally:
await response.close()
await ws_client.close()
# asyncio.run(port_forward()) # Uncomment to runimport signal
import struct
import fcntl
import termios
async def exec_with_resize():
await config.load_config()
ws_client = stream.WsApiClient()
v1 = client.CoreV1Api(ws_client)
try:
response = await v1.connect_get_namespaced_pod_exec(
name="my-pod",
namespace="default",
command=['/bin/bash'],
stderr=True,
stdin=True,
stdout=True,
tty=True
)
# Get initial terminal size
def get_terminal_size():
s = struct.pack('HHHH', 0, 0, 0, 0)
x = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, s)
return struct.unpack('HHHH', x)[:2] # rows, cols
# Send initial terminal size
rows, cols = get_terminal_size()
resize_data = f'{{"Width":{cols},"Height":{rows}}}'
await response.write_channel(stream.RESIZE_CHANNEL, resize_data.encode('utf-8'))
# Handle terminal resize signals
def handle_resize(signum, frame):
rows, cols = get_terminal_size()
resize_data = f'{{"Width":{cols},"Height":{rows}}}'
# Note: In real implementation, you'd need to queue this
# for the async event loop to process
print(f"Terminal resized to {cols}x{rows}")
signal.signal(signal.SIGWINCH, handle_resize)
# Continue with normal exec session handling...
# (Similar to interactive_shell example above)
finally:
await response.close()
await ws_client.close()
# asyncio.run(exec_with_resize()) # Uncomment to runimport base64
async def upload_file_to_pod():
await config.load_config()
ws_client = stream.WsApiClient()
v1 = client.CoreV1Api(ws_client)
try:
# Read local file
with open("/local/path/to/file.txt", "rb") as f:
file_content = f.read()
# Base64 encode for safe transmission
encoded_content = base64.b64encode(file_content).decode('utf-8')
# Create command to decode and write file in pod
command = [
'/bin/sh', '-c',
f'echo "{encoded_content}" | base64 -d > /remote/path/to/file.txt && echo "File uploaded successfully"'
]
response = await v1.connect_get_namespaced_pod_exec(
name="my-pod",
namespace="default",
command=command,
stderr=True,
stdin=False,
stdout=True,
tty=False
)
# Monitor upload progress
while True:
try:
channel, data = await response.read_channel(timeout=30)
if channel == stream.STDOUT_CHANNEL:
output = data.decode('utf-8')
print(f"Output: {output}")
elif channel == stream.STDERR_CHANNEL:
error = data.decode('utf-8')
print(f"Error: {error}")
elif channel == stream.ERROR_CHANNEL:
error_info = ws_client.parse_error_data(data)
if error_info.get('status') == 'Success':
print("File upload completed")
break
else:
print(f"Upload failed: {error_info}")
break
except asyncio.TimeoutError:
print("Upload timeout")
break
finally:
await response.close()
await ws_client.close()
asyncio.run(upload_file_to_pod())Install with Tessl CLI
npx tessl i tessl/pypi-kubernetes-asyncio