or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asynchronous-decorators.mdconcurrent-decorators.mdfuture-types-exceptions.mdindex.mdprocess-pools.mdsynchronization-utilities.mdthread-pools.md

synchronization-utilities.mddocs/

0

# Synchronization Utilities

1

2

Utility functions and decorators for thread synchronization, signal handling, and waiting operations. These tools help coordinate concurrent execution, handle edge cases, and provide fine-grained control over synchronization in multi-threaded and multi-process environments.

3

4

## Capabilities

5

6

### Synchronized Decorator

7

8

A decorator that prevents concurrent execution of decorated functions using locks, preventing race conditions and ensuring thread-safe access to shared resources.

9

10

```python { .api }

11

def synchronized(lock=None):

12

"""

13

Decorator that synchronizes function execution using a lock.

14

15

Parameters:

16

- lock: Optional Lock, RLock, or Semaphore object for synchronization.

17

If None, uses a shared Lock for all @synchronized functions.

18

19

Returns:

20

Decorated function that executes atomically

21

"""

22

```

23

24

#### Basic Synchronization

25

26

```python

27

from pebble import synchronized

28

import threading

29

import time

30

31

# Shared resource

32

counter = 0

33

shared_data = []

34

35

# Using default shared lock

36

@synchronized

37

def increment_counter():

38

global counter

39

current = counter

40

time.sleep(0.001) # Simulate some work

41

counter = current + 1

42

43

@synchronized

44

def append_data(value):

45

shared_data.append(value)

46

time.sleep(0.001)

47

48

# Using custom lock

49

custom_lock = threading.RLock()

50

51

@synchronized(custom_lock)

52

def complex_operation(value):

53

shared_data.append(f"start-{value}")

54

time.sleep(0.01)

55

shared_data.append(f"end-{value}")

56

57

# Test synchronization

58

def test_synchronization():

59

threads = []

60

61

# Test counter increment

62

for i in range(100):

63

thread = threading.Thread(target=increment_counter)

64

threads.append(thread)

65

66

# Test data appending

67

for i in range(50):

68

thread = threading.Thread(target=append_data, args=(i,))

69

threads.append(thread)

70

71

# Test complex operation

72

for i in range(10):

73

thread = threading.Thread(target=complex_operation, args=(i,))

74

threads.append(thread)

75

76

# Start all threads

77

for thread in threads:

78

thread.start()

79

80

# Wait for completion

81

for thread in threads:

82

thread.join()

83

84

print(f"Final counter value: {counter}")

85

print(f"Shared data length: {len(shared_data)}")

86

print(f"Complex operations: {[item for item in shared_data if 'start' in item]}")

87

88

test_synchronization()

89

```

90

91

#### Advanced Synchronization Patterns

92

93

```python

94

from pebble import synchronized

95

import threading

96

import time

97

import queue

98

99

# Different types of locks for different use cases

100

read_write_lock = threading.RLock() # Allows recursive locking

101

resource_semaphore = threading.Semaphore(3) # Limit concurrent access

102

condition_lock = threading.Condition()

103

104

class ThreadSafeCounter:

105

def __init__(self):

106

self._value = 0

107

self._lock = threading.Lock()

108

109

@synchronized # Uses shared lock

110

def get_global_count(self):

111

# This method shares a lock with all other @synchronized methods

112

return self._value

113

114

@synchronized(lambda self: self._lock) # Uses instance lock

115

def increment(self):

116

self._value += 1

117

118

@synchronized(lambda self: self._lock)

119

def decrement(self):

120

self._value -= 1

121

122

@synchronized(lambda self: self._lock)

123

def get_value(self):

124

return self._value

125

126

# Resource pool with semaphore

127

@synchronized(resource_semaphore)

128

def use_limited_resource(resource_id, duration):

129

print(f"Using resource {resource_id}")

130

time.sleep(duration)

131

print(f"Released resource {resource_id}")

132

return f"Result from resource {resource_id}"

133

134

# Producer-consumer with condition

135

shared_queue = queue.Queue()

136

137

@synchronized(condition_lock)

138

def producer(items):

139

for item in items:

140

shared_queue.put(item)

141

print(f"Produced: {item}")

142

condition_lock.notify_all() # Wake up consumers

143

time.sleep(0.1)

144

145

@synchronized(condition_lock)

146

def consumer(consumer_id, timeout=5):

147

while True:

148

try:

149

item = shared_queue.get(timeout=timeout)

150

print(f"Consumer {consumer_id} consumed: {item}")

151

shared_queue.task_done()

152

except queue.Empty:

153

print(f"Consumer {consumer_id} timed out")

154

break

155

156

# Test advanced patterns

157

def test_advanced_patterns():

158

# Test thread-safe counter

159

counter = ThreadSafeCounter()

160

161

def worker():

162

for _ in range(100):

163

counter.increment()

164

if counter.get_value() > 50:

165

counter.decrement()

166

167

threads = [threading.Thread(target=worker) for _ in range(5)]

168

for t in threads:

169

t.start()

170

for t in threads:

171

t.join()

172

173

print(f"Final counter value: {counter.get_value()}")

174

175

# Test resource pool

176

resource_threads = []

177

for i in range(10):

178

thread = threading.Thread(

179

target=use_limited_resource,

180

args=(i, 1.0)

181

)

182

resource_threads.append(thread)

183

184

for t in resource_threads:

185

t.start()

186

for t in resource_threads:

187

t.join()

188

189

# Test producer-consumer

190

producer_thread = threading.Thread(

191

target=producer,

192

args=(list(range(20)),)

193

)

194

consumer_threads = [

195

threading.Thread(target=consumer, args=(i,))

196

for i in range(3)

197

]

198

199

producer_thread.start()

200

for t in consumer_threads:

201

t.start()

202

203

producer_thread.join()

204

for t in consumer_threads:

205

t.join()

206

207

test_advanced_patterns()

208

```

209

210

### Signal Handler Decorator

211

212

A decorator for setting up signal handlers to manage process lifecycle and handle system signals gracefully.

213

214

```python { .api }

215

def sighandler(signals):

216

"""

217

Decorator that sets the decorated function as a signal handler.

218

219

Parameters:

220

- signals: Single signal or list/tuple of signals to handle

221

222

Returns:

223

Decorated function that will be called when specified signals are received

224

"""

225

```

226

227

#### Signal Handling Examples

228

229

```python

230

from pebble import sighandler

231

import signal

232

import time

233

import sys

234

import os

235

236

# Global state for signal handling

237

shutdown_requested = False

238

received_signals = []

239

240

# Handle single signal

241

@sighandler(signal.SIGINT)

242

def handle_interrupt(signum, frame):

243

global shutdown_requested

244

print(f"\nReceived SIGINT (Ctrl+C). Initiating graceful shutdown...")

245

shutdown_requested = True

246

247

# Handle multiple signals

248

@sighandler([signal.SIGTERM, signal.SIGUSR1])

249

def handle_multiple_signals(signum, frame):

250

global received_signals

251

signal_names = {

252

signal.SIGTERM: "SIGTERM",

253

signal.SIGUSR1: "SIGUSR1"

254

}

255

256

signal_name = signal_names.get(signum, f"Signal {signum}")

257

print(f"Received {signal_name}")

258

received_signals.append((signum, time.time()))

259

260

if signum == signal.SIGTERM:

261

print("Termination requested")

262

sys.exit(0)

263

elif signum == signal.SIGUSR1:

264

print("User signal 1 - logging status")

265

print(f"Received signals so far: {len(received_signals)}")

266

267

# Complex signal handler with state management

268

class SignalAwareWorker:

269

def __init__(self):

270

self.running = True

271

self.tasks_completed = 0

272

self.setup_signal_handlers()

273

274

@sighandler(signal.SIGINT)

275

def handle_shutdown(self, signum, frame):

276

print(f"\nShutdown signal received. Completed {self.tasks_completed} tasks.")

277

self.running = False

278

279

@sighandler(signal.SIGUSR2)

280

def handle_status_request(self, signum, frame):

281

print(f"Status: Running={self.running}, Tasks completed={self.tasks_completed}")

282

283

def setup_signal_handlers(self):

284

# Signal handlers are already set up by decorators

285

pass

286

287

def work(self):

288

print("Worker started. Send SIGINT to stop, SIGUSR2 for status.")

289

290

while self.running:

291

# Simulate work

292

time.sleep(1)

293

self.tasks_completed += 1

294

295

if self.tasks_completed % 5 == 0:

296

print(f"Completed {self.tasks_completed} tasks...")

297

298

print("Worker shutting down gracefully.")

299

300

# Signal handling in multiprocessing context

301

def worker_process():

302

"""Worker process with signal handling"""

303

304

@sighandler(signal.SIGTERM)

305

def worker_shutdown(signum, frame):

306

print(f"Worker process {os.getpid()} received SIGTERM")

307

sys.exit(0)

308

309

@sighandler(signal.SIGUSR1)

310

def worker_status(signum, frame):

311

print(f"Worker process {os.getpid()} is alive")

312

313

print(f"Worker process {os.getpid()} started")

314

315

# Simulate work

316

for i in range(100):

317

time.sleep(0.5)

318

if i % 10 == 0:

319

print(f"Worker {os.getpid()} progress: {i}/100")

320

321

# Example usage

322

def signal_handling_demo():

323

print("Signal handling demo. Process PID:", os.getpid())

324

print("Try: kill -SIGUSR1", os.getpid())

325

print("Or: kill -SIGTERM", os.getpid())

326

327

worker = SignalAwareWorker()

328

329

try:

330

worker.work()

331

except KeyboardInterrupt:

332

print("Caught KeyboardInterrupt in main")

333

334

# Uncomment to run demo (be careful in production environments)

335

# signal_handling_demo()

336

```

337

338

### Thread Waiting Functions

339

340

Functions for waiting on multiple threads to complete, providing fine-grained control over thread synchronization.

341

342

```python { .api }

343

def waitforthreads(threads, timeout=None):

344

"""

345

Wait for one or more threads to complete.

346

347

Parameters:

348

- threads: List of threading.Thread objects to wait for

349

- timeout: Maximum time to wait in seconds (None for no timeout)

350

351

Returns:

352

Filter object containing threads that have completed

353

"""

354

```

355

356

#### Thread Waiting Examples

357

358

```python

359

from pebble import waitforthreads

360

import threading

361

import time

362

import random

363

364

def worker_task(task_id, duration):

365

print(f"Task {task_id} starting (duration: {duration}s)")

366

time.sleep(duration)

367

print(f"Task {task_id} completed")

368

return f"Result {task_id}"

369

370

def test_thread_waiting():

371

# Create multiple threads with different durations

372

threads = []

373

for i in range(5):

374

duration = random.uniform(1, 5)

375

thread = threading.Thread(

376

target=worker_task,

377

args=(i, duration)

378

)

379

threads.append(thread)

380

381

# Start all threads

382

for thread in threads:

383

thread.start()

384

385

print("All threads started. Waiting for completion...")

386

387

# Wait for threads with timeout

388

completed = waitforthreads(threads, timeout=3.0)

389

completed_list = list(completed)

390

391

print(f"After 3 seconds, {len(completed_list)} threads completed")

392

393

# Wait for remaining threads

394

remaining = [t for t in threads if t.is_alive()]

395

if remaining:

396

print(f"Waiting for {len(remaining)} remaining threads...")

397

final_completed = waitforthreads(remaining, timeout=10.0)

398

final_completed_list = list(final_completed)

399

print(f"Finally, {len(final_completed_list)} more threads completed")

400

401

# Clean up any remaining threads

402

for thread in threads:

403

if thread.is_alive():

404

print(f"Thread {thread.name} is still running")

405

thread.join(timeout=1.0)

406

407

# Advanced thread waiting patterns

408

def advanced_thread_waiting():

409

# Producer threads

410

producer_threads = []

411

shared_queue = []

412

413

def producer(producer_id, item_count):

414

for i in range(item_count):

415

item = f"item-{producer_id}-{i}"

416

shared_queue.append(item)

417

time.sleep(0.1)

418

print(f"Producer {producer_id} finished")

419

420

# Consumer threads

421

consumer_threads = []

422

423

def consumer(consumer_id):

424

while True:

425

if shared_queue:

426

item = shared_queue.pop(0)

427

print(f"Consumer {consumer_id} processed {item}")

428

time.sleep(0.05)

429

else:

430

time.sleep(0.01)

431

432

# Start producers

433

for i in range(3):

434

thread = threading.Thread(target=producer, args=(i, 10))

435

producer_threads.append(thread)

436

thread.start()

437

438

# Start consumers

439

for i in range(2):

440

thread = threading.Thread(target=consumer, args=(i))

441

thread.daemon = True # Will exit when main program exits

442

consumer_threads.append(thread)

443

thread.start()

444

445

# Wait for producers to finish

446

print("Waiting for producers to finish...")

447

completed_producers = waitforthreads(producer_threads, timeout=20.0)

448

completed_count = len(list(completed_producers))

449

450

print(f"{completed_count} producers completed")

451

452

# Give consumers time to process remaining items

453

time.sleep(2)

454

455

print(f"Final queue size: {len(shared_queue)}")

456

457

test_thread_waiting()

458

advanced_thread_waiting()

459

```

460

461

### Queue Waiting Functions

462

463

Functions for waiting on multiple queues to have data available, enabling efficient queue-based coordination.

464

465

```python { .api }

466

def waitforqueues(queues, timeout=None):

467

"""

468

Wait for one or more queues to have data available.

469

470

Parameters:

471

- queues: List of queue.Queue objects to monitor

472

- timeout: Maximum time to wait in seconds (None for no timeout)

473

474

Returns:

475

Filter object containing queues that have data available

476

"""

477

```

478

479

#### Queue Waiting Examples

480

481

```python

482

from pebble import waitforqueues

483

import queue

484

import threading

485

import time

486

import random

487

488

def test_queue_waiting():

489

# Create multiple queues

490

queues = [queue.Queue() for _ in range(4)]

491

queue_names = [f"Queue-{i}" for i in range(4)]

492

493

def producer(q, name, delay):

494

time.sleep(delay)

495

q.put(f"Data from {name}")

496

print(f"{name} produced data after {delay}s")

497

498

# Start producers with different delays

499

producers = []

500

for i, (q, name) in enumerate(zip(queues, queue_names)):

501

delay = random.uniform(1, 5)

502

producer_thread = threading.Thread(

503

target=producer,

504

args=(q, name, delay)

505

)

506

producers.append(producer_thread)

507

producer_thread.start()

508

509

print("Waiting for queues to have data...")

510

511

# Wait for queues to have data

512

ready_queues = waitforqueues(queues, timeout=3.0)

513

ready_list = list(ready_queues)

514

515

print(f"After 3 seconds, {len(ready_list)} queues have data")

516

517

# Process ready queues

518

for q in ready_list:

519

try:

520

data = q.get_nowait()

521

print(f"Got data: {data}")

522

except queue.Empty:

523

print("Queue became empty")

524

525

# Wait for remaining queues

526

remaining = [q for q in queues if q.empty()]

527

if remaining:

528

print(f"Waiting for {len(remaining)} more queues...")

529

final_ready = waitforqueues(remaining, timeout=10.0)

530

final_ready_list = list(final_ready)

531

532

for q in final_ready_list:

533

try:

534

data = q.get_nowait()

535

print(f"Got final data: {data}")

536

except queue.Empty:

537

print("Final queue became empty")

538

539

# Clean up

540

for thread in producers:

541

thread.join()

542

543

# Advanced queue coordination

544

def queue_coordination_example():

545

# Different types of queues

546

priority_queue = queue.PriorityQueue()

547

lifo_queue = queue.LifoQueue() # Stack

548

fifo_queue = queue.Queue() # Regular queue

549

550

queues = [priority_queue, lifo_queue, fifo_queue]

551

queue_types = ["Priority", "LIFO", "FIFO"]

552

553

def priority_producer():

554

for priority in [3, 1, 2]: # Lower numbers = higher priority

555

time.sleep(random.uniform(0.5, 1.5))

556

priority_queue.put((priority, f"Priority-{priority} item"))

557

print(f"Added priority {priority} item")

558

559

def lifo_producer():

560

for i in range(3):

561

time.sleep(random.uniform(0.5, 1.5))

562

lifo_queue.put(f"LIFO-item-{i}")

563

print(f"Added LIFO item {i}")

564

565

def fifo_producer():

566

for i in range(3):

567

time.sleep(random.uniform(0.5, 1.5))

568

fifo_queue.put(f"FIFO-item-{i}")

569

print(f"Added FIFO item {i}")

570

571

# Start producers

572

producers = [

573

threading.Thread(target=priority_producer),

574

threading.Thread(target=lifo_producer),

575

threading.Thread(target=fifo_producer)

576

]

577

578

for producer in producers:

579

producer.start()

580

581

# Monitor queues as they get data

582

processed_items = 0

583

target_items = 9 # 3 items per queue

584

585

while processed_items < target_items:

586

print("Checking for queue updates...")

587

588

ready_queues = waitforqueues(queues, timeout=2.0)

589

ready_list = list(ready_queues)

590

591

if ready_list:

592

print(f"Found {len(ready_list)} queues with data")

593

594

for i, q in enumerate(queues):

595

if q in ready_list:

596

queue_type = queue_types[i]

597

try:

598

if isinstance(q, queue.PriorityQueue):

599

priority, item = q.get_nowait()

600

print(f" {queue_type}: {item} (priority {priority})")

601

else:

602

item = q.get_nowait()

603

print(f" {queue_type}: {item}")

604

processed_items += 1

605

except queue.Empty:

606

pass

607

else:

608

print("No queues ready, continuing to wait...")

609

610

# Wait for producers to finish

611

for producer in producers:

612

producer.join()

613

614

print(f"Processed all {processed_items} items")

615

616

test_queue_waiting()

617

queue_coordination_example()

618

```

619

620

### Utility Combinations

621

622

Combining synchronization utilities for complex coordination patterns:

623

624

```python

625

from pebble import synchronized, waitforthreads, waitforqueues

626

import threading

627

import queue

628

import time

629

630

class CoordinatedWorkerPool:

631

def __init__(self, worker_count=3):

632

self.worker_count = worker_count

633

self.task_queue = queue.Queue()

634

self.result_queues = [queue.Queue() for _ in range(worker_count)]

635

self.workers = []

636

self.active = False

637

self._lock = threading.Lock()

638

639

@synchronized

640

def start(self):

641

if self.active:

642

return

643

644

self.active = True

645

646

for i in range(self.worker_count):

647

worker = threading.Thread(

648

target=self._worker_loop,

649

args=(i, self.result_queues[i])

650

)

651

worker.daemon = True

652

self.workers.append(worker)

653

worker.start()

654

655

@synchronized

656

def stop(self):

657

self.active = False

658

659

# Add stop signals to task queue

660

for _ in range(self.worker_count):

661

self.task_queue.put(None)

662

663

def submit_task(self, task_func, *args, **kwargs):

664

if not self.active:

665

raise RuntimeError("Pool not started")

666

667

task = (task_func, args, kwargs)

668

self.task_queue.put(task)

669

670

def _worker_loop(self, worker_id, result_queue):

671

print(f"Worker {worker_id} started")

672

673

while self.active:

674

try:

675

task = self.task_queue.get(timeout=1.0)

676

677

if task is None: # Stop signal

678

break

679

680

task_func, args, kwargs = task

681

682

try:

683

result = task_func(*args, **kwargs)

684

result_queue.put(('success', result))

685

except Exception as e:

686

result_queue.put(('error', str(e)))

687

688

self.task_queue.task_done()

689

690

except queue.Empty:

691

continue

692

693

print(f"Worker {worker_id} stopped")

694

695

def get_results(self, timeout=None):

696

# Wait for result queues to have data

697

ready_queues = waitforqueues(self.result_queues, timeout=timeout)

698

ready_list = list(ready_queues)

699

700

results = []

701

for q in ready_list:

702

try:

703

while True:

704

result_type, result_data = q.get_nowait()

705

results.append((result_type, result_data))

706

except queue.Empty:

707

pass

708

709

return results

710

711

def wait_for_completion(self, timeout=None):

712

# Wait for all workers to finish

713

completed_workers = waitforthreads(self.workers, timeout=timeout)

714

return list(completed_workers)

715

716

# Test coordinated worker pool

717

def test_coordinated_pool():

718

def sample_task(task_id, duration):

719

time.sleep(duration)

720

return f"Task {task_id} completed in {duration}s"

721

722

pool = CoordinatedWorkerPool(worker_count=3)

723

pool.start()

724

725

try:

726

# Submit tasks

727

for i in range(10):

728

pool.submit_task(sample_task, i, random.uniform(0.1, 1.0))

729

730

# Monitor results as they come in

731

total_results = 0

732

while total_results < 10:

733

results = pool.get_results(timeout=2.0)

734

735

for result_type, result_data in results:

736

if result_type == 'success':

737

print(f"Success: {result_data}")

738

else:

739

print(f"Error: {result_data}")

740

total_results += 1

741

742

if not results:

743

print("No results yet, waiting...")

744

745

print("All tasks completed")

746

747

finally:

748

pool.stop()

749

completed = pool.wait_for_completion(timeout=5.0)

750

print(f"Pool stopped, {len(completed)} workers completed")

751

752

test_coordinated_pool()

753

```