or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdconcurrency.mdcore-effects.mddependency-injection.mderror-handling.mdindex.mdmetrics.mdresource-management.mdservices.mdstm.mdstreams.mdtesting.md

concurrency.mddocs/

0

# Concurrency

1

2

ZIO provides fiber-based lightweight concurrency with atomic data structures for building high-performance concurrent applications without the complexity of traditional thread-based concurrency.

3

4

## Capabilities

5

6

### Fiber - Lightweight Threads

7

8

Fibers are ZIO's abstraction for lightweight, composable concurrency that can be forked, joined, and interrupted safely.

9

10

```scala { .api }

11

/**

12

* A lightweight thread of execution that can be composed and managed safely

13

*/

14

sealed abstract class Fiber[+E, +A] {

15

/** Wait for the fiber to complete and return its Exit value */

16

def await(implicit trace: Trace): UIO[Exit[E, A]]

17

18

/** Join the fiber, returning success value or failing with error */

19

def join(implicit trace: Trace): IO[E, A]

20

21

/** Interrupt the fiber and wait for it to complete */

22

def interrupt(implicit trace: Trace): UIO[Exit[E, A]]

23

24

/** Interrupt the fiber with a specific fiber ID */

25

def interruptAs(fiberId: FiberId)(implicit trace: Trace): UIO[Exit[E, A]]

26

27

/** Interrupt the fiber in the background without waiting */

28

def interruptFork(implicit trace: Trace): UIO[Unit]

29

30

/** Check if the fiber has completed without blocking */

31

def poll(implicit trace: Trace): UIO[Option[Exit[E, A]]]

32

}

33

34

/**

35

* Runtime fiber with additional capabilities for introspection

36

*/

37

abstract class Fiber.Runtime[+E, +A] extends Fiber[E, A] {

38

/** Get the fiber's unique identifier */

39

def id: FiberId

40

41

/** Get the current status of the fiber */

42

def status(implicit trace: Trace): UIO[Fiber.Status]

43

44

/** Get the fiber's execution trace */

45

def trace(implicit trace: Trace): UIO[StackTrace]

46

47

/** Get all child fibers spawned by this fiber */

48

def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]]

49

}

50

```

51

52

**Usage Examples:**

53

54

```scala

55

import zio._

56

57

// Fork a computation into a fiber

58

val fiberProgram = for {

59

fiber <- heavyComputation.fork

60

_ <- quickTask

61

result <- fiber.join // Wait for completion

62

} yield result

63

64

// Race two fibers

65

val raceProgram = for {

66

fiber1 <- task1.fork

67

fiber2 <- task2.fork

68

winner <- fiber1.race(fiber2)

69

_ <- fiber1.interrupt

70

_ <- fiber2.interrupt

71

} yield winner

72

73

// Interrupt a long-running fiber

74

val interruptProgram = for {

75

fiber <- longRunningTask.fork

76

_ <- ZIO.sleep(5.seconds)

77

_ <- fiber.interrupt // Cancel after 5 seconds

78

} yield ()

79

```

80

81

### Fiber Composition

82

83

Combine and compose fibers using various operators for complex concurrent workflows.

84

85

```scala { .api }

86

/**

87

* Transform the success value of a fiber

88

*/

89

def map[B](f: A => B): Fiber.Synthetic[E, B]

90

91

/**

92

* Transform the success value using a ZIO effect

93

*/

94

def mapZIO[E1 >: E, B](f: A => IO[E1, B]): Fiber.Synthetic[E1, B]

95

96

/**

97

* Replace success value with a constant

98

*/

99

def as[B](b: => B): Fiber.Synthetic[E, B]

100

101

/**

102

* Discard the success value

103

*/

104

def unit: Fiber.Synthetic[E, Unit]

105

106

/**

107

* Combine two fibers, returning both results

108

*/

109

def zip[E1 >: E, B](that: => Fiber[E1, B]): Fiber.Synthetic[E1, (A, B)]

110

111

/**

112

* Combine two fibers with a custom function

113

*/

114

def zipWith[E1 >: E, B, C](that: => Fiber[E1, B])(f: (A, B) => C): Fiber.Synthetic[E1, C]

115

116

/**

117

* Use this fiber if it succeeds, otherwise use the fallback

118

*/

119

def orElse[E1, A1 >: A](that: => Fiber[E1, A1]): Fiber.Synthetic[E1, A1]

120

121

/**

122

* Race this fiber against another, returning the first to complete

123

*/

124

def race[E1 >: E, A1 >: A](that: => Fiber[E1, A1]): Fiber.Synthetic[E1, A1]

125

```

126

127

**Usage Examples:**

128

129

```scala

130

// Combine fiber results

131

val combined = for {

132

fiber1 <- fetchUser(id).fork

133

fiber2 <- fetchPreferences(id).fork

134

result <- fiber1.zip(fiber2)

135

(user, prefs) = result

136

} yield UserWithPrefs(user, prefs)

137

138

// Transform fiber result

139

val processedFiber = dataFiber.map(_.processData)

140

141

// Race with timeout

142

val timedFiber = dataFiber.race(

143

ZIO.sleep(30.seconds) *> ZIO.fail("Timeout")

144

)

145

```

146

147

### Fiber Factory Methods

148

149

Create fibers from values, effects, and external sources.

150

151

```scala { .api }

152

/**

153

* Create a fiber that has already completed with the given Exit

154

*/

155

def done[E, A](exit: => Exit[E, A]): Fiber.Synthetic[E, A]

156

157

/**

158

* Create a fiber that succeeds with the given value

159

*/

160

def succeed[A](a: A): Fiber.Synthetic[Nothing, A]

161

162

/**

163

* Create a fiber that fails with the given error

164

*/

165

def fail[E](e: E): Fiber.Synthetic[E, Nothing]

166

167

/**

168

* Create a fiber that fails with the given cause

169

*/

170

def failCause[E](cause: Cause[E]): Fiber.Synthetic[E, Nothing]

171

172

/**

173

* Create a fiber from a Scala Future

174

*/

175

def fromFuture[A](thunk: => Future[A]): Fiber.Synthetic[Throwable, A]

176

177

/**

178

* Create a fiber from a ZIO effect (effect runs immediately)

179

*/

180

def fromZIO[E, A](io: IO[E, A]): UIO[Fiber.Synthetic[E, A]]

181

182

/**

183

* Collect results from multiple fibers

184

*/

185

def collectAll[E, A](fibers: Iterable[Fiber[E, A]]): Fiber.Synthetic[E, Chunk[A]]

186

187

/**

188

* Wait for all fibers to complete, ignoring results

189

*/

190

def awaitAll(fs: Iterable[Fiber[Any, Any]]): UIO[Unit]

191

192

/**

193

* Join all fibers, failing if any fail

194

*/

195

def joinAll[E](fs: Iterable[Fiber[E, Any]]): IO[E, Unit]

196

197

/**

198

* Interrupt all fibers

199

*/

200

def interruptAll(fs: Iterable[Fiber[Any, Any]]): UIO[Unit]

201

```

202

203

**Usage Examples:**

204

205

```scala

206

// Create pre-completed fibers

207

val successFiber = Fiber.succeed(42)

208

val failureFiber = Fiber.fail("Error occurred")

209

210

// Work with multiple fibers

211

val batchProcessing = for {

212

fibers <- ZIO.foreach(dataChunks)(chunk => processChunk(chunk).fork)

213

results <- Fiber.collectAll(fibers).join

214

} yield results

215

216

// Cleanup multiple fibers

217

val cleanup = for {

218

_ <- Fiber.interruptAll(runningFibers)

219

_ <- Console.printLine("All background tasks cancelled")

220

} yield ()

221

```

222

223

### Ref - Atomic Reference

224

225

Thread-safe mutable reference that can be updated atomically across concurrent fibers.

226

227

```scala { .api }

228

/**

229

* A thread-safe mutable reference that can be updated atomically

230

*/

231

sealed abstract class Ref[A] {

232

/** Read the current value */

233

def get: UIO[A]

234

235

/** Set a new value */

236

def set(a: A): UIO[Unit]

237

238

/** Set a new value asynchronously */

239

def setAsync(a: A): UIO[Unit]

240

241

/** Atomically modify the value and return a result */

242

def modify[B](f: A => (B, A)): UIO[B]

243

244

/** Atomically update the value */

245

def update(f: A => A): UIO[Unit]

246

247

/** Update and return the new value */

248

def updateAndGet(f: A => A): UIO[A]

249

250

/** Return the old value and update */

251

def getAndUpdate(f: A => A): UIO[A]

252

253

/** Set new value and return the old value */

254

def getAndSet(a: A): UIO[A]

255

}

256

257

/**

258

* Synchronized Ref that allows ZIO effects in update functions

259

*/

260

sealed abstract class Ref.Synchronized[A] extends Ref[A] {

261

/** Modify using a ZIO effect */

262

def modifyZIO[R, E, B](f: A => ZIO[R, E, (B, A)]): ZIO[R, E, B]

263

264

/** Update using a ZIO effect */

265

def updateZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, Unit]

266

267

/** Update with ZIO and return new value */

268

def updateAndGetZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, A]

269

270

/** Get old value and update with ZIO */

271

def getAndUpdateZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, A]

272

}

273

```

274

275

**Usage Examples:**

276

277

```scala

278

// Counter with atomic operations

279

val counterProgram = for {

280

counter <- Ref.make(0)

281

_ <- ZIO.foreachParDiscard(1 to 1000) { _ =>

282

counter.update(_ + 1)

283

}

284

final <- counter.get

285

_ <- Console.printLine(s"Final count: $final")

286

} yield ()

287

288

// Accumulator pattern

289

val accumulatorProgram = for {

290

acc <- Ref.make(List.empty[String])

291

_ <- ZIO.foreachParDiscard(tasks) { task =>

292

for {

293

result <- processTask(task)

294

_ <- acc.update(result :: _)

295

} yield ()

296

}

297

results <- acc.get

298

} yield results.reverse

299

300

// Complex state update with Synchronized Ref

301

val complexUpdate = for {

302

state <- Ref.Synchronized.make(AppState.empty)

303

_ <- state.updateZIO { currentState =>

304

for {

305

validated <- validateState(currentState)

306

updated <- applyChanges(validated)

307

_ <- logStateChange(currentState, updated)

308

} yield updated

309

}

310

} yield ()

311

```

312

313

### Queue - Concurrent Queue

314

315

Thread-safe queue for communication between concurrent fibers with backpressure support.

316

317

```scala { .api }

318

/**

319

* A thread-safe queue for concurrent communication between fibers

320

*/

321

sealed abstract class Queue[A] extends Dequeue[A] with Enqueue[A] {

322

/** Offer an element, returning false if queue is full */

323

def offer(a: A): UIO[Boolean]

324

325

/** Offer multiple elements, returning elements that couldn't be enqueued */

326

def offerAll[A1 <: A](as: Iterable[A1]): UIO[Chunk[A1]]

327

328

/** Take an element, blocking if queue is empty */

329

def take: UIO[A]

330

331

/** Take all available elements */

332

def takeAll: UIO[Chunk[A]]

333

334

/** Take up to max elements */

335

def takeUpTo(max: Int): UIO[Chunk[A]]

336

337

/** Try to take an element without blocking */

338

def poll: UIO[Option[A]]

339

340

/** Get the queue capacity */

341

def capacity: Int

342

343

/** Get current size */

344

def size: UIO[Int]

345

346

/** Check if empty */

347

def isEmpty: UIO[Boolean]

348

349

/** Check if full */

350

def isFull: UIO[Boolean]

351

352

/** Shutdown the queue */

353

def shutdown: UIO[Unit]

354

355

/** Wait for queue shutdown */

356

def awaitShutdown: UIO[Unit]

357

358

/** Check if queue is shutdown */

359

def isShutdown: UIO[Boolean]

360

}

361

```

362

363

**Usage Examples:**

364

365

```scala

366

// Producer-consumer pattern

367

val producerConsumer = for {

368

queue <- Queue.bounded[String](100)

369

producer <- ZIO.foreach(1 to 1000) { i =>

370

queue.offer(s"item-$i")

371

}.fork

372

consumer <- ZIO.repeatN(999) {

373

queue.take.flatMap(item => processItem(item))

374

}.fork

375

_ <- producer.join

376

_ <- consumer.join

377

} yield ()

378

379

// Work distribution

380

val workDistribution = for {

381

workQueue <- Queue.bounded[Task](50)

382

workers <- ZIO.foreach(1 to 10) { workerId =>

383

ZIO.forever {

384

workQueue.take.flatMap(task => task.perform())

385

}.fork

386

}

387

_ <- ZIO.foreach(tasks)(workQueue.offer)

388

_ <- ZIO.foreachDiscard(workers)(_.interrupt)

389

} yield ()

390

391

// Buffered processing

392

val bufferedProcessing = for {

393

buffer <- Queue.sliding[Data](1000) // Drops old items when full

394

_ <- dataStream.foreach(buffer.offer(_)).fork

395

_ <- ZIO.forever {

396

buffer.takeAll.flatMap { batch =>

397

ZIO.when(batch.nonEmpty)(processBatch(batch))

398

}.delay(1.second)

399

}

400

} yield ()

401

```

402

403

### Queue Variants and Factory Methods

404

405

Different queue implementations for various use cases and performance characteristics.

406

407

```scala { .api }

408

/**

409

* Create a bounded queue that blocks when full

410

*/

411

def bounded[A](requestedCapacity: => Int): UIO[Queue[A]]

412

413

/**

414

* Create an unbounded queue (limited only by memory)

415

*/

416

def unbounded[A]: UIO[Queue[A]]

417

418

/**

419

* Create a dropping queue that drops new items when full

420

*/

421

def dropping[A](requestedCapacity: => Int): UIO[Queue[A]]

422

423

/**

424

* Create a sliding queue that drops old items when full

425

*/

426

def sliding[A](requestedCapacity: => Int): UIO[Queue[A]]

427

428

/**

429

* Create a back-pressured queue with async boundaries

430

*/

431

def backpressured[A](requestedCapacity: => Int): UIO[Queue[A]]

432

433

/**

434

* Create a single-element queue (like a synchronous channel)

435

*/

436

def single[A]: UIO[Queue[A]]

437

```

438

439

**Usage Examples:**

440

441

```scala

442

// Different queue types for different needs

443

val queueTypes = for {

444

// High-throughput with memory bounds

445

bounded <- Queue.bounded[Event](10000)

446

447

// No memory limits, but may cause OOM

448

unbounded <- Queue.unbounded[LogEntry]

449

450

// Latest data only, drops old items

451

sliding <- Queue.sliding[SensorReading](100)

452

453

// Drops new items when full

454

dropping <- Queue.dropping[NonCriticalUpdate](50)

455

456

// Synchronous handoff between fibers

457

sync <- Queue.single[CriticalMessage]

458

} yield (bounded, unbounded, sliding, dropping, sync)

459

460

// Fan-out pattern with multiple queues

461

val fanOut = for {

462

input <- Queue.unbounded[Data]

463

outputs <- ZIO.foreach(1 to 5)(_ => Queue.bounded[Data](100))

464

465

// Distribute input to all outputs

466

distributor <- ZIO.forever {

467

input.take.flatMap { data =>

468

ZIO.foreachDiscard(outputs)(_.offer(data))

469

}

470

}.fork

471

472

// Process from each output queue

473

processors <- ZIO.foreach(outputs.zipWithIndex) { case (queue, id) =>

474

ZIO.forever {

475

queue.take.flatMap(data => processWithId(data, id))

476

}.fork

477

}

478

} yield (distributor, processors)

479

```

480

481

### Hub - Broadcast Communication

482

483

Concurrent hub for broadcasting messages to multiple subscribers with various consumption patterns.

484

485

```scala { .api }

486

/**

487

* A concurrent hub for broadcasting messages to multiple subscribers

488

*/

489

sealed abstract class Hub[A] {

490

/** Publish a message to all subscribers */

491

def publish(a: A): UIO[Boolean]

492

493

/** Publish multiple messages */

494

def publishAll(as: Iterable[A]): UIO[Boolean]

495

496

/** Subscribe to the hub, getting a queue for messages */

497

def subscribe: UIO[Dequeue[A]]

498

499

/** Get the hub capacity */

500

def capacity: Int

501

502

/** Get current size */

503

def size: UIO[Int]

504

505

/** Check if hub is shutdown */

506

def isShutdown: UIO[Boolean]

507

508

/** Shutdown the hub */

509

def shutdown: UIO[Unit]

510

}

511

512

object Hub {

513

/** Create a bounded hub */

514

def bounded[A](requestedCapacity: Int): UIO[Hub[A]]

515

516

/** Create an unbounded hub */

517

def unbounded[A]: UIO[Hub[A]]

518

519

/** Create a dropping hub */

520

def dropping[A](requestedCapacity: Int): UIO[Hub[A]]

521

522

/** Create a sliding hub */

523

def sliding[A](requestedCapacity: Int): UIO[Hub[A]]

524

}

525

```

526

527

**Usage Examples:**

528

529

```scala

530

// Event broadcasting system

531

val eventSystem = for {

532

eventHub <- Hub.bounded[Event](1000)

533

534

// Multiple subscribers

535

uiQueue <- eventHub.subscribe

536

loggingQueue <- eventHub.subscribe

537

analyticsQueue <- eventHub.subscribe

538

539

// Publishers

540

_ <- eventStream.foreach(eventHub.publish).fork

541

542

// Subscribers

543

_ <- uiQueue.take.foreach(updateUI).forever.fork

544

_ <- loggingQueue.take.foreach(logEvent).forever.fork

545

_ <- analyticsQueue.take.foreach(trackEvent).forever.fork

546

547

} yield ()

548

549

// Real-time data distribution

550

val dataDistribution = for {

551

dataHub <- Hub.sliding[SensorData](500) // Keep latest data

552

553

// Data producer

554

_ <- sensorStream.foreach(dataHub.publish).fork

555

556

// Multiple consumers with different processing

557

dashboardQueue <- dataHub.subscribe

558

alertQueue <- dataHub.subscribe

559

storageQueue <- dataHub.subscribe

560

561

_ <- dashboardQueue.take.foreach(updateDashboard).forever.fork

562

_ <- alertQueue.take.foreach(checkAlerts).forever.fork

563

_ <- storageQueue.takeAll.foreach(batchStore).repeat(Schedule.fixed(10.seconds)).fork

564

565

} yield ()

566

```

567

568

### Promise - Single Assignment Variables

569

570

Promises represent single-assignment variables that can be completed exactly once, useful for coordination between fibers.

571

572

```scala { .api }

573

/**

574

* A Promise is a concurrent primitive that represents a value that may not yet be available

575

*/

576

sealed trait Promise[+E, +A] {

577

/** Complete the promise with a success value */

578

def succeed(a: A): UIO[Boolean]

579

580

/** Complete the promise with a failure */

581

def fail(e: E): UIO[Boolean]

582

583

/** Complete the promise with a ZIO effect */

584

def complete[E1 >: E, A1 >: A](zio: IO[E1, A1]): UIO[Boolean]

585

586

/** Complete the promise with an Exit value */

587

def done[E1 >: E, A1 >: A](exit: Exit[E1, A1]): UIO[Boolean]

588

589

/** Wait for the promise to be completed */

590

def await: IO[E, A]

591

592

/** Check if the promise is completed without blocking */

593

def poll: UIO[Option[IO[E, A]]]

594

595

/** Interrupt any fibers waiting on this promise */

596

def interrupt: UIO[Boolean]

597

598

/** Check if the promise has been completed */

599

def isDone: UIO[Boolean]

600

}

601

602

object Promise {

603

/** Create a new promise */

604

def make[E, A]: UIO[Promise[E, A]]

605

606

/** Create a promise and complete it with an effect */

607

def fromZIO[R, E, A](zio: ZIO[R, E, A]): ZIO[R, Nothing, Promise[E, A]]

608

}

609

```

610

611

**Usage Examples:**

612

613

```scala

614

// Coordination between fibers

615

val coordination = for {

616

promise <- Promise.make[String, Int]

617

618

// Producer fiber

619

producer <- ZIO.sleep(2.seconds) *> promise.succeed(42).fork

620

621

// Consumer fiber waits for result

622

consumer <- promise.await.fork

623

624

result <- consumer.join

625

_ <- producer.join

626

} yield result

627

628

// Error propagation

629

val errorHandling = for {

630

promise <- Promise.make[String, Int]

631

worker <- (ZIO.sleep(1.second) *> ZIO.fail("computation failed"))

632

.tapError(promise.fail)

633

.fork

634

result <- promise.await.either // Will receive Left("computation failed")

635

_ <- worker.interrupt

636

} yield result

637

```

638

639

### Semaphore - Resource Limiting

640

641

Semaphores provide controlled access to a limited number of resources with automatic blocking and releasing.

642

643

```scala { .api }

644

/**

645

* A Semaphore is a concurrency primitive that maintains a set of permits

646

*/

647

sealed trait Semaphore {

648

/** Acquire a permit, blocking if none available */

649

def acquire: UIO[Unit]

650

651

/** Acquire n permits */

652

def acquireN(n: Long): UIO[Unit]

653

654

/** Release a permit */

655

def release: UIO[Unit]

656

657

/** Release n permits */

658

def releaseN(n: Long): UIO[Unit]

659

660

/** Try to acquire a permit without blocking */

661

def tryAcquire: UIO[Boolean]

662

663

/** Try to acquire n permits without blocking */

664

def tryAcquireN(n: Long): UIO[Boolean]

665

666

/** Get available permits */

667

def available: UIO[Long]

668

669

/** Use a permit for the duration of an effect */

670

def withPermit[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A]

671

672

/** Use n permits for an effect */

673

def withPermits[R, E, A](n: Long)(zio: ZIO[R, E, A]): ZIO[R, E, A]

674

}

675

676

object Semaphore {

677

/** Create a semaphore with n permits */

678

def make(permits: Long): UIO[Semaphore]

679

}

680

```

681

682

**Usage Examples:**

683

684

```scala

685

// Connection pool management

686

val connectionPool = for {

687

semaphore <- Semaphore.make(10) // Max 10 concurrent connections

688

689

// Use connection with automatic permit management

690

result <- semaphore.withPermit {

691

for {

692

conn <- openConnection()

693

result <- conn.query("SELECT * FROM users")

694

_ <- conn.close()

695

} yield result

696

}

697

} yield result

698

699

// Rate limiting

700

val rateLimiter = for {

701

permits <- Semaphore.make(100) // 100 requests per batch

702

703

// Process requests with rate limiting

704

_ <- ZIO.foreach(requests) { request =>

705

permits.withPermit(processRequest(request))

706

}

707

708

// Replenish permits periodically

709

_ <- permits.releaseN(100).repeat(Schedule.fixed(1.second))

710

} yield ()

711

```

712

713

### MVar - Mutable Variable

714

715

MVars are mutable variables that can be empty or contain exactly one value, useful for communication patterns.

716

717

```scala { .api }

718

/**

719

* An MVar is a mutable variable that is either empty or contains exactly one value

720

*/

721

sealed trait MVar[A] {

722

/** Put a value, blocking if already full */

723

def put(a: A): UIO[Unit]

724

725

/** Take the value, blocking if empty */

726

def take: UIO[A]

727

728

/** Try to put without blocking */

729

def tryPut(a: A): UIO[Boolean]

730

731

/** Try to take without blocking */

732

def tryTake: UIO[Option[A]]

733

734

/** Read the value without removing it, blocking if empty */

735

def read: UIO[A]

736

737

/** Try to read without blocking */

738

def tryRead: UIO[Option[A]]

739

740

/** Check if the MVar is empty */

741

def isEmpty: UIO[Boolean]

742

743

/** Swap the current value with a new one */

744

def swap(a: A): UIO[A]

745

746

/** Modify the value using a function */

747

def modify[B](f: A => (B, A)): UIO[B]

748

}

749

750

object MVar {

751

/** Create an empty MVar */

752

def empty[A]: UIO[MVar[A]]

753

754

/** Create an MVar with initial value */

755

def make[A](a: A): UIO[MVar[A]]

756

}

757

```

758

759

**Usage Examples:**

760

761

```scala

762

// Producer-consumer with backpressure

763

val producerConsumer = for {

764

mvar <- MVar.empty[String]

765

766

producer <- ZIO.foreach(1 to 100) { i =>

767

mvar.put(s"item-$i") // Blocks if consumer is slow

768

}.fork

769

770

consumer <- ZIO.forever {

771

mvar.take.flatMap(processItem)

772

}.fork

773

774

_ <- ZIO.sleep(10.seconds)

775

_ <- producer.interrupt

776

_ <- consumer.interrupt

777

} yield ()

778

779

// Shared state with synchronization

780

val sharedCounter = for {

781

counter <- MVar.make(0)

782

783

// Multiple workers incrementing counter

784

workers <- ZIO.foreach(1 to 10) { _ =>

785

ZIO.forever {

786

counter.modify(n => ((), n + 1)) *>

787

ZIO.sleep(100.millis)

788

}.fork

789

}

790

791

_ <- ZIO.sleep(5.seconds)

792

final <- counter.read

793

_ <- ZIO.foreachDiscard(workers)(_.interrupt)

794

} yield final

795

```

796

797

### Additional Concurrent Collections

798

799

Extended concurrent data structures from the concurrent module for specialized use cases.

800

801

```scala { .api }

802

/**

803

* Thread-safe concurrent map

804

*/

805

trait ConcurrentMap[K, V] {

806

def get(key: K): UIO[Option[V]]

807

def put(key: K, value: V): UIO[Option[V]]

808

def putIfAbsent(key: K, value: V): UIO[Option[V]]

809

def remove(key: K): UIO[Option[V]]

810

def replace(key: K, value: V): UIO[Option[V]]

811

def size: UIO[Int]

812

def isEmpty: UIO[Boolean]

813

}

814

815

/**

816

* Thread-safe concurrent set

817

*/

818

trait ConcurrentSet[A] {

819

def add(a: A): UIO[Boolean]

820

def remove(a: A): UIO[Boolean]

821

def contains(a: A): UIO[Boolean]

822

def size: UIO[Int]

823

def toSet: UIO[Set[A]]

824

}

825

826

/**

827

* Reentrant lock for mutual exclusion

828

*/

829

trait ReentrantLock {

830

def lock: UIO[Unit]

831

def unlock: UIO[Unit]

832

def tryLock: UIO[Boolean]

833

def withLock[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A]

834

def isLocked: UIO[Boolean]

835

}

836

837

/**

838

* Countdown latch for coordination

839

*/

840

trait CountdownLatch {

841

def countDown: UIO[Unit]

842

def await: UIO[Unit]

843

def getCount: UIO[Long]

844

}

845

846

/**

847

* Cyclic barrier for multi-phase coordination

848

*/

849

trait CyclicBarrier {

850

def await: UIO[Int]

851

def getParties: UIO[Int]

852

def getNumberWaiting: UIO[Int]

853

def isBroken: UIO[Boolean]

854

def reset: UIO[Unit]

855

}

856

```

857

858

**Usage Examples:**

859

860

```scala

861

// Concurrent map for caching

862

val cacheExample = for {

863

cache <- ConcurrentMap.empty[String, User]

864

865

user <- cache.get("user-123").flatMap {

866

case Some(user) => ZIO.succeed(user)

867

case None => for {

868

user <- fetchUserFromDb("user-123")

869

_ <- cache.put("user-123", user)

870

} yield user

871

}

872

} yield user

873

874

// Coordination with countdown latch

875

val coordinatedStart = for {

876

latch <- CountdownLatch.make(3) // Wait for 3 workers

877

878

workers <- ZIO.foreach(1 to 3) { workerId =>

879

(initializeWorker(workerId) *> latch.countDown).fork

880

}

881

882

_ <- latch.await // Wait for all workers to initialize

883

_ <- Console.printLine("All workers ready, starting main task")

884

885

result <- mainTask

886

_ <- ZIO.foreachDiscard(workers)(_.interrupt)

887

} yield result

888

```