0
# Stream Management
1
2
Classes for handling I/O streams, multiplexing/demultiplexing, and data pumping between file descriptors. This module provides the foundation for dockerpty's non-blocking I/O operations and stream management.
3
4
## Capabilities
5
6
### Stream Class
7
8
Generic file-like abstraction on top of `os.read()` and `os.write()` that adds consistency to reading of sockets and files.
9
10
```python { .api }
11
class Stream:
12
ERRNO_RECOVERABLE = [errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK]
13
14
def __init__(self, fd):
15
"""
16
Initialize the Stream for the file descriptor fd.
17
18
The fd object must have a fileno() method.
19
20
Parameters:
21
- fd: file-like object with fileno() method
22
"""
23
24
def fileno(self):
25
"""
26
Return the fileno() of the file descriptor.
27
28
Returns:
29
int - file descriptor number
30
"""
31
32
def set_blocking(self, value):
33
"""
34
Set the stream to blocking or non-blocking mode.
35
36
Parameters:
37
- value: bool, True for blocking, False for non-blocking
38
39
Returns:
40
bool - previous blocking state
41
"""
42
43
def read(self, n=4096):
44
"""
45
Return n bytes of data from the Stream, or None at end of stream.
46
47
Parameters:
48
- n: int, number of bytes to read (default: 4096)
49
50
Returns:
51
bytes - data read from stream, or None at EOF
52
"""
53
54
def write(self, data):
55
"""
56
Write data to the Stream. Not all data may be written right away.
57
Use select to find when the stream is writeable, and call do_write()
58
to flush the internal buffer.
59
60
Parameters:
61
- data: bytes, data to write
62
63
Returns:
64
int - length of data or None if no data provided
65
"""
66
67
def do_write(self):
68
"""
69
Flushes as much pending data from the internal write buffer as possible.
70
71
Returns:
72
int - number of bytes written
73
"""
74
75
def needs_write(self):
76
"""
77
Returns True if the stream has data waiting to be written.
78
79
Returns:
80
bool - True if write buffer has pending data
81
"""
82
83
def close(self):
84
"""
85
Close the stream.
86
87
The fd is not closed immediately if there's pending write data.
88
89
Returns:
90
None
91
"""
92
```
93
94
Usage example:
95
96
```python
97
import socket
98
from dockerpty.io import Stream
99
100
# Wrap a socket in a Stream
101
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
102
stream = Stream(sock)
103
104
# Read data
105
data = stream.read(1024)
106
107
# Write data (may buffer)
108
stream.write(b'hello world')
109
stream.do_write() # Flush buffer
110
111
# Clean up
112
stream.close()
113
```
114
115
### Demuxer Class
116
117
Wraps a multiplexed Stream to read demultiplexed data. Docker multiplexes streams when there is no PTY attached by sending an 8-byte header followed by data chunks.
118
119
```python { .api }
120
class Demuxer:
121
def __init__(self, stream):
122
"""
123
Initialize a new Demuxer reading from stream.
124
125
Parameters:
126
- stream: Stream instance to demultiplex
127
"""
128
129
def fileno(self):
130
"""
131
Returns the fileno() of the underlying Stream.
132
133
This is useful for select() to work.
134
135
Returns:
136
int - file descriptor number
137
"""
138
139
def set_blocking(self, value):
140
"""
141
Set blocking mode on underlying stream.
142
143
Parameters:
144
- value: bool, True for blocking, False for non-blocking
145
146
Returns:
147
bool - previous blocking state
148
"""
149
150
def read(self, n=4096):
151
"""
152
Read up to n bytes of data from the Stream, after demuxing.
153
154
Less than n bytes may be returned depending on available payload,
155
but never exceeds n. Because demuxing involves scanning 8-byte headers,
156
the actual data read from underlying stream may be greater than n.
157
158
Parameters:
159
- n: int, maximum bytes to return (default: 4096)
160
161
Returns:
162
bytes - demultiplexed data, or None at EOF
163
"""
164
165
def write(self, data):
166
"""
167
Delegates to the underlying Stream.
168
169
Parameters:
170
- data: bytes, data to write
171
172
Returns:
173
Result from underlying stream write
174
"""
175
176
def needs_write(self):
177
"""
178
Delegates to underlying Stream.
179
180
Returns:
181
bool - True if underlying stream needs write
182
"""
183
184
def do_write(self):
185
"""
186
Delegates to underlying Stream.
187
188
Returns:
189
Result from underlying stream do_write
190
"""
191
192
def close(self):
193
"""
194
Delegates to underlying Stream.
195
196
Returns:
197
Result from underlying stream close
198
"""
199
```
200
201
Usage example:
202
203
```python
204
from dockerpty.io import Stream, Demuxer
205
206
# Assume you have a multiplexed stream from Docker
207
docker_stream = Stream(socket_from_docker)
208
demuxer = Demuxer(docker_stream)
209
210
# Read demultiplexed data
211
data = demuxer.read(1024) # Gets clean data without headers
212
```
213
214
### Pump Class
215
216
Stream pump that reads from one stream and writes to another, like a manually managed pipe. Used to facilitate piping data between TTY file descriptors and container PTY descriptors.
217
218
```python { .api }
219
class Pump:
220
def __init__(self, from_stream, to_stream, wait_for_output=True, propagate_close=True):
221
"""
222
Initialize a Pump with a Stream to read from and another to write to.
223
224
Parameters:
225
- from_stream: Stream, source stream to read from
226
- to_stream: Stream, destination stream to write to
227
- wait_for_output: bool, wait for EOF on from_stream to consider pump done (default: True)
228
- propagate_close: bool, close to_stream when from_stream reaches EOF (default: True)
229
"""
230
231
def fileno(self):
232
"""
233
Returns the fileno() of the reader end of the Pump.
234
235
This is useful to allow Pumps to function with select().
236
237
Returns:
238
int - file descriptor number of from_stream
239
"""
240
241
def set_blocking(self, value):
242
"""
243
Set blocking mode on the from_stream.
244
245
Parameters:
246
- value: bool, True for blocking, False for non-blocking
247
248
Returns:
249
bool - previous blocking state
250
"""
251
252
def flush(self, n=4096):
253
"""
254
Flush n bytes of data from the reader Stream to the writer Stream.
255
256
Parameters:
257
- n: int, maximum bytes to flush (default: 4096)
258
259
Returns:
260
int - number of bytes actually flushed, or None if EOF reached
261
"""
262
263
def is_done(self):
264
"""
265
Returns True if the read stream is done (EOF or wait_for_output=False)
266
and the write side has no pending bytes to send.
267
268
Returns:
269
bool - True if pump is completely finished
270
"""
271
```
272
273
Usage example:
274
275
```python
276
import sys
277
from dockerpty.io import Stream, Pump
278
279
# Create streams
280
stdin_stream = Stream(sys.stdin)
281
container_stream = Stream(container_socket)
282
283
# Create pump to send stdin to container
284
pump = Pump(stdin_stream, container_stream, wait_for_output=False)
285
286
# Use with select loop
287
import select
288
while not pump.is_done():
289
ready, _, _ = select.select([pump], [], [], 1.0)
290
if ready:
291
pump.flush()
292
```
293
294
### Utility Functions
295
296
Helper functions for stream and I/O management.
297
298
```python { .api }
299
def set_blocking(fd, blocking=True):
300
"""
301
Set the given file-descriptor blocking or non-blocking.
302
303
Parameters:
304
- fd: file descriptor or file-like object
305
- blocking: bool, True for blocking, False for non-blocking (default: True)
306
307
Returns:
308
bool - original blocking status
309
"""
310
311
def select(read_streams, write_streams, timeout=0):
312
"""
313
Select the streams ready for reading, and streams ready for writing.
314
315
Uses select.select() internally but only returns two lists of ready streams.
316
Handles EINTR interrupts gracefully.
317
318
Parameters:
319
- read_streams: list of streams to check for read readiness
320
- write_streams: list of streams to check for write readiness
321
- timeout: float, timeout in seconds (default: 0)
322
323
Returns:
324
tuple - (ready_read_streams, ready_write_streams)
325
"""
326
```
327
328
## Docker Stream Multiplexing
329
330
Docker multiplexes stdout and stderr streams when no PTY is allocated using this format:
331
332
1. **8-byte header**: First 4 bytes indicate stream (0x01=stdout, 0x02=stderr), next 4 bytes indicate data length
333
2. **Data payload**: Exactly the number of bytes specified in the header
334
335
The Demuxer class handles this protocol transparently, allowing you to read clean stream data without dealing with the multiplexing headers.
336
337
## Error Handling
338
339
### Recoverable Errors
340
341
The Stream class defines `ERRNO_RECOVERABLE` constants for errors that should be retried:
342
- `errno.EINTR`: Interrupted system call
343
- `errno.EDEADLK`: Resource deadlock avoided
344
- `errno.EWOULDBLOCK`: Operation would block
345
346
### Non-blocking I/O
347
348
All stream operations are designed to work with non-blocking I/O:
349
- `read()` returns immediately with available data or None
350
- `write()` buffers data and returns immediately
351
- `do_write()` flushes as much buffered data as possible
352
- Use `select()` to determine when streams are ready for I/O