CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dockerpty

Python library to use the pseudo-tty of a docker container

Pending
Overview
Eval results
Files

stream-management.mddocs/

Stream Management

Classes for handling I/O streams, multiplexing/demultiplexing, and data pumping between file descriptors. This module provides the foundation for dockerpty's non-blocking I/O operations and stream management.

Capabilities

Stream Class

Generic file-like abstraction on top of os.read() and os.write() that adds consistency to reading of sockets and files.

class Stream:
    ERRNO_RECOVERABLE = [errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK]
    
    def __init__(self, fd):
        """
        Initialize the Stream for the file descriptor fd.
        
        The fd object must have a fileno() method.
        
        Parameters:
        - fd: file-like object with fileno() method
        """
    
    def fileno(self):
        """
        Return the fileno() of the file descriptor.
        
        Returns:
        int - file descriptor number
        """
    
    def set_blocking(self, value):
        """
        Set the stream to blocking or non-blocking mode.
        
        Parameters:
        - value: bool, True for blocking, False for non-blocking
        
        Returns:
        bool - previous blocking state
        """
    
    def read(self, n=4096):
        """
        Return n bytes of data from the Stream, or None at end of stream.
        
        Parameters:
        - n: int, number of bytes to read (default: 4096)
        
        Returns:
        bytes - data read from stream, or None at EOF
        """
    
    def write(self, data):
        """
        Write data to the Stream. Not all data may be written right away.
        Use select to find when the stream is writeable, and call do_write()
        to flush the internal buffer.
        
        Parameters:
        - data: bytes, data to write
        
        Returns:
        int - length of data or None if no data provided
        """
    
    def do_write(self):
        """
        Flushes as much pending data from the internal write buffer as possible.
        
        Returns:
        int - number of bytes written
        """
    
    def needs_write(self):
        """
        Returns True if the stream has data waiting to be written.
        
        Returns:
        bool - True if write buffer has pending data
        """
    
    def close(self):
        """
        Close the stream.
        
        The fd is not closed immediately if there's pending write data.
        
        Returns:
        None
        """

Usage example:

import socket
from dockerpty.io import Stream

# Wrap a socket in a Stream
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
stream = Stream(sock)

# Read data
data = stream.read(1024)

# Write data (may buffer)
stream.write(b'hello world')
stream.do_write()  # Flush buffer

# Clean up
stream.close()

Demuxer Class

Wraps a multiplexed Stream to read demultiplexed data. Docker multiplexes streams when there is no PTY attached by sending an 8-byte header followed by data chunks.

class Demuxer:
    def __init__(self, stream):
        """
        Initialize a new Demuxer reading from stream.
        
        Parameters:
        - stream: Stream instance to demultiplex
        """
    
    def fileno(self):
        """
        Returns the fileno() of the underlying Stream.
        
        This is useful for select() to work.
        
        Returns:
        int - file descriptor number
        """
    
    def set_blocking(self, value):
        """
        Set blocking mode on underlying stream.
        
        Parameters:
        - value: bool, True for blocking, False for non-blocking
        
        Returns:
        bool - previous blocking state
        """
    
    def read(self, n=4096):
        """
        Read up to n bytes of data from the Stream, after demuxing.
        
        Less than n bytes may be returned depending on available payload,
        but never exceeds n. Because demuxing involves scanning 8-byte headers,
        the actual data read from underlying stream may be greater than n.
        
        Parameters:
        - n: int, maximum bytes to return (default: 4096)
        
        Returns:
        bytes - demultiplexed data, or None at EOF
        """
    
    def write(self, data):
        """
        Delegates to the underlying Stream.
        
        Parameters:
        - data: bytes, data to write
        
        Returns:
        Result from underlying stream write
        """
    
    def needs_write(self):
        """
        Delegates to underlying Stream.
        
        Returns:
        bool - True if underlying stream needs write
        """
    
    def do_write(self):
        """
        Delegates to underlying Stream.
        
        Returns:
        Result from underlying stream do_write
        """
    
    def close(self):
        """
        Delegates to underlying Stream.
        
        Returns:
        Result from underlying stream close
        """

Usage example:

from dockerpty.io import Stream, Demuxer

# Assume you have a multiplexed stream from Docker
docker_stream = Stream(socket_from_docker)
demuxer = Demuxer(docker_stream)

# Read demultiplexed data
data = demuxer.read(1024)  # Gets clean data without headers

Pump Class

Stream pump that reads from one stream and writes to another, like a manually managed pipe. Used to facilitate piping data between TTY file descriptors and container PTY descriptors.

class Pump:
    def __init__(self, from_stream, to_stream, wait_for_output=True, propagate_close=True):
        """
        Initialize a Pump with a Stream to read from and another to write to.
        
        Parameters:
        - from_stream: Stream, source stream to read from
        - to_stream: Stream, destination stream to write to
        - wait_for_output: bool, wait for EOF on from_stream to consider pump done (default: True)
        - propagate_close: bool, close to_stream when from_stream reaches EOF (default: True)
        """
    
    def fileno(self):
        """
        Returns the fileno() of the reader end of the Pump.
        
        This is useful to allow Pumps to function with select().
        
        Returns:
        int - file descriptor number of from_stream
        """
    
    def set_blocking(self, value):
        """
        Set blocking mode on the from_stream.
        
        Parameters:
        - value: bool, True for blocking, False for non-blocking
        
        Returns:
        bool - previous blocking state
        """
    
    def flush(self, n=4096):
        """
        Flush n bytes of data from the reader Stream to the writer Stream.
        
        Parameters:
        - n: int, maximum bytes to flush (default: 4096)
        
        Returns:
        int - number of bytes actually flushed, or None if EOF reached
        """
    
    def is_done(self):
        """
        Returns True if the read stream is done (EOF or wait_for_output=False)
        and the write side has no pending bytes to send.
        
        Returns:
        bool - True if pump is completely finished
        """

Usage example:

import sys
from dockerpty.io import Stream, Pump

# Create streams
stdin_stream = Stream(sys.stdin)
container_stream = Stream(container_socket)

# Create pump to send stdin to container
pump = Pump(stdin_stream, container_stream, wait_for_output=False)

# Use with select loop
import select
while not pump.is_done():
    ready, _, _ = select.select([pump], [], [], 1.0)
    if ready:
        pump.flush()

Utility Functions

Helper functions for stream and I/O management.

def set_blocking(fd, blocking=True):
    """
    Set the given file-descriptor blocking or non-blocking.
    
    Parameters:
    - fd: file descriptor or file-like object
    - blocking: bool, True for blocking, False for non-blocking (default: True)
    
    Returns:
    bool - original blocking status
    """

def select(read_streams, write_streams, timeout=0):
    """
    Select the streams ready for reading, and streams ready for writing.
    
    Uses select.select() internally but only returns two lists of ready streams.
    Handles EINTR interrupts gracefully.
    
    Parameters:
    - read_streams: list of streams to check for read readiness
    - write_streams: list of streams to check for write readiness  
    - timeout: float, timeout in seconds (default: 0)
    
    Returns:
    tuple - (ready_read_streams, ready_write_streams)
    """

Docker Stream Multiplexing

Docker multiplexes stdout and stderr streams when no PTY is allocated using this format:

  1. 8-byte header: First 4 bytes indicate stream (0x01=stdout, 0x02=stderr), next 4 bytes indicate data length
  2. Data payload: Exactly the number of bytes specified in the header

The Demuxer class handles this protocol transparently, allowing you to read clean stream data without dealing with the multiplexing headers.

Error Handling

Recoverable Errors

The Stream class defines ERRNO_RECOVERABLE constants for errors that should be retried:

  • errno.EINTR: Interrupted system call
  • errno.EDEADLK: Resource deadlock avoided
  • errno.EWOULDBLOCK: Operation would block

Non-blocking I/O

All stream operations are designed to work with non-blocking I/O:

  • read() returns immediately with available data or None
  • write() buffers data and returns immediately
  • do_write() flushes as much buffered data as possible
  • Use select() to determine when streams are ready for I/O

Install with Tessl CLI

npx tessl i tessl/pypi-dockerpty

docs

container-operations.md

core-pty-management.md

index.md

main-entry-points.md

stream-management.md

terminal-control.md

tile.json