Jupyter protocol implementation and client libraries for kernel communication and management
—
Kernel communication clients for executing code, sending messages, and receiving results from Jupyter kernels. Provides both blocking and asynchronous interfaces for different usage patterns and integration scenarios.
The KernelClient class provides the foundational API for communicating with kernels via ZMQ channels, including code execution, completion, inspection, and other kernel services.
class KernelClient:
"""Base class for communicating with a kernel via ZMQ channels."""
def execute(self, code, silent=False, store_history=True,
user_expressions=None, allow_stdin=None, stop_on_error=True):
"""
Execute code in the kernel.
Parameters:
- code (str): Code to execute
- silent (bool): Don't display output if True
- store_history (bool): Store in execution history if True
- user_expressions (dict): Expressions to evaluate
- allow_stdin (bool): Allow stdin requests from kernel
- stop_on_error (bool): Stop execution on first error
Returns:
str: Message ID for tracking the request
"""
def complete(self, code, cursor_pos=None):
"""
Request tab completion from kernel.
Parameters:
- code (str): Code to complete
- cursor_pos (int): Cursor position in code
Returns:
str: Message ID for tracking the request
"""
def inspect(self, code, cursor_pos=None, detail_level=0):
"""
Request object inspection from kernel.
Parameters:
- code (str): Code containing object to inspect
- cursor_pos (int): Cursor position in code
- detail_level (int): Level of detail (0 or 1)
Returns:
str: Message ID for tracking the request
"""
def history(self, raw=True, output=False, hist_access_type='range',
session=None, start=None, stop=None, n=None,
pattern=None, unique=False):
"""
Request execution history from kernel.
Parameters:
- raw (bool): Return raw input if True
- output (bool): Include output if True
- hist_access_type (str): 'range', 'tail', or 'search'
- session (int): Session number
- start (int): Start of range
- stop (int): End of range
- n (int): Number of entries for 'tail'
- pattern (str): Search pattern for 'search'
- unique (bool): Return unique entries only
Returns:
str: Message ID for tracking the request
"""
def kernel_info(self):
"""
Request kernel information.
Returns:
str: Message ID for tracking the request
"""
def comm_info(self, target_name=None):
"""
Request information about active comms.
Parameters:
- target_name (str): Specific comm target to query
Returns:
str: Message ID for tracking the request
"""
def is_complete(self, code):
"""
Check if code is complete and ready to execute.
Parameters:
- code (str): Code to check
Returns:
str: Message ID for tracking the request
"""
def shutdown(self, restart=False):
"""
Request kernel shutdown.
Parameters:
- restart (bool): Restart after shutdown if True
Returns:
str: Message ID for tracking the request
"""
def input(self, string):
"""
Send input to kernel in response to input_request.
Parameters:
- string (str): Input string to send
Returns:
None
"""
def start_channels(self, shell=True, iopub=True, stdin=True,
hb=True, control=True):
"""
Start the kernel communication channels.
Parameters:
- shell (bool): Start shell channel
- iopub (bool): Start iopub channel
- stdin (bool): Start stdin channel
- hb (bool): Start heartbeat channel
- control (bool): Start control channel
Returns:
None
"""
def stop_channels(self):
"""
Stop all communication channels.
Returns:
None
"""
@property
def channels_running(self):
"""Boolean indicating if channels are active."""
@property
def shell_channel(self):
"""Shell channel for execute requests."""
@property
def iopub_channel(self):
"""IOPub channel for kernel output."""
@property
def stdin_channel(self):
"""Stdin channel for input requests."""
@property
def hb_channel(self):
"""Heartbeat channel for kernel monitoring."""
@property
def control_channel(self):
"""Control channel for kernel management."""The BlockingKernelClient provides synchronous message handling with blocking methods for receiving kernel responses.
class BlockingKernelClient(KernelClient):
"""Synchronous kernel client with blocking message methods."""
def get_shell_msg(self, block=True, timeout=None):
"""
Get a message from the shell channel (blocking).
Parameters:
- block (bool): Block until message received
- timeout (float): Timeout in seconds
Returns:
dict: Jupyter protocol message
"""
def get_iopub_msg(self, block=True, timeout=None):
"""
Get a message from the iopub channel (blocking).
Parameters:
- block (bool): Block until message received
- timeout (float): Timeout in seconds
Returns:
dict: Jupyter protocol message
"""
def get_stdin_msg(self, block=True, timeout=None):
"""
Get a message from the stdin channel (blocking).
Parameters:
- block (bool): Block until message received
- timeout (float): Timeout in seconds
Returns:
dict: Jupyter protocol message
"""
def get_control_msg(self, block=True, timeout=None):
"""
Get a message from the control channel (blocking).
Parameters:
- block (bool): Block until message received
- timeout (float): Timeout in seconds
Returns:
dict: Jupyter protocol message
"""
def wait_for_ready(self, timeout=None):
"""
Wait for kernel to be ready for communication (blocking).
Parameters:
- timeout (float): Timeout in seconds
Returns:
None
"""
def is_alive(self):
"""
Check if kernel is alive (blocking).
Returns:
bool: True if kernel is responsive
"""
def execute_interactive(self, code, silent=False, store_history=True,
user_expressions=None, allow_stdin=None,
stop_on_error=True, timeout=None,
output_hook=None, stdin_hook=None):
"""
Execute code with interactive output handling (blocking).
Parameters:
- code (str): Code to execute
- silent (bool): Don't display output if True
- store_history (bool): Store in history if True
- user_expressions (dict): Expressions to evaluate
- allow_stdin (bool): Allow stdin requests
- stop_on_error (bool): Stop on first error
- timeout (float): Execution timeout
- output_hook (callable): Function to handle output
- stdin_hook (callable): Function to handle stdin requests
Returns:
dict: Execution reply message
"""The AsyncKernelClient provides asynchronous message handling with async/await methods for non-blocking kernel communication.
class AsyncKernelClient(KernelClient):
"""Asynchronous kernel client with async/await methods."""
async def get_shell_msg(self, timeout=None):
"""
Get a message from the shell channel (async).
Parameters:
- timeout (float): Timeout in seconds
Returns:
dict: Jupyter protocol message
"""
async def get_iopub_msg(self, timeout=None):
"""
Get a message from the iopub channel (async).
Parameters:
- timeout (float): Timeout in seconds
Returns:
dict: Jupyter protocol message
"""
async def get_stdin_msg(self, timeout=None):
"""
Get a message from the stdin channel (async).
Parameters:
- timeout (float): Timeout in seconds
Returns:
dict: Jupyter protocol message
"""
async def get_control_msg(self, timeout=None):
"""
Get a message from the control channel (async).
Parameters:
- timeout (float): Timeout in seconds
Returns:
dict: Jupyter protocol message
"""
async def wait_for_ready(self, timeout=None):
"""
Wait for kernel to be ready (async).
Parameters:
- timeout (float): Timeout in seconds
Returns:
None
"""
async def is_alive(self):
"""
Check if kernel is alive (async).
Returns:
bool: True if kernel is responsive
"""
async def execute_interactive(self, code, silent=False, store_history=True,
user_expressions=None, allow_stdin=None,
stop_on_error=True, timeout=None,
output_hook=None, stdin_hook=None):
"""
Execute code with interactive output handling (async).
Parameters:
- code (str): Code to execute
- silent (bool): Don't display output if True
- store_history (bool): Store in history if True
- user_expressions (dict): Expressions to evaluate
- allow_stdin (bool): Allow stdin requests
- stop_on_error (bool): Stop on first error
- timeout (float): Execution timeout
- output_hook (callable): Async function to handle output
- stdin_hook (callable): Async function to handle stdin
Returns:
dict: Execution reply message
"""class ThreadedKernelClient(KernelClient):
"""Kernel client using threaded ZMQ channels with IOLoop."""from jupyter_client import KernelManager
# Create and start kernel
km = KernelManager()
km.start_kernel()
# Create blocking client
kc = km.client()
kc.start_channels()
# Wait for kernel to be ready
kc.wait_for_ready()
# Execute code
msg_id = kc.execute("x = 1 + 1\nprint(x)")
# Get execution result
while True:
msg = kc.get_iopub_msg()
if msg['msg_type'] == 'execute_result':
print("Result:", msg['content']['data'])
break
elif msg['msg_type'] == 'stream':
print("Output:", msg['content']['text'])
elif msg['msg_type'] == 'status' and msg['content']['execution_state'] == 'idle':
break
# Clean up
kc.stop_channels()
km.shutdown_kernel()from jupyter_client import KernelManager
km = KernelManager()
km.start_kernel()
kc = km.client()
kc.start_channels()
# Execute with output handling
def handle_output(msg):
if msg['msg_type'] == 'stream':
print(f"Stream: {msg['content']['text']}")
elif msg['msg_type'] == 'execute_result':
print(f"Result: {msg['content']['data']}")
reply = kc.execute_interactive("print('Hello, World!')", output_hook=handle_output)
print(f"Execution status: {reply['content']['status']}")
kc.stop_channels()
km.shutdown_kernel()import asyncio
from jupyter_client import AsyncKernelManager
async def execute_async():
km = AsyncKernelManager()
await km.start_kernel()
kc = km.client()
await kc.start_channels()
await kc.wait_for_ready()
# Execute code
msg_id = kc.execute("import numpy as np\nresult = np.array([1, 2, 3]).sum()")
# Get results asynchronously
async def handle_output(msg):
if msg['msg_type'] == 'execute_result':
print(f"Result: {msg['content']['data']}")
reply = await kc.execute_interactive(
"print('Async execution')",
output_hook=handle_output
)
await kc.stop_channels()
await km.shutdown_kernel()
# Run async function
asyncio.run(execute_async())from jupyter_client import KernelManager
km = KernelManager()
km.start_kernel()
kc = km.client()
kc.start_channels()
kc.wait_for_ready()
# Request completion
msg_id = kc.complete("import num", cursor_pos=10)
reply = kc.get_shell_msg()
completions = reply['content']['matches']
print(f"Completions: {completions}")
# Request object inspection
msg_id = kc.inspect("len", cursor_pos=3)
reply = kc.get_shell_msg()
if reply['content']['found']:
print(f"Documentation: {reply['content']['data']['text/plain']}")
kc.stop_channels()
km.shutdown_kernel()Install with Tessl CLI
npx tessl i tessl/pypi-jupyter-client