0
# Shared Objects and Memory
1
2
Objects and memory that can be shared between processes. Multiprocess provides both high-level managed objects through Manager and low-level shared memory constructs for different sharing patterns and performance requirements.
3
4
## Capabilities
5
6
### Manager Objects
7
8
High-level interface for creating shared objects managed by a server process.
9
10
```python { .api }
11
def Manager():
12
"""
13
Create a SyncManager instance for sharing objects between processes.
14
15
Returns:
16
SyncManager: manager instance that creates shared objects
17
"""
18
```
19
20
#### SyncManager Class
21
22
```python { .api }
23
class SyncManager:
24
"""
25
Manager that provides shared objects via a server process.
26
"""
27
def start(self, initializer=None, initargs=()):
28
"""
29
Start the manager's server process.
30
31
Args:
32
initializer: callable to run when server starts
33
initargs: arguments for initializer
34
"""
35
36
def shutdown(self):
37
"""Shutdown the manager's server process."""
38
39
def dict(self, *args, **kwargs):
40
"""
41
Create a shared dictionary.
42
43
Returns:
44
DictProxy: proxy to a shared dict object
45
"""
46
47
def list(self, sequence=()):
48
"""
49
Create a shared list.
50
51
Args:
52
sequence: initial sequence to populate list
53
54
Returns:
55
ListProxy: proxy to a shared list object
56
"""
57
58
def Namespace(self):
59
"""
60
Create a shared namespace object.
61
62
Returns:
63
NamespaceProxy: proxy to a shared namespace
64
"""
65
66
def Lock(self):
67
"""Create a shared Lock."""
68
69
def RLock(self):
70
"""Create a shared RLock."""
71
72
def Semaphore(self, value=1):
73
"""Create a shared Semaphore."""
74
75
def BoundedSemaphore(self, value=1):
76
"""Create a shared BoundedSemaphore."""
77
78
def Condition(self, lock=None):
79
"""Create a shared Condition."""
80
81
def Event(self):
82
"""Create a shared Event."""
83
84
def Barrier(self, parties, action=None, timeout=None):
85
"""Create a shared Barrier."""
86
87
def Queue(self, maxsize=0):
88
"""Create a shared Queue."""
89
90
def JoinableQueue(self, maxsize=0):
91
"""Create a shared JoinableQueue."""
92
93
def Pool(self, processes=None, initializer=None, initargs=()):
94
"""Create a shared Pool."""
95
96
def __enter__(self):
97
"""Context manager entry - starts manager."""
98
99
def __exit__(self, exc_type, exc_val, exc_tb):
100
"""Context manager exit - shuts down manager."""
101
```
102
103
### Shared Memory Objects
104
105
Low-level shared memory constructs for direct memory sharing between processes.
106
107
```python
108
# Import from shared_memory submodule
109
from multiprocess.shared_memory import SharedMemory, ShareableList
110
```
111
112
```python { .api }
113
class SharedMemory:
114
"""
115
Direct shared memory block accessible across processes.
116
117
Args:
118
name: name of existing shared memory block (None to create new)
119
create: if True, create new shared memory block
120
size: size in bytes for new shared memory block
121
"""
122
def __init__(self, name=None, create=False, size=0): ...
123
124
def close(self):
125
"""Close access to the shared memory from this instance."""
126
127
def unlink(self):
128
"""
129
Request deletion of the shared memory block.
130
Should be called by only one process.
131
"""
132
133
# Properties
134
name: str # Name of the shared memory block
135
size: int # Size of the shared memory block in bytes
136
buf: memoryview # Memory buffer for direct access
137
```
138
139
```python { .api }
140
class ShareableList:
141
"""
142
List-like object stored in shared memory.
143
144
Args:
145
sequence: initial sequence to populate the list (None for existing)
146
name: name of existing shareable list (None to create new)
147
"""
148
def __init__(self, sequence=None, name=None): ...
149
150
def __getitem__(self, index):
151
"""Get item at index."""
152
153
def __setitem__(self, index, value):
154
"""Set item at index."""
155
156
def __len__(self):
157
"""Return length of list."""
158
159
def copy(self):
160
"""
161
Return a shallow copy as a regular list.
162
163
Returns:
164
list: copy of the shareable list
165
"""
166
167
def count(self, value):
168
"""
169
Return number of occurrences of value.
170
171
Args:
172
value: value to count
173
174
Returns:
175
int: number of occurrences
176
"""
177
178
def index(self, value):
179
"""
180
Return index of first occurrence of value.
181
182
Args:
183
value: value to find
184
185
Returns:
186
int: index of value
187
188
Raises:
189
ValueError: if value not found
190
"""
191
192
# Properties
193
format: str # Format string describing stored types
194
shm: SharedMemory # Underlying shared memory block
195
```
196
197
### Shared ctypes Objects
198
199
Shared memory objects based on ctypes for typed data sharing.
200
201
```python
202
# Import from sharedctypes for utility functions
203
from multiprocess.sharedctypes import copy, synchronized
204
```
205
206
```python { .api }
207
def Value(typecode_or_type, *args, lock=True):
208
"""
209
Create a shared ctypes object.
210
211
Args:
212
typecode_or_type: ctypes type or single character type code
213
args: initial value arguments
214
lock: if True, create with synchronization lock
215
216
Returns:
217
SynchronizedBase: synchronized shared value
218
"""
219
220
def Array(typecode_or_type, size_or_initializer, lock=True):
221
"""
222
Create a shared ctypes array.
223
224
Args:
225
typecode_or_type: ctypes type or single character type code
226
size_or_initializer: size of array or initial values
227
lock: if True, create with synchronization lock
228
229
Returns:
230
SynchronizedArray: synchronized shared array
231
"""
232
233
def RawValue(typecode_or_type, *args):
234
"""
235
Create an unsynchronized shared ctypes object.
236
237
Args:
238
typecode_or_type: ctypes type or single character type code
239
args: initial value arguments
240
241
Returns:
242
ctypes object: raw shared value without locking
243
"""
244
245
def RawArray(typecode_or_type, size_or_initializer):
246
"""
247
Create an unsynchronized shared ctypes array.
248
249
Args:
250
typecode_or_type: ctypes type or single character type code
251
size_or_initializer: size of array or initial values
252
253
Returns:
254
ctypes array: raw shared array without locking
255
"""
256
```
257
258
### Utility Functions
259
260
```python { .api }
261
def copy(obj):
262
"""
263
Create a copy of a shared object.
264
265
Args:
266
obj: shared object to copy
267
268
Returns:
269
object: copy of the shared object
270
"""
271
272
def synchronized(obj, lock=None):
273
"""
274
Make a shared object thread-safe.
275
276
Args:
277
obj: object to synchronize
278
lock: lock to use (creates new if None)
279
280
Returns:
281
SynchronizedBase: synchronized wrapper
282
"""
283
```
284
285
## Usage Examples
286
287
### Basic Manager Usage
288
289
```python
290
from multiprocess import Process, Manager
291
292
def worker(shared_dict, shared_list, worker_id):
293
# Modify shared dictionary
294
shared_dict[f'worker_{worker_id}'] = f'Hello from {worker_id}'
295
296
# Append to shared list
297
shared_list.append(f'Item from worker {worker_id}')
298
299
print(f"Worker {worker_id} completed")
300
301
if __name__ == '__main__':
302
with Manager() as manager:
303
# Create shared objects
304
shared_dict = manager.dict()
305
shared_list = manager.list()
306
307
# Create processes
308
processes = []
309
for i in range(3):
310
p = Process(target=worker, args=(shared_dict, shared_list, i))
311
p.start()
312
processes.append(p)
313
314
# Wait for completion
315
for p in processes:
316
p.join()
317
318
print(f"Final dict: {dict(shared_dict)}")
319
print(f"Final list: {list(shared_list)}")
320
```
321
322
### Shared Namespace
323
324
```python
325
from multiprocess import Process, Manager
326
327
def update_namespace(ns, worker_id):
328
# Access namespace attributes
329
ns.counter += 1
330
ns.messages.append(f"Message from worker {worker_id}")
331
332
# Create new attributes
333
setattr(ns, f'worker_{worker_id}_status', 'completed')
334
335
if __name__ == '__main__':
336
with Manager() as manager:
337
# Create shared namespace
338
ns = manager.Namespace()
339
ns.counter = 0
340
ns.messages = manager.list()
341
342
# Create processes
343
processes = []
344
for i in range(4):
345
p = Process(target=update_namespace, args=(ns, i))
346
p.start()
347
processes.append(p)
348
349
for p in processes:
350
p.join()
351
352
print(f"Counter: {ns.counter}")
353
print(f"Messages: {list(ns.messages)}")
354
355
# Print all attributes
356
for attr in dir(ns):
357
if not attr.startswith('_'):
358
print(f"{attr}: {getattr(ns, attr)}")
359
```
360
361
### SharedMemory Direct Access
362
363
```python
364
from multiprocess import Process
365
from multiprocess.shared_memory import SharedMemory
366
import numpy as np
367
368
def worker_process(shm_name, shape, dtype):
369
# Attach to existing shared memory
370
existing_shm = SharedMemory(name=shm_name)
371
372
# Create numpy array from shared memory
373
array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
374
375
# Modify the array
376
array += 10
377
print(f"Worker modified array: {array}")
378
379
# Clean up
380
existing_shm.close()
381
382
if __name__ == '__main__':
383
# Create data
384
data = np.array([1, 2, 3, 4, 5], dtype=np.int64)
385
386
# Create shared memory
387
shm = SharedMemory(create=True, size=data.nbytes)
388
389
# Copy data to shared memory
390
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
391
shared_array[:] = data[:]
392
393
print(f"Original array: {shared_array}")
394
395
# Create worker process
396
p = Process(target=worker_process, args=(shm.name, data.shape, data.dtype))
397
p.start()
398
p.join()
399
400
print(f"Array after worker: {shared_array}")
401
402
# Clean up
403
shm.close()
404
shm.unlink()
405
```
406
407
### ShareableList Usage
408
409
```python
410
from multiprocess import Process
411
from multiprocess.shared_memory import ShareableList
412
413
def list_worker(shared_list_name, worker_id):
414
# Attach to existing shareable list
415
shared_list = ShareableList(name=shared_list_name)
416
417
# Modify list elements
418
for i in range(len(shared_list)):
419
if isinstance(shared_list[i], (int, float)):
420
shared_list[i] = shared_list[i] + worker_id * 10
421
422
print(f"Worker {worker_id} processed list")
423
424
# List operations
425
print(f"List length: {len(shared_list)}")
426
print(f"Copy of list: {shared_list.copy()}")
427
428
if __name__ == '__main__':
429
# Create shareable list
430
initial_data = [1, 2, 3, 4, 5]
431
shared_list = ShareableList(initial_data)
432
433
print(f"Initial list: {shared_list.copy()}")
434
435
# Create worker processes
436
processes = []
437
for i in range(2):
438
p = Process(target=list_worker, args=(shared_list.shm.name, i + 1))
439
p.start()
440
processes.append(p)
441
442
for p in processes:
443
p.join()
444
445
print(f"Final list: {shared_list.copy()}")
446
447
# Clean up
448
shared_list.shm.close()
449
shared_list.shm.unlink()
450
```
451
452
### ctypes Shared Values
453
454
```python
455
from multiprocess import Process, Value, Array
456
import ctypes
457
458
def modify_shared_data(shared_value, shared_array):
459
# Modify shared value
460
with shared_value.get_lock():
461
shared_value.value += 10
462
463
# Modify shared array
464
with shared_array.get_lock():
465
for i in range(len(shared_array)):
466
shared_array[i] = shared_array[i] * 2
467
468
if __name__ == '__main__':
469
# Create shared value (integer)
470
shared_int = Value('i', 5) # 'i' = signed int
471
472
# Create shared array (floats)
473
shared_floats = Array('f', [1.0, 2.0, 3.0, 4.0]) # 'f' = float
474
475
print(f"Initial value: {shared_int.value}")
476
print(f"Initial array: {list(shared_floats[:])}")
477
478
# Create processes
479
processes = []
480
for i in range(2):
481
p = Process(target=modify_shared_data, args=(shared_int, shared_floats))
482
p.start()
483
processes.append(p)
484
485
for p in processes:
486
p.join()
487
488
print(f"Final value: {shared_int.value}")
489
print(f"Final array: {list(shared_floats[:])}")
490
```
491
492
### Raw Shared Objects (No Locking)
493
494
```python
495
from multiprocess import Process, RawValue, RawArray, Lock
496
import time
497
498
def worker_with_manual_locking(raw_value, raw_array, lock, worker_id):
499
for _ in range(5):
500
# Manual locking for raw shared objects
501
with lock:
502
# Modify raw value
503
old_val = raw_value.value
504
time.sleep(0.01) # Simulate some work
505
raw_value.value = old_val + 1
506
507
# Modify raw array
508
for i in range(len(raw_array)):
509
raw_array[i] += 1
510
511
print(f"Worker {worker_id} iteration completed")
512
time.sleep(0.1)
513
514
if __name__ == '__main__':
515
# Create raw shared objects (no automatic locking)
516
raw_value = RawValue('i', 0)
517
raw_array = RawArray('i', [0, 0, 0])
518
519
# Create manual lock
520
lock = Lock()
521
522
print(f"Initial value: {raw_value.value}")
523
print(f"Initial array: {list(raw_array[:])}")
524
525
# Create worker processes
526
processes = []
527
for i in range(3):
528
p = Process(target=worker_with_manual_locking,
529
args=(raw_value, raw_array, lock, i))
530
p.start()
531
processes.append(p)
532
533
for p in processes:
534
p.join()
535
536
print(f"Final value: {raw_value.value}")
537
print(f"Final array: {list(raw_array[:])}")
538
```
539
540
### Complex Shared Data Structures
541
542
```python
543
from multiprocess import Process, Manager
544
import time
545
546
def data_processor(shared_data, processor_id):
547
# Add processed items
548
for i in range(3):
549
item = {
550
'processor_id': processor_id,
551
'item_number': i,
552
'processed_at': time.time(),
553
'data': f"Processed by {processor_id}"
554
}
555
shared_data['items'].append(item)
556
557
# Update statistics
558
with shared_data['stats_lock']:
559
shared_data['stats']['total_processed'] += 3
560
shared_data['stats']['processors'][processor_id] = True
561
562
def monitor_progress(shared_data):
563
"""Monitor processing progress"""
564
start_time = time.time()
565
while True:
566
time.sleep(0.5)
567
with shared_data['stats_lock']:
568
total = shared_data['stats']['total_processed']
569
active_processors = sum(shared_data['stats']['processors'].values())
570
571
elapsed = time.time() - start_time
572
print(f"Time: {elapsed:.1f}s, Processed: {total}, Active: {active_processors}")
573
574
if total >= 9: # 3 processors * 3 items each
575
break
576
577
if __name__ == '__main__':
578
with Manager() as manager:
579
# Create complex shared data structure
580
shared_data = manager.dict({
581
'items': manager.list(),
582
'stats': manager.dict({
583
'total_processed': 0,
584
'processors': manager.dict({0: False, 1: False, 2: False})
585
}),
586
'stats_lock': manager.Lock()
587
})
588
589
# Create processor processes
590
processors = []
591
for i in range(3):
592
p = Process(target=data_processor, args=(shared_data, i))
593
p.start()
594
processors.append(p)
595
596
# Create monitor process
597
monitor = Process(target=monitor_progress, args=(shared_data,))
598
monitor.start()
599
600
# Wait for processors to complete
601
for p in processors:
602
p.join()
603
604
# Wait for monitor
605
monitor.join()
606
607
print(f"\nFinal results:")
608
print(f"Total items processed: {len(shared_data['items'])}")
609
print(f"Statistics: {dict(shared_data['stats'])}")
610
```
611
612
### Performance Comparison
613
614
```python
615
from multiprocess import Process, Manager, Value, RawValue, Lock
616
import time
617
618
def test_synchronized_value(shared_val, iterations):
619
"""Test with automatic synchronization"""
620
start_time = time.time()
621
for _ in range(iterations):
622
with shared_val.get_lock():
623
shared_val.value += 1
624
duration = time.time() - start_time
625
return duration
626
627
def test_raw_value(raw_val, lock, iterations):
628
"""Test with manual synchronization"""
629
start_time = time.time()
630
for _ in range(iterations):
631
with lock:
632
raw_val.value += 1
633
duration = time.time() - start_time
634
return duration
635
636
def benchmark_worker(test_type, shared_obj, extra_arg, iterations):
637
if test_type == 'synchronized':
638
duration = test_synchronized_value(shared_obj, iterations)
639
else:
640
duration = test_raw_value(shared_obj, extra_arg, iterations)
641
print(f"{test_type} test completed in {duration:.3f} seconds")
642
643
if __name__ == '__main__':
644
iterations = 10000
645
646
# Test 1: Synchronized Value
647
print("Testing synchronized Value...")
648
sync_val = Value('i', 0)
649
p1 = Process(target=benchmark_worker,
650
args=('synchronized', sync_val, None, iterations))
651
p1.start()
652
p1.join()
653
print(f"Synchronized final value: {sync_val.value}")
654
655
# Test 2: Raw Value with manual lock
656
print("\nTesting raw Value with manual lock...")
657
raw_val = RawValue('i', 0)
658
manual_lock = Lock()
659
p2 = Process(target=benchmark_worker,
660
args=('raw', raw_val, manual_lock, iterations))
661
p2.start()
662
p2.join()
663
print(f"Raw final value: {raw_val.value}")
664
```