0
# I/O and Streaming
1
2
Stream handling for capturing and redirecting stdout/stderr, managing kernel output publishing, and handling interactive input/output. Provides the infrastructure for kernel communication with frontends through various I/O channels.
3
4
## Capabilities
5
6
### Output Stream Classes
7
8
Classes for handling kernel output streams and publishing to frontends.
9
10
```python { .api }
11
class OutStream:
12
"""
13
Text stream for kernel output.
14
15
Captures and redirects output from stdout/stderr to the kernel's
16
IOPub channel for display in Jupyter frontends.
17
"""
18
19
def write(self, string):
20
"""
21
Write string to the output stream.
22
23
Parameters:
24
- string (str): Text to write to output
25
"""
26
27
def writelines(self, sequence):
28
"""
29
Write a sequence of strings to the output stream.
30
31
Parameters:
32
- sequence (iterable): Sequence of strings to write
33
"""
34
35
def flush(self):
36
"""
37
Flush the output stream.
38
39
Forces any buffered output to be sent to the frontend.
40
"""
41
42
def close(self):
43
"""Close the output stream."""
44
45
# Stream attributes
46
name: str # Stream name ('stdout' or 'stderr')
47
session: object # Kernel session for message sending
48
pub_thread: object # Publishing thread reference
49
```
50
51
### IOPub Thread Management
52
53
Thread-based system for managing IOPub socket communication.
54
55
```python { .api }
56
class IOPubThread:
57
"""
58
Thread for IOPub socket handling.
59
60
Manages the background thread responsible for publishing kernel
61
output, execution results, and other messages to frontends.
62
"""
63
64
def start(self):
65
"""Start the IOPub publishing thread."""
66
67
def stop(self):
68
"""Stop the IOPub publishing thread."""
69
70
def schedule(self, f, *args, **kwargs):
71
"""
72
Schedule function execution on IOPub thread.
73
74
Parameters:
75
- f (callable): Function to execute
76
- *args: Positional arguments for function
77
- **kwargs: Keyword arguments for function
78
"""
79
80
def flush(self, timeout=1.0):
81
"""
82
Flush all pending messages.
83
84
Parameters:
85
- timeout (float): Maximum time to wait for flush
86
"""
87
88
# Thread attributes
89
thread: object # Background thread object
90
pub_socket: object # ZMQ publishing socket
91
pipe_in: object # Input pipe for thread communication
92
pipe_out: object # Output pipe for thread communication
93
```
94
95
### Background Socket Wrapper
96
97
Socket wrapper for background operations and message handling.
98
99
```python { .api }
100
class BackgroundSocket:
101
"""
102
Socket wrapper for background operations.
103
104
Provides a socket interface that can handle operations in the
105
background without blocking the main kernel thread.
106
"""
107
108
def send(self, msg, **kwargs):
109
"""
110
Send message through background socket.
111
112
Parameters:
113
- msg: Message to send
114
- **kwargs: Additional send options
115
"""
116
117
def recv(self, **kwargs):
118
"""
119
Receive message from background socket.
120
121
Parameters:
122
- **kwargs: Additional receive options
123
124
Returns:
125
Message received from socket
126
"""
127
128
def close(self):
129
"""Close the background socket."""
130
131
# Socket attributes
132
socket: object # Underlying ZMQ socket
133
io_thread: object # IOPub thread reference
134
```
135
136
## Usage Examples
137
138
### Basic Output Stream Usage
139
140
```python
141
from ipykernel.iostream import OutStream
142
import sys
143
144
# Create output stream (typically done by kernel)
145
# This example shows the concept - actual usage is handled by kernel
146
class MockSession:
147
def send(self, stream, msg_type, content, **kwargs):
148
print(f"[{stream}] {msg_type}: {content}")
149
150
session = MockSession()
151
pub_thread = None # Normally an IOPubThread instance
152
153
# Create output stream
154
stdout_stream = OutStream('stdout', session, pub_thread)
155
156
# Redirect stdout to kernel stream
157
old_stdout = sys.stdout
158
sys.stdout = stdout_stream
159
160
try:
161
# Output will be captured and sent to frontend
162
print("This output goes to the Jupyter frontend")
163
print("Multiple lines are supported")
164
165
# Explicitly flush if needed
166
sys.stdout.flush()
167
168
finally:
169
# Restore original stdout
170
sys.stdout = old_stdout
171
```
172
173
### IOPub Thread Management
174
175
```python
176
from ipykernel.iostream import IOPubThread
177
import zmq
178
import time
179
180
# Create IOPub thread (typically done by kernel application)
181
context = zmq.Context()
182
pub_socket = context.socket(zmq.PUB)
183
pub_socket.bind("tcp://127.0.0.1:*")
184
185
# Create and start IOPub thread
186
iopub_thread = IOPubThread(pub_socket)
187
iopub_thread.start()
188
189
try:
190
# Schedule function to run on IOPub thread
191
def publish_message():
192
print("Publishing message from IOPub thread")
193
return "Message published"
194
195
# Schedule the function
196
iopub_thread.schedule(publish_message)
197
198
# Wait a moment for execution
199
time.sleep(0.1)
200
201
# Flush any pending messages
202
iopub_thread.flush()
203
204
finally:
205
# Stop the thread
206
iopub_thread.stop()
207
pub_socket.close()
208
context.term()
209
```
210
211
### Custom Output Capture
212
213
```python
214
from ipykernel.iostream import OutStream
215
import sys
216
import io
217
218
class CustomOutputCapture:
219
"""Custom output capture for kernel-like behavior."""
220
221
def __init__(self):
222
self.captured_output = []
223
self.mock_session = self
224
225
def send(self, stream, msg_type, content, **kwargs):
226
"""Mock session send method."""
227
self.captured_output.append({
228
'stream': stream,
229
'msg_type': msg_type,
230
'content': content,
231
'timestamp': time.time()
232
})
233
234
def capture_output(self, func, *args, **kwargs):
235
"""Capture output from function execution."""
236
# Create output streams
237
stdout_stream = OutStream('stdout', self.mock_session, None)
238
stderr_stream = OutStream('stderr', self.mock_session, None)
239
240
# Save original streams
241
old_stdout = sys.stdout
242
old_stderr = sys.stderr
243
244
try:
245
# Redirect to capture streams
246
sys.stdout = stdout_stream
247
sys.stderr = stderr_stream
248
249
# Execute function
250
result = func(*args, **kwargs)
251
252
# Flush streams
253
sys.stdout.flush()
254
sys.stderr.flush()
255
256
return result
257
258
finally:
259
# Restore original streams
260
sys.stdout = old_stdout
261
sys.stderr = old_stderr
262
263
def get_captured_output(self):
264
"""Get all captured output."""
265
return self.captured_output.copy()
266
267
def clear_output(self):
268
"""Clear captured output."""
269
self.captured_output.clear()
270
271
# Usage example
272
import time
273
274
capture = CustomOutputCapture()
275
276
def test_function():
277
print("This is stdout output")
278
print("Multiple lines of output", file=sys.stdout)
279
print("This goes to stderr", file=sys.stderr)
280
return "Function completed"
281
282
# Capture output from function
283
result = capture.capture_output(test_function)
284
285
# Review captured output
286
print("Function result:", result)
287
print("\nCaptured output:")
288
for output in capture.get_captured_output():
289
print(f" {output['stream']}: {output['content']['text']}")
290
```
291
292
### Stream Redirection for Debugging
293
294
```python
295
from ipykernel.iostream import OutStream
296
import sys
297
import contextlib
298
299
class DebugOutputManager:
300
"""Manage output streams for debugging purposes."""
301
302
def __init__(self):
303
self.debug_log = []
304
self.session = self
305
306
def send(self, stream, msg_type, content, **kwargs):
307
"""Log all output for debugging."""
308
self.debug_log.append({
309
'stream': stream,
310
'type': msg_type,
311
'content': content,
312
'kwargs': kwargs
313
})
314
315
@contextlib.contextmanager
316
def capture_streams(self):
317
"""Context manager for stream capture."""
318
# Create debug streams
319
stdout_stream = OutStream('stdout', self.session, None)
320
stderr_stream = OutStream('stderr', self.session, None)
321
322
# Save original streams
323
original_stdout = sys.stdout
324
original_stderr = sys.stderr
325
326
try:
327
# Redirect streams
328
sys.stdout = stdout_stream
329
sys.stderr = stderr_stream
330
331
yield self
332
333
finally:
334
# Restore streams
335
sys.stdout = original_stdout
336
sys.stderr = original_stderr
337
338
def print_debug_log(self):
339
"""Print captured debug information."""
340
print("=== Debug Output Log ===")
341
for i, entry in enumerate(self.debug_log):
342
print(f"{i+1}. Stream: {entry['stream']}")
343
print(f" Type: {entry['type']}")
344
print(f" Content: {entry['content']}")
345
print()
346
347
# Usage
348
debug_manager = DebugOutputManager()
349
350
with debug_manager.capture_streams():
351
print("This output will be captured")
352
print("Error message", file=sys.stderr)
353
354
# Simulate some processing
355
for i in range(3):
356
print(f"Processing item {i+1}")
357
358
# Review debug information
359
debug_manager.print_debug_log()
360
```
361
362
### Background Processing with IOPub
363
364
```python
365
from ipykernel.iostream import IOPubThread
366
import zmq
367
import threading
368
import time
369
import queue
370
371
class BackgroundProcessor:
372
"""Process tasks in background with IOPub communication."""
373
374
def __init__(self):
375
# Setup ZMQ for IOPub
376
self.context = zmq.Context()
377
self.pub_socket = self.context.socket(zmq.PUB)
378
self.pub_socket.bind("tcp://127.0.0.1:*")
379
380
# Create IOPub thread
381
self.iopub_thread = IOPubThread(self.pub_socket)
382
self.iopub_thread.start()
383
384
# Task queue
385
self.task_queue = queue.Queue()
386
self.processing = False
387
388
def add_task(self, task_func, *args, **kwargs):
389
"""Add task to processing queue."""
390
self.task_queue.put((task_func, args, kwargs))
391
392
def process_tasks(self):
393
"""Process all queued tasks in background."""
394
self.processing = True
395
396
def worker():
397
while self.processing and not self.task_queue.empty():
398
try:
399
task_func, args, kwargs = self.task_queue.get(timeout=1.0)
400
401
# Schedule task execution on IOPub thread
402
def execute_task():
403
try:
404
result = task_func(*args, **kwargs)
405
print(f"Task completed: {result}")
406
except Exception as e:
407
print(f"Task failed: {e}")
408
409
self.iopub_thread.schedule(execute_task)
410
411
except queue.Empty:
412
break
413
414
# Start worker thread
415
worker_thread = threading.Thread(target=worker)
416
worker_thread.start()
417
418
return worker_thread
419
420
def stop_processing(self):
421
"""Stop background processing."""
422
self.processing = False
423
self.iopub_thread.flush()
424
425
def cleanup(self):
426
"""Cleanup resources."""
427
self.stop_processing()
428
self.iopub_thread.stop()
429
self.pub_socket.close()
430
self.context.term()
431
432
# Usage example
433
def sample_task(name, duration):
434
"""Sample task that takes some time."""
435
print(f"Starting task: {name}")
436
time.sleep(duration)
437
return f"Task {name} completed after {duration}s"
438
439
# Create processor
440
processor = BackgroundProcessor()
441
442
# Add tasks
443
processor.add_task(sample_task, "Task1", 0.5)
444
processor.add_task(sample_task, "Task2", 0.3)
445
processor.add_task(sample_task, "Task3", 0.7)
446
447
# Process tasks
448
worker_thread = processor.process_tasks()
449
450
# Wait for completion
451
worker_thread.join()
452
453
# Cleanup
454
processor.cleanup()
455
```