Jupyter protocol implementation and client libraries for kernel communication and management
—
Pluggable kernel provisioning system for starting kernels in different environments. Supports local subprocess provisioning and extensible remote provisioning through a factory pattern and abstract base class system.
The KernelProvisionerBase abstract class defines the interface for kernel provisioners, enabling pluggable kernel launch strategies for different environments.
class KernelProvisionerBase:
"""Abstract base class for kernel provisioners."""
@property
def has_process(self):
"""
Whether this provisioner manages a process.
Returns:
bool: True if provisioner manages a local process
"""
async def poll(self):
"""
Check if the kernel process is running.
Returns:
int | None: Process exit code or None if still running
"""
async def wait(self):
"""
Wait for the kernel process to terminate.
Returns:
int | None: Process exit code
"""
async def send_signal(self, signum):
"""
Send a signal to the kernel process.
Parameters:
- signum (int): Signal number to send
Returns:
None
"""
async def kill(self, restart=False):
"""
Kill the kernel process.
Parameters:
- restart (bool): Whether this is part of a restart
Returns:
None
"""
async def terminate(self, restart=False):
"""
Terminate the kernel process gracefully.
Parameters:
- restart (bool): Whether this is part of a restart
Returns:
None
"""
async def launch_kernel(self, cmd, **kwargs):
"""
Launch the kernel process.
Parameters:
- cmd (list): Command line arguments to execute
- **kwargs: Additional launch parameters
Returns:
dict: Connection information for the launched kernel
"""
# Optional override methods
async def cleanup(self, restart=False):
"""
Cleanup resources after kernel shutdown.
Parameters:
- restart (bool): Whether this is part of a restart
Returns:
None
"""
async def shutdown_requested(self, restart=False):
"""
Handle shutdown request from kernel manager.
Parameters:
- restart (bool): Whether this is part of a restart
Returns:
None
"""
async def pre_launch(self, **kwargs):
"""
Perform setup before kernel launch.
Parameters:
- **kwargs: Launch parameters
Returns:
dict: Updated launch parameters
"""
async def post_launch(self, **kwargs):
"""
Perform setup after kernel launch.
Parameters:
- **kwargs: Launch results
Returns:
None
"""
async def get_provisioner_info(self):
"""
Get current provisioner state information.
Returns:
dict: Provisioner state data
"""
async def load_provisioner_info(self, provisioner_info):
"""
Load provisioner state information.
Parameters:
- provisioner_info (dict): Previously saved state data
Returns:
None
"""
# Utility methods
def get_shutdown_wait_time(self, recommended=5.0):
"""
Get timeout for shutdown operations.
Parameters:
- recommended (float): Recommended timeout in seconds
Returns:
float: Timeout value to use
"""
def get_stable_start_time(self, recommended=10.0):
"""
Get timeout for kernel startup stability.
Parameters:
- recommended (float): Recommended timeout in seconds
Returns:
float: Timeout value to use
"""The LocalProvisioner provides a concrete implementation for running kernels as local subprocesses.
class LocalProvisioner(KernelProvisionerBase):
"""Provisions kernels locally using subprocess."""
@property
def has_process(self):
"""
Always returns True for local provisioner.
Returns:
bool: True
"""
async def poll(self):
"""
Poll the local subprocess status.
Returns:
int | None: Process exit code or None if running
"""
async def wait(self):
"""
Wait for local subprocess to terminate.
Returns:
int | None: Process exit code
"""
async def send_signal(self, signum):
"""
Send signal to local subprocess.
Parameters:
- signum (int): Signal number
Returns:
None
"""
async def kill(self, restart=False):
"""
Kill local subprocess.
Parameters:
- restart (bool): Whether part of restart
Returns:
None
"""
async def terminate(self, restart=False):
"""
Terminate local subprocess gracefully.
Parameters:
- restart (bool): Whether part of restart
Returns:
None
"""
async def launch_kernel(self, cmd, **kwargs):
"""
Launch kernel as local subprocess.
Parameters:
- cmd (list): Command to execute
- **kwargs: Additional subprocess parameters
Returns:
dict: Connection information
"""
async def pre_launch(self, **kwargs):
"""
Setup local environment before launch.
Parameters:
- **kwargs: Launch parameters
Returns:
dict: Updated launch parameters
"""
async def cleanup(self, restart=False):
"""
Cleanup local resources.
Parameters:
- restart (bool): Whether part of restart
Returns:
None
"""The KernelProvisionerFactory manages the creation of provisioner instances and discovery of available provisioners.
class KernelProvisionerFactory:
"""Factory for creating kernel provisioner instances."""
def is_provisioner_available(self, kernel_spec):
"""
Check if a provisioner is available for the kernel spec.
Parameters:
- kernel_spec (KernelSpec): Kernel specification
Returns:
bool: True if provisioner is available
"""
def create_provisioner_instance(self, kernel_id, kernel_spec, parent):
"""
Create a provisioner instance for the kernel.
Parameters:
- kernel_id (str): Unique kernel identifier
- kernel_spec (KernelSpec): Kernel specification
- parent: Parent configurable object
Returns:
KernelProvisionerBase: Provisioner instance
"""
def get_provisioner_entries(self):
"""
Get all available provisioner entry points.
Returns:
dict: Mapping of provisioner names to entry points
"""Utility function for launching kernel processes directly.
def launch_kernel(cmd, stdin=None, stdout=None, stderr=None,
env=None, independent=False, cwd=None, **kw):
"""
Launch a kernel process.
Parameters:
- cmd (list): Command line arguments to execute
- stdin (int): Standard input file descriptor
- stdout (int): Standard output file descriptor
- stderr (int): Standard error file descriptor
- env (dict): Environment variables
- independent (bool): Whether kernel survives parent death
- cwd (str): Working directory for kernel process
- **kw: Additional Popen arguments
Returns:
subprocess.Popen: Process object for the kernel
"""from jupyter_client import KernelManager
# KernelManager uses LocalProvisioner by default
km = KernelManager(kernel_name='python3')
km.start_kernel()
# The provisioner is accessible
provisioner = km.provisioner
print(f"Provisioner type: {type(provisioner)}")
print(f"Has process: {provisioner.has_process}")
# Check if kernel is running
if await provisioner.poll() is None:
print("Kernel is running")
km.shutdown_kernel()from jupyter_client.provisioning import KernelProvisionerBase
import asyncio
import subprocess
class CustomProvisioner(KernelProvisionerBase):
"""Custom provisioner example."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.process = None
@property
def has_process(self):
return True
async def launch_kernel(self, cmd, **kwargs):
"""Launch kernel with custom logic."""
# Custom environment setup
custom_env = kwargs.get('env', {}).copy()
custom_env['CUSTOM_VAR'] = 'custom_value'
# Launch process
self.process = subprocess.Popen(
cmd,
env=custom_env,
cwd=kwargs.get('cwd'),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Return connection info (would be real in practice)
return {
'shell_port': 50001,
'iopub_port': 50002,
'stdin_port': 50003,
'hb_port': 50004,
'control_port': 50005,
'ip': '127.0.0.1',
'transport': 'tcp'
}
async def poll(self):
if self.process:
return self.process.poll()
return None
async def kill(self, restart=False):
if self.process:
self.process.kill()
async def terminate(self, restart=False):
if self.process:
self.process.terminate()
async def send_signal(self, signum):
if self.process:
self.process.send_signal(signum)
async def wait(self):
if self.process:
return self.process.wait()
return None
# Use custom provisioner
from jupyter_client import KernelManager
km = KernelManager()
km.provisioner = CustomProvisioner(parent=km)from jupyter_client.provisioning import KernelProvisionerFactory
from jupyter_client import get_kernel_spec
# Get factory instance
factory = KernelProvisionerFactory.instance()
# Get available provisioners
provisioners = factory.get_provisioner_entries()
print(f"Available provisioners: {list(provisioners.keys())}")
# Check if provisioner is available for a kernel spec
kernel_spec = get_kernel_spec('python3')
if factory.is_provisioner_available(kernel_spec):
print("Provisioner available for python3")
# Create provisioner instance
provisioner = factory.create_provisioner_instance(
kernel_id='test-kernel',
kernel_spec=kernel_spec,
parent=None
)
print(f"Created provisioner: {type(provisioner)}")import asyncio
from jupyter_client import AsyncKernelManager
async def manage_kernel_with_provisioner():
km = AsyncKernelManager()
await km.start_kernel()
provisioner = km.provisioner
# Check provisioner state
if provisioner.has_process:
status = await provisioner.poll()
if status is None:
print("Kernel process is running")
# Get provisioner info
info = await provisioner.get_provisioner_info()
print(f"Provisioner info: {info}")
# Graceful shutdown
await provisioner.terminate()
await provisioner.wait()
# Cleanup
await provisioner.cleanup()
await km.shutdown_kernel()
# Run async example
asyncio.run(manage_kernel_with_provisioner())from jupyter_client.launcher import launch_kernel
import tempfile
import os
# Prepare kernel command
cmd = ['python', '-m', 'ipykernel_launcher', '-f', 'connection.json']
# Create temporary directory for kernel
with tempfile.TemporaryDirectory() as temp_dir:
conn_file = os.path.join(temp_dir, 'connection.json')
# Write connection file
from jupyter_client import write_connection_file
write_connection_file(conn_file)
# Launch kernel directly
process = launch_kernel(
cmd,
cwd=temp_dir,
independent=True
)
print(f"Launched kernel with PID: {process.pid}")
# Use kernel...
# Clean up
process.terminate()
process.wait()from jupyter_client.provisioning import LocalProvisioner
class ContainerProvisioner(LocalProvisioner):
"""Example provisioner that runs kernels in containers."""
async def pre_launch(self, **kwargs):
"""Modify command to run in container."""
cmd = kwargs.get('cmd', [])
# Wrap command with container runtime
container_cmd = [
'docker', 'run', '--rm', '-i',
'--name', f'kernel-{self.kernel_id}',
'jupyter/scipy-notebook'
] + cmd
kwargs['cmd'] = container_cmd
return kwargs
async def kill(self, restart=False):
"""Kill container instead of process."""
import subprocess
try:
subprocess.run([
'docker', 'kill', f'kernel-{self.kernel_id}'
], check=True)
except subprocess.CalledProcessError:
pass # Container might already be stopped
# Use container provisioner
from jupyter_client import KernelManager
km = KernelManager()
km.provisioner_class = ContainerProvisioner
km.start_kernel()Install with Tessl CLI
npx tessl i tessl/pypi-jupyter-client