0
# Managers
1
2
Shared object managers for creating and managing shared objects across multiple processes with proxy-based access and automatic cleanup.
3
4
## Capabilities
5
6
### Manager Creation
7
8
Create manager instances for sharing objects between processes.
9
10
```python { .api }
11
def Manager() -> SyncManager:
12
"""
13
Create a SyncManager instance.
14
15
Returns:
16
SyncManager object for creating shared objects
17
"""
18
```
19
20
Usage example:
21
22
```python
23
from billiard import Process, Manager
24
import time
25
26
def worker_with_manager(shared_dict, shared_list, worker_id):
27
"""Worker that uses managed objects"""
28
# Update shared dictionary
29
shared_dict[f'worker_{worker_id}'] = f'Hello from worker {worker_id}'
30
31
# Add to shared list
32
shared_list.append(f'Item from worker {worker_id}')
33
34
print(f"Worker {worker_id}: dict={dict(shared_dict)}")
35
print(f"Worker {worker_id}: list={list(shared_list)}")
36
37
if __name__ == '__main__':
38
# Create manager
39
with Manager() as manager:
40
# Create shared objects through manager
41
shared_dict = manager.dict()
42
shared_list = manager.list()
43
44
# Initialize shared objects
45
shared_dict['initial'] = 'value'
46
shared_list.extend([1, 2, 3])
47
48
# Start worker processes
49
processes = []
50
for i in range(3):
51
p = Process(target=worker_with_manager,
52
args=(shared_dict, shared_list, i))
53
processes.append(p)
54
p.start()
55
56
# Wait for completion
57
for p in processes:
58
p.join()
59
60
print(f"Final dict: {dict(shared_dict)}")
61
print(f"Final list: {list(shared_list)}")
62
```
63
64
### SyncManager
65
66
Manager for synchronization primitives and shared objects.
67
68
```python { .api }
69
class SyncManager:
70
"""
71
Manager for shared objects and synchronization primitives.
72
"""
73
def start(self):
74
"""
75
Start the manager process.
76
"""
77
78
def shutdown(self):
79
"""
80
Shutdown the manager process.
81
"""
82
83
def dict(self, *args, **kwargs) -> dict:
84
"""
85
Create a shared dictionary.
86
87
Parameters:
88
- *args, **kwargs: arguments for dict() constructor
89
90
Returns:
91
Proxy to shared dictionary
92
"""
93
94
def list(self, sequence=()) -> list:
95
"""
96
Create a shared list.
97
98
Parameters:
99
- sequence: initial sequence for list
100
101
Returns:
102
Proxy to shared list
103
"""
104
105
def Namespace(self):
106
"""
107
Create a shared namespace object.
108
109
Returns:
110
Proxy to shared namespace (object with arbitrary attributes)
111
"""
112
113
def Value(self, typecode, value, lock=True):
114
"""
115
Create a shared Value.
116
117
Parameters:
118
- typecode: ctypes typecode
119
- value: initial value
120
- lock: whether to use locking
121
122
Returns:
123
Proxy to shared value
124
"""
125
126
def Array(self, typecode, sequence, lock=True):
127
"""
128
Create a shared Array.
129
130
Parameters:
131
- typecode: ctypes typecode
132
- sequence: initial sequence or size
133
- lock: whether to use locking
134
135
Returns:
136
Proxy to shared array
137
"""
138
139
def Queue(self, maxsize=0):
140
"""
141
Create a shared Queue.
142
143
Parameters:
144
- maxsize: maximum queue size
145
146
Returns:
147
Proxy to shared queue
148
"""
149
150
def JoinableQueue(self, maxsize=0):
151
"""
152
Create a shared JoinableQueue.
153
154
Parameters:
155
- maxsize: maximum queue size
156
157
Returns:
158
Proxy to shared joinable queue
159
"""
160
161
def Lock(self):
162
"""
163
Create a shared Lock.
164
165
Returns:
166
Proxy to shared lock
167
"""
168
169
def RLock(self):
170
"""
171
Create a shared RLock.
172
173
Returns:
174
Proxy to shared recursive lock
175
"""
176
177
def Semaphore(self, value=1):
178
"""
179
Create a shared Semaphore.
180
181
Parameters:
182
- value: initial semaphore count
183
184
Returns:
185
Proxy to shared semaphore
186
"""
187
188
def BoundedSemaphore(self, value=1):
189
"""
190
Create a shared BoundedSemaphore.
191
192
Parameters:
193
- value: initial semaphore count
194
195
Returns:
196
Proxy to shared bounded semaphore
197
"""
198
199
def Condition(self, lock=None):
200
"""
201
Create a shared Condition.
202
203
Parameters:
204
- lock: underlying lock (creates new if None)
205
206
Returns:
207
Proxy to shared condition variable
208
"""
209
210
def Event(self):
211
"""
212
Create a shared Event.
213
214
Returns:
215
Proxy to shared event
216
"""
217
218
def Barrier(self, parties, action=None, timeout=None):
219
"""
220
Create a shared Barrier.
221
222
Parameters:
223
- parties: number of processes needed
224
- action: callable to run when barrier releases
225
- timeout: default timeout
226
227
Returns:
228
Proxy to shared barrier
229
"""
230
```
231
232
Usage example:
233
234
```python
235
from billiard import Process, Manager
236
import time
237
import random
238
239
def producer_with_manager(queue, event, stats):
240
"""Producer using managed objects"""
241
for i in range(5):
242
item = f"item_{i}"
243
queue.put(item)
244
stats['produced'] = stats.get('produced', 0) + 1
245
print(f"Produced: {item}")
246
time.sleep(random.uniform(0.1, 0.5))
247
248
# Signal completion
249
event.set()
250
251
def consumer_with_manager(queue, event, stats, consumer_id):
252
"""Consumer using managed objects"""
253
while True:
254
try:
255
if not queue.empty():
256
item = queue.get_nowait()
257
stats[f'consumer_{consumer_id}'] = stats.get(f'consumer_{consumer_id}', 0) + 1
258
print(f"Consumer {consumer_id} consumed: {item}")
259
time.sleep(0.2)
260
elif event.is_set():
261
break
262
else:
263
time.sleep(0.1)
264
except:
265
time.sleep(0.1)
266
267
def manager_coordination_example():
268
"""Demonstrate manager-based coordination"""
269
with Manager() as manager:
270
# Create managed objects
271
shared_queue = manager.Queue()
272
completion_event = manager.Event()
273
stats = manager.dict()
274
275
# Start processes
276
processes = []
277
278
# Producer
279
prod = Process(target=producer_with_manager,
280
args=(shared_queue, completion_event, stats))
281
processes.append(prod)
282
prod.start()
283
284
# Consumers
285
for i in range(2):
286
cons = Process(target=consumer_with_manager,
287
args=(shared_queue, completion_event, stats, i))
288
processes.append(cons)
289
cons.start()
290
291
# Wait for completion
292
for p in processes:
293
p.join()
294
295
print(f"Final stats: {dict(stats)}")
296
297
if __name__ == '__main__':
298
manager_coordination_example()
299
```
300
301
### Custom Managers
302
303
Create custom managers for specialized shared objects.
304
305
```python { .api }
306
class BaseManager:
307
"""
308
Base class for creating custom managers.
309
"""
310
def __init__(self, address=None, authkey=None, serializer='pickle'):
311
"""
312
Create a BaseManager.
313
314
Parameters:
315
- address: address for manager server
316
- authkey: authentication key
317
- serializer: serialization method
318
"""
319
320
def start(self, initializer=None, initargs=()):
321
"""
322
Start the manager process.
323
324
Parameters:
325
- initializer: callable to run on manager startup
326
- initargs: arguments for initializer
327
"""
328
329
def shutdown(self):
330
"""
331
Shutdown the manager.
332
"""
333
334
@classmethod
335
def register(cls, typeid, callable=None, proxytype=None, exposed=None,
336
method_to_typeid=None, create_method=True):
337
"""
338
Register a type with the manager.
339
340
Parameters:
341
- typeid: string identifier for the type
342
- callable: callable that returns the object
343
- proxytype: proxy class for the object
344
- exposed: list of exposed methods/attributes
345
- method_to_typeid: mapping of method names to typeids
346
- create_method: whether to create a method on manager
347
"""
348
```
349
350
Usage example:
351
352
```python
353
from billiard import Process
354
from billiard.managers import BaseManager
355
import time
356
import threading
357
358
# Custom shared object
359
class Counter:
360
def __init__(self):
361
self._value = 0
362
self._lock = threading.Lock()
363
364
def increment(self):
365
with self._lock:
366
self._value += 1
367
368
def decrement(self):
369
with self._lock:
370
self._value -= 1
371
372
def get_value(self):
373
with self._lock:
374
return self._value
375
376
# Custom manager
377
class CustomManager(BaseManager):
378
pass
379
380
# Register the Counter class
381
CustomManager.register('Counter', Counter)
382
383
def worker_with_custom_manager(counter, worker_id, operations):
384
"""Worker using custom managed object"""
385
for i in range(operations):
386
if i % 2 == 0:
387
counter.increment()
388
print(f"Worker {worker_id}: incremented to {counter.get_value()}")
389
else:
390
counter.decrement()
391
print(f"Worker {worker_id}: decremented to {counter.get_value()}")
392
time.sleep(0.1)
393
394
def custom_manager_example():
395
"""Demonstrate custom manager usage"""
396
with CustomManager() as manager:
397
# Create custom managed object
398
counter = manager.Counter()
399
400
print(f"Initial counter value: {counter.get_value()}")
401
402
# Start worker processes
403
processes = []
404
for i in range(3):
405
p = Process(target=worker_with_custom_manager,
406
args=(counter, i, 5))
407
processes.append(p)
408
p.start()
409
410
# Wait for completion
411
for p in processes:
412
p.join()
413
414
print(f"Final counter value: {counter.get_value()}")
415
416
if __name__ == '__main__':
417
custom_manager_example()
418
```
419
420
### Advanced Manager Patterns
421
422
#### Shared Cache with Manager
423
424
```python
425
from billiard import Process, Manager
426
import time
427
import random
428
429
def cache_worker(cache, lock, worker_id):
430
"""Worker that uses shared cache"""
431
for i in range(5):
432
key = f"key_{random.randint(1, 10)}"
433
434
# Try to get from cache
435
with lock:
436
if key in cache:
437
value = cache[key]
438
print(f"Worker {worker_id}: cache hit for {key} = {value}")
439
else:
440
# Simulate expensive computation
441
value = random.randint(100, 999)
442
cache[key] = value
443
print(f"Worker {worker_id}: cache miss, computed {key} = {value}")
444
445
time.sleep(0.2)
446
447
def shared_cache_example():
448
"""Demonstrate shared cache using manager"""
449
with Manager() as manager:
450
cache = manager.dict()
451
cache_lock = manager.Lock()
452
453
# Start workers
454
processes = []
455
for i in range(4):
456
p = Process(target=cache_worker, args=(cache, cache_lock, i))
457
processes.append(p)
458
p.start()
459
460
for p in processes:
461
p.join()
462
463
print(f"Final cache contents: {dict(cache)}")
464
465
if __name__ == '__main__':
466
shared_cache_example()
467
```
468
469
#### Work Distribution with Manager
470
471
```python
472
from billiard import Process, Manager
473
import time
474
import random
475
476
def work_distributor(task_queue, result_dict, num_tasks):
477
"""Distribute tasks to workers"""
478
for i in range(num_tasks):
479
task = {
480
'id': i,
481
'data': random.randint(1, 100),
482
'operation': random.choice(['square', 'cube', 'double'])
483
}
484
task_queue.put(task)
485
486
# Add termination signals
487
for _ in range(3): # Number of workers
488
task_queue.put(None)
489
490
def worker_processor(task_queue, result_dict, worker_id):
491
"""Process tasks from queue"""
492
while True:
493
task = task_queue.get()
494
if task is None:
495
break
496
497
# Process task
498
data = task['data']
499
if task['operation'] == 'square':
500
result = data ** 2
501
elif task['operation'] == 'cube':
502
result = data ** 3
503
else: # double
504
result = data * 2
505
506
result_dict[task['id']] = {
507
'input': data,
508
'operation': task['operation'],
509
'result': result,
510
'worker': worker_id
511
}
512
513
print(f"Worker {worker_id}: processed task {task['id']}")
514
time.sleep(0.1)
515
516
def work_distribution_example():
517
"""Demonstrate work distribution pattern"""
518
with Manager() as manager:
519
task_queue = manager.Queue()
520
results = manager.dict()
521
522
# Start distributor
523
distributor = Process(target=work_distributor,
524
args=(task_queue, results, 15))
525
distributor.start()
526
527
# Start workers
528
workers = []
529
for i in range(3):
530
worker = Process(target=worker_processor,
531
args=(task_queue, results, i))
532
workers.append(worker)
533
worker.start()
534
535
# Wait for completion
536
distributor.join()
537
for worker in workers:
538
worker.join()
539
540
# Display results
541
print(f"Processed {len(results)} tasks:")
542
for task_id, result in sorted(results.items()):
543
print(f"Task {task_id}: {result['input']} {result['operation']} = "
544
f"{result['result']} (worker {result['worker']})")
545
546
if __name__ == '__main__':
547
work_distribution_example()
548
```
549
550
## Manager Best Practices
551
552
1. **Use context managers** (`with Manager() as manager:`) for automatic cleanup
553
2. **Minimize proxy method calls** - cache frequently accessed values locally
554
3. **Use appropriate data structures** - manager objects have method call overhead
555
4. **Consider alternatives** for high-performance scenarios (shared memory arrays)
556
5. **Handle manager failures** - managers run in separate processes and can fail
557
6. **Serialize access** to shared objects when needed using manager locks
558
559
## Performance Considerations
560
561
- **Method call overhead**: Each proxy method call involves IPC
562
- **Serialization cost**: Objects are pickled/unpickled for transfer
563
- **Network latency**: Managers can run on remote machines
564
- **Memory usage**: Objects stored in manager process memory
565
- **Garbage collection**: Managed objects persist until manager shutdown