or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-config.mdindex.mdpools.mdprocess-management.mdshared-objects.mdsynchronization.md

shared-objects.mddocs/

0

# Shared Objects and Memory

1

2

Objects and memory that can be shared between processes. Multiprocess provides both high-level managed objects through Manager and low-level shared memory constructs for different sharing patterns and performance requirements.

3

4

## Capabilities

5

6

### Manager Objects

7

8

High-level interface for creating shared objects managed by a server process.

9

10

```python { .api }

11

def Manager():

12

"""

13

Create a SyncManager instance for sharing objects between processes.

14

15

Returns:

16

SyncManager: manager instance that creates shared objects

17

"""

18

```

19

20

#### SyncManager Class

21

22

```python { .api }

23

class SyncManager:

24

"""

25

Manager that provides shared objects via a server process.

26

"""

27

def start(self, initializer=None, initargs=()):

28

"""

29

Start the manager's server process.

30

31

Args:

32

initializer: callable to run when server starts

33

initargs: arguments for initializer

34

"""

35

36

def shutdown(self):

37

"""Shutdown the manager's server process."""

38

39

def dict(self, *args, **kwargs):

40

"""

41

Create a shared dictionary.

42

43

Returns:

44

DictProxy: proxy to a shared dict object

45

"""

46

47

def list(self, sequence=()):

48

"""

49

Create a shared list.

50

51

Args:

52

sequence: initial sequence to populate list

53

54

Returns:

55

ListProxy: proxy to a shared list object

56

"""

57

58

def Namespace(self):

59

"""

60

Create a shared namespace object.

61

62

Returns:

63

NamespaceProxy: proxy to a shared namespace

64

"""

65

66

def Lock(self):

67

"""Create a shared Lock."""

68

69

def RLock(self):

70

"""Create a shared RLock."""

71

72

def Semaphore(self, value=1):

73

"""Create a shared Semaphore."""

74

75

def BoundedSemaphore(self, value=1):

76

"""Create a shared BoundedSemaphore."""

77

78

def Condition(self, lock=None):

79

"""Create a shared Condition."""

80

81

def Event(self):

82

"""Create a shared Event."""

83

84

def Barrier(self, parties, action=None, timeout=None):

85

"""Create a shared Barrier."""

86

87

def Queue(self, maxsize=0):

88

"""Create a shared Queue."""

89

90

def JoinableQueue(self, maxsize=0):

91

"""Create a shared JoinableQueue."""

92

93

def Pool(self, processes=None, initializer=None, initargs=()):

94

"""Create a shared Pool."""

95

96

def __enter__(self):

97

"""Context manager entry - starts manager."""

98

99

def __exit__(self, exc_type, exc_val, exc_tb):

100

"""Context manager exit - shuts down manager."""

101

```

102

103

### Shared Memory Objects

104

105

Low-level shared memory constructs for direct memory sharing between processes.

106

107

```python

108

# Import from shared_memory submodule

109

from multiprocess.shared_memory import SharedMemory, ShareableList

110

```

111

112

```python { .api }

113

class SharedMemory:

114

"""

115

Direct shared memory block accessible across processes.

116

117

Args:

118

name: name of existing shared memory block (None to create new)

119

create: if True, create new shared memory block

120

size: size in bytes for new shared memory block

121

"""

122

def __init__(self, name=None, create=False, size=0): ...

123

124

def close(self):

125

"""Close access to the shared memory from this instance."""

126

127

def unlink(self):

128

"""

129

Request deletion of the shared memory block.

130

Should be called by only one process.

131

"""

132

133

# Properties

134

name: str # Name of the shared memory block

135

size: int # Size of the shared memory block in bytes

136

buf: memoryview # Memory buffer for direct access

137

```

138

139

```python { .api }

140

class ShareableList:

141

"""

142

List-like object stored in shared memory.

143

144

Args:

145

sequence: initial sequence to populate the list (None for existing)

146

name: name of existing shareable list (None to create new)

147

"""

148

def __init__(self, sequence=None, name=None): ...

149

150

def __getitem__(self, index):

151

"""Get item at index."""

152

153

def __setitem__(self, index, value):

154

"""Set item at index."""

155

156

def __len__(self):

157

"""Return length of list."""

158

159

def copy(self):

160

"""

161

Return a shallow copy as a regular list.

162

163

Returns:

164

list: copy of the shareable list

165

"""

166

167

def count(self, value):

168

"""

169

Return number of occurrences of value.

170

171

Args:

172

value: value to count

173

174

Returns:

175

int: number of occurrences

176

"""

177

178

def index(self, value):

179

"""

180

Return index of first occurrence of value.

181

182

Args:

183

value: value to find

184

185

Returns:

186

int: index of value

187

188

Raises:

189

ValueError: if value not found

190

"""

191

192

# Properties

193

format: str # Format string describing stored types

194

shm: SharedMemory # Underlying shared memory block

195

```

196

197

### Shared ctypes Objects

198

199

Shared memory objects based on ctypes for typed data sharing.

200

201

```python

202

# Import from sharedctypes for utility functions

203

from multiprocess.sharedctypes import copy, synchronized

204

```

205

206

```python { .api }

207

def Value(typecode_or_type, *args, lock=True):

208

"""

209

Create a shared ctypes object.

210

211

Args:

212

typecode_or_type: ctypes type or single character type code

213

args: initial value arguments

214

lock: if True, create with synchronization lock

215

216

Returns:

217

SynchronizedBase: synchronized shared value

218

"""

219

220

def Array(typecode_or_type, size_or_initializer, lock=True):

221

"""

222

Create a shared ctypes array.

223

224

Args:

225

typecode_or_type: ctypes type or single character type code

226

size_or_initializer: size of array or initial values

227

lock: if True, create with synchronization lock

228

229

Returns:

230

SynchronizedArray: synchronized shared array

231

"""

232

233

def RawValue(typecode_or_type, *args):

234

"""

235

Create an unsynchronized shared ctypes object.

236

237

Args:

238

typecode_or_type: ctypes type or single character type code

239

args: initial value arguments

240

241

Returns:

242

ctypes object: raw shared value without locking

243

"""

244

245

def RawArray(typecode_or_type, size_or_initializer):

246

"""

247

Create an unsynchronized shared ctypes array.

248

249

Args:

250

typecode_or_type: ctypes type or single character type code

251

size_or_initializer: size of array or initial values

252

253

Returns:

254

ctypes array: raw shared array without locking

255

"""

256

```

257

258

### Utility Functions

259

260

```python { .api }

261

def copy(obj):

262

"""

263

Create a copy of a shared object.

264

265

Args:

266

obj: shared object to copy

267

268

Returns:

269

object: copy of the shared object

270

"""

271

272

def synchronized(obj, lock=None):

273

"""

274

Make a shared object thread-safe.

275

276

Args:

277

obj: object to synchronize

278

lock: lock to use (creates new if None)

279

280

Returns:

281

SynchronizedBase: synchronized wrapper

282

"""

283

```

284

285

## Usage Examples

286

287

### Basic Manager Usage

288

289

```python

290

from multiprocess import Process, Manager

291

292

def worker(shared_dict, shared_list, worker_id):

293

# Modify shared dictionary

294

shared_dict[f'worker_{worker_id}'] = f'Hello from {worker_id}'

295

296

# Append to shared list

297

shared_list.append(f'Item from worker {worker_id}')

298

299

print(f"Worker {worker_id} completed")

300

301

if __name__ == '__main__':

302

with Manager() as manager:

303

# Create shared objects

304

shared_dict = manager.dict()

305

shared_list = manager.list()

306

307

# Create processes

308

processes = []

309

for i in range(3):

310

p = Process(target=worker, args=(shared_dict, shared_list, i))

311

p.start()

312

processes.append(p)

313

314

# Wait for completion

315

for p in processes:

316

p.join()

317

318

print(f"Final dict: {dict(shared_dict)}")

319

print(f"Final list: {list(shared_list)}")

320

```

321

322

### Shared Namespace

323

324

```python

325

from multiprocess import Process, Manager

326

327

def update_namespace(ns, worker_id):

328

# Access namespace attributes

329

ns.counter += 1

330

ns.messages.append(f"Message from worker {worker_id}")

331

332

# Create new attributes

333

setattr(ns, f'worker_{worker_id}_status', 'completed')

334

335

if __name__ == '__main__':

336

with Manager() as manager:

337

# Create shared namespace

338

ns = manager.Namespace()

339

ns.counter = 0

340

ns.messages = manager.list()

341

342

# Create processes

343

processes = []

344

for i in range(4):

345

p = Process(target=update_namespace, args=(ns, i))

346

p.start()

347

processes.append(p)

348

349

for p in processes:

350

p.join()

351

352

print(f"Counter: {ns.counter}")

353

print(f"Messages: {list(ns.messages)}")

354

355

# Print all attributes

356

for attr in dir(ns):

357

if not attr.startswith('_'):

358

print(f"{attr}: {getattr(ns, attr)}")

359

```

360

361

### SharedMemory Direct Access

362

363

```python

364

from multiprocess import Process

365

from multiprocess.shared_memory import SharedMemory

366

import numpy as np

367

368

def worker_process(shm_name, shape, dtype):

369

# Attach to existing shared memory

370

existing_shm = SharedMemory(name=shm_name)

371

372

# Create numpy array from shared memory

373

array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

374

375

# Modify the array

376

array += 10

377

print(f"Worker modified array: {array}")

378

379

# Clean up

380

existing_shm.close()

381

382

if __name__ == '__main__':

383

# Create data

384

data = np.array([1, 2, 3, 4, 5], dtype=np.int64)

385

386

# Create shared memory

387

shm = SharedMemory(create=True, size=data.nbytes)

388

389

# Copy data to shared memory

390

shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)

391

shared_array[:] = data[:]

392

393

print(f"Original array: {shared_array}")

394

395

# Create worker process

396

p = Process(target=worker_process, args=(shm.name, data.shape, data.dtype))

397

p.start()

398

p.join()

399

400

print(f"Array after worker: {shared_array}")

401

402

# Clean up

403

shm.close()

404

shm.unlink()

405

```

406

407

### ShareableList Usage

408

409

```python

410

from multiprocess import Process

411

from multiprocess.shared_memory import ShareableList

412

413

def list_worker(shared_list_name, worker_id):

414

# Attach to existing shareable list

415

shared_list = ShareableList(name=shared_list_name)

416

417

# Modify list elements

418

for i in range(len(shared_list)):

419

if isinstance(shared_list[i], (int, float)):

420

shared_list[i] = shared_list[i] + worker_id * 10

421

422

print(f"Worker {worker_id} processed list")

423

424

# List operations

425

print(f"List length: {len(shared_list)}")

426

print(f"Copy of list: {shared_list.copy()}")

427

428

if __name__ == '__main__':

429

# Create shareable list

430

initial_data = [1, 2, 3, 4, 5]

431

shared_list = ShareableList(initial_data)

432

433

print(f"Initial list: {shared_list.copy()}")

434

435

# Create worker processes

436

processes = []

437

for i in range(2):

438

p = Process(target=list_worker, args=(shared_list.shm.name, i + 1))

439

p.start()

440

processes.append(p)

441

442

for p in processes:

443

p.join()

444

445

print(f"Final list: {shared_list.copy()}")

446

447

# Clean up

448

shared_list.shm.close()

449

shared_list.shm.unlink()

450

```

451

452

### ctypes Shared Values

453

454

```python

455

from multiprocess import Process, Value, Array

456

import ctypes

457

458

def modify_shared_data(shared_value, shared_array):

459

# Modify shared value

460

with shared_value.get_lock():

461

shared_value.value += 10

462

463

# Modify shared array

464

with shared_array.get_lock():

465

for i in range(len(shared_array)):

466

shared_array[i] = shared_array[i] * 2

467

468

if __name__ == '__main__':

469

# Create shared value (integer)

470

shared_int = Value('i', 5) # 'i' = signed int

471

472

# Create shared array (floats)

473

shared_floats = Array('f', [1.0, 2.0, 3.0, 4.0]) # 'f' = float

474

475

print(f"Initial value: {shared_int.value}")

476

print(f"Initial array: {list(shared_floats[:])}")

477

478

# Create processes

479

processes = []

480

for i in range(2):

481

p = Process(target=modify_shared_data, args=(shared_int, shared_floats))

482

p.start()

483

processes.append(p)

484

485

for p in processes:

486

p.join()

487

488

print(f"Final value: {shared_int.value}")

489

print(f"Final array: {list(shared_floats[:])}")

490

```

491

492

### Raw Shared Objects (No Locking)

493

494

```python

495

from multiprocess import Process, RawValue, RawArray, Lock

496

import time

497

498

def worker_with_manual_locking(raw_value, raw_array, lock, worker_id):

499

for _ in range(5):

500

# Manual locking for raw shared objects

501

with lock:

502

# Modify raw value

503

old_val = raw_value.value

504

time.sleep(0.01) # Simulate some work

505

raw_value.value = old_val + 1

506

507

# Modify raw array

508

for i in range(len(raw_array)):

509

raw_array[i] += 1

510

511

print(f"Worker {worker_id} iteration completed")

512

time.sleep(0.1)

513

514

if __name__ == '__main__':

515

# Create raw shared objects (no automatic locking)

516

raw_value = RawValue('i', 0)

517

raw_array = RawArray('i', [0, 0, 0])

518

519

# Create manual lock

520

lock = Lock()

521

522

print(f"Initial value: {raw_value.value}")

523

print(f"Initial array: {list(raw_array[:])}")

524

525

# Create worker processes

526

processes = []

527

for i in range(3):

528

p = Process(target=worker_with_manual_locking,

529

args=(raw_value, raw_array, lock, i))

530

p.start()

531

processes.append(p)

532

533

for p in processes:

534

p.join()

535

536

print(f"Final value: {raw_value.value}")

537

print(f"Final array: {list(raw_array[:])}")

538

```

539

540

### Complex Shared Data Structures

541

542

```python

543

from multiprocess import Process, Manager

544

import time

545

546

def data_processor(shared_data, processor_id):

547

# Add processed items

548

for i in range(3):

549

item = {

550

'processor_id': processor_id,

551

'item_number': i,

552

'processed_at': time.time(),

553

'data': f"Processed by {processor_id}"

554

}

555

shared_data['items'].append(item)

556

557

# Update statistics

558

with shared_data['stats_lock']:

559

shared_data['stats']['total_processed'] += 3

560

shared_data['stats']['processors'][processor_id] = True

561

562

def monitor_progress(shared_data):

563

"""Monitor processing progress"""

564

start_time = time.time()

565

while True:

566

time.sleep(0.5)

567

with shared_data['stats_lock']:

568

total = shared_data['stats']['total_processed']

569

active_processors = sum(shared_data['stats']['processors'].values())

570

571

elapsed = time.time() - start_time

572

print(f"Time: {elapsed:.1f}s, Processed: {total}, Active: {active_processors}")

573

574

if total >= 9: # 3 processors * 3 items each

575

break

576

577

if __name__ == '__main__':

578

with Manager() as manager:

579

# Create complex shared data structure

580

shared_data = manager.dict({

581

'items': manager.list(),

582

'stats': manager.dict({

583

'total_processed': 0,

584

'processors': manager.dict({0: False, 1: False, 2: False})

585

}),

586

'stats_lock': manager.Lock()

587

})

588

589

# Create processor processes

590

processors = []

591

for i in range(3):

592

p = Process(target=data_processor, args=(shared_data, i))

593

p.start()

594

processors.append(p)

595

596

# Create monitor process

597

monitor = Process(target=monitor_progress, args=(shared_data,))

598

monitor.start()

599

600

# Wait for processors to complete

601

for p in processors:

602

p.join()

603

604

# Wait for monitor

605

monitor.join()

606

607

print(f"\nFinal results:")

608

print(f"Total items processed: {len(shared_data['items'])}")

609

print(f"Statistics: {dict(shared_data['stats'])}")

610

```

611

612

### Performance Comparison

613

614

```python

615

from multiprocess import Process, Manager, Value, RawValue, Lock

616

import time

617

618

def test_synchronized_value(shared_val, iterations):

619

"""Test with automatic synchronization"""

620

start_time = time.time()

621

for _ in range(iterations):

622

with shared_val.get_lock():

623

shared_val.value += 1

624

duration = time.time() - start_time

625

return duration

626

627

def test_raw_value(raw_val, lock, iterations):

628

"""Test with manual synchronization"""

629

start_time = time.time()

630

for _ in range(iterations):

631

with lock:

632

raw_val.value += 1

633

duration = time.time() - start_time

634

return duration

635

636

def benchmark_worker(test_type, shared_obj, extra_arg, iterations):

637

if test_type == 'synchronized':

638

duration = test_synchronized_value(shared_obj, iterations)

639

else:

640

duration = test_raw_value(shared_obj, extra_arg, iterations)

641

print(f"{test_type} test completed in {duration:.3f} seconds")

642

643

if __name__ == '__main__':

644

iterations = 10000

645

646

# Test 1: Synchronized Value

647

print("Testing synchronized Value...")

648

sync_val = Value('i', 0)

649

p1 = Process(target=benchmark_worker,

650

args=('synchronized', sync_val, None, iterations))

651

p1.start()

652

p1.join()

653

print(f"Synchronized final value: {sync_val.value}")

654

655

# Test 2: Raw Value with manual lock

656

print("\nTesting raw Value with manual lock...")

657

raw_val = RawValue('i', 0)

658

manual_lock = Lock()

659

p2 = Process(target=benchmark_worker,

660

args=('raw', raw_val, manual_lock, iterations))

661

p2.start()

662

p2.join()

663

print(f"Raw final value: {raw_val.value}")

664

```