or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-management.mdindex.mdmanagers.mdprocess-management.mdprocess-pools.mdqueues.mdshared-memory.mdsynchronization.md

managers.mddocs/

0

# Managers

1

2

Shared object managers for creating and managing shared objects across multiple processes with proxy-based access and automatic cleanup.

3

4

## Capabilities

5

6

### Manager Creation

7

8

Create manager instances for sharing objects between processes.

9

10

```python { .api }

11

def Manager() -> SyncManager:

12

"""

13

Create a SyncManager instance.

14

15

Returns:

16

SyncManager object for creating shared objects

17

"""

18

```

19

20

Usage example:

21

22

```python

23

from billiard import Process, Manager

24

import time

25

26

def worker_with_manager(shared_dict, shared_list, worker_id):

27

"""Worker that uses managed objects"""

28

# Update shared dictionary

29

shared_dict[f'worker_{worker_id}'] = f'Hello from worker {worker_id}'

30

31

# Add to shared list

32

shared_list.append(f'Item from worker {worker_id}')

33

34

print(f"Worker {worker_id}: dict={dict(shared_dict)}")

35

print(f"Worker {worker_id}: list={list(shared_list)}")

36

37

if __name__ == '__main__':

38

# Create manager

39

with Manager() as manager:

40

# Create shared objects through manager

41

shared_dict = manager.dict()

42

shared_list = manager.list()

43

44

# Initialize shared objects

45

shared_dict['initial'] = 'value'

46

shared_list.extend([1, 2, 3])

47

48

# Start worker processes

49

processes = []

50

for i in range(3):

51

p = Process(target=worker_with_manager,

52

args=(shared_dict, shared_list, i))

53

processes.append(p)

54

p.start()

55

56

# Wait for completion

57

for p in processes:

58

p.join()

59

60

print(f"Final dict: {dict(shared_dict)}")

61

print(f"Final list: {list(shared_list)}")

62

```

63

64

### SyncManager

65

66

Manager for synchronization primitives and shared objects.

67

68

```python { .api }

69

class SyncManager:

70

"""

71

Manager for shared objects and synchronization primitives.

72

"""

73

def start(self):

74

"""

75

Start the manager process.

76

"""

77

78

def shutdown(self):

79

"""

80

Shutdown the manager process.

81

"""

82

83

def dict(self, *args, **kwargs) -> dict:

84

"""

85

Create a shared dictionary.

86

87

Parameters:

88

- *args, **kwargs: arguments for dict() constructor

89

90

Returns:

91

Proxy to shared dictionary

92

"""

93

94

def list(self, sequence=()) -> list:

95

"""

96

Create a shared list.

97

98

Parameters:

99

- sequence: initial sequence for list

100

101

Returns:

102

Proxy to shared list

103

"""

104

105

def Namespace(self):

106

"""

107

Create a shared namespace object.

108

109

Returns:

110

Proxy to shared namespace (object with arbitrary attributes)

111

"""

112

113

def Value(self, typecode, value, lock=True):

114

"""

115

Create a shared Value.

116

117

Parameters:

118

- typecode: ctypes typecode

119

- value: initial value

120

- lock: whether to use locking

121

122

Returns:

123

Proxy to shared value

124

"""

125

126

def Array(self, typecode, sequence, lock=True):

127

"""

128

Create a shared Array.

129

130

Parameters:

131

- typecode: ctypes typecode

132

- sequence: initial sequence or size

133

- lock: whether to use locking

134

135

Returns:

136

Proxy to shared array

137

"""

138

139

def Queue(self, maxsize=0):

140

"""

141

Create a shared Queue.

142

143

Parameters:

144

- maxsize: maximum queue size

145

146

Returns:

147

Proxy to shared queue

148

"""

149

150

def JoinableQueue(self, maxsize=0):

151

"""

152

Create a shared JoinableQueue.

153

154

Parameters:

155

- maxsize: maximum queue size

156

157

Returns:

158

Proxy to shared joinable queue

159

"""

160

161

def Lock(self):

162

"""

163

Create a shared Lock.

164

165

Returns:

166

Proxy to shared lock

167

"""

168

169

def RLock(self):

170

"""

171

Create a shared RLock.

172

173

Returns:

174

Proxy to shared recursive lock

175

"""

176

177

def Semaphore(self, value=1):

178

"""

179

Create a shared Semaphore.

180

181

Parameters:

182

- value: initial semaphore count

183

184

Returns:

185

Proxy to shared semaphore

186

"""

187

188

def BoundedSemaphore(self, value=1):

189

"""

190

Create a shared BoundedSemaphore.

191

192

Parameters:

193

- value: initial semaphore count

194

195

Returns:

196

Proxy to shared bounded semaphore

197

"""

198

199

def Condition(self, lock=None):

200

"""

201

Create a shared Condition.

202

203

Parameters:

204

- lock: underlying lock (creates new if None)

205

206

Returns:

207

Proxy to shared condition variable

208

"""

209

210

def Event(self):

211

"""

212

Create a shared Event.

213

214

Returns:

215

Proxy to shared event

216

"""

217

218

def Barrier(self, parties, action=None, timeout=None):

219

"""

220

Create a shared Barrier.

221

222

Parameters:

223

- parties: number of processes needed

224

- action: callable to run when barrier releases

225

- timeout: default timeout

226

227

Returns:

228

Proxy to shared barrier

229

"""

230

```

231

232

Usage example:

233

234

```python

235

from billiard import Process, Manager

236

import time

237

import random

238

239

def producer_with_manager(queue, event, stats):

240

"""Producer using managed objects"""

241

for i in range(5):

242

item = f"item_{i}"

243

queue.put(item)

244

stats['produced'] = stats.get('produced', 0) + 1

245

print(f"Produced: {item}")

246

time.sleep(random.uniform(0.1, 0.5))

247

248

# Signal completion

249

event.set()

250

251

def consumer_with_manager(queue, event, stats, consumer_id):

252

"""Consumer using managed objects"""

253

while True:

254

try:

255

if not queue.empty():

256

item = queue.get_nowait()

257

stats[f'consumer_{consumer_id}'] = stats.get(f'consumer_{consumer_id}', 0) + 1

258

print(f"Consumer {consumer_id} consumed: {item}")

259

time.sleep(0.2)

260

elif event.is_set():

261

break

262

else:

263

time.sleep(0.1)

264

except:

265

time.sleep(0.1)

266

267

def manager_coordination_example():

268

"""Demonstrate manager-based coordination"""

269

with Manager() as manager:

270

# Create managed objects

271

shared_queue = manager.Queue()

272

completion_event = manager.Event()

273

stats = manager.dict()

274

275

# Start processes

276

processes = []

277

278

# Producer

279

prod = Process(target=producer_with_manager,

280

args=(shared_queue, completion_event, stats))

281

processes.append(prod)

282

prod.start()

283

284

# Consumers

285

for i in range(2):

286

cons = Process(target=consumer_with_manager,

287

args=(shared_queue, completion_event, stats, i))

288

processes.append(cons)

289

cons.start()

290

291

# Wait for completion

292

for p in processes:

293

p.join()

294

295

print(f"Final stats: {dict(stats)}")

296

297

if __name__ == '__main__':

298

manager_coordination_example()

299

```

300

301

### Custom Managers

302

303

Create custom managers for specialized shared objects.

304

305

```python { .api }

306

class BaseManager:

307

"""

308

Base class for creating custom managers.

309

"""

310

def __init__(self, address=None, authkey=None, serializer='pickle'):

311

"""

312

Create a BaseManager.

313

314

Parameters:

315

- address: address for manager server

316

- authkey: authentication key

317

- serializer: serialization method

318

"""

319

320

def start(self, initializer=None, initargs=()):

321

"""

322

Start the manager process.

323

324

Parameters:

325

- initializer: callable to run on manager startup

326

- initargs: arguments for initializer

327

"""

328

329

def shutdown(self):

330

"""

331

Shutdown the manager.

332

"""

333

334

@classmethod

335

def register(cls, typeid, callable=None, proxytype=None, exposed=None,

336

method_to_typeid=None, create_method=True):

337

"""

338

Register a type with the manager.

339

340

Parameters:

341

- typeid: string identifier for the type

342

- callable: callable that returns the object

343

- proxytype: proxy class for the object

344

- exposed: list of exposed methods/attributes

345

- method_to_typeid: mapping of method names to typeids

346

- create_method: whether to create a method on manager

347

"""

348

```

349

350

Usage example:

351

352

```python

353

from billiard import Process

354

from billiard.managers import BaseManager

355

import time

356

import threading

357

358

# Custom shared object

359

class Counter:

360

def __init__(self):

361

self._value = 0

362

self._lock = threading.Lock()

363

364

def increment(self):

365

with self._lock:

366

self._value += 1

367

368

def decrement(self):

369

with self._lock:

370

self._value -= 1

371

372

def get_value(self):

373

with self._lock:

374

return self._value

375

376

# Custom manager

377

class CustomManager(BaseManager):

378

pass

379

380

# Register the Counter class

381

CustomManager.register('Counter', Counter)

382

383

def worker_with_custom_manager(counter, worker_id, operations):

384

"""Worker using custom managed object"""

385

for i in range(operations):

386

if i % 2 == 0:

387

counter.increment()

388

print(f"Worker {worker_id}: incremented to {counter.get_value()}")

389

else:

390

counter.decrement()

391

print(f"Worker {worker_id}: decremented to {counter.get_value()}")

392

time.sleep(0.1)

393

394

def custom_manager_example():

395

"""Demonstrate custom manager usage"""

396

with CustomManager() as manager:

397

# Create custom managed object

398

counter = manager.Counter()

399

400

print(f"Initial counter value: {counter.get_value()}")

401

402

# Start worker processes

403

processes = []

404

for i in range(3):

405

p = Process(target=worker_with_custom_manager,

406

args=(counter, i, 5))

407

processes.append(p)

408

p.start()

409

410

# Wait for completion

411

for p in processes:

412

p.join()

413

414

print(f"Final counter value: {counter.get_value()}")

415

416

if __name__ == '__main__':

417

custom_manager_example()

418

```

419

420

### Advanced Manager Patterns

421

422

#### Shared Cache with Manager

423

424

```python

425

from billiard import Process, Manager

426

import time

427

import random

428

429

def cache_worker(cache, lock, worker_id):

430

"""Worker that uses shared cache"""

431

for i in range(5):

432

key = f"key_{random.randint(1, 10)}"

433

434

# Try to get from cache

435

with lock:

436

if key in cache:

437

value = cache[key]

438

print(f"Worker {worker_id}: cache hit for {key} = {value}")

439

else:

440

# Simulate expensive computation

441

value = random.randint(100, 999)

442

cache[key] = value

443

print(f"Worker {worker_id}: cache miss, computed {key} = {value}")

444

445

time.sleep(0.2)

446

447

def shared_cache_example():

448

"""Demonstrate shared cache using manager"""

449

with Manager() as manager:

450

cache = manager.dict()

451

cache_lock = manager.Lock()

452

453

# Start workers

454

processes = []

455

for i in range(4):

456

p = Process(target=cache_worker, args=(cache, cache_lock, i))

457

processes.append(p)

458

p.start()

459

460

for p in processes:

461

p.join()

462

463

print(f"Final cache contents: {dict(cache)}")

464

465

if __name__ == '__main__':

466

shared_cache_example()

467

```

468

469

#### Work Distribution with Manager

470

471

```python

472

from billiard import Process, Manager

473

import time

474

import random

475

476

def work_distributor(task_queue, result_dict, num_tasks):

477

"""Distribute tasks to workers"""

478

for i in range(num_tasks):

479

task = {

480

'id': i,

481

'data': random.randint(1, 100),

482

'operation': random.choice(['square', 'cube', 'double'])

483

}

484

task_queue.put(task)

485

486

# Add termination signals

487

for _ in range(3): # Number of workers

488

task_queue.put(None)

489

490

def worker_processor(task_queue, result_dict, worker_id):

491

"""Process tasks from queue"""

492

while True:

493

task = task_queue.get()

494

if task is None:

495

break

496

497

# Process task

498

data = task['data']

499

if task['operation'] == 'square':

500

result = data ** 2

501

elif task['operation'] == 'cube':

502

result = data ** 3

503

else: # double

504

result = data * 2

505

506

result_dict[task['id']] = {

507

'input': data,

508

'operation': task['operation'],

509

'result': result,

510

'worker': worker_id

511

}

512

513

print(f"Worker {worker_id}: processed task {task['id']}")

514

time.sleep(0.1)

515

516

def work_distribution_example():

517

"""Demonstrate work distribution pattern"""

518

with Manager() as manager:

519

task_queue = manager.Queue()

520

results = manager.dict()

521

522

# Start distributor

523

distributor = Process(target=work_distributor,

524

args=(task_queue, results, 15))

525

distributor.start()

526

527

# Start workers

528

workers = []

529

for i in range(3):

530

worker = Process(target=worker_processor,

531

args=(task_queue, results, i))

532

workers.append(worker)

533

worker.start()

534

535

# Wait for completion

536

distributor.join()

537

for worker in workers:

538

worker.join()

539

540

# Display results

541

print(f"Processed {len(results)} tasks:")

542

for task_id, result in sorted(results.items()):

543

print(f"Task {task_id}: {result['input']} {result['operation']} = "

544

f"{result['result']} (worker {result['worker']})")

545

546

if __name__ == '__main__':

547

work_distribution_example()

548

```

549

550

## Manager Best Practices

551

552

1. **Use context managers** (`with Manager() as manager:`) for automatic cleanup

553

2. **Minimize proxy method calls** - cache frequently accessed values locally

554

3. **Use appropriate data structures** - manager objects have method call overhead

555

4. **Consider alternatives** for high-performance scenarios (shared memory arrays)

556

5. **Handle manager failures** - managers run in separate processes and can fail

557

6. **Serialize access** to shared objects when needed using manager locks

558

559

## Performance Considerations

560

561

- **Method call overhead**: Each proxy method call involves IPC

562

- **Serialization cost**: Objects are pickled/unpickled for transfer

563

- **Network latency**: Managers can run on remote machines

564

- **Memory usage**: Objects stored in manager process memory

565

- **Garbage collection**: Managed objects persist until manager shutdown