Python library to use the pseudo-tty of a docker container
—
Classes for handling I/O streams, multiplexing/demultiplexing, and data pumping between file descriptors. This module provides the foundation for dockerpty's non-blocking I/O operations and stream management.
Generic file-like abstraction on top of os.read() and os.write() that adds consistency to reading of sockets and files.
class Stream:
ERRNO_RECOVERABLE = [errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK]
def __init__(self, fd):
"""
Initialize the Stream for the file descriptor fd.
The fd object must have a fileno() method.
Parameters:
- fd: file-like object with fileno() method
"""
def fileno(self):
"""
Return the fileno() of the file descriptor.
Returns:
int - file descriptor number
"""
def set_blocking(self, value):
"""
Set the stream to blocking or non-blocking mode.
Parameters:
- value: bool, True for blocking, False for non-blocking
Returns:
bool - previous blocking state
"""
def read(self, n=4096):
"""
Return n bytes of data from the Stream, or None at end of stream.
Parameters:
- n: int, number of bytes to read (default: 4096)
Returns:
bytes - data read from stream, or None at EOF
"""
def write(self, data):
"""
Write data to the Stream. Not all data may be written right away.
Use select to find when the stream is writeable, and call do_write()
to flush the internal buffer.
Parameters:
- data: bytes, data to write
Returns:
int - length of data or None if no data provided
"""
def do_write(self):
"""
Flushes as much pending data from the internal write buffer as possible.
Returns:
int - number of bytes written
"""
def needs_write(self):
"""
Returns True if the stream has data waiting to be written.
Returns:
bool - True if write buffer has pending data
"""
def close(self):
"""
Close the stream.
The fd is not closed immediately if there's pending write data.
Returns:
None
"""Usage example:
import socket
from dockerpty.io import Stream
# Wrap a socket in a Stream
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
stream = Stream(sock)
# Read data
data = stream.read(1024)
# Write data (may buffer)
stream.write(b'hello world')
stream.do_write() # Flush buffer
# Clean up
stream.close()Wraps a multiplexed Stream to read demultiplexed data. Docker multiplexes streams when there is no PTY attached by sending an 8-byte header followed by data chunks.
class Demuxer:
def __init__(self, stream):
"""
Initialize a new Demuxer reading from stream.
Parameters:
- stream: Stream instance to demultiplex
"""
def fileno(self):
"""
Returns the fileno() of the underlying Stream.
This is useful for select() to work.
Returns:
int - file descriptor number
"""
def set_blocking(self, value):
"""
Set blocking mode on underlying stream.
Parameters:
- value: bool, True for blocking, False for non-blocking
Returns:
bool - previous blocking state
"""
def read(self, n=4096):
"""
Read up to n bytes of data from the Stream, after demuxing.
Less than n bytes may be returned depending on available payload,
but never exceeds n. Because demuxing involves scanning 8-byte headers,
the actual data read from underlying stream may be greater than n.
Parameters:
- n: int, maximum bytes to return (default: 4096)
Returns:
bytes - demultiplexed data, or None at EOF
"""
def write(self, data):
"""
Delegates to the underlying Stream.
Parameters:
- data: bytes, data to write
Returns:
Result from underlying stream write
"""
def needs_write(self):
"""
Delegates to underlying Stream.
Returns:
bool - True if underlying stream needs write
"""
def do_write(self):
"""
Delegates to underlying Stream.
Returns:
Result from underlying stream do_write
"""
def close(self):
"""
Delegates to underlying Stream.
Returns:
Result from underlying stream close
"""Usage example:
from dockerpty.io import Stream, Demuxer
# Assume you have a multiplexed stream from Docker
docker_stream = Stream(socket_from_docker)
demuxer = Demuxer(docker_stream)
# Read demultiplexed data
data = demuxer.read(1024) # Gets clean data without headersStream pump that reads from one stream and writes to another, like a manually managed pipe. Used to facilitate piping data between TTY file descriptors and container PTY descriptors.
class Pump:
def __init__(self, from_stream, to_stream, wait_for_output=True, propagate_close=True):
"""
Initialize a Pump with a Stream to read from and another to write to.
Parameters:
- from_stream: Stream, source stream to read from
- to_stream: Stream, destination stream to write to
- wait_for_output: bool, wait for EOF on from_stream to consider pump done (default: True)
- propagate_close: bool, close to_stream when from_stream reaches EOF (default: True)
"""
def fileno(self):
"""
Returns the fileno() of the reader end of the Pump.
This is useful to allow Pumps to function with select().
Returns:
int - file descriptor number of from_stream
"""
def set_blocking(self, value):
"""
Set blocking mode on the from_stream.
Parameters:
- value: bool, True for blocking, False for non-blocking
Returns:
bool - previous blocking state
"""
def flush(self, n=4096):
"""
Flush n bytes of data from the reader Stream to the writer Stream.
Parameters:
- n: int, maximum bytes to flush (default: 4096)
Returns:
int - number of bytes actually flushed, or None if EOF reached
"""
def is_done(self):
"""
Returns True if the read stream is done (EOF or wait_for_output=False)
and the write side has no pending bytes to send.
Returns:
bool - True if pump is completely finished
"""Usage example:
import sys
from dockerpty.io import Stream, Pump
# Create streams
stdin_stream = Stream(sys.stdin)
container_stream = Stream(container_socket)
# Create pump to send stdin to container
pump = Pump(stdin_stream, container_stream, wait_for_output=False)
# Use with select loop
import select
while not pump.is_done():
ready, _, _ = select.select([pump], [], [], 1.0)
if ready:
pump.flush()Helper functions for stream and I/O management.
def set_blocking(fd, blocking=True):
"""
Set the given file-descriptor blocking or non-blocking.
Parameters:
- fd: file descriptor or file-like object
- blocking: bool, True for blocking, False for non-blocking (default: True)
Returns:
bool - original blocking status
"""
def select(read_streams, write_streams, timeout=0):
"""
Select the streams ready for reading, and streams ready for writing.
Uses select.select() internally but only returns two lists of ready streams.
Handles EINTR interrupts gracefully.
Parameters:
- read_streams: list of streams to check for read readiness
- write_streams: list of streams to check for write readiness
- timeout: float, timeout in seconds (default: 0)
Returns:
tuple - (ready_read_streams, ready_write_streams)
"""Docker multiplexes stdout and stderr streams when no PTY is allocated using this format:
The Demuxer class handles this protocol transparently, allowing you to read clean stream data without dealing with the multiplexing headers.
The Stream class defines ERRNO_RECOVERABLE constants for errors that should be retried:
errno.EINTR: Interrupted system callerrno.EDEADLK: Resource deadlock avoidederrno.EWOULDBLOCK: Operation would blockAll stream operations are designed to work with non-blocking I/O:
read() returns immediately with available data or Nonewrite() buffers data and returns immediatelydo_write() flushes as much buffered data as possibleselect() to determine when streams are ready for I/OInstall with Tessl CLI
npx tessl i tessl/pypi-dockerpty