CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kubernetes

Python client library for interacting with Kubernetes clusters through the Kubernetes API

Pending
Overview
Eval results
Files

streaming-operations.mddocs/

Streaming Operations

Execute commands in running pods, stream container logs, and establish port forwarding connections. Essential for debugging, log analysis, and establishing secure connections to pod services through the Kubernetes API server.

Capabilities

Command Execution

Execute commands inside running containers and stream the results with support for stdin, stdout, stderr, and TTY.

def stream(
    ws_client,
    channel: str,
    *args,
    stdin: bool = False,
    stdout: bool = True,
    stderr: bool = True,
    tty: bool = False,
    **kwargs
):
    """
    Stream command execution or log output from pods.
    
    Parameters:
    - ws_client: WebSocket client connection
    - channel: Channel type (stdin, stdout, stderr, error, resize)
    - *args: Additional arguments
    - stdin: Enable stdin stream
    - stdout: Enable stdout stream  
    - stderr: Enable stderr stream
    - tty: Allocate TTY
    
    Returns:
    Stream object for reading/writing data
    """

Port Forwarding

Establish port forwarding connections to pods for accessing services running inside containers.

def portforward(
    api_instance,
    name: str,
    namespace: str,
    ports: list,
    **kwargs
):
    """
    Create port forwarding connection to a pod.
    
    Parameters:
    - api_instance: CoreV1Api instance
    - name: Pod name
    - namespace: Pod namespace
    - ports: List of port mappings
    
    Returns:
    PortForward connection object
    """

Usage Examples

Executing Commands in Pods

from kubernetes import client, config
from kubernetes.stream import stream

config.load_kube_config()
v1 = client.CoreV1Api()

# Execute a simple command
exec_command = ['/bin/sh', '-c', 'echo "Hello from pod"']

resp = stream(
    v1.connect_get_namespaced_pod_exec,
    name="my-pod",
    namespace="default",
    command=exec_command,
    stderr=True,
    stdin=False,
    stdout=True,
    tty=False
)

print("Command output:")
print(resp)

Interactive Shell Session

from kubernetes import client, config
from kubernetes.stream import stream
import sys
import select
import termios
import tty

config.load_kube_config()
v1 = client.CoreV1Api()

def interactive_shell(pod_name, namespace="default", container=None):
    """Create interactive shell session with pod."""
    
    # Store original terminal settings
    old_tty = termios.tcgetattr(sys.stdin)
    
    try:
        # Set terminal to raw mode
        tty.setraw(sys.stdin.fileno())
        
        # Start exec with TTY
        exec_command = ['/bin/bash']
        resp = stream(
            v1.connect_get_namespaced_pod_exec,
            name=pod_name,
            namespace=namespace,
            container=container,
            command=exec_command,
            stderr=True,
            stdin=True,
            stdout=True,
            tty=True,
            _preload_content=False
        )
        
        # Handle input/output
        while resp.is_open():
            resp.update(timeout=1)
            
            # Check for input
            if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
                input_char = sys.stdin.read(1)
                if input_char:
                    resp.write_stdin(input_char)
            
            # Read output
            if resp.peek_stdout():
                print(resp.read_stdout(), end='')
            if resp.peek_stderr():
                print(resp.read_stderr(), end='', file=sys.stderr)
                
        resp.close()
        
    finally:
        # Restore terminal settings
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

# Start interactive shell
interactive_shell("my-pod", "default")

Streaming Container Logs

from kubernetes import client, config
from kubernetes.stream import stream

config.load_kube_config()
v1 = client.CoreV1Api()

# Stream logs from a container
def stream_logs(pod_name, namespace="default", container=None, follow=True):
    """Stream logs from a pod container."""
    
    w = stream(
        v1.read_namespaced_pod_log,
        name=pod_name,
        namespace=namespace,
        container=container,
        follow=follow,
        _preload_content=False
    )
    
    try:
        for line in w.stream():
            print(line.decode('utf-8').rstrip())
    except KeyboardInterrupt:
        w.close()
        print("\nLog streaming stopped")

# Stream logs
stream_logs("my-pod", "default", follow=True)

Port Forwarding to Pod

from kubernetes import client, config
from kubernetes.stream import portforward
import requests
import threading
import time

config.load_kube_config()
v1 = client.CoreV1Api()

def create_port_forward(pod_name, namespace, local_port, pod_port):
    """Create port forward connection."""
    
    # Start port forwarding in background thread
    def port_forward_thread():
        try:
            pf = portforward(
                v1.connect_get_namespaced_pod_portforward,
                name=pod_name,
                namespace=namespace,
                ports=f"{local_port}:{pod_port}"
            )
            
            print(f"Port forwarding active: localhost:{local_port} -> {pod_name}:{pod_port}")
            
            # Keep connection alive
            while True:
                time.sleep(1)
                
        except Exception as e:
            print(f"Port forwarding error: {e}")
    
    # Start port forwarding
    pf_thread = threading.Thread(target=port_forward_thread, daemon=True)
    pf_thread.start()
    
    # Wait for connection to establish
    time.sleep(2)
    
    return pf_thread

# Example: Forward local port 8080 to pod port 80
pf_thread = create_port_forward("nginx-pod", "default", 8080, 80)

# Use the forwarded connection
try:
    response = requests.get("http://localhost:8080")
    print(f"Response: {response.status_code}")
    print(response.text[:200])
except requests.RequestException as e:
    print(f"Request failed: {e}")

Copying Files To/From Pods

from kubernetes import client, config
from kubernetes.stream import stream
import os
import tarfile
import tempfile

config.load_kube_config()
v1 = client.CoreV1Api()

def copy_file_to_pod(pod_name, namespace, local_file, pod_path, container=None):
    """Copy file from local system to pod."""
    
    # Create tar archive of the file
    with tempfile.NamedTemporaryFile() as tar_buffer:
        with tarfile.open(fileobj=tar_buffer, mode='w') as tar:
            tar.add(local_file, arcname=os.path.basename(local_file))
        
        tar_buffer.seek(0)
        tar_data = tar_buffer.read()
    
    # Execute tar command to extract file in pod
    exec_command = ['tar', 'xf', '-', '-C', os.path.dirname(pod_path)]
    
    resp = stream(
        v1.connect_get_namespaced_pod_exec,
        name=pod_name,
        namespace=namespace,
        container=container,
        command=exec_command,
        stderr=True,
        stdin=True,
        stdout=True,
        tty=False,
        _preload_content=False
    )
    
    # Send tar data to pod
    resp.write_stdin(tar_data)
    resp.close()
    
    print(f"File copied to {pod_name}:{pod_path}")

def copy_file_from_pod(pod_name, namespace, pod_path, local_file, container=None):
    """Copy file from pod to local system."""
    
    # Create tar archive of the file in pod
    exec_command = ['tar', 'cf', '-', pod_path]
    
    resp = stream(
        v1.connect_get_namespaced_pod_exec,
        name=pod_name,
        namespace=namespace,
        container=container,
        command=exec_command,
        stderr=True,
        stdin=False,
        stdout=True,
        tty=False,
        _preload_content=False
    )
    
    # Read tar data
    tar_data = b''
    while resp.is_open():
        resp.update(timeout=1)
        if resp.peek_stdout():
            tar_data += resp.read_stdout()
    
    # Extract file from tar data
    with tempfile.NamedTemporaryFile() as tar_buffer:
        tar_buffer.write(tar_data)
        tar_buffer.seek(0)
        
        with tarfile.open(fileobj=tar_buffer, mode='r') as tar:
            tar.extractall(path=os.path.dirname(local_file))
    
    print(f"File copied from {pod_name}:{pod_path} to {local_file}")

# Example usage
copy_file_to_pod("my-pod", "default", "/local/file.txt", "/tmp/file.txt")
copy_file_from_pod("my-pod", "default", "/tmp/output.txt", "/local/output.txt")

Monitoring Pod Resource Usage

from kubernetes import client, config
from kubernetes.stream import stream
import json
import time

config.load_kube_config()
v1 = client.CoreV1Api()

def monitor_pod_resources(pod_name, namespace="default", container=None):
    """Monitor pod resource usage using kubectl top equivalent."""
    
    # Use metrics from /proc or system commands
    exec_command = [
        'sh', '-c',
        'while true; do '
        'echo "=== $(date) ==="; '
        'cat /proc/meminfo | grep -E "MemTotal|MemAvailable"; '
        'cat /proc/loadavg; '
        'sleep 5; '
        'done'
    ]
    
    resp = stream(
        v1.connect_get_namespaced_pod_exec,
        name=pod_name,
        namespace=namespace,
        container=container,
        command=exec_command,
        stderr=True,
        stdin=False,
        stdout=True,
        tty=False,
        _preload_content=False
    )
    
    print(f"Monitoring resources for {pod_name}")
    
    try:
        while resp.is_open():
            resp.update(timeout=1)
            if resp.peek_stdout():
                output = resp.read_stdout()
                print(output.decode('utf-8'), end='')
            
            time.sleep(1)
            
    except KeyboardInterrupt:
        resp.close()
        print("\nMonitoring stopped")

# Monitor pod resources
monitor_pod_resources("my-pod", "default")

Debug Pod with Ephemeral Container

from kubernetes import client, config
from kubernetes.stream import stream

config.load_kube_config()
v1 = client.CoreV1Api()

def debug_pod_with_ephemeral(pod_name, namespace="default"):
    """Debug pod using ephemeral container (Kubernetes 1.23+)."""
    
    # Get existing pod
    pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
    
    # Add ephemeral container for debugging
    ephemeral_container = {
        "name": "debugger",
        "image": "busybox:latest",
        "command": ["/bin/sh"],
        "stdin": True,
        "tty": True,
        "targetContainerName": pod.spec.containers[0].name
    }
    
    # Update pod with ephemeral container
    if not pod.spec.ephemeral_containers:
        pod.spec.ephemeral_containers = []
    pod.spec.ephemeral_containers.append(ephemeral_container)
    
    # Patch the pod
    v1.patch_namespaced_pod(
        name=pod_name,
        namespace=namespace,
        body=pod
    )
    
    print(f"Ephemeral container added to {pod_name}")
    
    # Connect to the ephemeral container
    resp = stream(
        v1.connect_get_namespaced_pod_exec,
        name=pod_name,
        namespace=namespace,
        container="debugger",
        command=["/bin/sh"],
        stderr=True,
        stdin=True,
        stdout=True,
        tty=True,
        _preload_content=False
    )
    
    print("Connected to debug container. Type 'exit' to quit.")
    
    # Interactive session with debug container
    try:
        while resp.is_open():
            resp.update(timeout=1)
            
            if resp.peek_stdout():
                print(resp.read_stdout().decode('utf-8'), end='')
            if resp.peek_stderr():
                print(resp.read_stderr().decode('utf-8'), end='')
                
    except KeyboardInterrupt:
        resp.close()
        print("\nDebug session ended")

# Debug pod with ephemeral container
debug_pod_with_ephemeral("problematic-pod", "default")

Install with Tessl CLI

npx tessl i tessl/pypi-kubernetes

docs

application-workloads.md

autoscaling.md

configuration.md

core-resources.md

custom-resources.md

dynamic-client.md

index.md

leader-election.md

networking.md

rbac-security.md

resource-watching.md

storage.md

streaming-operations.md

utilities.md

tile.json