Capture C-level stdout/stderr pipes in Python via os.dup2
Low-level Wurlitzer class providing the underlying capture mechanism for C-level output redirection through file descriptor manipulation and background thread processing.
Core implementation class that handles the low-level details of C-level output capture.
class Wurlitzer:
"""Class for Capturing Process-level FD output via dup2
Typically used via `wurlitzer.pipes` context manager function.
Attributes:
flush_interval (float): Time interval for flushing streams (0.2 seconds)
"""
def __init__(self, stdout=None, stderr=None, encoding='utf-8', bufsize=None):
"""Initialize Wurlitzer with output destinations
Args:
stdout: Stream or None - The stream for forwarding stdout
stderr: Stream or None - The stream for forwarding stderr
encoding: str or None - Text encoding for streams (default: 'utf-8')
bufsize: int or None - Pipe buffer size in bytes (default: auto-detected)
"""The Wurlitzer class implements the context manager protocol for safe resource management.
def __enter__(self):
"""Setup capture and return handle tuple"""
def __exit__(self, exc_type, exc_value, traceback):
"""Cleanup and restore original state"""Example usage:
from wurlitzer import Wurlitzer
from io import StringIO
# Direct usage of Wurlitzer class
stdout_capture = StringIO()
stderr_capture = StringIO()
with Wurlitzer(stdout=stdout_capture, stderr=stderr_capture) as (out, err):
# C-level calls here
c_function_with_output()
captured_stdout = stdout_capture.getvalue()
captured_stderr = stderr_capture.getvalue()Internal methods for managing file descriptor redirection and pipe setup.
def _setup_pipe(self, name):
"""Setup pipe for stdout or stderr capture
Args:
name: 'stdout' or 'stderr' - which stream to setup
Returns:
File descriptor for pipe output, or None if direct FD capture
"""
def dup2(a, b, timeout=3):
"""Like os.dup2, but retry on EBUSY with timeout
Args:
a: Source file descriptor
b: Target file descriptor
timeout: Timeout in seconds (default: 3)
Returns:
Result of os.dup2
"""Methods for handling captured data and forwarding to destinations.
def _decode(self, data):
"""Decode data based on encoding setting
Args:
data: Raw bytes from pipe
Returns:
Decoded string if encoding is set, otherwise raw bytes
"""
def _handle_stdout(self, data):
"""Process and forward stdout data to configured destination"""
def _handle_stderr(self, data):
"""Process and forward stderr data to configured destination"""Low-level flushing of both Python and C-level streams for synchronization.
def _flush(self):
"""Flush sys.stdout/stderr and low-level C file descriptors
Ensures all buffered output is written before proceeding.
Flushes both Python sys streams and C-level stdout/stderr pointers.
"""Example of manual flushing:
from wurlitzer import Wurlitzer
w = Wurlitzer(stdout=sys.stdout)
with w:
c_function_call()
w._flush() # Force flush of all streams
more_c_calls()Subclass Wurlitzer for custom output processing:
from wurlitzer import Wurlitzer
import logging
class LoggingWurlitzer(Wurlitzer):
def __init__(self, logger, level=logging.INFO):
self.logger = logger
self.level = level
super().__init__(stdout=None, stderr=None)
def _handle_stdout(self, data):
decoded = self._decode(data)
for line in decoded.splitlines():
if line.strip():
self.logger.log(self.level, f"C-stdout: {line}")
def _handle_stderr(self, data):
decoded = self._decode(data)
for line in decoded.splitlines():
if line.strip():
self.logger.log(logging.ERROR, f"C-stderr: {line}")
# Usage
logger = logging.getLogger("c_output")
with LoggingWurlitzer(logger):
c_function_with_debug_output()Configure buffer sizes for different use cases:
from wurlitzer import Wurlitzer, _get_max_pipe_size
# Get system maximum pipe size
max_size = _get_max_pipe_size()
print(f"System max pipe size: {max_size}")
# High-throughput scenario
with Wurlitzer(stdout=sys.stdout, bufsize=1024*1024): # 1MB
high_volume_c_processing()
# Memory-constrained scenario
with Wurlitzer(stdout=sys.stdout, bufsize=4096): # 4KB
memory_sensitive_c_function()
# Disable buffer size optimization
with Wurlitzer(stdout=sys.stdout, bufsize=0):
c_function_call()Capture to file descriptors directly without pipes:
from wurlitzer import Wurlitzer
import tempfile
# Capture to temporary file
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as tmp:
with Wurlitzer(stdout=tmp, stderr=tmp):
c_function_generating_lots_of_output()
# File is written directly, no intermediate pipes
tmp.seek(0)
all_output = tmp.read()Understanding the background thread behavior:
from wurlitzer import Wurlitzer
import threading
def monitor_wurlitzer_thread():
with Wurlitzer(stdout=sys.stdout) as w:
print(f"Background thread active: {w.thread is not None}")
print(f"Thread daemon status: {w.thread.daemon if w.thread else 'N/A'}")
c_function_call()
if w.thread:
print(f"Thread still alive: {w.thread.is_alive()}")Understanding how pipes are created and managed:
# Pipe creation process (internal):
# 1. Save original file descriptors
# 2. Create OS pipes with os.pipe()
# 3. Set non-blocking mode with fcntl
# 4. Use os.dup2 to redirect stdout/stderr
# 5. Setup background thread with selectors for reading
# 6. Forward data to configured destinationsThe implementation uses selectors for efficient, non-blocking pipe reading:
# Internal selector usage (simplified):
import selectors
poller = selectors.DefaultSelector()
for pipe_fd in pipes:
poller.register(pipe_fd, selectors.EVENT_READ)
while active_pipes:
events = poller.select(timeout=flush_interval)
for selector_key, flags in events:
data = os.read(selector_key.fd, 1024)
if data:
handler(data) # Forward to destinationPlatform-specific optimizations and fallbacks:
# Linux-specific pipe buffer optimization
try:
from fcntl import F_SETPIPE_SZ
fcntl(pipe_fd, F_SETPIPE_SZ, buffer_size)
except (ImportError, OSError):
# Fallback for other platforms
pass
# C stream pointer lookup with fallbacks
try:
c_stdout_p = ctypes.c_void_p.in_dll(libc, 'stdout')
except ValueError:
# macOS-specific names
try:
c_stdout_p = ctypes.c_void_p.in_dll(libc, '__stdoutp')
except ValueError:
# CFFI fallback
c_stdout_p = get_streams_cffi()Robust error handling in the core implementation:
from wurlitzer import Wurlitzer, dup2
import errno
import time
# Retry logic for busy file descriptors
def robust_dup2(source_fd, target_fd, max_retries=30):
for attempt in range(max_retries):
try:
return dup2(source_fd, target_fd, timeout=3)
except OSError as e:
if e.errno == errno.EBUSY and attempt < max_retries - 1:
time.sleep(0.1)
continue
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-wurlitzer