or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

container-operations.mdcore-pty-management.mdindex.mdmain-entry-points.mdstream-management.mdterminal-control.md

stream-management.mddocs/

0

# Stream Management

1

2

Classes for handling I/O streams, multiplexing/demultiplexing, and data pumping between file descriptors. This module provides the foundation for dockerpty's non-blocking I/O operations and stream management.

3

4

## Capabilities

5

6

### Stream Class

7

8

Generic file-like abstraction on top of `os.read()` and `os.write()` that adds consistency to reading of sockets and files.

9

10

```python { .api }

11

class Stream:

12

ERRNO_RECOVERABLE = [errno.EINTR, errno.EDEADLK, errno.EWOULDBLOCK]

13

14

def __init__(self, fd):

15

"""

16

Initialize the Stream for the file descriptor fd.

17

18

The fd object must have a fileno() method.

19

20

Parameters:

21

- fd: file-like object with fileno() method

22

"""

23

24

def fileno(self):

25

"""

26

Return the fileno() of the file descriptor.

27

28

Returns:

29

int - file descriptor number

30

"""

31

32

def set_blocking(self, value):

33

"""

34

Set the stream to blocking or non-blocking mode.

35

36

Parameters:

37

- value: bool, True for blocking, False for non-blocking

38

39

Returns:

40

bool - previous blocking state

41

"""

42

43

def read(self, n=4096):

44

"""

45

Return n bytes of data from the Stream, or None at end of stream.

46

47

Parameters:

48

- n: int, number of bytes to read (default: 4096)

49

50

Returns:

51

bytes - data read from stream, or None at EOF

52

"""

53

54

def write(self, data):

55

"""

56

Write data to the Stream. Not all data may be written right away.

57

Use select to find when the stream is writeable, and call do_write()

58

to flush the internal buffer.

59

60

Parameters:

61

- data: bytes, data to write

62

63

Returns:

64

int - length of data or None if no data provided

65

"""

66

67

def do_write(self):

68

"""

69

Flushes as much pending data from the internal write buffer as possible.

70

71

Returns:

72

int - number of bytes written

73

"""

74

75

def needs_write(self):

76

"""

77

Returns True if the stream has data waiting to be written.

78

79

Returns:

80

bool - True if write buffer has pending data

81

"""

82

83

def close(self):

84

"""

85

Close the stream.

86

87

The fd is not closed immediately if there's pending write data.

88

89

Returns:

90

None

91

"""

92

```

93

94

Usage example:

95

96

```python

97

import socket

98

from dockerpty.io import Stream

99

100

# Wrap a socket in a Stream

101

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

102

stream = Stream(sock)

103

104

# Read data

105

data = stream.read(1024)

106

107

# Write data (may buffer)

108

stream.write(b'hello world')

109

stream.do_write() # Flush buffer

110

111

# Clean up

112

stream.close()

113

```

114

115

### Demuxer Class

116

117

Wraps a multiplexed Stream to read demultiplexed data. Docker multiplexes streams when there is no PTY attached by sending an 8-byte header followed by data chunks.

118

119

```python { .api }

120

class Demuxer:

121

def __init__(self, stream):

122

"""

123

Initialize a new Demuxer reading from stream.

124

125

Parameters:

126

- stream: Stream instance to demultiplex

127

"""

128

129

def fileno(self):

130

"""

131

Returns the fileno() of the underlying Stream.

132

133

This is useful for select() to work.

134

135

Returns:

136

int - file descriptor number

137

"""

138

139

def set_blocking(self, value):

140

"""

141

Set blocking mode on underlying stream.

142

143

Parameters:

144

- value: bool, True for blocking, False for non-blocking

145

146

Returns:

147

bool - previous blocking state

148

"""

149

150

def read(self, n=4096):

151

"""

152

Read up to n bytes of data from the Stream, after demuxing.

153

154

Less than n bytes may be returned depending on available payload,

155

but never exceeds n. Because demuxing involves scanning 8-byte headers,

156

the actual data read from underlying stream may be greater than n.

157

158

Parameters:

159

- n: int, maximum bytes to return (default: 4096)

160

161

Returns:

162

bytes - demultiplexed data, or None at EOF

163

"""

164

165

def write(self, data):

166

"""

167

Delegates to the underlying Stream.

168

169

Parameters:

170

- data: bytes, data to write

171

172

Returns:

173

Result from underlying stream write

174

"""

175

176

def needs_write(self):

177

"""

178

Delegates to underlying Stream.

179

180

Returns:

181

bool - True if underlying stream needs write

182

"""

183

184

def do_write(self):

185

"""

186

Delegates to underlying Stream.

187

188

Returns:

189

Result from underlying stream do_write

190

"""

191

192

def close(self):

193

"""

194

Delegates to underlying Stream.

195

196

Returns:

197

Result from underlying stream close

198

"""

199

```

200

201

Usage example:

202

203

```python

204

from dockerpty.io import Stream, Demuxer

205

206

# Assume you have a multiplexed stream from Docker

207

docker_stream = Stream(socket_from_docker)

208

demuxer = Demuxer(docker_stream)

209

210

# Read demultiplexed data

211

data = demuxer.read(1024) # Gets clean data without headers

212

```

213

214

### Pump Class

215

216

Stream pump that reads from one stream and writes to another, like a manually managed pipe. Used to facilitate piping data between TTY file descriptors and container PTY descriptors.

217

218

```python { .api }

219

class Pump:

220

def __init__(self, from_stream, to_stream, wait_for_output=True, propagate_close=True):

221

"""

222

Initialize a Pump with a Stream to read from and another to write to.

223

224

Parameters:

225

- from_stream: Stream, source stream to read from

226

- to_stream: Stream, destination stream to write to

227

- wait_for_output: bool, wait for EOF on from_stream to consider pump done (default: True)

228

- propagate_close: bool, close to_stream when from_stream reaches EOF (default: True)

229

"""

230

231

def fileno(self):

232

"""

233

Returns the fileno() of the reader end of the Pump.

234

235

This is useful to allow Pumps to function with select().

236

237

Returns:

238

int - file descriptor number of from_stream

239

"""

240

241

def set_blocking(self, value):

242

"""

243

Set blocking mode on the from_stream.

244

245

Parameters:

246

- value: bool, True for blocking, False for non-blocking

247

248

Returns:

249

bool - previous blocking state

250

"""

251

252

def flush(self, n=4096):

253

"""

254

Flush n bytes of data from the reader Stream to the writer Stream.

255

256

Parameters:

257

- n: int, maximum bytes to flush (default: 4096)

258

259

Returns:

260

int - number of bytes actually flushed, or None if EOF reached

261

"""

262

263

def is_done(self):

264

"""

265

Returns True if the read stream is done (EOF or wait_for_output=False)

266

and the write side has no pending bytes to send.

267

268

Returns:

269

bool - True if pump is completely finished

270

"""

271

```

272

273

Usage example:

274

275

```python

276

import sys

277

from dockerpty.io import Stream, Pump

278

279

# Create streams

280

stdin_stream = Stream(sys.stdin)

281

container_stream = Stream(container_socket)

282

283

# Create pump to send stdin to container

284

pump = Pump(stdin_stream, container_stream, wait_for_output=False)

285

286

# Use with select loop

287

import select

288

while not pump.is_done():

289

ready, _, _ = select.select([pump], [], [], 1.0)

290

if ready:

291

pump.flush()

292

```

293

294

### Utility Functions

295

296

Helper functions for stream and I/O management.

297

298

```python { .api }

299

def set_blocking(fd, blocking=True):

300

"""

301

Set the given file-descriptor blocking or non-blocking.

302

303

Parameters:

304

- fd: file descriptor or file-like object

305

- blocking: bool, True for blocking, False for non-blocking (default: True)

306

307

Returns:

308

bool - original blocking status

309

"""

310

311

def select(read_streams, write_streams, timeout=0):

312

"""

313

Select the streams ready for reading, and streams ready for writing.

314

315

Uses select.select() internally but only returns two lists of ready streams.

316

Handles EINTR interrupts gracefully.

317

318

Parameters:

319

- read_streams: list of streams to check for read readiness

320

- write_streams: list of streams to check for write readiness

321

- timeout: float, timeout in seconds (default: 0)

322

323

Returns:

324

tuple - (ready_read_streams, ready_write_streams)

325

"""

326

```

327

328

## Docker Stream Multiplexing

329

330

Docker multiplexes stdout and stderr streams when no PTY is allocated using this format:

331

332

1. **8-byte header**: First 4 bytes indicate stream (0x01=stdout, 0x02=stderr), next 4 bytes indicate data length

333

2. **Data payload**: Exactly the number of bytes specified in the header

334

335

The Demuxer class handles this protocol transparently, allowing you to read clean stream data without dealing with the multiplexing headers.

336

337

## Error Handling

338

339

### Recoverable Errors

340

341

The Stream class defines `ERRNO_RECOVERABLE` constants for errors that should be retried:

342

- `errno.EINTR`: Interrupted system call

343

- `errno.EDEADLK`: Resource deadlock avoided

344

- `errno.EWOULDBLOCK`: Operation would block

345

346

### Non-blocking I/O

347

348

All stream operations are designed to work with non-blocking I/O:

349

- `read()` returns immediately with available data or None

350

- `write()` buffers data and returns immediately

351

- `do_write()` flushes as much buffered data as possible

352

- Use `select()` to determine when streams are ready for I/O