or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication-framework.mdconnection-management.mdcore-kernel.mddata-utilities.mdgui-integration.mdin-process-kernels.mdindex.mdio-streaming.mdkernel-application.mdkernel-embedding.mdmatplotlib-integration.md

data-utilities.mddocs/

0

# Data Utilities

1

2

Utilities for JSON serialization, data cleaning, image encoding, and kernel-specific data handling requirements. Provides essential data processing capabilities for kernel communication and display systems.

3

4

## Capabilities

5

6

### JSON Utilities

7

8

Functions for preparing data for JSON serialization and handling Jupyter-specific data formats.

9

10

```python { .api }

11

def json_clean(obj):

12

"""

13

Clean objects for JSON serialization.

14

15

Recursively processes objects to ensure they can be safely

16

serialized to JSON, handling numpy arrays, dates, and other

17

non-JSON-serializable types.

18

19

Parameters:

20

- obj: Object to clean for JSON serialization

21

22

Returns:

23

Cleaned object that can be JSON serialized

24

"""

25

26

def encode_images(format_dict):

27

"""

28

Base64 encode images in display format dictionary.

29

30

Takes a display format dictionary and base64 encodes any

31

image data for transmission to Jupyter frontends.

32

33

Parameters:

34

- format_dict (dict): Display format dictionary potentially

35

containing image data

36

37

Returns:

38

dict: Format dictionary with base64 encoded images

39

"""

40

```

41

42

### Image Format Detection

43

44

Constants for detecting and handling different image formats.

45

46

```python { .api }

47

# Base64 format detection signatures

48

PNG: str # PNG format signature for base64 detection

49

JPEG: str # JPEG format signature for base64 detection

50

GIF: str # GIF format signature for base64 detection

51

PDF: str # PDF format signature for base64 detection

52

```

53

54

### Data Publishing

55

56

Functions for publishing data to subscribers through kernel communication channels.

57

58

```python { .api }

59

def publish_data(data, metadata=None):

60

"""

61

Publish data to subscribers.

62

63

Publishes data through the kernel's data publishing mechanism,

64

making it available to connected frontends and other subscribers.

65

66

Parameters:

67

- data (dict): Data to publish

68

- metadata (dict, optional): Metadata for the published data

69

"""

70

```

71

72

### Data Publisher Class

73

74

Class for managing data publication over ZMQ connections.

75

76

```python { .api }

77

class ZMQDataPublisher:

78

"""

79

Publisher for data over ZMQ.

80

81

Manages publication of data to subscribers using ZeroMQ

82

messaging for real-time data distribution.

83

"""

84

85

def __init__(self, session, pub_socket):

86

"""

87

Initialize data publisher.

88

89

Parameters:

90

- session: Kernel session for message handling

91

- pub_socket: ZMQ socket for publishing

92

"""

93

94

def publish_data(self, data, metadata=None):

95

"""

96

Publish data to all subscribers.

97

98

Parameters:

99

- data (dict): Data to publish

100

- metadata (dict, optional): Metadata for the data

101

"""

102

103

def set_parent(self, parent):

104

"""

105

Set parent message for publication context.

106

107

Parameters:

108

- parent: Parent message for context

109

"""

110

111

# Publisher attributes

112

session: object # Kernel session

113

pub_socket: object # ZMQ publishing socket

114

parent_header: dict # Parent message header

115

```

116

117

## Usage Examples

118

119

### JSON Data Cleaning

120

121

```python

122

from ipykernel.jsonutil import json_clean

123

import numpy as np

124

import datetime

125

import json

126

127

# Create complex data structure with non-JSON types

128

complex_data = {

129

'numpy_array': np.array([1, 2, 3, 4, 5]),

130

'numpy_float': np.float64(3.14159),

131

'numpy_int': np.int32(42),

132

'datetime': datetime.datetime.now(),

133

'date': datetime.date.today(),

134

'nested': {

135

'more_arrays': np.array([[1, 2], [3, 4]]),

136

'complex_num': complex(1, 2),

137

'bytes_data': b'binary data'

138

},

139

'normal_data': {

140

'string': 'hello',

141

'int': 123,

142

'float': 3.14,

143

'bool': True,

144

'list': [1, 2, 3],

145

'none': None

146

}

147

}

148

149

print("Original data types:")

150

print(f"numpy_array: {type(complex_data['numpy_array'])}")

151

print(f"datetime: {type(complex_data['datetime'])}")

152

153

# Clean data for JSON serialization

154

cleaned_data = json_clean(complex_data)

155

156

print("\nCleaned data types:")

157

print(f"numpy_array: {type(cleaned_data['numpy_array'])}")

158

print(f"datetime: {type(cleaned_data['datetime'])}")

159

160

# Verify it can be JSON serialized

161

json_string = json.dumps(cleaned_data, indent=2)

162

print(f"\nJSON serialization successful: {len(json_string)} characters")

163

164

# Verify roundtrip

165

roundtrip_data = json.loads(json_string)

166

print(f"Roundtrip successful: {type(roundtrip_data)}")

167

```

168

169

### Image Encoding

170

171

```python

172

from ipykernel.jsonutil import encode_images, PNG, JPEG

173

import base64

174

import io

175

from PIL import Image

176

import numpy as np

177

178

# Create sample image data

179

def create_sample_image():

180

"""Create a sample PIL image."""

181

# Create a simple gradient image

182

width, height = 200, 100

183

image = Image.new('RGB', (width, height))

184

pixels = image.load()

185

186

for x in range(width):

187

for y in range(height):

188

r = int(255 * x / width)

189

g = int(255 * y / height)

190

b = 128

191

pixels[x, y] = (r, g, b)

192

193

return image

194

195

def image_to_base64(image, format='PNG'):

196

"""Convert PIL image to base64 string."""

197

buffer = io.BytesIO()

198

image.save(buffer, format=format)

199

buffer.seek(0)

200

return base64.b64encode(buffer.getvalue()).decode()

201

202

# Create sample image

203

sample_image = create_sample_image()

204

205

# Create display format dictionary

206

format_dict = {

207

'text/plain': 'Sample Image',

208

'image/png': image_to_base64(sample_image, 'PNG'),

209

'image/jpeg': image_to_base64(sample_image, 'JPEG'),

210

'text/html': '<p>Sample image display</p>'

211

}

212

213

print("Original format dict keys:", list(format_dict.keys()))

214

215

# Encode images in the format dict

216

encoded_dict = encode_images(format_dict)

217

218

print("Encoded format dict keys:", list(encoded_dict.keys()))

219

print("PNG data length:", len(encoded_dict.get('image/png', '')))

220

print("JPEG data length:", len(encoded_dict.get('image/jpeg', '')))

221

222

# Check format signatures

223

png_data = encoded_dict.get('image/png', '')

224

if png_data.startswith(PNG):

225

print("PNG signature detected correctly")

226

227

jpeg_data = encoded_dict.get('image/jpeg', '')

228

if jpeg_data.startswith(JPEG):

229

print("JPEG signature detected correctly")

230

```

231

232

### Data Publishing

233

234

```python

235

from ipykernel.datapub import publish_data, ZMQDataPublisher

236

import time

237

import threading

238

239

# Mock session and socket for demonstration

240

class MockSession:

241

def send(self, stream, msg_type, content, **kwargs):

242

print(f"Published to {stream}: {msg_type}")

243

print(f"Content: {content}")

244

245

class MockSocket:

246

def send_multipart(self, msg_parts):

247

print(f"ZMQ send: {len(msg_parts)} parts")

248

249

# Create mock publisher

250

session = MockSession()

251

socket = MockSocket()

252

publisher = ZMQDataPublisher(session, socket)

253

254

# Publish simple data

255

simple_data = {

256

'timestamp': time.time(),

257

'sensor_reading': 23.5,

258

'status': 'active'

259

}

260

261

metadata = {

262

'source': 'temperature_sensor',

263

'units': 'celsius'

264

}

265

266

print("Publishing simple sensor data:")

267

publisher.publish_data(simple_data, metadata)

268

269

# Publish complex data

270

complex_data = {

271

'experiment_id': 'exp_001',

272

'measurements': [

273

{'time': 0.0, 'value': 1.0},

274

{'time': 0.1, 'value': 1.5},

275

{'time': 0.2, 'value': 2.0}

276

],

277

'parameters': {

278

'temperature': 298.15,

279

'pressure': 101325,

280

'humidity': 0.45

281

}

282

}

283

284

experiment_metadata = {

285

'researcher': 'Dr. Smith',

286

'lab': 'Physics Lab A',

287

'equipment': 'Spectrometer X1'

288

}

289

290

print("\nPublishing experiment data:")

291

publisher.publish_data(complex_data, experiment_metadata)

292

```

293

294

### Real-time Data Streaming

295

296

```python

297

from ipykernel.datapub import ZMQDataPublisher

298

import time

299

import threading

300

import random

301

import math

302

303

class DataStreamer:

304

"""Real-time data streaming using ZMQ publisher."""

305

306

def __init__(self, session, socket):

307

self.publisher = ZMQDataPublisher(session, socket)

308

self.streaming = False

309

self.stream_thread = None

310

311

def start_streaming(self, interval=1.0):

312

"""Start streaming data at specified interval."""

313

self.streaming = True

314

315

def stream_worker():

316

start_time = time.time()

317

318

while self.streaming:

319

current_time = time.time()

320

elapsed = current_time - start_time

321

322

# Generate sample data

323

data = {

324

'timestamp': current_time,

325

'elapsed_time': elapsed,

326

'sine_wave': math.sin(elapsed),

327

'cosine_wave': math.cos(elapsed),

328

'random_value': random.uniform(-1, 1),

329

'counter': int(elapsed / interval)

330

}

331

332

metadata = {

333

'stream_type': 'continuous',

334

'sample_rate': 1.0 / interval,

335

'data_source': 'synthetic_generator'

336

}

337

338

# Publish data

339

self.publisher.publish_data(data, metadata)

340

341

# Wait for next interval

342

time.sleep(interval)

343

344

self.stream_thread = threading.Thread(target=stream_worker)

345

self.stream_thread.daemon = True

346

self.stream_thread.start()

347

348

def stop_streaming(self):

349

"""Stop data streaming."""

350

self.streaming = False

351

if self.stream_thread:

352

self.stream_thread.join()

353

354

def publish_event(self, event_type, event_data):

355

"""Publish one-time event data."""

356

data = {

357

'event_type': event_type,

358

'timestamp': time.time(),

359

'data': event_data

360

}

361

362

metadata = {

363

'message_type': 'event',

364

'priority': 'high' if event_type == 'error' else 'normal'

365

}

366

367

self.publisher.publish_data(data, metadata)

368

369

# Usage example with mock objects

370

session = MockSession()

371

socket = MockSocket()

372

373

streamer = DataStreamer(session, socket)

374

375

print("Starting data stream...")

376

streamer.start_streaming(interval=0.5)

377

378

# Let it run for a few seconds

379

time.sleep(3)

380

381

# Publish some events

382

streamer.publish_event('calibration', {'sensor_id': 'temp_001', 'value': 25.0})

383

streamer.publish_event('warning', {'message': 'High temperature detected', 'value': 85.0})

384

385

# Continue streaming briefly

386

time.sleep(2)

387

388

# Stop streaming

389

print("Stopping data stream...")

390

streamer.stop_streaming()

391

```

392

393

### Data Validation and Cleaning Pipeline

394

395

```python

396

from ipykernel.jsonutil import json_clean

397

import numpy as np

398

import pandas as pd

399

import datetime

400

import json

401

402

class DataProcessor:

403

"""Process and clean data for kernel communication."""

404

405

def __init__(self):

406

self.processing_stats = {

407

'objects_processed': 0,

408

'conversions_made': 0,

409

'errors_encountered': 0

410

}

411

412

def process_dataframe(self, df):

413

"""Process pandas DataFrame for JSON serialization."""

414

try:

415

# Convert DataFrame to dict

416

data = {

417

'columns': df.columns.tolist(),

418

'index': df.index.tolist(),

419

'data': df.values.tolist(),

420

'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},

421

'shape': df.shape

422

}

423

424

# Clean the data

425

cleaned_data = json_clean(data)

426

self.processing_stats['objects_processed'] += 1

427

428

return cleaned_data

429

430

except Exception as e:

431

self.processing_stats['errors_encountered'] += 1

432

return {'error': str(e), 'type': 'dataframe_processing_error'}

433

434

def process_scientific_data(self, data):

435

"""Process scientific data with various formats."""

436

if isinstance(data, np.ndarray):

437

return self.process_numpy_array(data)

438

elif isinstance(data, pd.DataFrame):

439

return self.process_dataframe(data)

440

elif isinstance(data, dict):

441

return json_clean(data)

442

else:

443

return json_clean(data)

444

445

def process_numpy_array(self, arr):

446

"""Process numpy array with metadata."""

447

return {

448

'data': arr.tolist(),

449

'shape': arr.shape,

450

'dtype': str(arr.dtype),

451

'size': arr.size,

452

'ndim': arr.ndim,

453

'metadata': {

454

'min': float(np.min(arr)) if arr.size > 0 else None,

455

'max': float(np.max(arr)) if arr.size > 0 else None,

456

'mean': float(np.mean(arr)) if arr.size > 0 else None

457

}

458

}

459

460

def create_report(self):

461

"""Create processing report."""

462

return {

463

'processing_statistics': self.processing_stats,

464

'timestamp': datetime.datetime.now().isoformat(),

465

'processor_version': '1.0'

466

}

467

468

# Example usage

469

processor = DataProcessor()

470

471

# Create sample scientific data

472

numpy_data = np.random.normal(0, 1, (100, 3))

473

df_data = pd.DataFrame({

474

'experiment': range(50),

475

'temperature': np.random.normal(298, 5, 50),

476

'pressure': np.random.normal(101325, 1000, 50),

477

'result': np.random.choice(['success', 'failure'], 50)

478

})

479

480

# Process different data types

481

print("Processing numpy array...")

482

numpy_result = processor.process_scientific_data(numpy_data)

483

print(f"Numpy array processed: shape {numpy_result['shape']}")

484

485

print("\nProcessing DataFrame...")

486

df_result = processor.process_scientific_data(df_data)

487

print(f"DataFrame processed: {df_result['shape']} shape")

488

489

# Create comprehensive report

490

report = processor.create_report()

491

report_json = json.dumps(json_clean(report), indent=2)

492

print(f"\nProcessing report generated: {len(report_json)} characters")

493

print(f"Objects processed: {report['processing_statistics']['objects_processed']}")

494

```