0
# Core Implementation
1
2
Low-level Wurlitzer class providing the underlying capture mechanism for C-level output redirection through file descriptor manipulation and background thread processing.
3
4
## Capabilities
5
6
### Wurlitzer Class
7
8
Core implementation class that handles the low-level details of C-level output capture.
9
10
```python { .api }
11
class Wurlitzer:
12
"""Class for Capturing Process-level FD output via dup2
13
14
Typically used via `wurlitzer.pipes` context manager function.
15
16
Attributes:
17
flush_interval (float): Time interval for flushing streams (0.2 seconds)
18
"""
19
20
def __init__(self, stdout=None, stderr=None, encoding='utf-8', bufsize=None):
21
"""Initialize Wurlitzer with output destinations
22
23
Args:
24
stdout: Stream or None - The stream for forwarding stdout
25
stderr: Stream or None - The stream for forwarding stderr
26
encoding: str or None - Text encoding for streams (default: 'utf-8')
27
bufsize: int or None - Pipe buffer size in bytes (default: auto-detected)
28
"""
29
```
30
31
### Context Manager Protocol
32
33
The Wurlitzer class implements the context manager protocol for safe resource management.
34
35
```python { .api }
36
def __enter__(self):
37
"""Setup capture and return handle tuple"""
38
39
def __exit__(self, exc_type, exc_value, traceback):
40
"""Cleanup and restore original state"""
41
```
42
43
Example usage:
44
45
```python
46
from wurlitzer import Wurlitzer
47
from io import StringIO
48
49
# Direct usage of Wurlitzer class
50
stdout_capture = StringIO()
51
stderr_capture = StringIO()
52
53
with Wurlitzer(stdout=stdout_capture, stderr=stderr_capture) as (out, err):
54
# C-level calls here
55
c_function_with_output()
56
57
captured_stdout = stdout_capture.getvalue()
58
captured_stderr = stderr_capture.getvalue()
59
```
60
61
### File Descriptor Management
62
63
Internal methods for managing file descriptor redirection and pipe setup.
64
65
```python { .api }
66
def _setup_pipe(self, name):
67
"""Setup pipe for stdout or stderr capture
68
69
Args:
70
name: 'stdout' or 'stderr' - which stream to setup
71
72
Returns:
73
File descriptor for pipe output, or None if direct FD capture
74
"""
75
76
def dup2(a, b, timeout=3):
77
"""Like os.dup2, but retry on EBUSY with timeout
78
79
Args:
80
a: Source file descriptor
81
b: Target file descriptor
82
timeout: Timeout in seconds (default: 3)
83
84
Returns:
85
Result of os.dup2
86
"""
87
```
88
89
### Stream Processing
90
91
Methods for handling captured data and forwarding to destinations.
92
93
```python { .api }
94
def _decode(self, data):
95
"""Decode data based on encoding setting
96
97
Args:
98
data: Raw bytes from pipe
99
100
Returns:
101
Decoded string if encoding is set, otherwise raw bytes
102
"""
103
104
def _handle_stdout(self, data):
105
"""Process and forward stdout data to configured destination"""
106
107
def _handle_stderr(self, data):
108
"""Process and forward stderr data to configured destination"""
109
```
110
111
### Flushing and Synchronization
112
113
Low-level flushing of both Python and C-level streams for synchronization.
114
115
```python { .api }
116
def _flush(self):
117
"""Flush sys.stdout/stderr and low-level C file descriptors
118
119
Ensures all buffered output is written before proceeding.
120
Flushes both Python sys streams and C-level stdout/stderr pointers.
121
"""
122
```
123
124
Example of manual flushing:
125
126
```python
127
from wurlitzer import Wurlitzer
128
129
w = Wurlitzer(stdout=sys.stdout)
130
with w:
131
c_function_call()
132
w._flush() # Force flush of all streams
133
more_c_calls()
134
```
135
136
## Advanced Usage Patterns
137
138
### Custom Stream Handling
139
140
Subclass Wurlitzer for custom output processing:
141
142
```python
143
from wurlitzer import Wurlitzer
144
import logging
145
146
class LoggingWurlitzer(Wurlitzer):
147
def __init__(self, logger, level=logging.INFO):
148
self.logger = logger
149
self.level = level
150
super().__init__(stdout=None, stderr=None)
151
152
def _handle_stdout(self, data):
153
decoded = self._decode(data)
154
for line in decoded.splitlines():
155
if line.strip():
156
self.logger.log(self.level, f"C-stdout: {line}")
157
158
def _handle_stderr(self, data):
159
decoded = self._decode(data)
160
for line in decoded.splitlines():
161
if line.strip():
162
self.logger.log(logging.ERROR, f"C-stderr: {line}")
163
164
# Usage
165
logger = logging.getLogger("c_output")
166
with LoggingWurlitzer(logger):
167
c_function_with_debug_output()
168
```
169
170
### Buffer Size Optimization
171
172
Configure buffer sizes for different use cases:
173
174
```python
175
from wurlitzer import Wurlitzer, _get_max_pipe_size
176
177
# Get system maximum pipe size
178
max_size = _get_max_pipe_size()
179
print(f"System max pipe size: {max_size}")
180
181
# High-throughput scenario
182
with Wurlitzer(stdout=sys.stdout, bufsize=1024*1024): # 1MB
183
high_volume_c_processing()
184
185
# Memory-constrained scenario
186
with Wurlitzer(stdout=sys.stdout, bufsize=4096): # 4KB
187
memory_sensitive_c_function()
188
189
# Disable buffer size optimization
190
with Wurlitzer(stdout=sys.stdout, bufsize=0):
191
c_function_call()
192
```
193
194
### Direct File Descriptor Capture
195
196
Capture to file descriptors directly without pipes:
197
198
```python
199
from wurlitzer import Wurlitzer
200
import tempfile
201
202
# Capture to temporary file
203
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as tmp:
204
with Wurlitzer(stdout=tmp, stderr=tmp):
205
c_function_generating_lots_of_output()
206
207
# File is written directly, no intermediate pipes
208
tmp.seek(0)
209
all_output = tmp.read()
210
```
211
212
### Thread Management
213
214
Understanding the background thread behavior:
215
216
```python
217
from wurlitzer import Wurlitzer
218
import threading
219
220
def monitor_wurlitzer_thread():
221
with Wurlitzer(stdout=sys.stdout) as w:
222
print(f"Background thread active: {w.thread is not None}")
223
print(f"Thread daemon status: {w.thread.daemon if w.thread else 'N/A'}")
224
225
c_function_call()
226
227
if w.thread:
228
print(f"Thread still alive: {w.thread.is_alive()}")
229
```
230
231
## Internal Architecture
232
233
### Pipe Management
234
235
Understanding how pipes are created and managed:
236
237
```python
238
# Pipe creation process (internal):
239
# 1. Save original file descriptors
240
# 2. Create OS pipes with os.pipe()
241
# 3. Set non-blocking mode with fcntl
242
# 4. Use os.dup2 to redirect stdout/stderr
243
# 5. Setup background thread with selectors for reading
244
# 6. Forward data to configured destinations
245
```
246
247
### Selector-based I/O
248
249
The implementation uses selectors for efficient, non-blocking pipe reading:
250
251
```python
252
# Internal selector usage (simplified):
253
import selectors
254
255
poller = selectors.DefaultSelector()
256
for pipe_fd in pipes:
257
poller.register(pipe_fd, selectors.EVENT_READ)
258
259
while active_pipes:
260
events = poller.select(timeout=flush_interval)
261
for selector_key, flags in events:
262
data = os.read(selector_key.fd, 1024)
263
if data:
264
handler(data) # Forward to destination
265
```
266
267
### Cross-Platform Compatibility
268
269
Platform-specific optimizations and fallbacks:
270
271
```python
272
# Linux-specific pipe buffer optimization
273
try:
274
from fcntl import F_SETPIPE_SZ
275
fcntl(pipe_fd, F_SETPIPE_SZ, buffer_size)
276
except (ImportError, OSError):
277
# Fallback for other platforms
278
pass
279
280
# C stream pointer lookup with fallbacks
281
try:
282
c_stdout_p = ctypes.c_void_p.in_dll(libc, 'stdout')
283
except ValueError:
284
# macOS-specific names
285
try:
286
c_stdout_p = ctypes.c_void_p.in_dll(libc, '__stdoutp')
287
except ValueError:
288
# CFFI fallback
289
c_stdout_p = get_streams_cffi()
290
```
291
292
### Error Handling and Recovery
293
294
Robust error handling in the core implementation:
295
296
```python
297
from wurlitzer import Wurlitzer, dup2
298
import errno
299
import time
300
301
# Retry logic for busy file descriptors
302
def robust_dup2(source_fd, target_fd, max_retries=30):
303
for attempt in range(max_retries):
304
try:
305
return dup2(source_fd, target_fd, timeout=3)
306
except OSError as e:
307
if e.errno == errno.EBUSY and attempt < max_retries - 1:
308
time.sleep(0.1)
309
continue
310
raise
311
```