CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kubernetes-asyncio

Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations

Pending
Overview
Eval results
Files

streaming.mddocs/

Streaming

WebSocket-based streaming for exec sessions, port forwarding, and attach operations. Provides real-time bidirectional communication with containers running in Kubernetes pods, enabling interactive terminal sessions and data streaming.

Capabilities

WebSocket API Client

Enhanced API client with WebSocket support for streaming operations.

class WsApiClient(ApiClient):
    def __init__(self, configuration=None, header_name=None, header_value=None, 
                 cookie=None, pool_threads=1, heartbeat=None):
        """
        WebSocket-enabled API client for streaming operations.
        
        Parameters:
        - configuration: Configuration, client configuration
        - header_name: str, custom header name for authentication
        - header_value: str, custom header value for authentication
        - cookie: str, cookie string for authentication
        - pool_threads: int, number of threads for connection pooling
        - heartbeat: int, WebSocket heartbeat interval in seconds
        """
    
    async def request(self, method, url, query_params=None, headers=None, body=None, 
                     post_params=None, _preload_content=True, _request_timeout=None):
        """
        Make WebSocket request for streaming operations.
        
        Parameters:
        - method: str, HTTP method (GET for WebSocket upgrade)
        - url: str, WebSocket URL
        - query_params: dict, query parameters for WebSocket connection
        - headers: dict, additional headers
        - body: bytes, initial data to send
        - post_params: dict, POST parameters (unused for WebSocket)
        - _preload_content: bool, whether to preload response content
        - _request_timeout: int, request timeout in seconds
        
        Returns:
        - WsResponse: WebSocket response wrapper
        """
    
    def parse_error_data(self, data):
        """
        Parse error channel data from WebSocket stream.
        
        Parameters:
        - data: bytes, raw error data from ERROR_CHANNEL
        
        Returns:
        - dict: Parsed error information
        """

WebSocket Response Handling

Response wrapper for WebSocket streaming operations.

class WsResponse:
    def __init__(self, websocket):
        """
        WebSocket response wrapper.
        
        Parameters:
        - websocket: WebSocket connection object
        """
    
    async def read_channel(self, timeout=None):
        """
        Read data from WebSocket with channel information.
        
        Parameters:
        - timeout: int, read timeout in seconds
        
        Returns:
        - tuple: (channel_number, data) where channel indicates data type
        """
    
    async def write_channel(self, channel, data):
        """
        Write data to specific WebSocket channel.
        
        Parameters:
        - channel: int, target channel number
        - data: bytes, data to write
        """
    
    async def close(self):
        """Close WebSocket connection."""

Channel Constants

WebSocket channel identifiers for different data streams in exec/attach operations.

# Standard I/O channels
STDIN_CHANNEL: int = 0   # Standard input to container
STDOUT_CHANNEL: int = 1  # Standard output from container  
STDERR_CHANNEL: int = 2  # Standard error from container
ERROR_CHANNEL: int = 3   # Error information and status
RESIZE_CHANNEL: int = 4  # Terminal resize events

Utility Functions

Helper functions for WebSocket URL handling and connection setup.

def get_websocket_url(url):
    """
    Convert HTTP/HTTPS URL to WebSocket URL.
    
    Parameters:
    - url: str, HTTP or HTTPS URL
    
    Returns:
    - str: Corresponding WebSocket URL (ws:// or wss://)
    """

Usage Examples

Container Exec Session

import asyncio
from kubernetes_asyncio import client, config, stream

async def exec_in_pod():
    await config.load_config()
    
    # Create WebSocket-enabled client
    ws_client = stream.WsApiClient()
    v1 = client.CoreV1Api(ws_client)
    
    try:
        # Execute command in pod
        exec_command = ['/bin/sh', '-c', 'echo "Hello from container"; ls -la']
        
        response = await v1.connect_get_namespaced_pod_exec(
            name="my-pod",
            namespace="default",
            command=exec_command,
            stderr=True,
            stdin=False,
            stdout=True,
            tty=False
        )
        
        # Read output from exec session
        while True:
            try:
                channel, data = await response.read_channel(timeout=10)
                
                if channel == stream.STDOUT_CHANNEL:
                    print(f"STDOUT: {data.decode('utf-8')}", end="")
                elif channel == stream.STDERR_CHANNEL:
                    print(f"STDERR: {data.decode('utf-8')}", end="")
                elif channel == stream.ERROR_CHANNEL:
                    error_info = ws_client.parse_error_data(data)
                    if error_info.get('status') == 'Success':
                        print("Command completed successfully")
                        break
                    else:
                        print(f"Error: {error_info}")
                        break
                        
            except asyncio.TimeoutError:
                print("Exec session timed out")
                break
                
    finally:
        await response.close()
        await ws_client.close()

asyncio.run(exec_in_pod())

Interactive Terminal Session

import sys
import select
import termios
import tty
from kubernetes_asyncio import client, config, stream

async def interactive_shell():
    await config.load_config()
    
    ws_client = stream.WsApiClient()
    v1 = client.CoreV1Api(ws_client)
    
    try:
        # Start interactive shell
        response = await v1.connect_get_namespaced_pod_exec(
            name="my-pod",
            namespace="default", 
            command=['/bin/bash'],
            stderr=True,
            stdin=True,
            stdout=True,
            tty=True
        )
        
        # Set terminal to raw mode for interactive session
        old_settings = termios.tcgetattr(sys.stdin)
        tty.setraw(sys.stdin.fileno())
        
        try:
            # Handle bidirectional communication
            async def read_output():
                while True:
                    try:
                        channel, data = await response.read_channel(timeout=0.1)
                        
                        if channel == stream.STDOUT_CHANNEL:
                            sys.stdout.write(data.decode('utf-8'))
                            sys.stdout.flush()
                        elif channel == stream.STDERR_CHANNEL:
                            sys.stderr.write(data.decode('utf-8'))
                            sys.stderr.flush()
                        elif channel == stream.ERROR_CHANNEL:
                            error_info = ws_client.parse_error_data(data)
                            if error_info.get('status') != 'Success':
                                print(f"\nError: {error_info}")
                            break
                            
                    except asyncio.TimeoutError:
                        continue
            
            async def send_input():
                while True:
                    # Check for input without blocking
                    if select.select([sys.stdin], [], [], 0.1)[0]:
                        char = sys.stdin.read(1)
                        if char:
                            await response.write_channel(stream.STDIN_CHANNEL, char.encode('utf-8'))
                    await asyncio.sleep(0.01)
            
            # Run both input and output handlers concurrently
            await asyncio.gather(read_output(), send_input())
            
        finally:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
            
    finally:
        await response.close()
        await ws_client.close()

# asyncio.run(interactive_shell())  # Uncomment to run

Container Attach

async def attach_to_pod():
    await config.load_config()
    
    ws_client = stream.WsApiClient()
    v1 = client.CoreV1Api(ws_client)
    
    try:
        # Attach to running container
        response = await v1.connect_get_namespaced_pod_attach(
            name="my-pod",
            namespace="default",
            container="my-container",  # Optional: specify container
            stderr=True,
            stdin=True,
            stdout=True,
            tty=True
        )
        
        # Send initial command
        command = "/bin/bash\n"
        await response.write_channel(stream.STDIN_CHANNEL, command.encode('utf-8'))
        
        # Read responses
        timeout_count = 0
        while timeout_count < 5:  # Exit after 5 timeouts
            try:
                channel, data = await response.read_channel(timeout=2)
                
                if channel == stream.STDOUT_CHANNEL:
                    print(f"Output: {data.decode('utf-8')}", end="")
                    timeout_count = 0  # Reset timeout counter
                elif channel == stream.STDERR_CHANNEL:
                    print(f"Error: {data.decode('utf-8')}", end="")
                    timeout_count = 0
                elif channel == stream.ERROR_CHANNEL:
                    error_info = ws_client.parse_error_data(data)
                    print(f"Status: {error_info}")
                    break
                    
            except asyncio.TimeoutError:
                timeout_count += 1
                print("No output received...")
                
    finally:
        await response.close()
        await ws_client.close()

asyncio.run(attach_to_pod())

Port Forwarding

async def port_forward():
    await config.load_config()
    
    ws_client = stream.WsApiClient()
    v1 = client.CoreV1Api(ws_client)
    
    try:
        # Set up port forwarding
        response = await v1.connect_get_namespaced_pod_portforward(
            name="my-pod",
            namespace="default",
            ports="8080"  # Forward port 8080
        )
        
        print("Port forwarding established on port 8080")
        
        # Handle port forwarding data
        while True:
            try:
                channel, data = await response.read_channel(timeout=30)
                
                if channel == stream.STDOUT_CHANNEL:
                    # Handle forwarded data from port 8080
                    print(f"Received {len(data)} bytes from port 8080")
                    # Process or forward data as needed
                elif channel == stream.ERROR_CHANNEL:
                    error_info = ws_client.parse_error_data(data)
                    if error_info.get('status') != 'Success':
                        print(f"Port forward error: {error_info}")
                        break
                        
            except asyncio.TimeoutError:
                print("Port forwarding timeout - connection may be idle")
                continue
                
    finally:
        await response.close()
        await ws_client.close()

# asyncio.run(port_forward())  # Uncomment to run

Terminal Resize Handling

import signal
import struct
import fcntl
import termios

async def exec_with_resize():
    await config.load_config()
    
    ws_client = stream.WsApiClient()
    v1 = client.CoreV1Api(ws_client)
    
    try:
        response = await v1.connect_get_namespaced_pod_exec(
            name="my-pod",
            namespace="default",
            command=['/bin/bash'],
            stderr=True,
            stdin=True,
            stdout=True,
            tty=True
        )
        
        # Get initial terminal size
        def get_terminal_size():
            s = struct.pack('HHHH', 0, 0, 0, 0)
            x = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, s)
            return struct.unpack('HHHH', x)[:2]  # rows, cols
        
        # Send initial terminal size
        rows, cols = get_terminal_size()
        resize_data = f'{{"Width":{cols},"Height":{rows}}}'
        await response.write_channel(stream.RESIZE_CHANNEL, resize_data.encode('utf-8'))
        
        # Handle terminal resize signals
        def handle_resize(signum, frame):
            rows, cols = get_terminal_size()
            resize_data = f'{{"Width":{cols},"Height":{rows}}}'
            
            # Note: In real implementation, you'd need to queue this
            # for the async event loop to process
            print(f"Terminal resized to {cols}x{rows}")
        
        signal.signal(signal.SIGWINCH, handle_resize)
        
        # Continue with normal exec session handling...
        # (Similar to interactive_shell example above)
        
    finally:
        await response.close()
        await ws_client.close()

# asyncio.run(exec_with_resize())  # Uncomment to run

File Upload via Exec

import base64

async def upload_file_to_pod():
    await config.load_config()
    
    ws_client = stream.WsApiClient()
    v1 = client.CoreV1Api(ws_client)
    
    try:
        # Read local file
        with open("/local/path/to/file.txt", "rb") as f:
            file_content = f.read()
        
        # Base64 encode for safe transmission
        encoded_content = base64.b64encode(file_content).decode('utf-8')
        
        # Create command to decode and write file in pod
        command = [
            '/bin/sh', '-c',
            f'echo "{encoded_content}" | base64 -d > /remote/path/to/file.txt && echo "File uploaded successfully"'
        ]
        
        response = await v1.connect_get_namespaced_pod_exec(
            name="my-pod",
            namespace="default",
            command=command,
            stderr=True,
            stdin=False,
            stdout=True,
            tty=False
        )
        
        # Monitor upload progress
        while True:
            try:
                channel, data = await response.read_channel(timeout=30)
                
                if channel == stream.STDOUT_CHANNEL:
                    output = data.decode('utf-8')
                    print(f"Output: {output}")
                elif channel == stream.STDERR_CHANNEL:
                    error = data.decode('utf-8')
                    print(f"Error: {error}")
                elif channel == stream.ERROR_CHANNEL:
                    error_info = ws_client.parse_error_data(data)
                    if error_info.get('status') == 'Success':
                        print("File upload completed")
                        break
                    else:
                        print(f"Upload failed: {error_info}")
                        break
                        
            except asyncio.TimeoutError:
                print("Upload timeout")
                break
                
    finally:
        await response.close()
        await ws_client.close()

asyncio.run(upload_file_to_pod())

Install with Tessl CLI

npx tessl i tessl/pypi-kubernetes-asyncio

docs

client-apis.md

configuration.md

dynamic-client.md

index.md

leader-election.md

streaming.md

utils.md

watch.md

tile.json