or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication-framework.mdconnection-management.mdcore-kernel.mddata-utilities.mdgui-integration.mdin-process-kernels.mdindex.mdio-streaming.mdkernel-application.mdkernel-embedding.mdmatplotlib-integration.md

io-streaming.mddocs/

0

# I/O and Streaming

1

2

Stream handling for capturing and redirecting stdout/stderr, managing kernel output publishing, and handling interactive input/output. Provides the infrastructure for kernel communication with frontends through various I/O channels.

3

4

## Capabilities

5

6

### Output Stream Classes

7

8

Classes for handling kernel output streams and publishing to frontends.

9

10

```python { .api }

11

class OutStream:

12

"""

13

Text stream for kernel output.

14

15

Captures and redirects output from stdout/stderr to the kernel's

16

IOPub channel for display in Jupyter frontends.

17

"""

18

19

def write(self, string):

20

"""

21

Write string to the output stream.

22

23

Parameters:

24

- string (str): Text to write to output

25

"""

26

27

def writelines(self, sequence):

28

"""

29

Write a sequence of strings to the output stream.

30

31

Parameters:

32

- sequence (iterable): Sequence of strings to write

33

"""

34

35

def flush(self):

36

"""

37

Flush the output stream.

38

39

Forces any buffered output to be sent to the frontend.

40

"""

41

42

def close(self):

43

"""Close the output stream."""

44

45

# Stream attributes

46

name: str # Stream name ('stdout' or 'stderr')

47

session: object # Kernel session for message sending

48

pub_thread: object # Publishing thread reference

49

```

50

51

### IOPub Thread Management

52

53

Thread-based system for managing IOPub socket communication.

54

55

```python { .api }

56

class IOPubThread:

57

"""

58

Thread for IOPub socket handling.

59

60

Manages the background thread responsible for publishing kernel

61

output, execution results, and other messages to frontends.

62

"""

63

64

def start(self):

65

"""Start the IOPub publishing thread."""

66

67

def stop(self):

68

"""Stop the IOPub publishing thread."""

69

70

def schedule(self, f, *args, **kwargs):

71

"""

72

Schedule function execution on IOPub thread.

73

74

Parameters:

75

- f (callable): Function to execute

76

- *args: Positional arguments for function

77

- **kwargs: Keyword arguments for function

78

"""

79

80

def flush(self, timeout=1.0):

81

"""

82

Flush all pending messages.

83

84

Parameters:

85

- timeout (float): Maximum time to wait for flush

86

"""

87

88

# Thread attributes

89

thread: object # Background thread object

90

pub_socket: object # ZMQ publishing socket

91

pipe_in: object # Input pipe for thread communication

92

pipe_out: object # Output pipe for thread communication

93

```

94

95

### Background Socket Wrapper

96

97

Socket wrapper for background operations and message handling.

98

99

```python { .api }

100

class BackgroundSocket:

101

"""

102

Socket wrapper for background operations.

103

104

Provides a socket interface that can handle operations in the

105

background without blocking the main kernel thread.

106

"""

107

108

def send(self, msg, **kwargs):

109

"""

110

Send message through background socket.

111

112

Parameters:

113

- msg: Message to send

114

- **kwargs: Additional send options

115

"""

116

117

def recv(self, **kwargs):

118

"""

119

Receive message from background socket.

120

121

Parameters:

122

- **kwargs: Additional receive options

123

124

Returns:

125

Message received from socket

126

"""

127

128

def close(self):

129

"""Close the background socket."""

130

131

# Socket attributes

132

socket: object # Underlying ZMQ socket

133

io_thread: object # IOPub thread reference

134

```

135

136

## Usage Examples

137

138

### Basic Output Stream Usage

139

140

```python

141

from ipykernel.iostream import OutStream

142

import sys

143

144

# Create output stream (typically done by kernel)

145

# This example shows the concept - actual usage is handled by kernel

146

class MockSession:

147

def send(self, stream, msg_type, content, **kwargs):

148

print(f"[{stream}] {msg_type}: {content}")

149

150

session = MockSession()

151

pub_thread = None # Normally an IOPubThread instance

152

153

# Create output stream

154

stdout_stream = OutStream('stdout', session, pub_thread)

155

156

# Redirect stdout to kernel stream

157

old_stdout = sys.stdout

158

sys.stdout = stdout_stream

159

160

try:

161

# Output will be captured and sent to frontend

162

print("This output goes to the Jupyter frontend")

163

print("Multiple lines are supported")

164

165

# Explicitly flush if needed

166

sys.stdout.flush()

167

168

finally:

169

# Restore original stdout

170

sys.stdout = old_stdout

171

```

172

173

### IOPub Thread Management

174

175

```python

176

from ipykernel.iostream import IOPubThread

177

import zmq

178

import time

179

180

# Create IOPub thread (typically done by kernel application)

181

context = zmq.Context()

182

pub_socket = context.socket(zmq.PUB)

183

pub_socket.bind("tcp://127.0.0.1:*")

184

185

# Create and start IOPub thread

186

iopub_thread = IOPubThread(pub_socket)

187

iopub_thread.start()

188

189

try:

190

# Schedule function to run on IOPub thread

191

def publish_message():

192

print("Publishing message from IOPub thread")

193

return "Message published"

194

195

# Schedule the function

196

iopub_thread.schedule(publish_message)

197

198

# Wait a moment for execution

199

time.sleep(0.1)

200

201

# Flush any pending messages

202

iopub_thread.flush()

203

204

finally:

205

# Stop the thread

206

iopub_thread.stop()

207

pub_socket.close()

208

context.term()

209

```

210

211

### Custom Output Capture

212

213

```python

214

from ipykernel.iostream import OutStream

215

import sys

216

import io

217

218

class CustomOutputCapture:

219

"""Custom output capture for kernel-like behavior."""

220

221

def __init__(self):

222

self.captured_output = []

223

self.mock_session = self

224

225

def send(self, stream, msg_type, content, **kwargs):

226

"""Mock session send method."""

227

self.captured_output.append({

228

'stream': stream,

229

'msg_type': msg_type,

230

'content': content,

231

'timestamp': time.time()

232

})

233

234

def capture_output(self, func, *args, **kwargs):

235

"""Capture output from function execution."""

236

# Create output streams

237

stdout_stream = OutStream('stdout', self.mock_session, None)

238

stderr_stream = OutStream('stderr', self.mock_session, None)

239

240

# Save original streams

241

old_stdout = sys.stdout

242

old_stderr = sys.stderr

243

244

try:

245

# Redirect to capture streams

246

sys.stdout = stdout_stream

247

sys.stderr = stderr_stream

248

249

# Execute function

250

result = func(*args, **kwargs)

251

252

# Flush streams

253

sys.stdout.flush()

254

sys.stderr.flush()

255

256

return result

257

258

finally:

259

# Restore original streams

260

sys.stdout = old_stdout

261

sys.stderr = old_stderr

262

263

def get_captured_output(self):

264

"""Get all captured output."""

265

return self.captured_output.copy()

266

267

def clear_output(self):

268

"""Clear captured output."""

269

self.captured_output.clear()

270

271

# Usage example

272

import time

273

274

capture = CustomOutputCapture()

275

276

def test_function():

277

print("This is stdout output")

278

print("Multiple lines of output", file=sys.stdout)

279

print("This goes to stderr", file=sys.stderr)

280

return "Function completed"

281

282

# Capture output from function

283

result = capture.capture_output(test_function)

284

285

# Review captured output

286

print("Function result:", result)

287

print("\nCaptured output:")

288

for output in capture.get_captured_output():

289

print(f" {output['stream']}: {output['content']['text']}")

290

```

291

292

### Stream Redirection for Debugging

293

294

```python

295

from ipykernel.iostream import OutStream

296

import sys

297

import contextlib

298

299

class DebugOutputManager:

300

"""Manage output streams for debugging purposes."""

301

302

def __init__(self):

303

self.debug_log = []

304

self.session = self

305

306

def send(self, stream, msg_type, content, **kwargs):

307

"""Log all output for debugging."""

308

self.debug_log.append({

309

'stream': stream,

310

'type': msg_type,

311

'content': content,

312

'kwargs': kwargs

313

})

314

315

@contextlib.contextmanager

316

def capture_streams(self):

317

"""Context manager for stream capture."""

318

# Create debug streams

319

stdout_stream = OutStream('stdout', self.session, None)

320

stderr_stream = OutStream('stderr', self.session, None)

321

322

# Save original streams

323

original_stdout = sys.stdout

324

original_stderr = sys.stderr

325

326

try:

327

# Redirect streams

328

sys.stdout = stdout_stream

329

sys.stderr = stderr_stream

330

331

yield self

332

333

finally:

334

# Restore streams

335

sys.stdout = original_stdout

336

sys.stderr = original_stderr

337

338

def print_debug_log(self):

339

"""Print captured debug information."""

340

print("=== Debug Output Log ===")

341

for i, entry in enumerate(self.debug_log):

342

print(f"{i+1}. Stream: {entry['stream']}")

343

print(f" Type: {entry['type']}")

344

print(f" Content: {entry['content']}")

345

print()

346

347

# Usage

348

debug_manager = DebugOutputManager()

349

350

with debug_manager.capture_streams():

351

print("This output will be captured")

352

print("Error message", file=sys.stderr)

353

354

# Simulate some processing

355

for i in range(3):

356

print(f"Processing item {i+1}")

357

358

# Review debug information

359

debug_manager.print_debug_log()

360

```

361

362

### Background Processing with IOPub

363

364

```python

365

from ipykernel.iostream import IOPubThread

366

import zmq

367

import threading

368

import time

369

import queue

370

371

class BackgroundProcessor:

372

"""Process tasks in background with IOPub communication."""

373

374

def __init__(self):

375

# Setup ZMQ for IOPub

376

self.context = zmq.Context()

377

self.pub_socket = self.context.socket(zmq.PUB)

378

self.pub_socket.bind("tcp://127.0.0.1:*")

379

380

# Create IOPub thread

381

self.iopub_thread = IOPubThread(self.pub_socket)

382

self.iopub_thread.start()

383

384

# Task queue

385

self.task_queue = queue.Queue()

386

self.processing = False

387

388

def add_task(self, task_func, *args, **kwargs):

389

"""Add task to processing queue."""

390

self.task_queue.put((task_func, args, kwargs))

391

392

def process_tasks(self):

393

"""Process all queued tasks in background."""

394

self.processing = True

395

396

def worker():

397

while self.processing and not self.task_queue.empty():

398

try:

399

task_func, args, kwargs = self.task_queue.get(timeout=1.0)

400

401

# Schedule task execution on IOPub thread

402

def execute_task():

403

try:

404

result = task_func(*args, **kwargs)

405

print(f"Task completed: {result}")

406

except Exception as e:

407

print(f"Task failed: {e}")

408

409

self.iopub_thread.schedule(execute_task)

410

411

except queue.Empty:

412

break

413

414

# Start worker thread

415

worker_thread = threading.Thread(target=worker)

416

worker_thread.start()

417

418

return worker_thread

419

420

def stop_processing(self):

421

"""Stop background processing."""

422

self.processing = False

423

self.iopub_thread.flush()

424

425

def cleanup(self):

426

"""Cleanup resources."""

427

self.stop_processing()

428

self.iopub_thread.stop()

429

self.pub_socket.close()

430

self.context.term()

431

432

# Usage example

433

def sample_task(name, duration):

434

"""Sample task that takes some time."""

435

print(f"Starting task: {name}")

436

time.sleep(duration)

437

return f"Task {name} completed after {duration}s"

438

439

# Create processor

440

processor = BackgroundProcessor()

441

442

# Add tasks

443

processor.add_task(sample_task, "Task1", 0.5)

444

processor.add_task(sample_task, "Task2", 0.3)

445

processor.add_task(sample_task, "Task3", 0.7)

446

447

# Process tasks

448

worker_thread = processor.process_tasks()

449

450

# Wait for completion

451

worker_thread.join()

452

453

# Cleanup

454

processor.cleanup()

455

```