CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-jupyter-client

Jupyter protocol implementation and client libraries for kernel communication and management

Pending
Overview
Eval results
Files

session-messaging.mddocs/

Session and Messaging

Jupyter protocol message handling, serialization, authentication, and session management. Provides complete protocol implementation with signing, security features, and message lifecycle management for kernel communication.

Capabilities

Session Management

The Session class handles Jupyter protocol message creation, serialization, authentication, and communication over ZMQ sockets.

class Session:
    """Handles Jupyter protocol message creation and communication."""
    
    def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
        """
        Create a Jupyter protocol message.
        
        Parameters:
        - msg_type (str): Type of message (e.g., 'execute_request')
        - content (dict): Message content payload
        - parent (dict): Parent message for threading
        - header (dict): Message header (auto-generated if None)
        - metadata (dict): Message metadata
        
        Returns:
        dict: Complete Jupyter protocol message
        """
    
    def sign(self, msg_list):
        """
        Sign a message list for authentication.
        
        Parameters:
        - msg_list (list): List of message parts to sign
        
        Returns:
        bytes: Message signature
        """
    
    def serialize(self, msg, ident=None):
        """
        Serialize a message for transmission over ZMQ.
        
        Parameters:
        - msg (dict): Message to serialize
        - ident (bytes): ZMQ identity for routing
        
        Returns:
        list: List of bytes for ZMQ multipart message
        """
    
    def deserialize(self, msg_list, content=True, copy=True):
        """
        Deserialize a message received from ZMQ.
        
        Parameters:
        - msg_list (list): List of bytes from ZMQ multipart message
        - content (bool): Parse message content if True
        - copy (bool): Copy message data if True
        
        Returns:
        dict: Deserialized message dictionary
        """
    
    def send(self, socket, msg_or_type, content=None, parent=None, 
             ident=None, buffers=None, track=False, header=None, metadata=None):
        """
        Send a message over a ZMQ socket.
        
        Parameters:
        - socket (zmq.Socket): ZMQ socket to send on
        - msg_or_type (dict | str): Complete message or message type
        - content (dict): Message content (if msg_or_type is string)
        - parent (dict): Parent message for threading
        - ident (bytes): ZMQ identity for routing
        - buffers (list): Binary buffers to send
        - track (bool): Track message delivery if True
        - header (dict): Message header
        - metadata (dict): Message metadata
        
        Returns:
        MessageTracker | None: Tracker for message delivery
        """
    
    def recv(self, socket, mode=0, content=True, copy=True):
        """
        Receive a message from a ZMQ socket.
        
        Parameters:
        - socket (zmq.Socket): ZMQ socket to receive from
        - mode (int): ZMQ receive mode (0=blocking, zmq.NOBLOCK=non-blocking)
        - content (bool): Parse message content if True
        - copy (bool): Copy message data if True
        
        Returns:
        dict: Received message dictionary
        """
    
    def clone(self):
        """
        Create a copy of this session.
        
        Returns:
        Session: New session instance with same configuration
        """

    # Configuration properties
    username = 'username'           # Username for message headers
    session = 'session-id'          # Session identifier
    signature_scheme = 'hmac-sha256'  # Message signing scheme
    key = b''                       # Signing key
    packer = None                   # Message serialization function
    unpacker = None                 # Message deserialization function

Session Factory

Base class for objects that need session management capabilities.

class SessionFactory:
    """Base class for configurables that have a Session."""
    
    session = None  # Session instance for message handling

Message Structure

The Message class represents Jupyter protocol messages with proper structure and validation.

class Message:
    """Jupyter protocol message representation."""
    
    def __init__(self, msg_dict):
        """
        Create message from dictionary.
        
        Parameters:
        - msg_dict (dict): Message dictionary
        """
    
    @property
    def header(self):
        """Message header dictionary."""
    
    @property
    def parent_header(self):
        """Parent message header for threading."""
    
    @property
    def metadata(self):
        """Message metadata dictionary."""
    
    @property
    def content(self):
        """Message content payload."""
    
    @property
    def buffers(self):
        """Binary message buffers."""

Message Utilities

Utility functions for working with Jupyter protocol messages.

def msg_header(msg_id, msg_type, username, session, date=None, version=None):
    """
    Create a message header.
    
    Parameters:
    - msg_id (str): Unique message identifier
    - msg_type (str): Type of message
    - username (str): Username creating the message
    - session (str): Session identifier
    - date (datetime): Message timestamp (current time if None)
    - version (str): Protocol version
    
    Returns:
    dict: Message header dictionary
    """

def extract_header(msg_or_header):
    """
    Extract header from message or return header directly.
    
    Parameters:
    - msg_or_header (dict): Message dict or header dict
    
    Returns:
    dict: Message header
    """

def utcnow():
    """
    Get current UTC timestamp for messages.
    
    Returns:
    datetime: Current UTC datetime
    """

def new_id():
    """
    Generate a new unique message ID.
    
    Returns:
    str: Unique message identifier
    """

JSON Utilities

Specialized JSON handling for Jupyter messages with support for dates and numpy arrays.

# JSON serialization functions optimized for Jupyter messages
def json_default(obj):
    """Default JSON serializer for special types."""

def json_packer(obj):
    """Pack objects to JSON with special type handling."""

def json_unpacker(s):
    """Unpack JSON with special type handling."""

Usage Examples

Basic Session Usage

from jupyter_client import Session
import zmq

# Create ZMQ context and sockets
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:50001')

# Create session
session = Session()

# Create and send a message
msg = session.msg('kernel_info_request')
session.send(socket, msg)

# Receive reply
reply = session.recv(socket)
print(f"Kernel info: {reply['content']}")

# Clean up
socket.close()
context.term()

Message Creation and Structure

from jupyter_client import Session

session = Session(username='user', session='test-session')

# Create execute request message
execute_msg = session.msg(
    'execute_request',
    content={
        'code': 'print("Hello, World!")',
        'silent': False,
        'store_history': True,
        'user_expressions': {},
        'allow_stdin': False,
        'stop_on_error': True
    }
)

print(f"Message ID: {execute_msg['header']['msg_id']}")
print(f"Message type: {execute_msg['header']['msg_type']}")
print(f"Content: {execute_msg['content']}")

# Create reply message
reply = session.msg(
    'execute_reply',
    content={
        'status': 'ok',
        'execution_count': 1,
        'user_expressions': {}
    },
    parent=execute_msg
)

print(f"Reply parent: {reply['parent_header']['msg_id']}")

Message Serialization and Signing

from jupyter_client import Session
import zmq

# Create session with authentication
session = Session(
    key=b'secret-key-for-signing',
    signature_scheme='hmac-sha256'
)

# Create message
msg = session.msg('status', content={'execution_state': 'busy'})

# Serialize message
serialized = session.serialize(msg)
print(f"Serialized parts: {len(serialized)}")

# Sign message manually
signature = session.sign(serialized[1:])  # Skip signature part
print(f"Signature: {signature.hex()}")

# Deserialize message
deserialized = session.deserialize(serialized)
print(f"Deserialized type: {deserialized['header']['msg_type']}")

Session Communication Pattern

from jupyter_client import Session, KernelManager
import zmq

# Start kernel and get connection info
km = KernelManager()
km.start_kernel()
connection_info = km.get_connection_info()

# Create session matching kernel's session
session = Session(
    key=connection_info['key'].encode(),
    signature_scheme=connection_info['signature_scheme']
)

# Connect to shell channel
context = zmq.Context()
shell_socket = context.socket(zmq.DEALER)
shell_socket.connect(f"tcp://127.0.0.1:{connection_info['shell_port']}")

# Send kernel info request
kernel_info_msg = session.msg('kernel_info_request')
session.send(shell_socket, kernel_info_msg)

# Receive reply
reply = session.recv(shell_socket)
print(f"Kernel language: {reply['content']['language_info']['name']}")

# Clean up
shell_socket.close()
context.term()
km.shutdown_kernel()

Custom Message Types

from jupyter_client import Session

session = Session()

# Create custom message type
custom_msg = session.msg(
    'custom_request',
    content={
        'operation': 'process_data',
        'parameters': {
            'input_file': '/path/to/data.csv',
            'output_format': 'json'
        }
    },
    metadata={
        'priority': 'high',
        'timeout': 30
    }
)

print(f"Custom message: {custom_msg}")

# Handle custom reply
def handle_custom_reply(msg):
    if msg['header']['msg_type'] == 'custom_reply':
        content = msg['content']
        if content['status'] == 'ok':
            print(f"Operation completed: {content['result']}")
        else:
            print(f"Operation failed: {content['error']}")

Thread-Safe Session Usage

import threading
from jupyter_client import Session
import zmq

class SessionWorker(threading.Thread):
    def __init__(self, session_config, socket_url):
        super().__init__()
        self.session = Session(**session_config)
        self.socket_url = socket_url
        self.running = True
    
    def run(self):
        context = zmq.Context()
        socket = context.socket(zmq.DEALER)
        socket.connect(self.socket_url)
        
        while self.running:
            try:
                # Non-blocking receive
                msg = self.session.recv(socket, mode=zmq.NOBLOCK)
                self.handle_message(msg)
            except zmq.Again:
                # No message available
                continue
            except Exception as e:
                print(f"Error receiving message: {e}")
        
        socket.close()
        context.term()
    
    def handle_message(self, msg):
        print(f"Received: {msg['header']['msg_type']}")
    
    def send_message(self, msg_type, content=None):
        # This would need socket access - shown for concept
        msg = self.session.msg(msg_type, content=content)
        # session.send(socket, msg)

# Create worker with session config
session_config = {
    'username': 'worker',
    'session': 'worker-session'
}

worker = SessionWorker(session_config, 'tcp://127.0.0.1:50001')
worker.start()

# Use worker...

worker.running = False
worker.join()

Message Buffer Handling

from jupyter_client import Session
import numpy as np

session = Session()

# Create message with binary buffers
data = np.array([1, 2, 3, 4, 5])
msg = session.msg(
    'data_message',
    content={
        'shape': data.shape,
        'dtype': str(data.dtype)
    },
    buffers=[data.tobytes()]
)

print(f"Message has {len(msg.get('buffers', []))} buffers")

# Serialize with buffers
serialized = session.serialize(msg)
print(f"Serialized message parts: {len(serialized)}")

# Deserialize and reconstruct data
deserialized = session.deserialize(serialized)
if 'buffers' in deserialized:
    buffer_data = deserialized['buffers'][0]
    reconstructed = np.frombuffer(buffer_data, dtype=data.dtype)
    print(f"Reconstructed data: {reconstructed}")

Install with Tessl CLI

npx tessl i tessl/pypi-jupyter-client

docs

client-communication.md

connection-management.md

index.md

kernel-management.md

kernel-provisioning.md

kernel-specifications.md

session-messaging.md

tile.json