or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdcontext-management.mdindex.mdmanagers.mdprocess-management.mdprocess-pools.mdqueues.mdshared-memory.mdsynchronization.md

shared-memory.mddocs/

0

# Shared Memory

1

2

Synchronized and unsynchronized shared memory objects for efficient data sharing between processes using ctypes-based values and arrays.

3

4

## Capabilities

5

6

### Shared Values

7

8

Create shared values that can be accessed by multiple processes with optional synchronization.

9

10

```python { .api }

11

def Value(typecode_or_type, *args, lock=True, ctx=None):

12

"""

13

Create a synchronized shared ctypes value.

14

15

Parameters:

16

- typecode_or_type: ctypes type or single character typecode

17

- *args: initialization arguments for the value

18

- lock: if True (default), operations are synchronized; if False, unsynchronized

19

- ctx: multiprocessing context

20

21

Returns:

22

SynchronizedBase wrapper around ctypes value

23

"""

24

25

def RawValue(typecode_or_type, *args):

26

"""

27

Create an unsynchronized shared ctypes value.

28

29

Parameters:

30

- typecode_or_type: ctypes type or single character typecode

31

- *args: initialization arguments for the value

32

33

Returns:

34

Raw ctypes value (no synchronization)

35

"""

36

```

37

38

Common typecodes:

39

- `'i'` - signed int

40

- `'f'` - float

41

- `'d'` - double

42

- `'c'` - char

43

- `'b'` - signed char

44

- `'B'` - unsigned char

45

- `'h'` - short

46

- `'l'` - long

47

48

Usage example:

49

50

```python

51

from billiard import Process, Value, RawValue

52

import time

53

import ctypes

54

55

def worker_with_shared_value(shared_counter, worker_id, iterations):

56

"""Worker that increments shared counter"""

57

for i in range(iterations):

58

with shared_counter.get_lock():

59

old_value = shared_counter.value

60

time.sleep(0.001) # Simulate some work

61

shared_counter.value = old_value + 1

62

print(f"Worker {worker_id}: counter = {shared_counter.value}")

63

64

def raw_value_worker(raw_val, worker_id):

65

"""Worker using unsynchronized raw value (unsafe!)"""

66

for i in range(5):

67

raw_val.value += 1

68

print(f"Worker {worker_id}: raw value = {raw_val.value}")

69

time.sleep(0.1)

70

71

if __name__ == '__main__':

72

# Synchronized shared value

73

counter = Value('i', 0) # Integer initialized to 0

74

print(f"Initial counter value: {counter.value}")

75

76

# Start workers that safely increment counter

77

processes = []

78

for i in range(3):

79

p = Process(target=worker_with_shared_value, args=(counter, i, 5))

80

processes.append(p)

81

p.start()

82

83

for p in processes:

84

p.join()

85

86

print(f"Final counter value: {counter.value}")

87

88

# Demonstrate ctypes usage

89

float_value = Value(ctypes.c_double, 3.14159)

90

print(f"Float value: {float_value.value}")

91

92

# Raw value (no synchronization)

93

raw_counter = RawValue('i', 0)

94

95

# Start workers with raw value (potential race conditions)

96

raw_processes = []

97

for i in range(2):

98

p = Process(target=raw_value_worker, args=(raw_counter, i))

99

raw_processes.append(p)

100

p.start()

101

102

for p in raw_processes:

103

p.join()

104

105

print(f"Final raw counter: {raw_counter.value}")

106

```

107

108

### Shared Arrays

109

110

Create shared arrays that can be accessed by multiple processes with optional synchronization.

111

112

```python { .api }

113

def Array(typecode_or_type, size_or_initializer, lock=True, ctx=None):

114

"""

115

Create a synchronized shared ctypes array.

116

117

Parameters:

118

- typecode_or_type: ctypes type or single character typecode

119

- size_or_initializer: array size (int) or sequence to initialize from

120

- lock: if True (default), operations are synchronized; if False, unsynchronized

121

- ctx: multiprocessing context

122

123

Returns:

124

SynchronizedArray wrapper around ctypes array

125

"""

126

127

def RawArray(typecode_or_type, size_or_initializer):

128

"""

129

Create an unsynchronized shared ctypes array.

130

131

Parameters:

132

- typecode_or_type: ctypes type or single character typecode

133

- size_or_initializer: array size (int) or sequence to initialize from

134

135

Returns:

136

Raw ctypes array (no synchronization)

137

"""

138

```

139

140

Usage example:

141

142

```python

143

from billiard import Process, Array, RawArray

144

import time

145

146

def array_worker(shared_array, worker_id, start_idx, count):

147

"""Worker that modifies part of shared array"""

148

with shared_array.get_lock():

149

for i in range(count):

150

idx = start_idx + i

151

if idx < len(shared_array):

152

shared_array[idx] = worker_id * 100 + i

153

print(f"Worker {worker_id}: set array[{idx}] = {shared_array[idx]}")

154

time.sleep(0.1)

155

156

def array_reader(shared_array):

157

"""Process that reads from shared array"""

158

time.sleep(1) # Let writers work first

159

160

with shared_array.get_lock():

161

print("Array contents:", list(shared_array[:]))

162

print("Array sum:", sum(shared_array))

163

164

def matrix_worker(matrix, row, cols):

165

"""Worker that processes a row of 2D array"""

166

for col in range(cols):

167

idx = row * cols + col

168

matrix[idx] = row * cols + col + 1 # Fill with sequential values

169

time.sleep(0.05)

170

171

if __name__ == '__main__':

172

# Create synchronized shared array

173

shared_arr = Array('i', 10) # Integer array of size 10

174

print(f"Initial array: {list(shared_arr[:])}")

175

176

# Start workers to modify different parts of array

177

processes = []

178

for i in range(3):

179

start = i * 3

180

p = Process(target=array_worker, args=(shared_arr, i, start, 3))

181

processes.append(p)

182

p.start()

183

184

# Start reader process

185

reader_proc = Process(target=array_reader, args=(shared_arr,))

186

reader_proc.start()

187

processes.append(reader_proc)

188

189

for p in processes:

190

p.join()

191

192

# Array from initializer

193

init_data = [1, 2, 3, 4, 5]

194

initialized_array = Array('i', init_data)

195

print(f"Initialized array: {list(initialized_array[:])}")

196

197

# 2D array simulation (flattened)

198

rows, cols = 3, 4

199

matrix = Array('i', rows * cols)

200

201

# Process each row in parallel

202

matrix_procs = []

203

for row in range(rows):

204

p = Process(target=matrix_worker, args=(matrix, row, cols))

205

matrix_procs.append(p)

206

p.start()

207

208

for p in matrix_procs:

209

p.join()

210

211

# Print matrix

212

print("Matrix:")

213

for row in range(rows):

214

row_data = []

215

for col in range(cols):

216

row_data.append(matrix[row * cols + col])

217

print(row_data)

218

```

219

220

### Shared Memory Utilities

221

222

Additional functions for working with shared memory objects.

223

224

```python { .api }

225

def copy(obj):

226

"""

227

Create a copy of a shared object.

228

229

Parameters:

230

- obj: shared object to copy

231

232

Returns:

233

Copy of the shared object

234

"""

235

236

def synchronized(obj, lock=None, ctx=None):

237

"""

238

Add synchronization wrapper to an object.

239

240

Parameters:

241

- obj: object to wrap

242

- lock: lock to use (creates new Lock if None)

243

- ctx: multiprocessing context

244

245

Returns:

246

Synchronized wrapper around object

247

"""

248

```

249

250

Usage example:

251

252

```python

253

from billiard import Process, RawArray, Lock

254

from billiard.sharedctypes import synchronized, copy

255

import time

256

257

def synchronized_access_example():

258

"""Demonstrate adding synchronization to raw shared object"""

259

# Create raw array (no built-in synchronization)

260

raw_arr = RawArray('i', [0] * 10)

261

262

# Add synchronization wrapper

263

sync_arr = synchronized(raw_arr)

264

265

def sync_worker(arr, worker_id):

266

with arr.get_lock():

267

for i in range(len(arr)):

268

arr[i] += worker_id

269

time.sleep(0.01)

270

print(f"Worker {worker_id} completed")

271

272

# Use synchronized array

273

processes = []

274

for i in range(1, 4):

275

p = Process(target=sync_worker, args=(sync_arr, i))

276

processes.append(p)

277

p.start()

278

279

for p in processes:

280

p.join()

281

282

print(f"Final array: {list(sync_arr[:])}")

283

284

# Copy shared object

285

arr_copy = copy(sync_arr)

286

print(f"Copied array: {list(arr_copy[:])}")

287

288

if __name__ == '__main__':

289

synchronized_access_example()

290

```

291

292

### Advanced Shared Memory Patterns

293

294

#### Circular Buffer

295

296

```python

297

from billiard import Process, Array, Value

298

import time

299

300

class CircularBuffer:

301

def __init__(self, size):

302

self.buffer = Array('i', size)

303

self.size = size

304

self.head = Value('i', 0)

305

self.tail = Value('i', 0)

306

self.count = Value('i', 0)

307

308

def put(self, item):

309

with self.buffer.get_lock():

310

if self.count.value < self.size:

311

self.buffer[self.tail.value] = item

312

self.tail.value = (self.tail.value + 1) % self.size

313

self.count.value += 1

314

return True

315

return False # Buffer full

316

317

def get(self):

318

with self.buffer.get_lock():

319

if self.count.value > 0:

320

item = self.buffer[self.head.value]

321

self.head.value = (self.head.value + 1) % self.size

322

self.count.value -= 1

323

return item

324

return None # Buffer empty

325

326

def producer(buffer, items):

327

for item in items:

328

while not buffer.put(item):

329

time.sleep(0.01) # Wait if buffer full

330

print(f"Produced: {item}")

331

time.sleep(0.1)

332

333

def consumer(buffer, consumer_id):

334

while True:

335

item = buffer.get()

336

if item is not None:

337

print(f"Consumer {consumer_id} got: {item}")

338

time.sleep(0.15)

339

else:

340

time.sleep(0.05)

341

# Check for termination condition

342

break

343

344

# Usage

345

if __name__ == '__main__':

346

circ_buffer = CircularBuffer(5)

347

348

prod = Process(target=producer, args=(circ_buffer, range(10)))

349

cons1 = Process(target=consumer, args=(circ_buffer, 1))

350

cons2 = Process(target=consumer, args=(circ_buffer, 2))

351

352

prod.start()

353

cons1.start()

354

cons2.start()

355

356

prod.join()

357

time.sleep(2) # Let consumers finish

358

cons1.terminate()

359

cons2.terminate()

360

```

361

362

#### Shared Statistics

363

364

```python

365

from billiard import Process, Array, Value

366

import time

367

import random

368

369

class SharedStats:

370

def __init__(self):

371

self.count = Value('i', 0)

372

self.sum = Value('d', 0.0)

373

self.min_val = Value('d', float('inf'))

374

self.max_val = Value('d', float('-inf'))

375

376

def update(self, value):

377

with self.count.get_lock():

378

self.count.value += 1

379

self.sum.value += value

380

if value < self.min_val.value:

381

self.min_val.value = value

382

if value > self.max_val.value:

383

self.max_val.value = value

384

385

def get_stats(self):

386

with self.count.get_lock():

387

if self.count.value > 0:

388

return {

389

'count': self.count.value,

390

'sum': self.sum.value,

391

'avg': self.sum.value / self.count.value,

392

'min': self.min_val.value,

393

'max': self.max_val.value

394

}

395

return {'count': 0}

396

397

def data_generator(stats, num_values):

398

for _ in range(num_values):

399

value = random.uniform(-100, 100)

400

stats.update(value)

401

time.sleep(0.01)

402

403

def stats_reporter(stats):

404

for _ in range(10):

405

time.sleep(0.5)

406

current_stats = stats.get_stats()

407

print(f"Stats: {current_stats}")

408

409

# Usage

410

if __name__ == '__main__':

411

stats = SharedStats()

412

413

# Start data generators

414

generators = []

415

for i in range(3):

416

p = Process(target=data_generator, args=(stats, 20))

417

generators.append(p)

418

p.start()

419

420

# Start reporter

421

reporter = Process(target=stats_reporter, args=(stats,))

422

reporter.start()

423

424

for p in generators:

425

p.join()

426

427

reporter.join()

428

429

final_stats = stats.get_stats()

430

print(f"Final stats: {final_stats}")

431

```

432

433

## Memory Layout and Performance

434

435

- **Shared values and arrays** reside in shared memory accessible by all processes

436

- **Synchronization overhead** occurs only when `lock=True` (default)

437

- **Raw values/arrays** have no synchronization overhead but require manual coordination

438

- **ctypes integration** provides direct memory access with C-compatible data types

439

- **Initialization** can be done with size (zeros) or from existing sequences