IPython Kernel for Jupyter - provides the core communication layer between Jupyter frontends and the Python interpreter
—
Utilities for JSON serialization, data cleaning, image encoding, and kernel-specific data handling requirements. Provides essential data processing capabilities for kernel communication and display systems.
Functions for preparing data for JSON serialization and handling Jupyter-specific data formats.
def json_clean(obj):
"""
Clean objects for JSON serialization.
Recursively processes objects to ensure they can be safely
serialized to JSON, handling numpy arrays, dates, and other
non-JSON-serializable types.
Parameters:
- obj: Object to clean for JSON serialization
Returns:
Cleaned object that can be JSON serialized
"""
def encode_images(format_dict):
"""
Base64 encode images in display format dictionary.
Takes a display format dictionary and base64 encodes any
image data for transmission to Jupyter frontends.
Parameters:
- format_dict (dict): Display format dictionary potentially
containing image data
Returns:
dict: Format dictionary with base64 encoded images
"""Constants for detecting and handling different image formats.
# Base64 format detection signatures
PNG: str # PNG format signature for base64 detection
JPEG: str # JPEG format signature for base64 detection
GIF: str # GIF format signature for base64 detection
PDF: str # PDF format signature for base64 detectionFunctions for publishing data to subscribers through kernel communication channels.
def publish_data(data, metadata=None):
"""
Publish data to subscribers.
Publishes data through the kernel's data publishing mechanism,
making it available to connected frontends and other subscribers.
Parameters:
- data (dict): Data to publish
- metadata (dict, optional): Metadata for the published data
"""Class for managing data publication over ZMQ connections.
class ZMQDataPublisher:
"""
Publisher for data over ZMQ.
Manages publication of data to subscribers using ZeroMQ
messaging for real-time data distribution.
"""
def __init__(self, session, pub_socket):
"""
Initialize data publisher.
Parameters:
- session: Kernel session for message handling
- pub_socket: ZMQ socket for publishing
"""
def publish_data(self, data, metadata=None):
"""
Publish data to all subscribers.
Parameters:
- data (dict): Data to publish
- metadata (dict, optional): Metadata for the data
"""
def set_parent(self, parent):
"""
Set parent message for publication context.
Parameters:
- parent: Parent message for context
"""
# Publisher attributes
session: object # Kernel session
pub_socket: object # ZMQ publishing socket
parent_header: dict # Parent message headerfrom ipykernel.jsonutil import json_clean
import numpy as np
import datetime
import json
# Create complex data structure with non-JSON types
complex_data = {
'numpy_array': np.array([1, 2, 3, 4, 5]),
'numpy_float': np.float64(3.14159),
'numpy_int': np.int32(42),
'datetime': datetime.datetime.now(),
'date': datetime.date.today(),
'nested': {
'more_arrays': np.array([[1, 2], [3, 4]]),
'complex_num': complex(1, 2),
'bytes_data': b'binary data'
},
'normal_data': {
'string': 'hello',
'int': 123,
'float': 3.14,
'bool': True,
'list': [1, 2, 3],
'none': None
}
}
print("Original data types:")
print(f"numpy_array: {type(complex_data['numpy_array'])}")
print(f"datetime: {type(complex_data['datetime'])}")
# Clean data for JSON serialization
cleaned_data = json_clean(complex_data)
print("\nCleaned data types:")
print(f"numpy_array: {type(cleaned_data['numpy_array'])}")
print(f"datetime: {type(cleaned_data['datetime'])}")
# Verify it can be JSON serialized
json_string = json.dumps(cleaned_data, indent=2)
print(f"\nJSON serialization successful: {len(json_string)} characters")
# Verify roundtrip
roundtrip_data = json.loads(json_string)
print(f"Roundtrip successful: {type(roundtrip_data)}")from ipykernel.jsonutil import encode_images, PNG, JPEG
import base64
import io
from PIL import Image
import numpy as np
# Create sample image data
def create_sample_image():
"""Create a sample PIL image."""
# Create a simple gradient image
width, height = 200, 100
image = Image.new('RGB', (width, height))
pixels = image.load()
for x in range(width):
for y in range(height):
r = int(255 * x / width)
g = int(255 * y / height)
b = 128
pixels[x, y] = (r, g, b)
return image
def image_to_base64(image, format='PNG'):
"""Convert PIL image to base64 string."""
buffer = io.BytesIO()
image.save(buffer, format=format)
buffer.seek(0)
return base64.b64encode(buffer.getvalue()).decode()
# Create sample image
sample_image = create_sample_image()
# Create display format dictionary
format_dict = {
'text/plain': 'Sample Image',
'image/png': image_to_base64(sample_image, 'PNG'),
'image/jpeg': image_to_base64(sample_image, 'JPEG'),
'text/html': '<p>Sample image display</p>'
}
print("Original format dict keys:", list(format_dict.keys()))
# Encode images in the format dict
encoded_dict = encode_images(format_dict)
print("Encoded format dict keys:", list(encoded_dict.keys()))
print("PNG data length:", len(encoded_dict.get('image/png', '')))
print("JPEG data length:", len(encoded_dict.get('image/jpeg', '')))
# Check format signatures
png_data = encoded_dict.get('image/png', '')
if png_data.startswith(PNG):
print("PNG signature detected correctly")
jpeg_data = encoded_dict.get('image/jpeg', '')
if jpeg_data.startswith(JPEG):
print("JPEG signature detected correctly")from ipykernel.datapub import publish_data, ZMQDataPublisher
import time
import threading
# Mock session and socket for demonstration
class MockSession:
def send(self, stream, msg_type, content, **kwargs):
print(f"Published to {stream}: {msg_type}")
print(f"Content: {content}")
class MockSocket:
def send_multipart(self, msg_parts):
print(f"ZMQ send: {len(msg_parts)} parts")
# Create mock publisher
session = MockSession()
socket = MockSocket()
publisher = ZMQDataPublisher(session, socket)
# Publish simple data
simple_data = {
'timestamp': time.time(),
'sensor_reading': 23.5,
'status': 'active'
}
metadata = {
'source': 'temperature_sensor',
'units': 'celsius'
}
print("Publishing simple sensor data:")
publisher.publish_data(simple_data, metadata)
# Publish complex data
complex_data = {
'experiment_id': 'exp_001',
'measurements': [
{'time': 0.0, 'value': 1.0},
{'time': 0.1, 'value': 1.5},
{'time': 0.2, 'value': 2.0}
],
'parameters': {
'temperature': 298.15,
'pressure': 101325,
'humidity': 0.45
}
}
experiment_metadata = {
'researcher': 'Dr. Smith',
'lab': 'Physics Lab A',
'equipment': 'Spectrometer X1'
}
print("\nPublishing experiment data:")
publisher.publish_data(complex_data, experiment_metadata)from ipykernel.datapub import ZMQDataPublisher
import time
import threading
import random
import math
class DataStreamer:
"""Real-time data streaming using ZMQ publisher."""
def __init__(self, session, socket):
self.publisher = ZMQDataPublisher(session, socket)
self.streaming = False
self.stream_thread = None
def start_streaming(self, interval=1.0):
"""Start streaming data at specified interval."""
self.streaming = True
def stream_worker():
start_time = time.time()
while self.streaming:
current_time = time.time()
elapsed = current_time - start_time
# Generate sample data
data = {
'timestamp': current_time,
'elapsed_time': elapsed,
'sine_wave': math.sin(elapsed),
'cosine_wave': math.cos(elapsed),
'random_value': random.uniform(-1, 1),
'counter': int(elapsed / interval)
}
metadata = {
'stream_type': 'continuous',
'sample_rate': 1.0 / interval,
'data_source': 'synthetic_generator'
}
# Publish data
self.publisher.publish_data(data, metadata)
# Wait for next interval
time.sleep(interval)
self.stream_thread = threading.Thread(target=stream_worker)
self.stream_thread.daemon = True
self.stream_thread.start()
def stop_streaming(self):
"""Stop data streaming."""
self.streaming = False
if self.stream_thread:
self.stream_thread.join()
def publish_event(self, event_type, event_data):
"""Publish one-time event data."""
data = {
'event_type': event_type,
'timestamp': time.time(),
'data': event_data
}
metadata = {
'message_type': 'event',
'priority': 'high' if event_type == 'error' else 'normal'
}
self.publisher.publish_data(data, metadata)
# Usage example with mock objects
session = MockSession()
socket = MockSocket()
streamer = DataStreamer(session, socket)
print("Starting data stream...")
streamer.start_streaming(interval=0.5)
# Let it run for a few seconds
time.sleep(3)
# Publish some events
streamer.publish_event('calibration', {'sensor_id': 'temp_001', 'value': 25.0})
streamer.publish_event('warning', {'message': 'High temperature detected', 'value': 85.0})
# Continue streaming briefly
time.sleep(2)
# Stop streaming
print("Stopping data stream...")
streamer.stop_streaming()from ipykernel.jsonutil import json_clean
import numpy as np
import pandas as pd
import datetime
import json
class DataProcessor:
"""Process and clean data for kernel communication."""
def __init__(self):
self.processing_stats = {
'objects_processed': 0,
'conversions_made': 0,
'errors_encountered': 0
}
def process_dataframe(self, df):
"""Process pandas DataFrame for JSON serialization."""
try:
# Convert DataFrame to dict
data = {
'columns': df.columns.tolist(),
'index': df.index.tolist(),
'data': df.values.tolist(),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'shape': df.shape
}
# Clean the data
cleaned_data = json_clean(data)
self.processing_stats['objects_processed'] += 1
return cleaned_data
except Exception as e:
self.processing_stats['errors_encountered'] += 1
return {'error': str(e), 'type': 'dataframe_processing_error'}
def process_scientific_data(self, data):
"""Process scientific data with various formats."""
if isinstance(data, np.ndarray):
return self.process_numpy_array(data)
elif isinstance(data, pd.DataFrame):
return self.process_dataframe(data)
elif isinstance(data, dict):
return json_clean(data)
else:
return json_clean(data)
def process_numpy_array(self, arr):
"""Process numpy array with metadata."""
return {
'data': arr.tolist(),
'shape': arr.shape,
'dtype': str(arr.dtype),
'size': arr.size,
'ndim': arr.ndim,
'metadata': {
'min': float(np.min(arr)) if arr.size > 0 else None,
'max': float(np.max(arr)) if arr.size > 0 else None,
'mean': float(np.mean(arr)) if arr.size > 0 else None
}
}
def create_report(self):
"""Create processing report."""
return {
'processing_statistics': self.processing_stats,
'timestamp': datetime.datetime.now().isoformat(),
'processor_version': '1.0'
}
# Example usage
processor = DataProcessor()
# Create sample scientific data
numpy_data = np.random.normal(0, 1, (100, 3))
df_data = pd.DataFrame({
'experiment': range(50),
'temperature': np.random.normal(298, 5, 50),
'pressure': np.random.normal(101325, 1000, 50),
'result': np.random.choice(['success', 'failure'], 50)
})
# Process different data types
print("Processing numpy array...")
numpy_result = processor.process_scientific_data(numpy_data)
print(f"Numpy array processed: shape {numpy_result['shape']}")
print("\nProcessing DataFrame...")
df_result = processor.process_scientific_data(df_data)
print(f"DataFrame processed: {df_result['shape']} shape")
# Create comprehensive report
report = processor.create_report()
report_json = json.dumps(json_clean(report), indent=2)
print(f"\nProcessing report generated: {len(report_json)} characters")
print(f"Objects processed: {report['processing_statistics']['objects_processed']}")Install with Tessl CLI
npx tessl i tessl/pypi-ipykernel