0
# Data Utilities
1
2
Utilities for JSON serialization, data cleaning, image encoding, and kernel-specific data handling requirements. Provides essential data processing capabilities for kernel communication and display systems.
3
4
## Capabilities
5
6
### JSON Utilities
7
8
Functions for preparing data for JSON serialization and handling Jupyter-specific data formats.
9
10
```python { .api }
11
def json_clean(obj):
12
"""
13
Clean objects for JSON serialization.
14
15
Recursively processes objects to ensure they can be safely
16
serialized to JSON, handling numpy arrays, dates, and other
17
non-JSON-serializable types.
18
19
Parameters:
20
- obj: Object to clean for JSON serialization
21
22
Returns:
23
Cleaned object that can be JSON serialized
24
"""
25
26
def encode_images(format_dict):
27
"""
28
Base64 encode images in display format dictionary.
29
30
Takes a display format dictionary and base64 encodes any
31
image data for transmission to Jupyter frontends.
32
33
Parameters:
34
- format_dict (dict): Display format dictionary potentially
35
containing image data
36
37
Returns:
38
dict: Format dictionary with base64 encoded images
39
"""
40
```
41
42
### Image Format Detection
43
44
Constants for detecting and handling different image formats.
45
46
```python { .api }
47
# Base64 format detection signatures
48
PNG: str # PNG format signature for base64 detection
49
JPEG: str # JPEG format signature for base64 detection
50
GIF: str # GIF format signature for base64 detection
51
PDF: str # PDF format signature for base64 detection
52
```
53
54
### Data Publishing
55
56
Functions for publishing data to subscribers through kernel communication channels.
57
58
```python { .api }
59
def publish_data(data, metadata=None):
60
"""
61
Publish data to subscribers.
62
63
Publishes data through the kernel's data publishing mechanism,
64
making it available to connected frontends and other subscribers.
65
66
Parameters:
67
- data (dict): Data to publish
68
- metadata (dict, optional): Metadata for the published data
69
"""
70
```
71
72
### Data Publisher Class
73
74
Class for managing data publication over ZMQ connections.
75
76
```python { .api }
77
class ZMQDataPublisher:
78
"""
79
Publisher for data over ZMQ.
80
81
Manages publication of data to subscribers using ZeroMQ
82
messaging for real-time data distribution.
83
"""
84
85
def __init__(self, session, pub_socket):
86
"""
87
Initialize data publisher.
88
89
Parameters:
90
- session: Kernel session for message handling
91
- pub_socket: ZMQ socket for publishing
92
"""
93
94
def publish_data(self, data, metadata=None):
95
"""
96
Publish data to all subscribers.
97
98
Parameters:
99
- data (dict): Data to publish
100
- metadata (dict, optional): Metadata for the data
101
"""
102
103
def set_parent(self, parent):
104
"""
105
Set parent message for publication context.
106
107
Parameters:
108
- parent: Parent message for context
109
"""
110
111
# Publisher attributes
112
session: object # Kernel session
113
pub_socket: object # ZMQ publishing socket
114
parent_header: dict # Parent message header
115
```
116
117
## Usage Examples
118
119
### JSON Data Cleaning
120
121
```python
122
from ipykernel.jsonutil import json_clean
123
import numpy as np
124
import datetime
125
import json
126
127
# Create complex data structure with non-JSON types
128
complex_data = {
129
'numpy_array': np.array([1, 2, 3, 4, 5]),
130
'numpy_float': np.float64(3.14159),
131
'numpy_int': np.int32(42),
132
'datetime': datetime.datetime.now(),
133
'date': datetime.date.today(),
134
'nested': {
135
'more_arrays': np.array([[1, 2], [3, 4]]),
136
'complex_num': complex(1, 2),
137
'bytes_data': b'binary data'
138
},
139
'normal_data': {
140
'string': 'hello',
141
'int': 123,
142
'float': 3.14,
143
'bool': True,
144
'list': [1, 2, 3],
145
'none': None
146
}
147
}
148
149
print("Original data types:")
150
print(f"numpy_array: {type(complex_data['numpy_array'])}")
151
print(f"datetime: {type(complex_data['datetime'])}")
152
153
# Clean data for JSON serialization
154
cleaned_data = json_clean(complex_data)
155
156
print("\nCleaned data types:")
157
print(f"numpy_array: {type(cleaned_data['numpy_array'])}")
158
print(f"datetime: {type(cleaned_data['datetime'])}")
159
160
# Verify it can be JSON serialized
161
json_string = json.dumps(cleaned_data, indent=2)
162
print(f"\nJSON serialization successful: {len(json_string)} characters")
163
164
# Verify roundtrip
165
roundtrip_data = json.loads(json_string)
166
print(f"Roundtrip successful: {type(roundtrip_data)}")
167
```
168
169
### Image Encoding
170
171
```python
172
from ipykernel.jsonutil import encode_images, PNG, JPEG
173
import base64
174
import io
175
from PIL import Image
176
import numpy as np
177
178
# Create sample image data
179
def create_sample_image():
180
"""Create a sample PIL image."""
181
# Create a simple gradient image
182
width, height = 200, 100
183
image = Image.new('RGB', (width, height))
184
pixels = image.load()
185
186
for x in range(width):
187
for y in range(height):
188
r = int(255 * x / width)
189
g = int(255 * y / height)
190
b = 128
191
pixels[x, y] = (r, g, b)
192
193
return image
194
195
def image_to_base64(image, format='PNG'):
196
"""Convert PIL image to base64 string."""
197
buffer = io.BytesIO()
198
image.save(buffer, format=format)
199
buffer.seek(0)
200
return base64.b64encode(buffer.getvalue()).decode()
201
202
# Create sample image
203
sample_image = create_sample_image()
204
205
# Create display format dictionary
206
format_dict = {
207
'text/plain': 'Sample Image',
208
'image/png': image_to_base64(sample_image, 'PNG'),
209
'image/jpeg': image_to_base64(sample_image, 'JPEG'),
210
'text/html': '<p>Sample image display</p>'
211
}
212
213
print("Original format dict keys:", list(format_dict.keys()))
214
215
# Encode images in the format dict
216
encoded_dict = encode_images(format_dict)
217
218
print("Encoded format dict keys:", list(encoded_dict.keys()))
219
print("PNG data length:", len(encoded_dict.get('image/png', '')))
220
print("JPEG data length:", len(encoded_dict.get('image/jpeg', '')))
221
222
# Check format signatures
223
png_data = encoded_dict.get('image/png', '')
224
if png_data.startswith(PNG):
225
print("PNG signature detected correctly")
226
227
jpeg_data = encoded_dict.get('image/jpeg', '')
228
if jpeg_data.startswith(JPEG):
229
print("JPEG signature detected correctly")
230
```
231
232
### Data Publishing
233
234
```python
235
from ipykernel.datapub import publish_data, ZMQDataPublisher
236
import time
237
import threading
238
239
# Mock session and socket for demonstration
240
class MockSession:
241
def send(self, stream, msg_type, content, **kwargs):
242
print(f"Published to {stream}: {msg_type}")
243
print(f"Content: {content}")
244
245
class MockSocket:
246
def send_multipart(self, msg_parts):
247
print(f"ZMQ send: {len(msg_parts)} parts")
248
249
# Create mock publisher
250
session = MockSession()
251
socket = MockSocket()
252
publisher = ZMQDataPublisher(session, socket)
253
254
# Publish simple data
255
simple_data = {
256
'timestamp': time.time(),
257
'sensor_reading': 23.5,
258
'status': 'active'
259
}
260
261
metadata = {
262
'source': 'temperature_sensor',
263
'units': 'celsius'
264
}
265
266
print("Publishing simple sensor data:")
267
publisher.publish_data(simple_data, metadata)
268
269
# Publish complex data
270
complex_data = {
271
'experiment_id': 'exp_001',
272
'measurements': [
273
{'time': 0.0, 'value': 1.0},
274
{'time': 0.1, 'value': 1.5},
275
{'time': 0.2, 'value': 2.0}
276
],
277
'parameters': {
278
'temperature': 298.15,
279
'pressure': 101325,
280
'humidity': 0.45
281
}
282
}
283
284
experiment_metadata = {
285
'researcher': 'Dr. Smith',
286
'lab': 'Physics Lab A',
287
'equipment': 'Spectrometer X1'
288
}
289
290
print("\nPublishing experiment data:")
291
publisher.publish_data(complex_data, experiment_metadata)
292
```
293
294
### Real-time Data Streaming
295
296
```python
297
from ipykernel.datapub import ZMQDataPublisher
298
import time
299
import threading
300
import random
301
import math
302
303
class DataStreamer:
304
"""Real-time data streaming using ZMQ publisher."""
305
306
def __init__(self, session, socket):
307
self.publisher = ZMQDataPublisher(session, socket)
308
self.streaming = False
309
self.stream_thread = None
310
311
def start_streaming(self, interval=1.0):
312
"""Start streaming data at specified interval."""
313
self.streaming = True
314
315
def stream_worker():
316
start_time = time.time()
317
318
while self.streaming:
319
current_time = time.time()
320
elapsed = current_time - start_time
321
322
# Generate sample data
323
data = {
324
'timestamp': current_time,
325
'elapsed_time': elapsed,
326
'sine_wave': math.sin(elapsed),
327
'cosine_wave': math.cos(elapsed),
328
'random_value': random.uniform(-1, 1),
329
'counter': int(elapsed / interval)
330
}
331
332
metadata = {
333
'stream_type': 'continuous',
334
'sample_rate': 1.0 / interval,
335
'data_source': 'synthetic_generator'
336
}
337
338
# Publish data
339
self.publisher.publish_data(data, metadata)
340
341
# Wait for next interval
342
time.sleep(interval)
343
344
self.stream_thread = threading.Thread(target=stream_worker)
345
self.stream_thread.daemon = True
346
self.stream_thread.start()
347
348
def stop_streaming(self):
349
"""Stop data streaming."""
350
self.streaming = False
351
if self.stream_thread:
352
self.stream_thread.join()
353
354
def publish_event(self, event_type, event_data):
355
"""Publish one-time event data."""
356
data = {
357
'event_type': event_type,
358
'timestamp': time.time(),
359
'data': event_data
360
}
361
362
metadata = {
363
'message_type': 'event',
364
'priority': 'high' if event_type == 'error' else 'normal'
365
}
366
367
self.publisher.publish_data(data, metadata)
368
369
# Usage example with mock objects
370
session = MockSession()
371
socket = MockSocket()
372
373
streamer = DataStreamer(session, socket)
374
375
print("Starting data stream...")
376
streamer.start_streaming(interval=0.5)
377
378
# Let it run for a few seconds
379
time.sleep(3)
380
381
# Publish some events
382
streamer.publish_event('calibration', {'sensor_id': 'temp_001', 'value': 25.0})
383
streamer.publish_event('warning', {'message': 'High temperature detected', 'value': 85.0})
384
385
# Continue streaming briefly
386
time.sleep(2)
387
388
# Stop streaming
389
print("Stopping data stream...")
390
streamer.stop_streaming()
391
```
392
393
### Data Validation and Cleaning Pipeline
394
395
```python
396
from ipykernel.jsonutil import json_clean
397
import numpy as np
398
import pandas as pd
399
import datetime
400
import json
401
402
class DataProcessor:
403
"""Process and clean data for kernel communication."""
404
405
def __init__(self):
406
self.processing_stats = {
407
'objects_processed': 0,
408
'conversions_made': 0,
409
'errors_encountered': 0
410
}
411
412
def process_dataframe(self, df):
413
"""Process pandas DataFrame for JSON serialization."""
414
try:
415
# Convert DataFrame to dict
416
data = {
417
'columns': df.columns.tolist(),
418
'index': df.index.tolist(),
419
'data': df.values.tolist(),
420
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
421
'shape': df.shape
422
}
423
424
# Clean the data
425
cleaned_data = json_clean(data)
426
self.processing_stats['objects_processed'] += 1
427
428
return cleaned_data
429
430
except Exception as e:
431
self.processing_stats['errors_encountered'] += 1
432
return {'error': str(e), 'type': 'dataframe_processing_error'}
433
434
def process_scientific_data(self, data):
435
"""Process scientific data with various formats."""
436
if isinstance(data, np.ndarray):
437
return self.process_numpy_array(data)
438
elif isinstance(data, pd.DataFrame):
439
return self.process_dataframe(data)
440
elif isinstance(data, dict):
441
return json_clean(data)
442
else:
443
return json_clean(data)
444
445
def process_numpy_array(self, arr):
446
"""Process numpy array with metadata."""
447
return {
448
'data': arr.tolist(),
449
'shape': arr.shape,
450
'dtype': str(arr.dtype),
451
'size': arr.size,
452
'ndim': arr.ndim,
453
'metadata': {
454
'min': float(np.min(arr)) if arr.size > 0 else None,
455
'max': float(np.max(arr)) if arr.size > 0 else None,
456
'mean': float(np.mean(arr)) if arr.size > 0 else None
457
}
458
}
459
460
def create_report(self):
461
"""Create processing report."""
462
return {
463
'processing_statistics': self.processing_stats,
464
'timestamp': datetime.datetime.now().isoformat(),
465
'processor_version': '1.0'
466
}
467
468
# Example usage
469
processor = DataProcessor()
470
471
# Create sample scientific data
472
numpy_data = np.random.normal(0, 1, (100, 3))
473
df_data = pd.DataFrame({
474
'experiment': range(50),
475
'temperature': np.random.normal(298, 5, 50),
476
'pressure': np.random.normal(101325, 1000, 50),
477
'result': np.random.choice(['success', 'failure'], 50)
478
})
479
480
# Process different data types
481
print("Processing numpy array...")
482
numpy_result = processor.process_scientific_data(numpy_data)
483
print(f"Numpy array processed: shape {numpy_result['shape']}")
484
485
print("\nProcessing DataFrame...")
486
df_result = processor.process_scientific_data(df_data)
487
print(f"DataFrame processed: {df_result['shape']} shape")
488
489
# Create comprehensive report
490
report = processor.create_report()
491
report_json = json.dumps(json_clean(report), indent=2)
492
print(f"\nProcessing report generated: {len(report_json)} characters")
493
print(f"Objects processed: {report['processing_statistics']['objects_processed']}")
494
```