CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-jupyter-client

Jupyter protocol implementation and client libraries for kernel communication and management

Pending
Overview
Eval results
Files

client-communication.mddocs/

Client Communication

Kernel communication clients for executing code, sending messages, and receiving results from Jupyter kernels. Provides both blocking and asynchronous interfaces for different usage patterns and integration scenarios.

Capabilities

Base Kernel Client

The KernelClient class provides the foundational API for communicating with kernels via ZMQ channels, including code execution, completion, inspection, and other kernel services.

class KernelClient:
    """Base class for communicating with a kernel via ZMQ channels."""
    
    def execute(self, code, silent=False, store_history=True, 
                user_expressions=None, allow_stdin=None, stop_on_error=True):
        """
        Execute code in the kernel.
        
        Parameters:
        - code (str): Code to execute
        - silent (bool): Don't display output if True
        - store_history (bool): Store in execution history if True
        - user_expressions (dict): Expressions to evaluate
        - allow_stdin (bool): Allow stdin requests from kernel
        - stop_on_error (bool): Stop execution on first error
        
        Returns:
        str: Message ID for tracking the request
        """
    
    def complete(self, code, cursor_pos=None):
        """
        Request tab completion from kernel.
        
        Parameters:
        - code (str): Code to complete
        - cursor_pos (int): Cursor position in code
        
        Returns:
        str: Message ID for tracking the request
        """
    
    def inspect(self, code, cursor_pos=None, detail_level=0):
        """
        Request object inspection from kernel.
        
        Parameters:
        - code (str): Code containing object to inspect
        - cursor_pos (int): Cursor position in code
        - detail_level (int): Level of detail (0 or 1)
        
        Returns:
        str: Message ID for tracking the request
        """
    
    def history(self, raw=True, output=False, hist_access_type='range', 
                session=None, start=None, stop=None, n=None, 
                pattern=None, unique=False):
        """
        Request execution history from kernel.
        
        Parameters:
        - raw (bool): Return raw input if True
        - output (bool): Include output if True
        - hist_access_type (str): 'range', 'tail', or 'search'
        - session (int): Session number
        - start (int): Start of range
        - stop (int): End of range
        - n (int): Number of entries for 'tail'
        - pattern (str): Search pattern for 'search'
        - unique (bool): Return unique entries only
        
        Returns:
        str: Message ID for tracking the request
        """
    
    def kernel_info(self):
        """
        Request kernel information.
        
        Returns:
        str: Message ID for tracking the request
        """
    
    def comm_info(self, target_name=None):
        """
        Request information about active comms.
        
        Parameters:
        - target_name (str): Specific comm target to query
        
        Returns:
        str: Message ID for tracking the request
        """
    
    def is_complete(self, code):
        """
        Check if code is complete and ready to execute.
        
        Parameters:
        - code (str): Code to check
        
        Returns:
        str: Message ID for tracking the request
        """
    
    def shutdown(self, restart=False):
        """
        Request kernel shutdown.
        
        Parameters:
        - restart (bool): Restart after shutdown if True
        
        Returns:
        str: Message ID for tracking the request
        """
    
    def input(self, string):
        """
        Send input to kernel in response to input_request.
        
        Parameters:
        - string (str): Input string to send
        
        Returns:
        None
        """
    
    def start_channels(self, shell=True, iopub=True, stdin=True, 
                      hb=True, control=True):
        """
        Start the kernel communication channels.
        
        Parameters:
        - shell (bool): Start shell channel
        - iopub (bool): Start iopub channel
        - stdin (bool): Start stdin channel
        - hb (bool): Start heartbeat channel
        - control (bool): Start control channel
        
        Returns:
        None
        """
    
    def stop_channels(self):
        """
        Stop all communication channels.
        
        Returns:
        None
        """

    @property
    def channels_running(self):
        """Boolean indicating if channels are active."""
    
    @property
    def shell_channel(self):
        """Shell channel for execute requests."""
    
    @property
    def iopub_channel(self):
        """IOPub channel for kernel output."""
    
    @property
    def stdin_channel(self):
        """Stdin channel for input requests."""
    
    @property
    def hb_channel(self):
        """Heartbeat channel for kernel monitoring."""
    
    @property
    def control_channel(self):
        """Control channel for kernel management."""

Blocking Kernel Client

The BlockingKernelClient provides synchronous message handling with blocking methods for receiving kernel responses.

class BlockingKernelClient(KernelClient):
    """Synchronous kernel client with blocking message methods."""
    
    def get_shell_msg(self, block=True, timeout=None):
        """
        Get a message from the shell channel (blocking).
        
        Parameters:
        - block (bool): Block until message received
        - timeout (float): Timeout in seconds
        
        Returns:
        dict: Jupyter protocol message
        """
    
    def get_iopub_msg(self, block=True, timeout=None):
        """
        Get a message from the iopub channel (blocking).
        
        Parameters:
        - block (bool): Block until message received
        - timeout (float): Timeout in seconds
        
        Returns:
        dict: Jupyter protocol message
        """
    
    def get_stdin_msg(self, block=True, timeout=None):
        """
        Get a message from the stdin channel (blocking).
        
        Parameters:
        - block (bool): Block until message received
        - timeout (float): Timeout in seconds
        
        Returns:
        dict: Jupyter protocol message
        """
    
    def get_control_msg(self, block=True, timeout=None):
        """
        Get a message from the control channel (blocking).
        
        Parameters:
        - block (bool): Block until message received
        - timeout (float): Timeout in seconds
        
        Returns:
        dict: Jupyter protocol message
        """
    
    def wait_for_ready(self, timeout=None):
        """
        Wait for kernel to be ready for communication (blocking).
        
        Parameters:
        - timeout (float): Timeout in seconds
        
        Returns:
        None
        """
    
    def is_alive(self):
        """
        Check if kernel is alive (blocking).
        
        Returns:
        bool: True if kernel is responsive
        """
    
    def execute_interactive(self, code, silent=False, store_history=True,
                           user_expressions=None, allow_stdin=None, 
                           stop_on_error=True, timeout=None, 
                           output_hook=None, stdin_hook=None):
        """
        Execute code with interactive output handling (blocking).
        
        Parameters:
        - code (str): Code to execute
        - silent (bool): Don't display output if True
        - store_history (bool): Store in history if True
        - user_expressions (dict): Expressions to evaluate
        - allow_stdin (bool): Allow stdin requests
        - stop_on_error (bool): Stop on first error
        - timeout (float): Execution timeout
        - output_hook (callable): Function to handle output
        - stdin_hook (callable): Function to handle stdin requests
        
        Returns:
        dict: Execution reply message
        """

Async Kernel Client

The AsyncKernelClient provides asynchronous message handling with async/await methods for non-blocking kernel communication.

class AsyncKernelClient(KernelClient):
    """Asynchronous kernel client with async/await methods."""
    
    async def get_shell_msg(self, timeout=None):
        """
        Get a message from the shell channel (async).
        
        Parameters:
        - timeout (float): Timeout in seconds
        
        Returns:
        dict: Jupyter protocol message
        """
    
    async def get_iopub_msg(self, timeout=None):
        """
        Get a message from the iopub channel (async).
        
        Parameters:
        - timeout (float): Timeout in seconds
        
        Returns:
        dict: Jupyter protocol message
        """
    
    async def get_stdin_msg(self, timeout=None):
        """
        Get a message from the stdin channel (async).
        
        Parameters:
        - timeout (float): Timeout in seconds
        
        Returns:
        dict: Jupyter protocol message
        """
    
    async def get_control_msg(self, timeout=None):
        """
        Get a message from the control channel (async).
        
        Parameters:
        - timeout (float): Timeout in seconds
        
        Returns:
        dict: Jupyter protocol message
        """
    
    async def wait_for_ready(self, timeout=None):
        """
        Wait for kernel to be ready (async).
        
        Parameters:
        - timeout (float): Timeout in seconds
        
        Returns:
        None
        """
    
    async def is_alive(self):
        """
        Check if kernel is alive (async).
        
        Returns:
        bool: True if kernel is responsive
        """
    
    async def execute_interactive(self, code, silent=False, store_history=True,
                                 user_expressions=None, allow_stdin=None,
                                 stop_on_error=True, timeout=None,
                                 output_hook=None, stdin_hook=None):
        """
        Execute code with interactive output handling (async).
        
        Parameters:
        - code (str): Code to execute
        - silent (bool): Don't display output if True
        - store_history (bool): Store in history if True
        - user_expressions (dict): Expressions to evaluate
        - allow_stdin (bool): Allow stdin requests
        - stop_on_error (bool): Stop on first error
        - timeout (float): Execution timeout
        - output_hook (callable): Async function to handle output
        - stdin_hook (callable): Async function to handle stdin
        
        Returns:
        dict: Execution reply message
        """

Threaded Kernel Client

class ThreadedKernelClient(KernelClient):
    """Kernel client using threaded ZMQ channels with IOLoop."""

Usage Examples

Basic Code Execution (Blocking)

from jupyter_client import KernelManager

# Create and start kernel
km = KernelManager()
km.start_kernel()

# Create blocking client
kc = km.client()
kc.start_channels()

# Wait for kernel to be ready
kc.wait_for_ready()

# Execute code
msg_id = kc.execute("x = 1 + 1\nprint(x)")

# Get execution result
while True:
    msg = kc.get_iopub_msg()
    if msg['msg_type'] == 'execute_result':
        print("Result:", msg['content']['data'])
        break
    elif msg['msg_type'] == 'stream':
        print("Output:", msg['content']['text'])
    elif msg['msg_type'] == 'status' and msg['content']['execution_state'] == 'idle':
        break

# Clean up
kc.stop_channels()
km.shutdown_kernel()

Interactive Execution (Blocking)

from jupyter_client import KernelManager

km = KernelManager()
km.start_kernel()
kc = km.client()
kc.start_channels()

# Execute with output handling
def handle_output(msg):
    if msg['msg_type'] == 'stream':
        print(f"Stream: {msg['content']['text']}")
    elif msg['msg_type'] == 'execute_result':
        print(f"Result: {msg['content']['data']}")

reply = kc.execute_interactive("print('Hello, World!')", output_hook=handle_output)
print(f"Execution status: {reply['content']['status']}")

kc.stop_channels()
km.shutdown_kernel()

Async Code Execution

import asyncio
from jupyter_client import AsyncKernelManager

async def execute_async():
    km = AsyncKernelManager()
    await km.start_kernel()
    
    kc = km.client()
    await kc.start_channels()
    await kc.wait_for_ready()
    
    # Execute code
    msg_id = kc.execute("import numpy as np\nresult = np.array([1, 2, 3]).sum()")
    
    # Get results asynchronously
    async def handle_output(msg):
        if msg['msg_type'] == 'execute_result':
            print(f"Result: {msg['content']['data']}")
    
    reply = await kc.execute_interactive(
        "print('Async execution')", 
        output_hook=handle_output
    )
    
    await kc.stop_channels()
    await km.shutdown_kernel()

# Run async function
asyncio.run(execute_async())

Code Completion and Inspection

from jupyter_client import KernelManager

km = KernelManager()
km.start_kernel()
kc = km.client()
kc.start_channels()
kc.wait_for_ready()

# Request completion
msg_id = kc.complete("import num", cursor_pos=10)
reply = kc.get_shell_msg()
completions = reply['content']['matches']
print(f"Completions: {completions}")

# Request object inspection
msg_id = kc.inspect("len", cursor_pos=3)
reply = kc.get_shell_msg()
if reply['content']['found']:
    print(f"Documentation: {reply['content']['data']['text/plain']}")

kc.stop_channels()
km.shutdown_kernel()

Install with Tessl CLI

npx tessl i tessl/pypi-jupyter-client

docs

client-communication.md

connection-management.md

index.md

kernel-management.md

kernel-provisioning.md

kernel-specifications.md

session-messaging.md

tile.json