IPython Kernel for Jupyter - provides the core communication layer between Jupyter frontends and the Python interpreter
—
Stream handling for capturing and redirecting stdout/stderr, managing kernel output publishing, and handling interactive input/output. Provides the infrastructure for kernel communication with frontends through various I/O channels.
Classes for handling kernel output streams and publishing to frontends.
class OutStream:
"""
Text stream for kernel output.
Captures and redirects output from stdout/stderr to the kernel's
IOPub channel for display in Jupyter frontends.
"""
def write(self, string):
"""
Write string to the output stream.
Parameters:
- string (str): Text to write to output
"""
def writelines(self, sequence):
"""
Write a sequence of strings to the output stream.
Parameters:
- sequence (iterable): Sequence of strings to write
"""
def flush(self):
"""
Flush the output stream.
Forces any buffered output to be sent to the frontend.
"""
def close(self):
"""Close the output stream."""
# Stream attributes
name: str # Stream name ('stdout' or 'stderr')
session: object # Kernel session for message sending
pub_thread: object # Publishing thread referenceThread-based system for managing IOPub socket communication.
class IOPubThread:
"""
Thread for IOPub socket handling.
Manages the background thread responsible for publishing kernel
output, execution results, and other messages to frontends.
"""
def start(self):
"""Start the IOPub publishing thread."""
def stop(self):
"""Stop the IOPub publishing thread."""
def schedule(self, f, *args, **kwargs):
"""
Schedule function execution on IOPub thread.
Parameters:
- f (callable): Function to execute
- *args: Positional arguments for function
- **kwargs: Keyword arguments for function
"""
def flush(self, timeout=1.0):
"""
Flush all pending messages.
Parameters:
- timeout (float): Maximum time to wait for flush
"""
# Thread attributes
thread: object # Background thread object
pub_socket: object # ZMQ publishing socket
pipe_in: object # Input pipe for thread communication
pipe_out: object # Output pipe for thread communicationSocket wrapper for background operations and message handling.
class BackgroundSocket:
"""
Socket wrapper for background operations.
Provides a socket interface that can handle operations in the
background without blocking the main kernel thread.
"""
def send(self, msg, **kwargs):
"""
Send message through background socket.
Parameters:
- msg: Message to send
- **kwargs: Additional send options
"""
def recv(self, **kwargs):
"""
Receive message from background socket.
Parameters:
- **kwargs: Additional receive options
Returns:
Message received from socket
"""
def close(self):
"""Close the background socket."""
# Socket attributes
socket: object # Underlying ZMQ socket
io_thread: object # IOPub thread referencefrom ipykernel.iostream import OutStream
import sys
# Create output stream (typically done by kernel)
# This example shows the concept - actual usage is handled by kernel
class MockSession:
def send(self, stream, msg_type, content, **kwargs):
print(f"[{stream}] {msg_type}: {content}")
session = MockSession()
pub_thread = None # Normally an IOPubThread instance
# Create output stream
stdout_stream = OutStream('stdout', session, pub_thread)
# Redirect stdout to kernel stream
old_stdout = sys.stdout
sys.stdout = stdout_stream
try:
# Output will be captured and sent to frontend
print("This output goes to the Jupyter frontend")
print("Multiple lines are supported")
# Explicitly flush if needed
sys.stdout.flush()
finally:
# Restore original stdout
sys.stdout = old_stdoutfrom ipykernel.iostream import IOPubThread
import zmq
import time
# Create IOPub thread (typically done by kernel application)
context = zmq.Context()
pub_socket = context.socket(zmq.PUB)
pub_socket.bind("tcp://127.0.0.1:*")
# Create and start IOPub thread
iopub_thread = IOPubThread(pub_socket)
iopub_thread.start()
try:
# Schedule function to run on IOPub thread
def publish_message():
print("Publishing message from IOPub thread")
return "Message published"
# Schedule the function
iopub_thread.schedule(publish_message)
# Wait a moment for execution
time.sleep(0.1)
# Flush any pending messages
iopub_thread.flush()
finally:
# Stop the thread
iopub_thread.stop()
pub_socket.close()
context.term()from ipykernel.iostream import OutStream
import sys
import io
class CustomOutputCapture:
"""Custom output capture for kernel-like behavior."""
def __init__(self):
self.captured_output = []
self.mock_session = self
def send(self, stream, msg_type, content, **kwargs):
"""Mock session send method."""
self.captured_output.append({
'stream': stream,
'msg_type': msg_type,
'content': content,
'timestamp': time.time()
})
def capture_output(self, func, *args, **kwargs):
"""Capture output from function execution."""
# Create output streams
stdout_stream = OutStream('stdout', self.mock_session, None)
stderr_stream = OutStream('stderr', self.mock_session, None)
# Save original streams
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
# Redirect to capture streams
sys.stdout = stdout_stream
sys.stderr = stderr_stream
# Execute function
result = func(*args, **kwargs)
# Flush streams
sys.stdout.flush()
sys.stderr.flush()
return result
finally:
# Restore original streams
sys.stdout = old_stdout
sys.stderr = old_stderr
def get_captured_output(self):
"""Get all captured output."""
return self.captured_output.copy()
def clear_output(self):
"""Clear captured output."""
self.captured_output.clear()
# Usage example
import time
capture = CustomOutputCapture()
def test_function():
print("This is stdout output")
print("Multiple lines of output", file=sys.stdout)
print("This goes to stderr", file=sys.stderr)
return "Function completed"
# Capture output from function
result = capture.capture_output(test_function)
# Review captured output
print("Function result:", result)
print("\nCaptured output:")
for output in capture.get_captured_output():
print(f" {output['stream']}: {output['content']['text']}")from ipykernel.iostream import OutStream
import sys
import contextlib
class DebugOutputManager:
"""Manage output streams for debugging purposes."""
def __init__(self):
self.debug_log = []
self.session = self
def send(self, stream, msg_type, content, **kwargs):
"""Log all output for debugging."""
self.debug_log.append({
'stream': stream,
'type': msg_type,
'content': content,
'kwargs': kwargs
})
@contextlib.contextmanager
def capture_streams(self):
"""Context manager for stream capture."""
# Create debug streams
stdout_stream = OutStream('stdout', self.session, None)
stderr_stream = OutStream('stderr', self.session, None)
# Save original streams
original_stdout = sys.stdout
original_stderr = sys.stderr
try:
# Redirect streams
sys.stdout = stdout_stream
sys.stderr = stderr_stream
yield self
finally:
# Restore streams
sys.stdout = original_stdout
sys.stderr = original_stderr
def print_debug_log(self):
"""Print captured debug information."""
print("=== Debug Output Log ===")
for i, entry in enumerate(self.debug_log):
print(f"{i+1}. Stream: {entry['stream']}")
print(f" Type: {entry['type']}")
print(f" Content: {entry['content']}")
print()
# Usage
debug_manager = DebugOutputManager()
with debug_manager.capture_streams():
print("This output will be captured")
print("Error message", file=sys.stderr)
# Simulate some processing
for i in range(3):
print(f"Processing item {i+1}")
# Review debug information
debug_manager.print_debug_log()from ipykernel.iostream import IOPubThread
import zmq
import threading
import time
import queue
class BackgroundProcessor:
"""Process tasks in background with IOPub communication."""
def __init__(self):
# Setup ZMQ for IOPub
self.context = zmq.Context()
self.pub_socket = self.context.socket(zmq.PUB)
self.pub_socket.bind("tcp://127.0.0.1:*")
# Create IOPub thread
self.iopub_thread = IOPubThread(self.pub_socket)
self.iopub_thread.start()
# Task queue
self.task_queue = queue.Queue()
self.processing = False
def add_task(self, task_func, *args, **kwargs):
"""Add task to processing queue."""
self.task_queue.put((task_func, args, kwargs))
def process_tasks(self):
"""Process all queued tasks in background."""
self.processing = True
def worker():
while self.processing and not self.task_queue.empty():
try:
task_func, args, kwargs = self.task_queue.get(timeout=1.0)
# Schedule task execution on IOPub thread
def execute_task():
try:
result = task_func(*args, **kwargs)
print(f"Task completed: {result}")
except Exception as e:
print(f"Task failed: {e}")
self.iopub_thread.schedule(execute_task)
except queue.Empty:
break
# Start worker thread
worker_thread = threading.Thread(target=worker)
worker_thread.start()
return worker_thread
def stop_processing(self):
"""Stop background processing."""
self.processing = False
self.iopub_thread.flush()
def cleanup(self):
"""Cleanup resources."""
self.stop_processing()
self.iopub_thread.stop()
self.pub_socket.close()
self.context.term()
# Usage example
def sample_task(name, duration):
"""Sample task that takes some time."""
print(f"Starting task: {name}")
time.sleep(duration)
return f"Task {name} completed after {duration}s"
# Create processor
processor = BackgroundProcessor()
# Add tasks
processor.add_task(sample_task, "Task1", 0.5)
processor.add_task(sample_task, "Task2", 0.3)
processor.add_task(sample_task, "Task3", 0.7)
# Process tasks
worker_thread = processor.process_tasks()
# Wait for completion
worker_thread.join()
# Cleanup
processor.cleanup()Install with Tessl CLI
npx tessl i tessl/pypi-ipykernel