CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ipykernel

IPython Kernel for Jupyter - provides the core communication layer between Jupyter frontends and the Python interpreter

Pending
Overview
Eval results
Files

io-streaming.mddocs/

I/O and Streaming

Stream handling for capturing and redirecting stdout/stderr, managing kernel output publishing, and handling interactive input/output. Provides the infrastructure for kernel communication with frontends through various I/O channels.

Capabilities

Output Stream Classes

Classes for handling kernel output streams and publishing to frontends.

class OutStream:
    """
    Text stream for kernel output.
    
    Captures and redirects output from stdout/stderr to the kernel's
    IOPub channel for display in Jupyter frontends.
    """
    
    def write(self, string):
        """
        Write string to the output stream.
        
        Parameters:
        - string (str): Text to write to output
        """
    
    def writelines(self, sequence):
        """
        Write a sequence of strings to the output stream.
        
        Parameters:
        - sequence (iterable): Sequence of strings to write
        """
    
    def flush(self):
        """
        Flush the output stream.
        
        Forces any buffered output to be sent to the frontend.
        """
    
    def close(self):
        """Close the output stream."""
    
    # Stream attributes
    name: str            # Stream name ('stdout' or 'stderr')
    session: object      # Kernel session for message sending
    pub_thread: object   # Publishing thread reference

IOPub Thread Management

Thread-based system for managing IOPub socket communication.

class IOPubThread:
    """
    Thread for IOPub socket handling.
    
    Manages the background thread responsible for publishing kernel
    output, execution results, and other messages to frontends.
    """
    
    def start(self):
        """Start the IOPub publishing thread."""
    
    def stop(self):
        """Stop the IOPub publishing thread."""
    
    def schedule(self, f, *args, **kwargs):
        """
        Schedule function execution on IOPub thread.
        
        Parameters:
        - f (callable): Function to execute
        - *args: Positional arguments for function
        - **kwargs: Keyword arguments for function
        """
    
    def flush(self, timeout=1.0):
        """
        Flush all pending messages.
        
        Parameters:
        - timeout (float): Maximum time to wait for flush
        """
    
    # Thread attributes
    thread: object       # Background thread object
    pub_socket: object   # ZMQ publishing socket
    pipe_in: object      # Input pipe for thread communication
    pipe_out: object     # Output pipe for thread communication

Background Socket Wrapper

Socket wrapper for background operations and message handling.

class BackgroundSocket:
    """
    Socket wrapper for background operations.
    
    Provides a socket interface that can handle operations in the
    background without blocking the main kernel thread.
    """
    
    def send(self, msg, **kwargs):
        """
        Send message through background socket.
        
        Parameters:
        - msg: Message to send
        - **kwargs: Additional send options
        """
    
    def recv(self, **kwargs):
        """
        Receive message from background socket.
        
        Parameters:
        - **kwargs: Additional receive options
        
        Returns:
        Message received from socket
        """
    
    def close(self):
        """Close the background socket."""
    
    # Socket attributes
    socket: object       # Underlying ZMQ socket
    io_thread: object    # IOPub thread reference

Usage Examples

Basic Output Stream Usage

from ipykernel.iostream import OutStream
import sys

# Create output stream (typically done by kernel)
# This example shows the concept - actual usage is handled by kernel
class MockSession:
    def send(self, stream, msg_type, content, **kwargs):
        print(f"[{stream}] {msg_type}: {content}")

session = MockSession()
pub_thread = None  # Normally an IOPubThread instance

# Create output stream
stdout_stream = OutStream('stdout', session, pub_thread)

# Redirect stdout to kernel stream
old_stdout = sys.stdout
sys.stdout = stdout_stream

try:
    # Output will be captured and sent to frontend
    print("This output goes to the Jupyter frontend")
    print("Multiple lines are supported")
    
    # Explicitly flush if needed
    sys.stdout.flush()
    
finally:
    # Restore original stdout
    sys.stdout = old_stdout

IOPub Thread Management

from ipykernel.iostream import IOPubThread
import zmq
import time

# Create IOPub thread (typically done by kernel application)
context = zmq.Context()
pub_socket = context.socket(zmq.PUB)
pub_socket.bind("tcp://127.0.0.1:*")

# Create and start IOPub thread
iopub_thread = IOPubThread(pub_socket)
iopub_thread.start()

try:
    # Schedule function to run on IOPub thread
    def publish_message():
        print("Publishing message from IOPub thread")
        return "Message published"
    
    # Schedule the function
    iopub_thread.schedule(publish_message)
    
    # Wait a moment for execution
    time.sleep(0.1)
    
    # Flush any pending messages
    iopub_thread.flush()
    
finally:
    # Stop the thread
    iopub_thread.stop()
    pub_socket.close()
    context.term()

Custom Output Capture

from ipykernel.iostream import OutStream
import sys
import io

class CustomOutputCapture:
    """Custom output capture for kernel-like behavior."""
    
    def __init__(self):
        self.captured_output = []
        self.mock_session = self
    
    def send(self, stream, msg_type, content, **kwargs):
        """Mock session send method."""
        self.captured_output.append({
            'stream': stream,
            'msg_type': msg_type,
            'content': content,
            'timestamp': time.time()
        })
    
    def capture_output(self, func, *args, **kwargs):
        """Capture output from function execution."""
        # Create output streams
        stdout_stream = OutStream('stdout', self.mock_session, None)
        stderr_stream = OutStream('stderr', self.mock_session, None)
        
        # Save original streams
        old_stdout = sys.stdout
        old_stderr = sys.stderr
        
        try:
            # Redirect to capture streams
            sys.stdout = stdout_stream
            sys.stderr = stderr_stream
            
            # Execute function
            result = func(*args, **kwargs)
            
            # Flush streams
            sys.stdout.flush()
            sys.stderr.flush()
            
            return result
            
        finally:
            # Restore original streams
            sys.stdout = old_stdout
            sys.stderr = old_stderr
    
    def get_captured_output(self):
        """Get all captured output."""
        return self.captured_output.copy()
    
    def clear_output(self):
        """Clear captured output."""
        self.captured_output.clear()

# Usage example
import time

capture = CustomOutputCapture()

def test_function():
    print("This is stdout output")
    print("Multiple lines of output", file=sys.stdout)
    print("This goes to stderr", file=sys.stderr)
    return "Function completed"

# Capture output from function
result = capture.capture_output(test_function)

# Review captured output
print("Function result:", result)
print("\nCaptured output:")
for output in capture.get_captured_output():
    print(f"  {output['stream']}: {output['content']['text']}")

Stream Redirection for Debugging

from ipykernel.iostream import OutStream
import sys
import contextlib

class DebugOutputManager:
    """Manage output streams for debugging purposes."""
    
    def __init__(self):
        self.debug_log = []
        self.session = self
    
    def send(self, stream, msg_type, content, **kwargs):
        """Log all output for debugging."""
        self.debug_log.append({
            'stream': stream,
            'type': msg_type,
            'content': content,
            'kwargs': kwargs
        })
    
    @contextlib.contextmanager
    def capture_streams(self):
        """Context manager for stream capture."""
        # Create debug streams
        stdout_stream = OutStream('stdout', self.session, None)
        stderr_stream = OutStream('stderr', self.session, None)
        
        # Save original streams
        original_stdout = sys.stdout
        original_stderr = sys.stderr
        
        try:
            # Redirect streams
            sys.stdout = stdout_stream
            sys.stderr = stderr_stream
            
            yield self
            
        finally:
            # Restore streams
            sys.stdout = original_stdout
            sys.stderr = original_stderr
    
    def print_debug_log(self):
        """Print captured debug information."""
        print("=== Debug Output Log ===")
        for i, entry in enumerate(self.debug_log):
            print(f"{i+1}. Stream: {entry['stream']}")
            print(f"   Type: {entry['type']}")
            print(f"   Content: {entry['content']}")
            print()

# Usage
debug_manager = DebugOutputManager()

with debug_manager.capture_streams():
    print("This output will be captured")
    print("Error message", file=sys.stderr)
    
    # Simulate some processing
    for i in range(3):
        print(f"Processing item {i+1}")

# Review debug information
debug_manager.print_debug_log()

Background Processing with IOPub

from ipykernel.iostream import IOPubThread
import zmq
import threading
import time
import queue

class BackgroundProcessor:
    """Process tasks in background with IOPub communication."""
    
    def __init__(self):
        # Setup ZMQ for IOPub
        self.context = zmq.Context()
        self.pub_socket = self.context.socket(zmq.PUB)
        self.pub_socket.bind("tcp://127.0.0.1:*")
        
        # Create IOPub thread
        self.iopub_thread = IOPubThread(self.pub_socket)
        self.iopub_thread.start()
        
        # Task queue
        self.task_queue = queue.Queue()
        self.processing = False
    
    def add_task(self, task_func, *args, **kwargs):
        """Add task to processing queue."""
        self.task_queue.put((task_func, args, kwargs))
    
    def process_tasks(self):
        """Process all queued tasks in background."""
        self.processing = True
        
        def worker():
            while self.processing and not self.task_queue.empty():
                try:
                    task_func, args, kwargs = self.task_queue.get(timeout=1.0)
                    
                    # Schedule task execution on IOPub thread
                    def execute_task():
                        try:
                            result = task_func(*args, **kwargs)
                            print(f"Task completed: {result}")
                        except Exception as e:
                            print(f"Task failed: {e}")
                    
                    self.iopub_thread.schedule(execute_task)
                    
                except queue.Empty:
                    break
        
        # Start worker thread
        worker_thread = threading.Thread(target=worker)
        worker_thread.start()
        
        return worker_thread
    
    def stop_processing(self):
        """Stop background processing."""
        self.processing = False
        self.iopub_thread.flush()
    
    def cleanup(self):
        """Cleanup resources."""
        self.stop_processing()
        self.iopub_thread.stop()
        self.pub_socket.close()
        self.context.term()

# Usage example
def sample_task(name, duration):
    """Sample task that takes some time."""
    print(f"Starting task: {name}")
    time.sleep(duration)
    return f"Task {name} completed after {duration}s"

# Create processor
processor = BackgroundProcessor()

# Add tasks
processor.add_task(sample_task, "Task1", 0.5)
processor.add_task(sample_task, "Task2", 0.3)
processor.add_task(sample_task, "Task3", 0.7)

# Process tasks
worker_thread = processor.process_tasks()

# Wait for completion
worker_thread.join()

# Cleanup
processor.cleanup()

Install with Tessl CLI

npx tessl i tessl/pypi-ipykernel

docs

communication-framework.md

connection-management.md

core-kernel.md

data-utilities.md

gui-integration.md

in-process-kernels.md

index.md

io-streaming.md

kernel-application.md

kernel-embedding.md

matplotlib-integration.md

tile.json