0
# Utilities and Infrastructure
1
2
Core utilities including object hashing, logging with timing, backend infrastructure, and compression management for extending joblib's functionality and integrating with scientific computing workflows.
3
4
## Capabilities
5
6
### Object Hashing
7
8
Fast hash calculation for Python objects to create unique identifiers, used internally by Memory caching and available for custom caching implementations.
9
10
```python { .api }
11
def hash(obj, hash_name="md5", coerce_mmap=False):
12
"""
13
Quick calculation of hash to identify Python objects uniquely.
14
15
Parameters:
16
- obj: any Python object to hash
17
- hash_name: str, hashing algorithm ("md5" or "sha1")
18
- coerce_mmap: bool, treat memory-mapped arrays as regular arrays
19
20
Returns:
21
str: hexadecimal hash string
22
23
Raises:
24
ValueError: if hash_name is not supported
25
"""
26
```
27
28
**Usage Examples:**
29
30
```python
31
from joblib import hash
32
import numpy as np
33
34
# Hash simple objects
35
hash_int = hash(42)
36
hash_str = hash("hello world")
37
hash_list = hash([1, 2, 3, 4])
38
39
print(f"Integer hash: {hash_int}")
40
print(f"String hash: {hash_str}")
41
42
# Hash NumPy arrays
43
array = np.random.random(1000)
44
array_hash = hash(array)
45
print(f"Array hash: {array_hash}")
46
47
# Different arrays with same content have same hash
48
array2 = array.copy()
49
assert hash(array) == hash(array2)
50
51
# Different hash algorithms
52
md5_hash = hash(array, hash_name="md5") # Default
53
sha1_hash = hash(array, hash_name="sha1") # More secure
54
55
# Memory-mapped arrays
56
mmap_array = np.memmap('temp.dat', dtype='float32', mode='w+', shape=(1000,))
57
mmap_array[:] = array[:]
58
59
# Hash memory-mapped array as regular array
60
regular_hash = hash(mmap_array, coerce_mmap=True)
61
mmap_hash = hash(mmap_array, coerce_mmap=False)
62
63
# Complex objects
64
complex_obj = {
65
'data': np.random.random((100, 50)),
66
'params': {'learning_rate': 0.01, 'epochs': 100},
67
'metadata': ['training', 'validation']
68
}
69
complex_hash = hash(complex_obj)
70
```
71
72
### Non-Picklable Object Wrapping
73
74
Utilities for handling non-serializable objects in parallel processing contexts.
75
76
```python { .api }
77
def wrap_non_picklable_objects(obj, keep_wrapper=True):
78
"""
79
Wrap non-picklable objects to enable parallel processing.
80
81
Parameters:
82
- obj: object to wrap (may contain non-picklable elements)
83
- keep_wrapper: bool, whether to keep wrapper for round-trip compatibility
84
85
Returns:
86
Wrapped object that can be pickled and sent to parallel workers
87
"""
88
```
89
90
**Usage Examples:**
91
92
```python
93
from joblib import wrap_non_picklable_objects, Parallel, delayed
94
import sqlite3
95
96
# Example with non-picklable database connection
97
def create_db_connection():
98
return sqlite3.connect(':memory:')
99
100
def process_with_connection(data, connection):
101
# Use database connection in processing
102
cursor = connection.cursor()
103
cursor.execute("CREATE TABLE IF NOT EXISTS temp (value INTEGER)")
104
cursor.execute("INSERT INTO temp VALUES (?)", (data,))
105
return cursor.fetchall()
106
107
# Wrap non-picklable connection
108
connection = create_db_connection()
109
wrapped_connection = wrap_non_picklable_objects(connection)
110
111
# Use in parallel processing (connection will be recreated in each worker)
112
data_items = [1, 2, 3, 4, 5]
113
results = Parallel(n_jobs=2)(
114
delayed(process_with_connection)(item, wrapped_connection)
115
for item in data_items
116
)
117
118
# Custom objects with lambda functions or other non-picklable elements
119
class ProcessorWithLambda:
120
def __init__(self):
121
self.transform = lambda x: x ** 2 # Non-picklable lambda
122
123
processor = ProcessorWithLambda()
124
wrapped_processor = wrap_non_picklable_objects(processor)
125
```
126
127
### Logging and Timing
128
129
Logging utilities with built-in timing capabilities for monitoring performance and debugging computational workflows.
130
131
```python { .api }
132
class Logger:
133
def __init__(self, depth=3, name=None):
134
"""
135
Base logging class with formatting and timing capabilities.
136
137
Parameters:
138
- depth: int, call stack depth for logging context
139
- name: str, logger name (None for auto-generation)
140
"""
141
142
def warn(self, msg):
143
"""
144
Log a warning message.
145
146
Parameters:
147
- msg: str, warning message to log
148
"""
149
150
def info(self, msg):
151
"""
152
Log an informational message.
153
154
Parameters:
155
- msg: str, info message to log
156
"""
157
158
def debug(self, msg):
159
"""
160
Log a debug message.
161
162
Parameters:
163
- msg: str, debug message to log
164
"""
165
166
def format(self, obj, indent=0):
167
"""
168
Return formatted representation of object.
169
170
Parameters:
171
- obj: object to format
172
- indent: int, indentation level
173
174
Returns:
175
str: formatted object representation
176
"""
177
178
class PrintTime:
179
def __init__(self, logfile=None, logdir=None):
180
"""
181
Print and log messages with execution time tracking.
182
183
Parameters:
184
- logfile: str, path to log file (None for stdout)
185
- logdir: str, directory for log files (None for current dir)
186
"""
187
```
188
189
**Usage Examples:**
190
191
```python
192
from joblib import Logger, PrintTime
193
import time
194
import numpy as np
195
196
# Basic logging
197
logger = Logger(name="DataProcessor")
198
199
def process_data(data):
200
logger.info(f"Processing {len(data)} items")
201
202
if len(data) == 0:
203
logger.warn("Empty data provided")
204
return []
205
206
logger.debug(f"Data type: {type(data)}")
207
208
# Simulate processing
209
result = [x * 2 for x in data]
210
211
logger.info("Processing complete")
212
return result
213
214
# Process with logging
215
data = [1, 2, 3, 4, 5]
216
result = process_data(data)
217
218
# Time tracking with PrintTime
219
timer = PrintTime()
220
221
print("Starting computation...")
222
start_time = time.time()
223
224
# Simulate expensive computation
225
large_array = np.random.random((10000, 1000))
226
result = np.mean(large_array, axis=1)
227
228
elapsed = time.time() - start_time
229
print(f"Computation completed in {elapsed:.2f} seconds")
230
231
# Custom formatting
232
logger = Logger()
233
complex_data = {
234
'arrays': [np.random.random(100) for _ in range(3)],
235
'config': {'param1': 0.1, 'param2': 'test'},
236
'metadata': {'version': 1, 'created': time.time()}
237
}
238
239
formatted_output = logger.format(complex_data, indent=2)
240
print("Complex data structure:")
241
print(formatted_output)
242
```
243
244
### Backend Infrastructure
245
246
Abstract base classes for implementing custom parallel execution and storage backends.
247
248
```python { .api }
249
class ParallelBackendBase:
250
"""
251
Abstract base class for parallel execution backends.
252
253
Subclass this to implement custom parallel processing backends
254
for specialized computing environments or frameworks.
255
"""
256
257
# Backend capabilities
258
default_n_jobs = 1
259
supports_inner_max_num_threads = False
260
supports_retrieve_callback = False
261
supports_return_generator = False
262
supports_timeout = False
263
264
def effective_n_jobs(self, n_jobs):
265
"""
266
Determine actual number of parallel jobs.
267
268
Parameters:
269
- n_jobs: int, requested number of jobs
270
271
Returns:
272
int: actual number of jobs to use
273
"""
274
275
def submit(self, func, callback=None):
276
"""
277
Schedule function execution.
278
279
Parameters:
280
- func: callable, function to execute
281
- callback: callable, optional callback for result handling
282
283
Returns:
284
Future-like object representing the computation
285
"""
286
287
def retrieve_result(self, futures, timeout=None):
288
"""
289
Retrieve results from submitted computations.
290
291
Parameters:
292
- futures: list of future objects
293
- timeout: float, timeout in seconds
294
295
Returns:
296
Generator yielding (future, result) pairs
297
"""
298
299
class StoreBackendBase:
300
"""
301
Abstract base class for storage backends.
302
303
Subclass this to implement custom storage solutions
304
for Memory caching (e.g., cloud storage, databases).
305
"""
306
307
def _open_item(self, f, mode):
308
"""
309
Open item in storage backend.
310
311
Parameters:
312
- f: file identifier
313
- mode: str, file opening mode
314
315
Returns:
316
File-like object
317
"""
318
319
def _item_exists(self, location):
320
"""
321
Check if item exists in storage.
322
323
Parameters:
324
- location: str, item location identifier
325
326
Returns:
327
bool: True if item exists
328
"""
329
330
def _move_item(self, src, dst):
331
"""
332
Move item within storage backend.
333
334
Parameters:
335
- src: str, source location
336
- dst: str, destination location
337
"""
338
339
def clear_item(self, call_id):
340
"""
341
Clear single cached item.
342
343
Parameters:
344
- call_id: str, unique identifier for cached call
345
"""
346
347
def clear_path(self, path):
348
"""
349
Clear all items at specified path.
350
351
Parameters:
352
- path: str, path to clear
353
"""
354
355
def clear(self):
356
"""Clear all items in storage backend."""
357
```
358
359
**Usage Examples:**
360
361
```python
362
from joblib import ParallelBackendBase, StoreBackendBase, register_parallel_backend, register_store_backend
363
364
# Custom parallel backend example
365
class GPUBackend(ParallelBackendBase):
366
"""Example GPU computing backend."""
367
368
supports_timeout = True
369
default_n_jobs = 4 # Number of GPU streams
370
371
def __init__(self, device_id=0):
372
self.device_id = device_id
373
374
def effective_n_jobs(self, n_jobs):
375
# Limit to available GPU streams
376
return min(n_jobs, 8)
377
378
def submit(self, func, callback=None):
379
# Submit computation to GPU
380
# Return GPU future object
381
pass
382
383
def retrieve_result(self, futures, timeout=None):
384
# Retrieve results from GPU
385
for future in futures:
386
yield future, future.result(timeout=timeout)
387
388
# Register custom backend
389
register_parallel_backend('gpu', GPUBackend)
390
391
# Custom storage backend example
392
class RedisStoreBackend(StoreBackendBase):
393
"""Example Redis storage backend for caching."""
394
395
def __init__(self, host='localhost', port=6379, db=0):
396
import redis
397
self.redis_client = redis.Redis(host=host, port=port, db=db)
398
399
def _item_exists(self, location):
400
return self.redis_client.exists(location)
401
402
def _open_item(self, f, mode):
403
# Implement Redis-based file-like object
404
pass
405
406
def clear_item(self, call_id):
407
self.redis_client.delete(call_id)
408
409
def clear(self):
410
self.redis_client.flushdb()
411
412
# Register custom storage backend
413
register_store_backend('redis', RedisStoreBackend)
414
415
# Use custom backends
416
from joblib import Memory, Parallel, delayed, parallel_backend
417
418
# Use custom storage
419
mem = Memory(backend='redis', backend_options={'host': 'cache-server'})
420
421
# Use custom parallel backend
422
with parallel_backend('gpu', device_id=1):
423
results = Parallel(n_jobs=4)(delayed(gpu_function)(i) for i in range(100))
424
```
425
426
### Compression Management
427
428
Registration and management of compression algorithms for persistence operations.
429
430
```python { .api }
431
def register_compressor(compressor_name, compressor, force=False):
432
"""
433
Register a new compressor for use with dump/load operations.
434
435
Parameters:
436
- compressor_name: str, name to identify the compressor
437
- compressor: compressor object implementing required interface
438
- force: bool, whether to overwrite existing compressor with same name
439
440
Raises:
441
ValueError: if compressor_name already exists and force=False
442
"""
443
```
444
445
**Usage Examples:**
446
447
```python
448
from joblib import register_compressor, dump, load
449
450
# Example custom compressor (simplified)
451
class CustomCompressor:
452
"""Example custom compression algorithm."""
453
454
def compress(self, data):
455
# Implement compression logic
456
return compressed_data
457
458
def decompress(self, compressed_data):
459
# Implement decompression logic
460
return original_data
461
462
# Register custom compressor
463
custom_comp = CustomCompressor()
464
register_compressor('custom', custom_comp)
465
466
# Use custom compressor
467
data = {'large_array': np.random.random(100000)}
468
dump(data, 'data_custom.pkl', compress='custom')
469
loaded_data = load('data_custom.pkl')
470
471
# Register external compressor library
472
try:
473
import snappy
474
475
class SnappyCompressor:
476
def compress(self, data):
477
return snappy.compress(data)
478
479
def decompress(self, compressed_data):
480
return snappy.decompress(compressed_data)
481
482
register_compressor('snappy', SnappyCompressor())
483
484
# Use snappy compression for fast compression/decompression
485
dump(data, 'data_snappy.pkl', compress='snappy')
486
487
except ImportError:
488
print("Snappy not available")
489
```
490
491
## Advanced Infrastructure Patterns
492
493
### Custom Caching Strategy
494
495
```python
496
from joblib import Memory, hash
497
from joblib._store_backends import StoreBackendBase
498
import time
499
500
class TimeBasedCacheBackend(StoreBackendBase):
501
"""Cache backend with automatic expiration."""
502
503
def __init__(self, base_backend, ttl_seconds=3600):
504
self.base_backend = base_backend
505
self.ttl_seconds = ttl_seconds
506
self.timestamps = {}
507
508
def _item_exists(self, location):
509
if not self.base_backend._item_exists(location):
510
return False
511
512
# Check if item has expired
513
timestamp = self.timestamps.get(location, 0)
514
if time.time() - timestamp > self.ttl_seconds:
515
self.clear_item(location)
516
return False
517
518
return True
519
520
def _open_item(self, f, mode):
521
if 'w' in mode:
522
# Record timestamp when writing
523
self.timestamps[f] = time.time()
524
return self.base_backend._open_item(f, mode)
525
526
# Use time-based caching
527
register_store_backend('ttl', TimeBasedCacheBackend)
528
mem = Memory('./cache', backend='ttl', backend_options={'ttl_seconds': 1800})
529
```
530
531
### Performance Monitoring
532
533
```python
534
from joblib import Logger, Parallel, delayed
535
import time
536
import psutil
537
538
class PerformanceLogger(Logger):
539
"""Logger with system performance monitoring."""
540
541
def __init__(self, *args, **kwargs):
542
super().__init__(*args, **kwargs)
543
self.start_time = None
544
self.start_memory = None
545
546
def start_monitoring(self):
547
self.start_time = time.time()
548
self.start_memory = psutil.virtual_memory().used
549
self.info("Performance monitoring started")
550
551
def log_performance(self, operation_name):
552
if self.start_time:
553
elapsed = time.time() - self.start_time
554
current_memory = psutil.virtual_memory().used
555
memory_delta = current_memory - self.start_memory
556
557
self.info(f"{operation_name} completed:")
558
self.info(f" Time: {elapsed:.2f} seconds")
559
self.info(f" Memory change: {memory_delta / 1024**2:.1f} MB")
560
self.info(f" CPU usage: {psutil.cpu_percent()}%")
561
562
# Use performance monitoring
563
perf_logger = PerformanceLogger(name="Computation")
564
565
perf_logger.start_monitoring()
566
567
# Perform computation
568
results = Parallel(n_jobs=4)(delayed(expensive_function)(i) for i in range(100))
569
570
perf_logger.log_performance("Parallel computation")
571
```