or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmemory-caching.mdparallel-processing.mdpersistence-serialization.mdutilities-infrastructure.md

utilities-infrastructure.mddocs/

0

# Utilities and Infrastructure

1

2

Core utilities including object hashing, logging with timing, backend infrastructure, and compression management for extending joblib's functionality and integrating with scientific computing workflows.

3

4

## Capabilities

5

6

### Object Hashing

7

8

Fast hash calculation for Python objects to create unique identifiers, used internally by Memory caching and available for custom caching implementations.

9

10

```python { .api }

11

def hash(obj, hash_name="md5", coerce_mmap=False):

12

"""

13

Quick calculation of hash to identify Python objects uniquely.

14

15

Parameters:

16

- obj: any Python object to hash

17

- hash_name: str, hashing algorithm ("md5" or "sha1")

18

- coerce_mmap: bool, treat memory-mapped arrays as regular arrays

19

20

Returns:

21

str: hexadecimal hash string

22

23

Raises:

24

ValueError: if hash_name is not supported

25

"""

26

```

27

28

**Usage Examples:**

29

30

```python

31

from joblib import hash

32

import numpy as np

33

34

# Hash simple objects

35

hash_int = hash(42)

36

hash_str = hash("hello world")

37

hash_list = hash([1, 2, 3, 4])

38

39

print(f"Integer hash: {hash_int}")

40

print(f"String hash: {hash_str}")

41

42

# Hash NumPy arrays

43

array = np.random.random(1000)

44

array_hash = hash(array)

45

print(f"Array hash: {array_hash}")

46

47

# Different arrays with same content have same hash

48

array2 = array.copy()

49

assert hash(array) == hash(array2)

50

51

# Different hash algorithms

52

md5_hash = hash(array, hash_name="md5") # Default

53

sha1_hash = hash(array, hash_name="sha1") # More secure

54

55

# Memory-mapped arrays

56

mmap_array = np.memmap('temp.dat', dtype='float32', mode='w+', shape=(1000,))

57

mmap_array[:] = array[:]

58

59

# Hash memory-mapped array as regular array

60

regular_hash = hash(mmap_array, coerce_mmap=True)

61

mmap_hash = hash(mmap_array, coerce_mmap=False)

62

63

# Complex objects

64

complex_obj = {

65

'data': np.random.random((100, 50)),

66

'params': {'learning_rate': 0.01, 'epochs': 100},

67

'metadata': ['training', 'validation']

68

}

69

complex_hash = hash(complex_obj)

70

```

71

72

### Non-Picklable Object Wrapping

73

74

Utilities for handling non-serializable objects in parallel processing contexts.

75

76

```python { .api }

77

def wrap_non_picklable_objects(obj, keep_wrapper=True):

78

"""

79

Wrap non-picklable objects to enable parallel processing.

80

81

Parameters:

82

- obj: object to wrap (may contain non-picklable elements)

83

- keep_wrapper: bool, whether to keep wrapper for round-trip compatibility

84

85

Returns:

86

Wrapped object that can be pickled and sent to parallel workers

87

"""

88

```

89

90

**Usage Examples:**

91

92

```python

93

from joblib import wrap_non_picklable_objects, Parallel, delayed

94

import sqlite3

95

96

# Example with non-picklable database connection

97

def create_db_connection():

98

return sqlite3.connect(':memory:')

99

100

def process_with_connection(data, connection):

101

# Use database connection in processing

102

cursor = connection.cursor()

103

cursor.execute("CREATE TABLE IF NOT EXISTS temp (value INTEGER)")

104

cursor.execute("INSERT INTO temp VALUES (?)", (data,))

105

return cursor.fetchall()

106

107

# Wrap non-picklable connection

108

connection = create_db_connection()

109

wrapped_connection = wrap_non_picklable_objects(connection)

110

111

# Use in parallel processing (connection will be recreated in each worker)

112

data_items = [1, 2, 3, 4, 5]

113

results = Parallel(n_jobs=2)(

114

delayed(process_with_connection)(item, wrapped_connection)

115

for item in data_items

116

)

117

118

# Custom objects with lambda functions or other non-picklable elements

119

class ProcessorWithLambda:

120

def __init__(self):

121

self.transform = lambda x: x ** 2 # Non-picklable lambda

122

123

processor = ProcessorWithLambda()

124

wrapped_processor = wrap_non_picklable_objects(processor)

125

```

126

127

### Logging and Timing

128

129

Logging utilities with built-in timing capabilities for monitoring performance and debugging computational workflows.

130

131

```python { .api }

132

class Logger:

133

def __init__(self, depth=3, name=None):

134

"""

135

Base logging class with formatting and timing capabilities.

136

137

Parameters:

138

- depth: int, call stack depth for logging context

139

- name: str, logger name (None for auto-generation)

140

"""

141

142

def warn(self, msg):

143

"""

144

Log a warning message.

145

146

Parameters:

147

- msg: str, warning message to log

148

"""

149

150

def info(self, msg):

151

"""

152

Log an informational message.

153

154

Parameters:

155

- msg: str, info message to log

156

"""

157

158

def debug(self, msg):

159

"""

160

Log a debug message.

161

162

Parameters:

163

- msg: str, debug message to log

164

"""

165

166

def format(self, obj, indent=0):

167

"""

168

Return formatted representation of object.

169

170

Parameters:

171

- obj: object to format

172

- indent: int, indentation level

173

174

Returns:

175

str: formatted object representation

176

"""

177

178

class PrintTime:

179

def __init__(self, logfile=None, logdir=None):

180

"""

181

Print and log messages with execution time tracking.

182

183

Parameters:

184

- logfile: str, path to log file (None for stdout)

185

- logdir: str, directory for log files (None for current dir)

186

"""

187

```

188

189

**Usage Examples:**

190

191

```python

192

from joblib import Logger, PrintTime

193

import time

194

import numpy as np

195

196

# Basic logging

197

logger = Logger(name="DataProcessor")

198

199

def process_data(data):

200

logger.info(f"Processing {len(data)} items")

201

202

if len(data) == 0:

203

logger.warn("Empty data provided")

204

return []

205

206

logger.debug(f"Data type: {type(data)}")

207

208

# Simulate processing

209

result = [x * 2 for x in data]

210

211

logger.info("Processing complete")

212

return result

213

214

# Process with logging

215

data = [1, 2, 3, 4, 5]

216

result = process_data(data)

217

218

# Time tracking with PrintTime

219

timer = PrintTime()

220

221

print("Starting computation...")

222

start_time = time.time()

223

224

# Simulate expensive computation

225

large_array = np.random.random((10000, 1000))

226

result = np.mean(large_array, axis=1)

227

228

elapsed = time.time() - start_time

229

print(f"Computation completed in {elapsed:.2f} seconds")

230

231

# Custom formatting

232

logger = Logger()

233

complex_data = {

234

'arrays': [np.random.random(100) for _ in range(3)],

235

'config': {'param1': 0.1, 'param2': 'test'},

236

'metadata': {'version': 1, 'created': time.time()}

237

}

238

239

formatted_output = logger.format(complex_data, indent=2)

240

print("Complex data structure:")

241

print(formatted_output)

242

```

243

244

### Backend Infrastructure

245

246

Abstract base classes for implementing custom parallel execution and storage backends.

247

248

```python { .api }

249

class ParallelBackendBase:

250

"""

251

Abstract base class for parallel execution backends.

252

253

Subclass this to implement custom parallel processing backends

254

for specialized computing environments or frameworks.

255

"""

256

257

# Backend capabilities

258

default_n_jobs = 1

259

supports_inner_max_num_threads = False

260

supports_retrieve_callback = False

261

supports_return_generator = False

262

supports_timeout = False

263

264

def effective_n_jobs(self, n_jobs):

265

"""

266

Determine actual number of parallel jobs.

267

268

Parameters:

269

- n_jobs: int, requested number of jobs

270

271

Returns:

272

int: actual number of jobs to use

273

"""

274

275

def submit(self, func, callback=None):

276

"""

277

Schedule function execution.

278

279

Parameters:

280

- func: callable, function to execute

281

- callback: callable, optional callback for result handling

282

283

Returns:

284

Future-like object representing the computation

285

"""

286

287

def retrieve_result(self, futures, timeout=None):

288

"""

289

Retrieve results from submitted computations.

290

291

Parameters:

292

- futures: list of future objects

293

- timeout: float, timeout in seconds

294

295

Returns:

296

Generator yielding (future, result) pairs

297

"""

298

299

class StoreBackendBase:

300

"""

301

Abstract base class for storage backends.

302

303

Subclass this to implement custom storage solutions

304

for Memory caching (e.g., cloud storage, databases).

305

"""

306

307

def _open_item(self, f, mode):

308

"""

309

Open item in storage backend.

310

311

Parameters:

312

- f: file identifier

313

- mode: str, file opening mode

314

315

Returns:

316

File-like object

317

"""

318

319

def _item_exists(self, location):

320

"""

321

Check if item exists in storage.

322

323

Parameters:

324

- location: str, item location identifier

325

326

Returns:

327

bool: True if item exists

328

"""

329

330

def _move_item(self, src, dst):

331

"""

332

Move item within storage backend.

333

334

Parameters:

335

- src: str, source location

336

- dst: str, destination location

337

"""

338

339

def clear_item(self, call_id):

340

"""

341

Clear single cached item.

342

343

Parameters:

344

- call_id: str, unique identifier for cached call

345

"""

346

347

def clear_path(self, path):

348

"""

349

Clear all items at specified path.

350

351

Parameters:

352

- path: str, path to clear

353

"""

354

355

def clear(self):

356

"""Clear all items in storage backend."""

357

```

358

359

**Usage Examples:**

360

361

```python

362

from joblib import ParallelBackendBase, StoreBackendBase, register_parallel_backend, register_store_backend

363

364

# Custom parallel backend example

365

class GPUBackend(ParallelBackendBase):

366

"""Example GPU computing backend."""

367

368

supports_timeout = True

369

default_n_jobs = 4 # Number of GPU streams

370

371

def __init__(self, device_id=0):

372

self.device_id = device_id

373

374

def effective_n_jobs(self, n_jobs):

375

# Limit to available GPU streams

376

return min(n_jobs, 8)

377

378

def submit(self, func, callback=None):

379

# Submit computation to GPU

380

# Return GPU future object

381

pass

382

383

def retrieve_result(self, futures, timeout=None):

384

# Retrieve results from GPU

385

for future in futures:

386

yield future, future.result(timeout=timeout)

387

388

# Register custom backend

389

register_parallel_backend('gpu', GPUBackend)

390

391

# Custom storage backend example

392

class RedisStoreBackend(StoreBackendBase):

393

"""Example Redis storage backend for caching."""

394

395

def __init__(self, host='localhost', port=6379, db=0):

396

import redis

397

self.redis_client = redis.Redis(host=host, port=port, db=db)

398

399

def _item_exists(self, location):

400

return self.redis_client.exists(location)

401

402

def _open_item(self, f, mode):

403

# Implement Redis-based file-like object

404

pass

405

406

def clear_item(self, call_id):

407

self.redis_client.delete(call_id)

408

409

def clear(self):

410

self.redis_client.flushdb()

411

412

# Register custom storage backend

413

register_store_backend('redis', RedisStoreBackend)

414

415

# Use custom backends

416

from joblib import Memory, Parallel, delayed, parallel_backend

417

418

# Use custom storage

419

mem = Memory(backend='redis', backend_options={'host': 'cache-server'})

420

421

# Use custom parallel backend

422

with parallel_backend('gpu', device_id=1):

423

results = Parallel(n_jobs=4)(delayed(gpu_function)(i) for i in range(100))

424

```

425

426

### Compression Management

427

428

Registration and management of compression algorithms for persistence operations.

429

430

```python { .api }

431

def register_compressor(compressor_name, compressor, force=False):

432

"""

433

Register a new compressor for use with dump/load operations.

434

435

Parameters:

436

- compressor_name: str, name to identify the compressor

437

- compressor: compressor object implementing required interface

438

- force: bool, whether to overwrite existing compressor with same name

439

440

Raises:

441

ValueError: if compressor_name already exists and force=False

442

"""

443

```

444

445

**Usage Examples:**

446

447

```python

448

from joblib import register_compressor, dump, load

449

450

# Example custom compressor (simplified)

451

class CustomCompressor:

452

"""Example custom compression algorithm."""

453

454

def compress(self, data):

455

# Implement compression logic

456

return compressed_data

457

458

def decompress(self, compressed_data):

459

# Implement decompression logic

460

return original_data

461

462

# Register custom compressor

463

custom_comp = CustomCompressor()

464

register_compressor('custom', custom_comp)

465

466

# Use custom compressor

467

data = {'large_array': np.random.random(100000)}

468

dump(data, 'data_custom.pkl', compress='custom')

469

loaded_data = load('data_custom.pkl')

470

471

# Register external compressor library

472

try:

473

import snappy

474

475

class SnappyCompressor:

476

def compress(self, data):

477

return snappy.compress(data)

478

479

def decompress(self, compressed_data):

480

return snappy.decompress(compressed_data)

481

482

register_compressor('snappy', SnappyCompressor())

483

484

# Use snappy compression for fast compression/decompression

485

dump(data, 'data_snappy.pkl', compress='snappy')

486

487

except ImportError:

488

print("Snappy not available")

489

```

490

491

## Advanced Infrastructure Patterns

492

493

### Custom Caching Strategy

494

495

```python

496

from joblib import Memory, hash

497

from joblib._store_backends import StoreBackendBase

498

import time

499

500

class TimeBasedCacheBackend(StoreBackendBase):

501

"""Cache backend with automatic expiration."""

502

503

def __init__(self, base_backend, ttl_seconds=3600):

504

self.base_backend = base_backend

505

self.ttl_seconds = ttl_seconds

506

self.timestamps = {}

507

508

def _item_exists(self, location):

509

if not self.base_backend._item_exists(location):

510

return False

511

512

# Check if item has expired

513

timestamp = self.timestamps.get(location, 0)

514

if time.time() - timestamp > self.ttl_seconds:

515

self.clear_item(location)

516

return False

517

518

return True

519

520

def _open_item(self, f, mode):

521

if 'w' in mode:

522

# Record timestamp when writing

523

self.timestamps[f] = time.time()

524

return self.base_backend._open_item(f, mode)

525

526

# Use time-based caching

527

register_store_backend('ttl', TimeBasedCacheBackend)

528

mem = Memory('./cache', backend='ttl', backend_options={'ttl_seconds': 1800})

529

```

530

531

### Performance Monitoring

532

533

```python

534

from joblib import Logger, Parallel, delayed

535

import time

536

import psutil

537

538

class PerformanceLogger(Logger):

539

"""Logger with system performance monitoring."""

540

541

def __init__(self, *args, **kwargs):

542

super().__init__(*args, **kwargs)

543

self.start_time = None

544

self.start_memory = None

545

546

def start_monitoring(self):

547

self.start_time = time.time()

548

self.start_memory = psutil.virtual_memory().used

549

self.info("Performance monitoring started")

550

551

def log_performance(self, operation_name):

552

if self.start_time:

553

elapsed = time.time() - self.start_time

554

current_memory = psutil.virtual_memory().used

555

memory_delta = current_memory - self.start_memory

556

557

self.info(f"{operation_name} completed:")

558

self.info(f" Time: {elapsed:.2f} seconds")

559

self.info(f" Memory change: {memory_delta / 1024**2:.1f} MB")

560

self.info(f" CPU usage: {psutil.cpu_percent()}%")

561

562

# Use performance monitoring

563

perf_logger = PerformanceLogger(name="Computation")

564

565

perf_logger.start_monitoring()

566

567

# Perform computation

568

results = Parallel(n_jobs=4)(delayed(expensive_function)(i) for i in range(100))

569

570

perf_logger.log_performance("Parallel computation")

571

```