CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-wurlitzer

Capture C-level stdout/stderr pipes in Python via os.dup2

Overview
Eval results
Files

core-implementation.mddocs/

Core Implementation

Low-level Wurlitzer class providing the underlying capture mechanism for C-level output redirection through file descriptor manipulation and background thread processing.

Capabilities

Wurlitzer Class

Core implementation class that handles the low-level details of C-level output capture.

class Wurlitzer:
    """Class for Capturing Process-level FD output via dup2
    
    Typically used via `wurlitzer.pipes` context manager function.
    
    Attributes:
        flush_interval (float): Time interval for flushing streams (0.2 seconds)
    """
    
    def __init__(self, stdout=None, stderr=None, encoding='utf-8', bufsize=None):
        """Initialize Wurlitzer with output destinations
        
        Args:
            stdout: Stream or None - The stream for forwarding stdout
            stderr: Stream or None - The stream for forwarding stderr  
            encoding: str or None - Text encoding for streams (default: 'utf-8')
            bufsize: int or None - Pipe buffer size in bytes (default: auto-detected)
        """

Context Manager Protocol

The Wurlitzer class implements the context manager protocol for safe resource management.

def __enter__(self):
    """Setup capture and return handle tuple"""

def __exit__(self, exc_type, exc_value, traceback):
    """Cleanup and restore original state"""

Example usage:

from wurlitzer import Wurlitzer
from io import StringIO

# Direct usage of Wurlitzer class
stdout_capture = StringIO()
stderr_capture = StringIO()

with Wurlitzer(stdout=stdout_capture, stderr=stderr_capture) as (out, err):
    # C-level calls here
    c_function_with_output()

captured_stdout = stdout_capture.getvalue()
captured_stderr = stderr_capture.getvalue()

File Descriptor Management

Internal methods for managing file descriptor redirection and pipe setup.

def _setup_pipe(self, name):
    """Setup pipe for stdout or stderr capture
    
    Args:
        name: 'stdout' or 'stderr' - which stream to setup
        
    Returns:
        File descriptor for pipe output, or None if direct FD capture
    """

def dup2(a, b, timeout=3):
    """Like os.dup2, but retry on EBUSY with timeout
    
    Args:
        a: Source file descriptor
        b: Target file descriptor
        timeout: Timeout in seconds (default: 3)
        
    Returns:
        Result of os.dup2
    """

Stream Processing

Methods for handling captured data and forwarding to destinations.

def _decode(self, data):
    """Decode data based on encoding setting
    
    Args:
        data: Raw bytes from pipe
        
    Returns:
        Decoded string if encoding is set, otherwise raw bytes
    """

def _handle_stdout(self, data):
    """Process and forward stdout data to configured destination"""

def _handle_stderr(self, data):  
    """Process and forward stderr data to configured destination"""

Flushing and Synchronization

Low-level flushing of both Python and C-level streams for synchronization.

def _flush(self):
    """Flush sys.stdout/stderr and low-level C file descriptors
    
    Ensures all buffered output is written before proceeding.
    Flushes both Python sys streams and C-level stdout/stderr pointers.
    """

Example of manual flushing:

from wurlitzer import Wurlitzer

w = Wurlitzer(stdout=sys.stdout)
with w:
    c_function_call()
    w._flush()  # Force flush of all streams
    more_c_calls()

Advanced Usage Patterns

Custom Stream Handling

Subclass Wurlitzer for custom output processing:

from wurlitzer import Wurlitzer
import logging

class LoggingWurlitzer(Wurlitzer):
    def __init__(self, logger, level=logging.INFO):
        self.logger = logger
        self.level = level
        super().__init__(stdout=None, stderr=None)
        
    def _handle_stdout(self, data):
        decoded = self._decode(data)
        for line in decoded.splitlines():
            if line.strip():
                self.logger.log(self.level, f"C-stdout: {line}")
                
    def _handle_stderr(self, data):
        decoded = self._decode(data)
        for line in decoded.splitlines():
            if line.strip():
                self.logger.log(logging.ERROR, f"C-stderr: {line}")

# Usage
logger = logging.getLogger("c_output")
with LoggingWurlitzer(logger):
    c_function_with_debug_output()

Buffer Size Optimization

Configure buffer sizes for different use cases:

from wurlitzer import Wurlitzer, _get_max_pipe_size

# Get system maximum pipe size
max_size = _get_max_pipe_size()
print(f"System max pipe size: {max_size}")

# High-throughput scenario
with Wurlitzer(stdout=sys.stdout, bufsize=1024*1024):  # 1MB
    high_volume_c_processing()

# Memory-constrained scenario  
with Wurlitzer(stdout=sys.stdout, bufsize=4096):  # 4KB
    memory_sensitive_c_function()

# Disable buffer size optimization
with Wurlitzer(stdout=sys.stdout, bufsize=0):
    c_function_call()

Direct File Descriptor Capture

Capture to file descriptors directly without pipes:

from wurlitzer import Wurlitzer
import tempfile

# Capture to temporary file
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as tmp:
    with Wurlitzer(stdout=tmp, stderr=tmp):
        c_function_generating_lots_of_output()
        
    # File is written directly, no intermediate pipes
    tmp.seek(0)
    all_output = tmp.read()

Thread Management

Understanding the background thread behavior:

from wurlitzer import Wurlitzer
import threading

def monitor_wurlitzer_thread():
    with Wurlitzer(stdout=sys.stdout) as w:
        print(f"Background thread active: {w.thread is not None}")
        print(f"Thread daemon status: {w.thread.daemon if w.thread else 'N/A'}")
        
        c_function_call()
        
        if w.thread:
            print(f"Thread still alive: {w.thread.is_alive()}")

Internal Architecture

Pipe Management

Understanding how pipes are created and managed:

# Pipe creation process (internal):
# 1. Save original file descriptors
# 2. Create OS pipes with os.pipe()
# 3. Set non-blocking mode with fcntl
# 4. Use os.dup2 to redirect stdout/stderr
# 5. Setup background thread with selectors for reading
# 6. Forward data to configured destinations

Selector-based I/O

The implementation uses selectors for efficient, non-blocking pipe reading:

# Internal selector usage (simplified):
import selectors

poller = selectors.DefaultSelector()
for pipe_fd in pipes:
    poller.register(pipe_fd, selectors.EVENT_READ)

while active_pipes:
    events = poller.select(timeout=flush_interval)
    for selector_key, flags in events:
        data = os.read(selector_key.fd, 1024)
        if data:
            handler(data)  # Forward to destination

Cross-Platform Compatibility

Platform-specific optimizations and fallbacks:

# Linux-specific pipe buffer optimization
try:
    from fcntl import F_SETPIPE_SZ
    fcntl(pipe_fd, F_SETPIPE_SZ, buffer_size)
except (ImportError, OSError):
    # Fallback for other platforms
    pass

# C stream pointer lookup with fallbacks
try:
    c_stdout_p = ctypes.c_void_p.in_dll(libc, 'stdout')
except ValueError:
    # macOS-specific names
    try:
        c_stdout_p = ctypes.c_void_p.in_dll(libc, '__stdoutp')
    except ValueError:
        # CFFI fallback
        c_stdout_p = get_streams_cffi()

Error Handling and Recovery

Robust error handling in the core implementation:

from wurlitzer import Wurlitzer, dup2
import errno
import time

# Retry logic for busy file descriptors
def robust_dup2(source_fd, target_fd, max_retries=30):
    for attempt in range(max_retries):
        try:
            return dup2(source_fd, target_fd, timeout=3)
        except OSError as e:
            if e.errno == errno.EBUSY and attempt < max_retries - 1:
                time.sleep(0.1)
                continue
            raise

Install with Tessl CLI

npx tessl i tessl/pypi-wurlitzer@3.1.1

docs

core-implementation.md

index.md

ipython-integration.md

output-capture.md

permanent-redirection.md

system-integration.md

tile.json