Jupyter protocol implementation and client libraries for kernel communication and management
—
Connection file handling, port management, and network configuration for kernel connections. Includes utilities for creating connection files, discovering existing connections, SSH tunneling, and managing network ports.
Functions for creating, reading, and finding kernel connection files that contain network and authentication information for connecting to kernels.
def write_connection_file(fname=None, shell_port=0, iopub_port=0,
stdin_port=0, hb_port=0, control_port=0,
ip='', key=b'', transport='tcp',
signature_scheme='hmac-sha256', kernel_name='',
**kwargs):
"""
Write kernel connection information to a JSON file.
Parameters:
- fname (str): Connection file path (auto-generated if None)
- shell_port (int): Port for shell channel (0 for auto-assign)
- iopub_port (int): Port for iopub channel (0 for auto-assign)
- stdin_port (int): Port for stdin channel (0 for auto-assign)
- hb_port (int): Port for heartbeat channel (0 for auto-assign)
- control_port (int): Port for control channel (0 for auto-assign)
- ip (str): IP address for connections (empty for localhost)
- key (bytes): Authentication key for message signing
- transport (str): Transport protocol ('tcp' or 'ipc')
- signature_scheme (str): Message signing scheme
- kernel_name (str): Name of the kernel
- **kwargs: Additional connection parameters
Returns:
tuple: (connection_file_path, connection_info_dict)
"""
def find_connection_file(filename='kernel-*.json', path=None, profile=None):
"""
Find a kernel connection file.
Parameters:
- filename (str): Connection file pattern to search for
- path (str): Directory to search in (runtime dir if None)
- profile (str): Jupyter profile name
Returns:
str: Path to connection file
Raises:
IOError: If connection file not found
"""
def tunnel_to_kernel(connection_info, sshserver, sshkey=None):
"""
Create SSH tunnels for connecting to a remote kernel.
Parameters:
- connection_info (dict): Kernel connection information
- sshserver (str): SSH server address
- sshkey (str): Path to SSH private key file
Returns:
tuple: (shell_port, iopub_port, stdin_port, hb_port, control_port)
"""The ConnectionFileMixin class provides methods for handling connection files and network setup in kernel managers and clients.
class ConnectionFileMixin:
"""Mixin for handling kernel connection files and network setup."""
def write_connection_file(self, **kwargs):
"""
Write connection info to file.
Parameters:
- **kwargs: Connection parameters (see write_connection_file)
Returns:
None
"""
def load_connection_file(self, connection_file=None):
"""
Load connection information from file.
Parameters:
- connection_file (str): Path to connection file
Returns:
None
"""
def load_connection_info(self, info):
"""
Load connection information from dictionary.
Parameters:
- info (dict): Connection information dictionary
Returns:
None
"""
def get_connection_info(self, session=False):
"""
Get current connection information.
Parameters:
- session (bool): Include session information if True
Returns:
dict: Connection information dictionary
"""
def cleanup_connection_file(self):
"""
Remove the connection file.
Returns:
None
"""
def blocking_client(self):
"""
Create a blocking client for this connection.
Returns:
BlockingKernelClient: Client instance
"""
def connect_iopub(self, identity=None):
"""
Connect to the iopub channel.
Parameters:
- identity (bytes): ZMQ identity for the socket
Returns:
zmq.Socket: Connected ZMQ socket
"""
def connect_shell(self, identity=None):
"""
Connect to the shell channel.
Parameters:
- identity (bytes): ZMQ identity for the socket
Returns:
zmq.Socket: Connected ZMQ socket
"""
def connect_stdin(self, identity=None):
"""
Connect to the stdin channel.
Parameters:
- identity (bytes): ZMQ identity for the socket
Returns:
zmq.Socket: Connected ZMQ socket
"""
def connect_hb(self, identity=None):
"""
Connect to the heartbeat channel.
Parameters:
- identity (bytes): ZMQ identity for the socket
Returns:
zmq.Socket: Connected ZMQ socket
"""
def connect_control(self, identity=None):
"""
Connect to the control channel.
Parameters:
- identity (bytes): ZMQ identity for the socket
Returns:
zmq.Socket: Connected ZMQ socket
"""
# Configuration traits
connection_file = None # Path to connection file
transport = 'tcp' # Transport protocol
ip = '' # IP address
shell_port = 0 # Shell channel port
iopub_port = 0 # IOPub channel port
stdin_port = 0 # Stdin channel port
hb_port = 0 # Heartbeat channel port
control_port = 0 # Control channel portThe LocalPortCache singleton manages port allocation to prevent race conditions when starting multiple kernels.
class LocalPortCache:
"""Singleton class for managing port allocation."""
def find_available_port(self, ip):
"""
Find an available port on the specified IP.
Parameters:
- ip (str): IP address to check ports on
Returns:
int: Available port number
"""
def return_port(self, port):
"""
Return a port to the available pool.
Parameters:
- port (int): Port number to return
Returns:
None
"""Low-level utilities for launching kernel processes with proper configuration and cross-platform support.
def launch_kernel(cmd, stdin=None, stdout=None, stderr=None,
env=None, independent=False, cwd=None, **kwargs):
"""
Launch a kernel process with appropriate configuration.
Parameters:
- cmd (List[str]): Command list for launching the kernel
- stdin (int, optional): Standard input file descriptor
- stdout (int, optional): Standard output file descriptor
- stderr (int, optional): Standard error file descriptor
- env (Dict[str, str], optional): Environment variables for kernel
- independent (bool): Whether kernel survives parent process death
- cwd (str, optional): Working directory for kernel process
- **kwargs: Additional arguments passed to subprocess.Popen
Returns:
subprocess.Popen: Kernel process instance
"""Functions for creating SSH tunnels to connect to remote kernels securely.
def ssh_tunnel(lport, rport, server, remoteip='127.0.0.1',
keyfile=None, password=None, timeout=0.4):
"""
Create an SSH tunnel.
Parameters:
- lport (int): Local port number
- rport (int): Remote port number
- server (str): SSH server address
- remoteip (str): Remote IP address to tunnel to
- keyfile (str): Path to SSH private key
- password (str): SSH password (if not using key)
- timeout (float): Connection timeout
Returns:
subprocess.Popen: SSH process
"""
def forward_tunnel(local_port, remote_port, remote_host, ssh_server,
ssh_keyfile=None, password=None):
"""
Create a port forwarding tunnel.
Parameters:
- local_port (int): Local port to bind to
- remote_port (int): Remote port to forward to
- remote_host (str): Remote host address
- ssh_server (str): SSH server for tunneling
- ssh_keyfile (str): Path to SSH private key
- password (str): SSH password
Returns:
ForwardServer: Server instance managing the tunnel
"""# Type alias for connection information
KernelConnectionInfo = Dict[str, Union[int, str, bytes]]from jupyter_client import write_connection_file
# Create connection file with auto-assigned ports
fname, connection_info = write_connection_file()
print(f"Connection file: {fname}")
print(f"Connection info: {connection_info}")
# Create with specific configuration
fname, connection_info = write_connection_file(
ip='192.168.1.100',
shell_port=50001,
iopub_port=50002,
stdin_port=50003,
hb_port=50004,
control_port=50005,
kernel_name='python3'
)from jupyter_client import find_connection_file, KernelManager
# Find existing connection file
try:
conn_file = find_connection_file('kernel-12345.json')
print(f"Found connection file: {conn_file}")
except IOError:
print("Connection file not found")
# Load connection in kernel manager
km = KernelManager()
km.load_connection_file(conn_file)
# Create client from loaded connection
kc = km.client()from jupyter_client import KernelManager
km = KernelManager()
km.start_kernel()
# Get connection information
conn_info = km.get_connection_info()
print(f"Shell port: {conn_info['shell_port']}")
print(f"IOPub port: {conn_info['iopub_port']}")
print(f"Transport: {conn_info['transport']}")
print(f"IP: {conn_info['ip']}")
# Load connection info into another manager
km2 = KernelManager()
km2.load_connection_info(conn_info)
# Clean up
km.cleanup_connection_file()from jupyter_client import tunnel_to_kernel, find_connection_file
# Find remote kernel connection file
remote_conn_file = find_connection_file('kernel-remote.json')
# Load connection info
with open(remote_conn_file, 'r') as f:
import json
remote_conn_info = json.load(f)
# Create SSH tunnels
local_ports = tunnel_to_kernel(
remote_conn_info,
sshserver='user@remote-server.com',
sshkey='/path/to/private_key'
)
print(f"Local tunnel ports: {local_ports}")
# Now connect using local ports
from jupyter_client import KernelManager
km = KernelManager()
# Update connection info with local tunnel ports
tunnel_conn_info = remote_conn_info.copy()
tunnel_conn_info.update({
'shell_port': local_ports[0],
'iopub_port': local_ports[1],
'stdin_port': local_ports[2],
'hb_port': local_ports[3],
'control_port': local_ports[4],
'ip': '127.0.0.1' # Connect to local tunnel
})
km.load_connection_info(tunnel_conn_info)
kc = km.client()from jupyter_client.connect import LocalPortCache
# Get port cache instance
port_cache = LocalPortCache.instance()
# Find available port
available_port = port_cache.find_available_port('127.0.0.1')
print(f"Available port: {available_port}")
# Use the port...
# Return port when done
port_cache.return_port(available_port)from jupyter_client import KernelManager
import zmq
km = KernelManager()
km.start_kernel()
# Connect to individual channels
context = zmq.Context()
# Connect to shell channel
shell_socket = km.connect_shell()
shell_socket.send_multipart([b'execute_request', b'print("hello")'])
# Connect to iopub channel
iopub_socket = km.connect_iopub()
message = iopub_socket.recv_multipart()
# Clean up
shell_socket.close()
iopub_socket.close()
context.term()
km.shutdown_kernel()from jupyter_client import launch_kernel
import subprocess
import sys
# Launch a Python kernel directly
kernel_cmd = [
sys.executable, '-m', 'ipykernel_launcher',
'-f', 'connection-file.json'
]
# Launch with custom environment
custom_env = {'PYTHONPATH': '/custom/path'}
kernel_proc = launch_kernel(
kernel_cmd,
env=custom_env,
cwd='/custom/working/dir',
independent=True # Kernel survives if parent dies
)
print(f"Kernel PID: {kernel_proc.pid}")
# Monitor kernel status
if kernel_proc.poll() is None:
print("Kernel is running")
else:
print(f"Kernel exited with code: {kernel_proc.returncode}")
# Terminate kernel when done
kernel_proc.terminate()
kernel_proc.wait()Install with Tessl CLI
npx tessl i tessl/pypi-jupyter-client