Python client library for interacting with Kubernetes clusters through the Kubernetes API
—
Execute commands in running pods, stream container logs, and establish port forwarding connections. Essential for debugging, log analysis, and establishing secure connections to pod services through the Kubernetes API server.
Execute commands inside running containers and stream the results with support for stdin, stdout, stderr, and TTY.
def stream(
ws_client,
channel: str,
*args,
stdin: bool = False,
stdout: bool = True,
stderr: bool = True,
tty: bool = False,
**kwargs
):
"""
Stream command execution or log output from pods.
Parameters:
- ws_client: WebSocket client connection
- channel: Channel type (stdin, stdout, stderr, error, resize)
- *args: Additional arguments
- stdin: Enable stdin stream
- stdout: Enable stdout stream
- stderr: Enable stderr stream
- tty: Allocate TTY
Returns:
Stream object for reading/writing data
"""Establish port forwarding connections to pods for accessing services running inside containers.
def portforward(
api_instance,
name: str,
namespace: str,
ports: list,
**kwargs
):
"""
Create port forwarding connection to a pod.
Parameters:
- api_instance: CoreV1Api instance
- name: Pod name
- namespace: Pod namespace
- ports: List of port mappings
Returns:
PortForward connection object
"""from kubernetes import client, config
from kubernetes.stream import stream
config.load_kube_config()
v1 = client.CoreV1Api()
# Execute a simple command
exec_command = ['/bin/sh', '-c', 'echo "Hello from pod"']
resp = stream(
v1.connect_get_namespaced_pod_exec,
name="my-pod",
namespace="default",
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False
)
print("Command output:")
print(resp)from kubernetes import client, config
from kubernetes.stream import stream
import sys
import select
import termios
import tty
config.load_kube_config()
v1 = client.CoreV1Api()
def interactive_shell(pod_name, namespace="default", container=None):
"""Create interactive shell session with pod."""
# Store original terminal settings
old_tty = termios.tcgetattr(sys.stdin)
try:
# Set terminal to raw mode
tty.setraw(sys.stdin.fileno())
# Start exec with TTY
exec_command = ['/bin/bash']
resp = stream(
v1.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=namespace,
container=container,
command=exec_command,
stderr=True,
stdin=True,
stdout=True,
tty=True,
_preload_content=False
)
# Handle input/output
while resp.is_open():
resp.update(timeout=1)
# Check for input
if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
input_char = sys.stdin.read(1)
if input_char:
resp.write_stdin(input_char)
# Read output
if resp.peek_stdout():
print(resp.read_stdout(), end='')
if resp.peek_stderr():
print(resp.read_stderr(), end='', file=sys.stderr)
resp.close()
finally:
# Restore terminal settings
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
# Start interactive shell
interactive_shell("my-pod", "default")from kubernetes import client, config
from kubernetes.stream import stream
config.load_kube_config()
v1 = client.CoreV1Api()
# Stream logs from a container
def stream_logs(pod_name, namespace="default", container=None, follow=True):
"""Stream logs from a pod container."""
w = stream(
v1.read_namespaced_pod_log,
name=pod_name,
namespace=namespace,
container=container,
follow=follow,
_preload_content=False
)
try:
for line in w.stream():
print(line.decode('utf-8').rstrip())
except KeyboardInterrupt:
w.close()
print("\nLog streaming stopped")
# Stream logs
stream_logs("my-pod", "default", follow=True)from kubernetes import client, config
from kubernetes.stream import portforward
import requests
import threading
import time
config.load_kube_config()
v1 = client.CoreV1Api()
def create_port_forward(pod_name, namespace, local_port, pod_port):
"""Create port forward connection."""
# Start port forwarding in background thread
def port_forward_thread():
try:
pf = portforward(
v1.connect_get_namespaced_pod_portforward,
name=pod_name,
namespace=namespace,
ports=f"{local_port}:{pod_port}"
)
print(f"Port forwarding active: localhost:{local_port} -> {pod_name}:{pod_port}")
# Keep connection alive
while True:
time.sleep(1)
except Exception as e:
print(f"Port forwarding error: {e}")
# Start port forwarding
pf_thread = threading.Thread(target=port_forward_thread, daemon=True)
pf_thread.start()
# Wait for connection to establish
time.sleep(2)
return pf_thread
# Example: Forward local port 8080 to pod port 80
pf_thread = create_port_forward("nginx-pod", "default", 8080, 80)
# Use the forwarded connection
try:
response = requests.get("http://localhost:8080")
print(f"Response: {response.status_code}")
print(response.text[:200])
except requests.RequestException as e:
print(f"Request failed: {e}")from kubernetes import client, config
from kubernetes.stream import stream
import os
import tarfile
import tempfile
config.load_kube_config()
v1 = client.CoreV1Api()
def copy_file_to_pod(pod_name, namespace, local_file, pod_path, container=None):
"""Copy file from local system to pod."""
# Create tar archive of the file
with tempfile.NamedTemporaryFile() as tar_buffer:
with tarfile.open(fileobj=tar_buffer, mode='w') as tar:
tar.add(local_file, arcname=os.path.basename(local_file))
tar_buffer.seek(0)
tar_data = tar_buffer.read()
# Execute tar command to extract file in pod
exec_command = ['tar', 'xf', '-', '-C', os.path.dirname(pod_path)]
resp = stream(
v1.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=namespace,
container=container,
command=exec_command,
stderr=True,
stdin=True,
stdout=True,
tty=False,
_preload_content=False
)
# Send tar data to pod
resp.write_stdin(tar_data)
resp.close()
print(f"File copied to {pod_name}:{pod_path}")
def copy_file_from_pod(pod_name, namespace, pod_path, local_file, container=None):
"""Copy file from pod to local system."""
# Create tar archive of the file in pod
exec_command = ['tar', 'cf', '-', pod_path]
resp = stream(
v1.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=namespace,
container=container,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=False
)
# Read tar data
tar_data = b''
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
tar_data += resp.read_stdout()
# Extract file from tar data
with tempfile.NamedTemporaryFile() as tar_buffer:
tar_buffer.write(tar_data)
tar_buffer.seek(0)
with tarfile.open(fileobj=tar_buffer, mode='r') as tar:
tar.extractall(path=os.path.dirname(local_file))
print(f"File copied from {pod_name}:{pod_path} to {local_file}")
# Example usage
copy_file_to_pod("my-pod", "default", "/local/file.txt", "/tmp/file.txt")
copy_file_from_pod("my-pod", "default", "/tmp/output.txt", "/local/output.txt")from kubernetes import client, config
from kubernetes.stream import stream
import json
import time
config.load_kube_config()
v1 = client.CoreV1Api()
def monitor_pod_resources(pod_name, namespace="default", container=None):
"""Monitor pod resource usage using kubectl top equivalent."""
# Use metrics from /proc or system commands
exec_command = [
'sh', '-c',
'while true; do '
'echo "=== $(date) ==="; '
'cat /proc/meminfo | grep -E "MemTotal|MemAvailable"; '
'cat /proc/loadavg; '
'sleep 5; '
'done'
]
resp = stream(
v1.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=namespace,
container=container,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=False
)
print(f"Monitoring resources for {pod_name}")
try:
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
output = resp.read_stdout()
print(output.decode('utf-8'), end='')
time.sleep(1)
except KeyboardInterrupt:
resp.close()
print("\nMonitoring stopped")
# Monitor pod resources
monitor_pod_resources("my-pod", "default")from kubernetes import client, config
from kubernetes.stream import stream
config.load_kube_config()
v1 = client.CoreV1Api()
def debug_pod_with_ephemeral(pod_name, namespace="default"):
"""Debug pod using ephemeral container (Kubernetes 1.23+)."""
# Get existing pod
pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
# Add ephemeral container for debugging
ephemeral_container = {
"name": "debugger",
"image": "busybox:latest",
"command": ["/bin/sh"],
"stdin": True,
"tty": True,
"targetContainerName": pod.spec.containers[0].name
}
# Update pod with ephemeral container
if not pod.spec.ephemeral_containers:
pod.spec.ephemeral_containers = []
pod.spec.ephemeral_containers.append(ephemeral_container)
# Patch the pod
v1.patch_namespaced_pod(
name=pod_name,
namespace=namespace,
body=pod
)
print(f"Ephemeral container added to {pod_name}")
# Connect to the ephemeral container
resp = stream(
v1.connect_get_namespaced_pod_exec,
name=pod_name,
namespace=namespace,
container="debugger",
command=["/bin/sh"],
stderr=True,
stdin=True,
stdout=True,
tty=True,
_preload_content=False
)
print("Connected to debug container. Type 'exit' to quit.")
# Interactive session with debug container
try:
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
print(resp.read_stdout().decode('utf-8'), end='')
if resp.peek_stderr():
print(resp.read_stderr().decode('utf-8'), end='')
except KeyboardInterrupt:
resp.close()
print("\nDebug session ended")
# Debug pod with ephemeral container
debug_pod_with_ephemeral("problematic-pod", "default")Install with Tessl CLI
npx tessl i tessl/pypi-kubernetes