CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ipykernel

IPython Kernel for Jupyter - provides the core communication layer between Jupyter frontends and the Python interpreter

Pending
Overview
Eval results
Files

data-utilities.mddocs/

Data Utilities

Utilities for JSON serialization, data cleaning, image encoding, and kernel-specific data handling requirements. Provides essential data processing capabilities for kernel communication and display systems.

Capabilities

JSON Utilities

Functions for preparing data for JSON serialization and handling Jupyter-specific data formats.

def json_clean(obj):
    """
    Clean objects for JSON serialization.
    
    Recursively processes objects to ensure they can be safely
    serialized to JSON, handling numpy arrays, dates, and other
    non-JSON-serializable types.
    
    Parameters:
    - obj: Object to clean for JSON serialization
    
    Returns:
    Cleaned object that can be JSON serialized
    """

def encode_images(format_dict):
    """
    Base64 encode images in display format dictionary.
    
    Takes a display format dictionary and base64 encodes any
    image data for transmission to Jupyter frontends.
    
    Parameters:
    - format_dict (dict): Display format dictionary potentially
                         containing image data
    
    Returns:
    dict: Format dictionary with base64 encoded images
    """

Image Format Detection

Constants for detecting and handling different image formats.

# Base64 format detection signatures
PNG: str       # PNG format signature for base64 detection
JPEG: str      # JPEG format signature for base64 detection  
GIF: str       # GIF format signature for base64 detection
PDF: str       # PDF format signature for base64 detection

Data Publishing

Functions for publishing data to subscribers through kernel communication channels.

def publish_data(data, metadata=None):
    """
    Publish data to subscribers.
    
    Publishes data through the kernel's data publishing mechanism,
    making it available to connected frontends and other subscribers.
    
    Parameters:
    - data (dict): Data to publish
    - metadata (dict, optional): Metadata for the published data
    """

Data Publisher Class

Class for managing data publication over ZMQ connections.

class ZMQDataPublisher:
    """
    Publisher for data over ZMQ.
    
    Manages publication of data to subscribers using ZeroMQ
    messaging for real-time data distribution.
    """
    
    def __init__(self, session, pub_socket):
        """
        Initialize data publisher.
        
        Parameters:
        - session: Kernel session for message handling
        - pub_socket: ZMQ socket for publishing
        """
    
    def publish_data(self, data, metadata=None):
        """
        Publish data to all subscribers.
        
        Parameters:
        - data (dict): Data to publish
        - metadata (dict, optional): Metadata for the data
        """
    
    def set_parent(self, parent):
        """
        Set parent message for publication context.
        
        Parameters:
        - parent: Parent message for context
        """
    
    # Publisher attributes
    session: object      # Kernel session
    pub_socket: object   # ZMQ publishing socket
    parent_header: dict  # Parent message header

Usage Examples

JSON Data Cleaning

from ipykernel.jsonutil import json_clean
import numpy as np
import datetime
import json

# Create complex data structure with non-JSON types
complex_data = {
    'numpy_array': np.array([1, 2, 3, 4, 5]),
    'numpy_float': np.float64(3.14159),
    'numpy_int': np.int32(42),
    'datetime': datetime.datetime.now(),
    'date': datetime.date.today(),
    'nested': {
        'more_arrays': np.array([[1, 2], [3, 4]]),
        'complex_num': complex(1, 2),
        'bytes_data': b'binary data'
    },
    'normal_data': {
        'string': 'hello',
        'int': 123,
        'float': 3.14,
        'bool': True,
        'list': [1, 2, 3],
        'none': None
    }
}

print("Original data types:")
print(f"numpy_array: {type(complex_data['numpy_array'])}")
print(f"datetime: {type(complex_data['datetime'])}")

# Clean data for JSON serialization
cleaned_data = json_clean(complex_data)

print("\nCleaned data types:")
print(f"numpy_array: {type(cleaned_data['numpy_array'])}")
print(f"datetime: {type(cleaned_data['datetime'])}")

# Verify it can be JSON serialized
json_string = json.dumps(cleaned_data, indent=2)
print(f"\nJSON serialization successful: {len(json_string)} characters")

# Verify roundtrip
roundtrip_data = json.loads(json_string)
print(f"Roundtrip successful: {type(roundtrip_data)}")

Image Encoding

from ipykernel.jsonutil import encode_images, PNG, JPEG
import base64
import io
from PIL import Image
import numpy as np

# Create sample image data
def create_sample_image():
    """Create a sample PIL image."""
    # Create a simple gradient image
    width, height = 200, 100
    image = Image.new('RGB', (width, height))
    pixels = image.load()
    
    for x in range(width):
        for y in range(height):
            r = int(255 * x / width)
            g = int(255 * y / height)
            b = 128
            pixels[x, y] = (r, g, b)
    
    return image

def image_to_base64(image, format='PNG'):
    """Convert PIL image to base64 string."""
    buffer = io.BytesIO()
    image.save(buffer, format=format)
    buffer.seek(0)
    return base64.b64encode(buffer.getvalue()).decode()

# Create sample image
sample_image = create_sample_image()

# Create display format dictionary
format_dict = {
    'text/plain': 'Sample Image',
    'image/png': image_to_base64(sample_image, 'PNG'),
    'image/jpeg': image_to_base64(sample_image, 'JPEG'),
    'text/html': '<p>Sample image display</p>'
}

print("Original format dict keys:", list(format_dict.keys()))

# Encode images in the format dict
encoded_dict = encode_images(format_dict)

print("Encoded format dict keys:", list(encoded_dict.keys()))
print("PNG data length:", len(encoded_dict.get('image/png', '')))
print("JPEG data length:", len(encoded_dict.get('image/jpeg', '')))

# Check format signatures
png_data = encoded_dict.get('image/png', '')
if png_data.startswith(PNG):
    print("PNG signature detected correctly")

jpeg_data = encoded_dict.get('image/jpeg', '')
if jpeg_data.startswith(JPEG):
    print("JPEG signature detected correctly")

Data Publishing

from ipykernel.datapub import publish_data, ZMQDataPublisher
import time
import threading

# Mock session and socket for demonstration
class MockSession:
    def send(self, stream, msg_type, content, **kwargs):
        print(f"Published to {stream}: {msg_type}")
        print(f"Content: {content}")

class MockSocket:
    def send_multipart(self, msg_parts):
        print(f"ZMQ send: {len(msg_parts)} parts")

# Create mock publisher
session = MockSession()
socket = MockSocket()
publisher = ZMQDataPublisher(session, socket)

# Publish simple data
simple_data = {
    'timestamp': time.time(),
    'sensor_reading': 23.5,
    'status': 'active'
}

metadata = {
    'source': 'temperature_sensor',
    'units': 'celsius'
}

print("Publishing simple sensor data:")
publisher.publish_data(simple_data, metadata)

# Publish complex data
complex_data = {
    'experiment_id': 'exp_001',
    'measurements': [
        {'time': 0.0, 'value': 1.0},
        {'time': 0.1, 'value': 1.5},
        {'time': 0.2, 'value': 2.0}
    ],
    'parameters': {
        'temperature': 298.15,
        'pressure': 101325,
        'humidity': 0.45
    }
}

experiment_metadata = {
    'researcher': 'Dr. Smith',
    'lab': 'Physics Lab A',
    'equipment': 'Spectrometer X1'
}

print("\nPublishing experiment data:")
publisher.publish_data(complex_data, experiment_metadata)

Real-time Data Streaming

from ipykernel.datapub import ZMQDataPublisher
import time
import threading
import random
import math

class DataStreamer:
    """Real-time data streaming using ZMQ publisher."""
    
    def __init__(self, session, socket):
        self.publisher = ZMQDataPublisher(session, socket)
        self.streaming = False
        self.stream_thread = None
    
    def start_streaming(self, interval=1.0):
        """Start streaming data at specified interval."""
        self.streaming = True
        
        def stream_worker():
            start_time = time.time()
            
            while self.streaming:
                current_time = time.time()
                elapsed = current_time - start_time
                
                # Generate sample data
                data = {
                    'timestamp': current_time,
                    'elapsed_time': elapsed,
                    'sine_wave': math.sin(elapsed),
                    'cosine_wave': math.cos(elapsed),
                    'random_value': random.uniform(-1, 1),
                    'counter': int(elapsed / interval)
                }
                
                metadata = {
                    'stream_type': 'continuous',
                    'sample_rate': 1.0 / interval,
                    'data_source': 'synthetic_generator'
                }
                
                # Publish data
                self.publisher.publish_data(data, metadata)
                
                # Wait for next interval
                time.sleep(interval)
        
        self.stream_thread = threading.Thread(target=stream_worker)
        self.stream_thread.daemon = True
        self.stream_thread.start()
    
    def stop_streaming(self):
        """Stop data streaming."""
        self.streaming = False
        if self.stream_thread:
            self.stream_thread.join()
    
    def publish_event(self, event_type, event_data):
        """Publish one-time event data."""
        data = {
            'event_type': event_type,
            'timestamp': time.time(),
            'data': event_data
        }
        
        metadata = {
            'message_type': 'event',
            'priority': 'high' if event_type == 'error' else 'normal'
        }
        
        self.publisher.publish_data(data, metadata)

# Usage example with mock objects
session = MockSession()
socket = MockSocket()

streamer = DataStreamer(session, socket)

print("Starting data stream...")
streamer.start_streaming(interval=0.5)

# Let it run for a few seconds
time.sleep(3)

# Publish some events
streamer.publish_event('calibration', {'sensor_id': 'temp_001', 'value': 25.0})
streamer.publish_event('warning', {'message': 'High temperature detected', 'value': 85.0})

# Continue streaming briefly
time.sleep(2)

# Stop streaming
print("Stopping data stream...")
streamer.stop_streaming()

Data Validation and Cleaning Pipeline

from ipykernel.jsonutil import json_clean
import numpy as np
import pandas as pd
import datetime
import json

class DataProcessor:
    """Process and clean data for kernel communication."""
    
    def __init__(self):
        self.processing_stats = {
            'objects_processed': 0,
            'conversions_made': 0,
            'errors_encountered': 0
        }
    
    def process_dataframe(self, df):
        """Process pandas DataFrame for JSON serialization."""
        try:
            # Convert DataFrame to dict
            data = {
                'columns': df.columns.tolist(),
                'index': df.index.tolist(),
                'data': df.values.tolist(),
                'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
                'shape': df.shape
            }
            
            # Clean the data
            cleaned_data = json_clean(data)
            self.processing_stats['objects_processed'] += 1
            
            return cleaned_data
            
        except Exception as e:
            self.processing_stats['errors_encountered'] += 1
            return {'error': str(e), 'type': 'dataframe_processing_error'}
    
    def process_scientific_data(self, data):
        """Process scientific data with various formats."""
        if isinstance(data, np.ndarray):
            return self.process_numpy_array(data)
        elif isinstance(data, pd.DataFrame):
            return self.process_dataframe(data)
        elif isinstance(data, dict):
            return json_clean(data)
        else:
            return json_clean(data)
    
    def process_numpy_array(self, arr):
        """Process numpy array with metadata."""
        return {
            'data': arr.tolist(),
            'shape': arr.shape,
            'dtype': str(arr.dtype),
            'size': arr.size,
            'ndim': arr.ndim,
            'metadata': {
                'min': float(np.min(arr)) if arr.size > 0 else None,
                'max': float(np.max(arr)) if arr.size > 0 else None,
                'mean': float(np.mean(arr)) if arr.size > 0 else None
            }
        }
    
    def create_report(self):
        """Create processing report."""
        return {
            'processing_statistics': self.processing_stats,
            'timestamp': datetime.datetime.now().isoformat(),
            'processor_version': '1.0'
        }

# Example usage
processor = DataProcessor()

# Create sample scientific data
numpy_data = np.random.normal(0, 1, (100, 3))
df_data = pd.DataFrame({
    'experiment': range(50),
    'temperature': np.random.normal(298, 5, 50),
    'pressure': np.random.normal(101325, 1000, 50),
    'result': np.random.choice(['success', 'failure'], 50)
})

# Process different data types
print("Processing numpy array...")
numpy_result = processor.process_scientific_data(numpy_data)
print(f"Numpy array processed: shape {numpy_result['shape']}")

print("\nProcessing DataFrame...")
df_result = processor.process_scientific_data(df_data)
print(f"DataFrame processed: {df_result['shape']} shape")

# Create comprehensive report
report = processor.create_report()
report_json = json.dumps(json_clean(report), indent=2)
print(f"\nProcessing report generated: {len(report_json)} characters")
print(f"Objects processed: {report['processing_statistics']['objects_processed']}")

Install with Tessl CLI

npx tessl i tessl/pypi-ipykernel

docs

communication-framework.md

connection-management.md

core-kernel.md

data-utilities.md

gui-integration.md

in-process-kernels.md

index.md

io-streaming.md

kernel-application.md

kernel-embedding.md

matplotlib-integration.md

tile.json