0
# Shared Memory
1
2
Synchronized and unsynchronized shared memory objects for efficient data sharing between processes using ctypes-based values and arrays.
3
4
## Capabilities
5
6
### Shared Values
7
8
Create shared values that can be accessed by multiple processes with optional synchronization.
9
10
```python { .api }
11
def Value(typecode_or_type, *args, lock=True, ctx=None):
12
"""
13
Create a synchronized shared ctypes value.
14
15
Parameters:
16
- typecode_or_type: ctypes type or single character typecode
17
- *args: initialization arguments for the value
18
- lock: if True (default), operations are synchronized; if False, unsynchronized
19
- ctx: multiprocessing context
20
21
Returns:
22
SynchronizedBase wrapper around ctypes value
23
"""
24
25
def RawValue(typecode_or_type, *args):
26
"""
27
Create an unsynchronized shared ctypes value.
28
29
Parameters:
30
- typecode_or_type: ctypes type or single character typecode
31
- *args: initialization arguments for the value
32
33
Returns:
34
Raw ctypes value (no synchronization)
35
"""
36
```
37
38
Common typecodes:
39
- `'i'` - signed int
40
- `'f'` - float
41
- `'d'` - double
42
- `'c'` - char
43
- `'b'` - signed char
44
- `'B'` - unsigned char
45
- `'h'` - short
46
- `'l'` - long
47
48
Usage example:
49
50
```python
51
from billiard import Process, Value, RawValue
52
import time
53
import ctypes
54
55
def worker_with_shared_value(shared_counter, worker_id, iterations):
56
"""Worker that increments shared counter"""
57
for i in range(iterations):
58
with shared_counter.get_lock():
59
old_value = shared_counter.value
60
time.sleep(0.001) # Simulate some work
61
shared_counter.value = old_value + 1
62
print(f"Worker {worker_id}: counter = {shared_counter.value}")
63
64
def raw_value_worker(raw_val, worker_id):
65
"""Worker using unsynchronized raw value (unsafe!)"""
66
for i in range(5):
67
raw_val.value += 1
68
print(f"Worker {worker_id}: raw value = {raw_val.value}")
69
time.sleep(0.1)
70
71
if __name__ == '__main__':
72
# Synchronized shared value
73
counter = Value('i', 0) # Integer initialized to 0
74
print(f"Initial counter value: {counter.value}")
75
76
# Start workers that safely increment counter
77
processes = []
78
for i in range(3):
79
p = Process(target=worker_with_shared_value, args=(counter, i, 5))
80
processes.append(p)
81
p.start()
82
83
for p in processes:
84
p.join()
85
86
print(f"Final counter value: {counter.value}")
87
88
# Demonstrate ctypes usage
89
float_value = Value(ctypes.c_double, 3.14159)
90
print(f"Float value: {float_value.value}")
91
92
# Raw value (no synchronization)
93
raw_counter = RawValue('i', 0)
94
95
# Start workers with raw value (potential race conditions)
96
raw_processes = []
97
for i in range(2):
98
p = Process(target=raw_value_worker, args=(raw_counter, i))
99
raw_processes.append(p)
100
p.start()
101
102
for p in raw_processes:
103
p.join()
104
105
print(f"Final raw counter: {raw_counter.value}")
106
```
107
108
### Shared Arrays
109
110
Create shared arrays that can be accessed by multiple processes with optional synchronization.
111
112
```python { .api }
113
def Array(typecode_or_type, size_or_initializer, lock=True, ctx=None):
114
"""
115
Create a synchronized shared ctypes array.
116
117
Parameters:
118
- typecode_or_type: ctypes type or single character typecode
119
- size_or_initializer: array size (int) or sequence to initialize from
120
- lock: if True (default), operations are synchronized; if False, unsynchronized
121
- ctx: multiprocessing context
122
123
Returns:
124
SynchronizedArray wrapper around ctypes array
125
"""
126
127
def RawArray(typecode_or_type, size_or_initializer):
128
"""
129
Create an unsynchronized shared ctypes array.
130
131
Parameters:
132
- typecode_or_type: ctypes type or single character typecode
133
- size_or_initializer: array size (int) or sequence to initialize from
134
135
Returns:
136
Raw ctypes array (no synchronization)
137
"""
138
```
139
140
Usage example:
141
142
```python
143
from billiard import Process, Array, RawArray
144
import time
145
146
def array_worker(shared_array, worker_id, start_idx, count):
147
"""Worker that modifies part of shared array"""
148
with shared_array.get_lock():
149
for i in range(count):
150
idx = start_idx + i
151
if idx < len(shared_array):
152
shared_array[idx] = worker_id * 100 + i
153
print(f"Worker {worker_id}: set array[{idx}] = {shared_array[idx]}")
154
time.sleep(0.1)
155
156
def array_reader(shared_array):
157
"""Process that reads from shared array"""
158
time.sleep(1) # Let writers work first
159
160
with shared_array.get_lock():
161
print("Array contents:", list(shared_array[:]))
162
print("Array sum:", sum(shared_array))
163
164
def matrix_worker(matrix, row, cols):
165
"""Worker that processes a row of 2D array"""
166
for col in range(cols):
167
idx = row * cols + col
168
matrix[idx] = row * cols + col + 1 # Fill with sequential values
169
time.sleep(0.05)
170
171
if __name__ == '__main__':
172
# Create synchronized shared array
173
shared_arr = Array('i', 10) # Integer array of size 10
174
print(f"Initial array: {list(shared_arr[:])}")
175
176
# Start workers to modify different parts of array
177
processes = []
178
for i in range(3):
179
start = i * 3
180
p = Process(target=array_worker, args=(shared_arr, i, start, 3))
181
processes.append(p)
182
p.start()
183
184
# Start reader process
185
reader_proc = Process(target=array_reader, args=(shared_arr,))
186
reader_proc.start()
187
processes.append(reader_proc)
188
189
for p in processes:
190
p.join()
191
192
# Array from initializer
193
init_data = [1, 2, 3, 4, 5]
194
initialized_array = Array('i', init_data)
195
print(f"Initialized array: {list(initialized_array[:])}")
196
197
# 2D array simulation (flattened)
198
rows, cols = 3, 4
199
matrix = Array('i', rows * cols)
200
201
# Process each row in parallel
202
matrix_procs = []
203
for row in range(rows):
204
p = Process(target=matrix_worker, args=(matrix, row, cols))
205
matrix_procs.append(p)
206
p.start()
207
208
for p in matrix_procs:
209
p.join()
210
211
# Print matrix
212
print("Matrix:")
213
for row in range(rows):
214
row_data = []
215
for col in range(cols):
216
row_data.append(matrix[row * cols + col])
217
print(row_data)
218
```
219
220
### Shared Memory Utilities
221
222
Additional functions for working with shared memory objects.
223
224
```python { .api }
225
def copy(obj):
226
"""
227
Create a copy of a shared object.
228
229
Parameters:
230
- obj: shared object to copy
231
232
Returns:
233
Copy of the shared object
234
"""
235
236
def synchronized(obj, lock=None, ctx=None):
237
"""
238
Add synchronization wrapper to an object.
239
240
Parameters:
241
- obj: object to wrap
242
- lock: lock to use (creates new Lock if None)
243
- ctx: multiprocessing context
244
245
Returns:
246
Synchronized wrapper around object
247
"""
248
```
249
250
Usage example:
251
252
```python
253
from billiard import Process, RawArray, Lock
254
from billiard.sharedctypes import synchronized, copy
255
import time
256
257
def synchronized_access_example():
258
"""Demonstrate adding synchronization to raw shared object"""
259
# Create raw array (no built-in synchronization)
260
raw_arr = RawArray('i', [0] * 10)
261
262
# Add synchronization wrapper
263
sync_arr = synchronized(raw_arr)
264
265
def sync_worker(arr, worker_id):
266
with arr.get_lock():
267
for i in range(len(arr)):
268
arr[i] += worker_id
269
time.sleep(0.01)
270
print(f"Worker {worker_id} completed")
271
272
# Use synchronized array
273
processes = []
274
for i in range(1, 4):
275
p = Process(target=sync_worker, args=(sync_arr, i))
276
processes.append(p)
277
p.start()
278
279
for p in processes:
280
p.join()
281
282
print(f"Final array: {list(sync_arr[:])}")
283
284
# Copy shared object
285
arr_copy = copy(sync_arr)
286
print(f"Copied array: {list(arr_copy[:])}")
287
288
if __name__ == '__main__':
289
synchronized_access_example()
290
```
291
292
### Advanced Shared Memory Patterns
293
294
#### Circular Buffer
295
296
```python
297
from billiard import Process, Array, Value
298
import time
299
300
class CircularBuffer:
301
def __init__(self, size):
302
self.buffer = Array('i', size)
303
self.size = size
304
self.head = Value('i', 0)
305
self.tail = Value('i', 0)
306
self.count = Value('i', 0)
307
308
def put(self, item):
309
with self.buffer.get_lock():
310
if self.count.value < self.size:
311
self.buffer[self.tail.value] = item
312
self.tail.value = (self.tail.value + 1) % self.size
313
self.count.value += 1
314
return True
315
return False # Buffer full
316
317
def get(self):
318
with self.buffer.get_lock():
319
if self.count.value > 0:
320
item = self.buffer[self.head.value]
321
self.head.value = (self.head.value + 1) % self.size
322
self.count.value -= 1
323
return item
324
return None # Buffer empty
325
326
def producer(buffer, items):
327
for item in items:
328
while not buffer.put(item):
329
time.sleep(0.01) # Wait if buffer full
330
print(f"Produced: {item}")
331
time.sleep(0.1)
332
333
def consumer(buffer, consumer_id):
334
while True:
335
item = buffer.get()
336
if item is not None:
337
print(f"Consumer {consumer_id} got: {item}")
338
time.sleep(0.15)
339
else:
340
time.sleep(0.05)
341
# Check for termination condition
342
break
343
344
# Usage
345
if __name__ == '__main__':
346
circ_buffer = CircularBuffer(5)
347
348
prod = Process(target=producer, args=(circ_buffer, range(10)))
349
cons1 = Process(target=consumer, args=(circ_buffer, 1))
350
cons2 = Process(target=consumer, args=(circ_buffer, 2))
351
352
prod.start()
353
cons1.start()
354
cons2.start()
355
356
prod.join()
357
time.sleep(2) # Let consumers finish
358
cons1.terminate()
359
cons2.terminate()
360
```
361
362
#### Shared Statistics
363
364
```python
365
from billiard import Process, Array, Value
366
import time
367
import random
368
369
class SharedStats:
370
def __init__(self):
371
self.count = Value('i', 0)
372
self.sum = Value('d', 0.0)
373
self.min_val = Value('d', float('inf'))
374
self.max_val = Value('d', float('-inf'))
375
376
def update(self, value):
377
with self.count.get_lock():
378
self.count.value += 1
379
self.sum.value += value
380
if value < self.min_val.value:
381
self.min_val.value = value
382
if value > self.max_val.value:
383
self.max_val.value = value
384
385
def get_stats(self):
386
with self.count.get_lock():
387
if self.count.value > 0:
388
return {
389
'count': self.count.value,
390
'sum': self.sum.value,
391
'avg': self.sum.value / self.count.value,
392
'min': self.min_val.value,
393
'max': self.max_val.value
394
}
395
return {'count': 0}
396
397
def data_generator(stats, num_values):
398
for _ in range(num_values):
399
value = random.uniform(-100, 100)
400
stats.update(value)
401
time.sleep(0.01)
402
403
def stats_reporter(stats):
404
for _ in range(10):
405
time.sleep(0.5)
406
current_stats = stats.get_stats()
407
print(f"Stats: {current_stats}")
408
409
# Usage
410
if __name__ == '__main__':
411
stats = SharedStats()
412
413
# Start data generators
414
generators = []
415
for i in range(3):
416
p = Process(target=data_generator, args=(stats, 20))
417
generators.append(p)
418
p.start()
419
420
# Start reporter
421
reporter = Process(target=stats_reporter, args=(stats,))
422
reporter.start()
423
424
for p in generators:
425
p.join()
426
427
reporter.join()
428
429
final_stats = stats.get_stats()
430
print(f"Final stats: {final_stats}")
431
```
432
433
## Memory Layout and Performance
434
435
- **Shared values and arrays** reside in shared memory accessible by all processes
436
- **Synchronization overhead** occurs only when `lock=True` (default)
437
- **Raw values/arrays** have no synchronization overhead but require manual coordination
438
- **ctypes integration** provides direct memory access with C-compatible data types
439
- **Initialization** can be done with size (zeros) or from existing sequences