0
# Concurrency
1
2
ZIO provides fiber-based lightweight concurrency with atomic data structures for building high-performance concurrent applications without the complexity of traditional thread-based concurrency.
3
4
## Capabilities
5
6
### Fiber - Lightweight Threads
7
8
Fibers are ZIO's abstraction for lightweight, composable concurrency that can be forked, joined, and interrupted safely.
9
10
```scala { .api }
11
/**
12
* A lightweight thread of execution that can be composed and managed safely
13
*/
14
sealed abstract class Fiber[+E, +A] {
15
/** Wait for the fiber to complete and return its Exit value */
16
def await(implicit trace: Trace): UIO[Exit[E, A]]
17
18
/** Join the fiber, returning success value or failing with error */
19
def join(implicit trace: Trace): IO[E, A]
20
21
/** Interrupt the fiber and wait for it to complete */
22
def interrupt(implicit trace: Trace): UIO[Exit[E, A]]
23
24
/** Interrupt the fiber with a specific fiber ID */
25
def interruptAs(fiberId: FiberId)(implicit trace: Trace): UIO[Exit[E, A]]
26
27
/** Interrupt the fiber in the background without waiting */
28
def interruptFork(implicit trace: Trace): UIO[Unit]
29
30
/** Check if the fiber has completed without blocking */
31
def poll(implicit trace: Trace): UIO[Option[Exit[E, A]]]
32
}
33
34
/**
35
* Runtime fiber with additional capabilities for introspection
36
*/
37
abstract class Fiber.Runtime[+E, +A] extends Fiber[E, A] {
38
/** Get the fiber's unique identifier */
39
def id: FiberId
40
41
/** Get the current status of the fiber */
42
def status(implicit trace: Trace): UIO[Fiber.Status]
43
44
/** Get the fiber's execution trace */
45
def trace(implicit trace: Trace): UIO[StackTrace]
46
47
/** Get all child fibers spawned by this fiber */
48
def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]]
49
}
50
```
51
52
**Usage Examples:**
53
54
```scala
55
import zio._
56
57
// Fork a computation into a fiber
58
val fiberProgram = for {
59
fiber <- heavyComputation.fork
60
_ <- quickTask
61
result <- fiber.join // Wait for completion
62
} yield result
63
64
// Race two fibers
65
val raceProgram = for {
66
fiber1 <- task1.fork
67
fiber2 <- task2.fork
68
winner <- fiber1.race(fiber2)
69
_ <- fiber1.interrupt
70
_ <- fiber2.interrupt
71
} yield winner
72
73
// Interrupt a long-running fiber
74
val interruptProgram = for {
75
fiber <- longRunningTask.fork
76
_ <- ZIO.sleep(5.seconds)
77
_ <- fiber.interrupt // Cancel after 5 seconds
78
} yield ()
79
```
80
81
### Fiber Composition
82
83
Combine and compose fibers using various operators for complex concurrent workflows.
84
85
```scala { .api }
86
/**
87
* Transform the success value of a fiber
88
*/
89
def map[B](f: A => B): Fiber.Synthetic[E, B]
90
91
/**
92
* Transform the success value using a ZIO effect
93
*/
94
def mapZIO[E1 >: E, B](f: A => IO[E1, B]): Fiber.Synthetic[E1, B]
95
96
/**
97
* Replace success value with a constant
98
*/
99
def as[B](b: => B): Fiber.Synthetic[E, B]
100
101
/**
102
* Discard the success value
103
*/
104
def unit: Fiber.Synthetic[E, Unit]
105
106
/**
107
* Combine two fibers, returning both results
108
*/
109
def zip[E1 >: E, B](that: => Fiber[E1, B]): Fiber.Synthetic[E1, (A, B)]
110
111
/**
112
* Combine two fibers with a custom function
113
*/
114
def zipWith[E1 >: E, B, C](that: => Fiber[E1, B])(f: (A, B) => C): Fiber.Synthetic[E1, C]
115
116
/**
117
* Use this fiber if it succeeds, otherwise use the fallback
118
*/
119
def orElse[E1, A1 >: A](that: => Fiber[E1, A1]): Fiber.Synthetic[E1, A1]
120
121
/**
122
* Race this fiber against another, returning the first to complete
123
*/
124
def race[E1 >: E, A1 >: A](that: => Fiber[E1, A1]): Fiber.Synthetic[E1, A1]
125
```
126
127
**Usage Examples:**
128
129
```scala
130
// Combine fiber results
131
val combined = for {
132
fiber1 <- fetchUser(id).fork
133
fiber2 <- fetchPreferences(id).fork
134
result <- fiber1.zip(fiber2)
135
(user, prefs) = result
136
} yield UserWithPrefs(user, prefs)
137
138
// Transform fiber result
139
val processedFiber = dataFiber.map(_.processData)
140
141
// Race with timeout
142
val timedFiber = dataFiber.race(
143
ZIO.sleep(30.seconds) *> ZIO.fail("Timeout")
144
)
145
```
146
147
### Fiber Factory Methods
148
149
Create fibers from values, effects, and external sources.
150
151
```scala { .api }
152
/**
153
* Create a fiber that has already completed with the given Exit
154
*/
155
def done[E, A](exit: => Exit[E, A]): Fiber.Synthetic[E, A]
156
157
/**
158
* Create a fiber that succeeds with the given value
159
*/
160
def succeed[A](a: A): Fiber.Synthetic[Nothing, A]
161
162
/**
163
* Create a fiber that fails with the given error
164
*/
165
def fail[E](e: E): Fiber.Synthetic[E, Nothing]
166
167
/**
168
* Create a fiber that fails with the given cause
169
*/
170
def failCause[E](cause: Cause[E]): Fiber.Synthetic[E, Nothing]
171
172
/**
173
* Create a fiber from a Scala Future
174
*/
175
def fromFuture[A](thunk: => Future[A]): Fiber.Synthetic[Throwable, A]
176
177
/**
178
* Create a fiber from a ZIO effect (effect runs immediately)
179
*/
180
def fromZIO[E, A](io: IO[E, A]): UIO[Fiber.Synthetic[E, A]]
181
182
/**
183
* Collect results from multiple fibers
184
*/
185
def collectAll[E, A](fibers: Iterable[Fiber[E, A]]): Fiber.Synthetic[E, Chunk[A]]
186
187
/**
188
* Wait for all fibers to complete, ignoring results
189
*/
190
def awaitAll(fs: Iterable[Fiber[Any, Any]]): UIO[Unit]
191
192
/**
193
* Join all fibers, failing if any fail
194
*/
195
def joinAll[E](fs: Iterable[Fiber[E, Any]]): IO[E, Unit]
196
197
/**
198
* Interrupt all fibers
199
*/
200
def interruptAll(fs: Iterable[Fiber[Any, Any]]): UIO[Unit]
201
```
202
203
**Usage Examples:**
204
205
```scala
206
// Create pre-completed fibers
207
val successFiber = Fiber.succeed(42)
208
val failureFiber = Fiber.fail("Error occurred")
209
210
// Work with multiple fibers
211
val batchProcessing = for {
212
fibers <- ZIO.foreach(dataChunks)(chunk => processChunk(chunk).fork)
213
results <- Fiber.collectAll(fibers).join
214
} yield results
215
216
// Cleanup multiple fibers
217
val cleanup = for {
218
_ <- Fiber.interruptAll(runningFibers)
219
_ <- Console.printLine("All background tasks cancelled")
220
} yield ()
221
```
222
223
### Ref - Atomic Reference
224
225
Thread-safe mutable reference that can be updated atomically across concurrent fibers.
226
227
```scala { .api }
228
/**
229
* A thread-safe mutable reference that can be updated atomically
230
*/
231
sealed abstract class Ref[A] {
232
/** Read the current value */
233
def get: UIO[A]
234
235
/** Set a new value */
236
def set(a: A): UIO[Unit]
237
238
/** Set a new value asynchronously */
239
def setAsync(a: A): UIO[Unit]
240
241
/** Atomically modify the value and return a result */
242
def modify[B](f: A => (B, A)): UIO[B]
243
244
/** Atomically update the value */
245
def update(f: A => A): UIO[Unit]
246
247
/** Update and return the new value */
248
def updateAndGet(f: A => A): UIO[A]
249
250
/** Return the old value and update */
251
def getAndUpdate(f: A => A): UIO[A]
252
253
/** Set new value and return the old value */
254
def getAndSet(a: A): UIO[A]
255
}
256
257
/**
258
* Synchronized Ref that allows ZIO effects in update functions
259
*/
260
sealed abstract class Ref.Synchronized[A] extends Ref[A] {
261
/** Modify using a ZIO effect */
262
def modifyZIO[R, E, B](f: A => ZIO[R, E, (B, A)]): ZIO[R, E, B]
263
264
/** Update using a ZIO effect */
265
def updateZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, Unit]
266
267
/** Update with ZIO and return new value */
268
def updateAndGetZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, A]
269
270
/** Get old value and update with ZIO */
271
def getAndUpdateZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, A]
272
}
273
```
274
275
**Usage Examples:**
276
277
```scala
278
// Counter with atomic operations
279
val counterProgram = for {
280
counter <- Ref.make(0)
281
_ <- ZIO.foreachParDiscard(1 to 1000) { _ =>
282
counter.update(_ + 1)
283
}
284
final <- counter.get
285
_ <- Console.printLine(s"Final count: $final")
286
} yield ()
287
288
// Accumulator pattern
289
val accumulatorProgram = for {
290
acc <- Ref.make(List.empty[String])
291
_ <- ZIO.foreachParDiscard(tasks) { task =>
292
for {
293
result <- processTask(task)
294
_ <- acc.update(result :: _)
295
} yield ()
296
}
297
results <- acc.get
298
} yield results.reverse
299
300
// Complex state update with Synchronized Ref
301
val complexUpdate = for {
302
state <- Ref.Synchronized.make(AppState.empty)
303
_ <- state.updateZIO { currentState =>
304
for {
305
validated <- validateState(currentState)
306
updated <- applyChanges(validated)
307
_ <- logStateChange(currentState, updated)
308
} yield updated
309
}
310
} yield ()
311
```
312
313
### Queue - Concurrent Queue
314
315
Thread-safe queue for communication between concurrent fibers with backpressure support.
316
317
```scala { .api }
318
/**
319
* A thread-safe queue for concurrent communication between fibers
320
*/
321
sealed abstract class Queue[A] extends Dequeue[A] with Enqueue[A] {
322
/** Offer an element, returning false if queue is full */
323
def offer(a: A): UIO[Boolean]
324
325
/** Offer multiple elements, returning elements that couldn't be enqueued */
326
def offerAll[A1 <: A](as: Iterable[A1]): UIO[Chunk[A1]]
327
328
/** Take an element, blocking if queue is empty */
329
def take: UIO[A]
330
331
/** Take all available elements */
332
def takeAll: UIO[Chunk[A]]
333
334
/** Take up to max elements */
335
def takeUpTo(max: Int): UIO[Chunk[A]]
336
337
/** Try to take an element without blocking */
338
def poll: UIO[Option[A]]
339
340
/** Get the queue capacity */
341
def capacity: Int
342
343
/** Get current size */
344
def size: UIO[Int]
345
346
/** Check if empty */
347
def isEmpty: UIO[Boolean]
348
349
/** Check if full */
350
def isFull: UIO[Boolean]
351
352
/** Shutdown the queue */
353
def shutdown: UIO[Unit]
354
355
/** Wait for queue shutdown */
356
def awaitShutdown: UIO[Unit]
357
358
/** Check if queue is shutdown */
359
def isShutdown: UIO[Boolean]
360
}
361
```
362
363
**Usage Examples:**
364
365
```scala
366
// Producer-consumer pattern
367
val producerConsumer = for {
368
queue <- Queue.bounded[String](100)
369
producer <- ZIO.foreach(1 to 1000) { i =>
370
queue.offer(s"item-$i")
371
}.fork
372
consumer <- ZIO.repeatN(999) {
373
queue.take.flatMap(item => processItem(item))
374
}.fork
375
_ <- producer.join
376
_ <- consumer.join
377
} yield ()
378
379
// Work distribution
380
val workDistribution = for {
381
workQueue <- Queue.bounded[Task](50)
382
workers <- ZIO.foreach(1 to 10) { workerId =>
383
ZIO.forever {
384
workQueue.take.flatMap(task => task.perform())
385
}.fork
386
}
387
_ <- ZIO.foreach(tasks)(workQueue.offer)
388
_ <- ZIO.foreachDiscard(workers)(_.interrupt)
389
} yield ()
390
391
// Buffered processing
392
val bufferedProcessing = for {
393
buffer <- Queue.sliding[Data](1000) // Drops old items when full
394
_ <- dataStream.foreach(buffer.offer(_)).fork
395
_ <- ZIO.forever {
396
buffer.takeAll.flatMap { batch =>
397
ZIO.when(batch.nonEmpty)(processBatch(batch))
398
}.delay(1.second)
399
}
400
} yield ()
401
```
402
403
### Queue Variants and Factory Methods
404
405
Different queue implementations for various use cases and performance characteristics.
406
407
```scala { .api }
408
/**
409
* Create a bounded queue that blocks when full
410
*/
411
def bounded[A](requestedCapacity: => Int): UIO[Queue[A]]
412
413
/**
414
* Create an unbounded queue (limited only by memory)
415
*/
416
def unbounded[A]: UIO[Queue[A]]
417
418
/**
419
* Create a dropping queue that drops new items when full
420
*/
421
def dropping[A](requestedCapacity: => Int): UIO[Queue[A]]
422
423
/**
424
* Create a sliding queue that drops old items when full
425
*/
426
def sliding[A](requestedCapacity: => Int): UIO[Queue[A]]
427
428
/**
429
* Create a back-pressured queue with async boundaries
430
*/
431
def backpressured[A](requestedCapacity: => Int): UIO[Queue[A]]
432
433
/**
434
* Create a single-element queue (like a synchronous channel)
435
*/
436
def single[A]: UIO[Queue[A]]
437
```
438
439
**Usage Examples:**
440
441
```scala
442
// Different queue types for different needs
443
val queueTypes = for {
444
// High-throughput with memory bounds
445
bounded <- Queue.bounded[Event](10000)
446
447
// No memory limits, but may cause OOM
448
unbounded <- Queue.unbounded[LogEntry]
449
450
// Latest data only, drops old items
451
sliding <- Queue.sliding[SensorReading](100)
452
453
// Drops new items when full
454
dropping <- Queue.dropping[NonCriticalUpdate](50)
455
456
// Synchronous handoff between fibers
457
sync <- Queue.single[CriticalMessage]
458
} yield (bounded, unbounded, sliding, dropping, sync)
459
460
// Fan-out pattern with multiple queues
461
val fanOut = for {
462
input <- Queue.unbounded[Data]
463
outputs <- ZIO.foreach(1 to 5)(_ => Queue.bounded[Data](100))
464
465
// Distribute input to all outputs
466
distributor <- ZIO.forever {
467
input.take.flatMap { data =>
468
ZIO.foreachDiscard(outputs)(_.offer(data))
469
}
470
}.fork
471
472
// Process from each output queue
473
processors <- ZIO.foreach(outputs.zipWithIndex) { case (queue, id) =>
474
ZIO.forever {
475
queue.take.flatMap(data => processWithId(data, id))
476
}.fork
477
}
478
} yield (distributor, processors)
479
```
480
481
### Hub - Broadcast Communication
482
483
Concurrent hub for broadcasting messages to multiple subscribers with various consumption patterns.
484
485
```scala { .api }
486
/**
487
* A concurrent hub for broadcasting messages to multiple subscribers
488
*/
489
sealed abstract class Hub[A] {
490
/** Publish a message to all subscribers */
491
def publish(a: A): UIO[Boolean]
492
493
/** Publish multiple messages */
494
def publishAll(as: Iterable[A]): UIO[Boolean]
495
496
/** Subscribe to the hub, getting a queue for messages */
497
def subscribe: UIO[Dequeue[A]]
498
499
/** Get the hub capacity */
500
def capacity: Int
501
502
/** Get current size */
503
def size: UIO[Int]
504
505
/** Check if hub is shutdown */
506
def isShutdown: UIO[Boolean]
507
508
/** Shutdown the hub */
509
def shutdown: UIO[Unit]
510
}
511
512
object Hub {
513
/** Create a bounded hub */
514
def bounded[A](requestedCapacity: Int): UIO[Hub[A]]
515
516
/** Create an unbounded hub */
517
def unbounded[A]: UIO[Hub[A]]
518
519
/** Create a dropping hub */
520
def dropping[A](requestedCapacity: Int): UIO[Hub[A]]
521
522
/** Create a sliding hub */
523
def sliding[A](requestedCapacity: Int): UIO[Hub[A]]
524
}
525
```
526
527
**Usage Examples:**
528
529
```scala
530
// Event broadcasting system
531
val eventSystem = for {
532
eventHub <- Hub.bounded[Event](1000)
533
534
// Multiple subscribers
535
uiQueue <- eventHub.subscribe
536
loggingQueue <- eventHub.subscribe
537
analyticsQueue <- eventHub.subscribe
538
539
// Publishers
540
_ <- eventStream.foreach(eventHub.publish).fork
541
542
// Subscribers
543
_ <- uiQueue.take.foreach(updateUI).forever.fork
544
_ <- loggingQueue.take.foreach(logEvent).forever.fork
545
_ <- analyticsQueue.take.foreach(trackEvent).forever.fork
546
547
} yield ()
548
549
// Real-time data distribution
550
val dataDistribution = for {
551
dataHub <- Hub.sliding[SensorData](500) // Keep latest data
552
553
// Data producer
554
_ <- sensorStream.foreach(dataHub.publish).fork
555
556
// Multiple consumers with different processing
557
dashboardQueue <- dataHub.subscribe
558
alertQueue <- dataHub.subscribe
559
storageQueue <- dataHub.subscribe
560
561
_ <- dashboardQueue.take.foreach(updateDashboard).forever.fork
562
_ <- alertQueue.take.foreach(checkAlerts).forever.fork
563
_ <- storageQueue.takeAll.foreach(batchStore).repeat(Schedule.fixed(10.seconds)).fork
564
565
} yield ()
566
```
567
568
### Promise - Single Assignment Variables
569
570
Promises represent single-assignment variables that can be completed exactly once, useful for coordination between fibers.
571
572
```scala { .api }
573
/**
574
* A Promise is a concurrent primitive that represents a value that may not yet be available
575
*/
576
sealed trait Promise[+E, +A] {
577
/** Complete the promise with a success value */
578
def succeed(a: A): UIO[Boolean]
579
580
/** Complete the promise with a failure */
581
def fail(e: E): UIO[Boolean]
582
583
/** Complete the promise with a ZIO effect */
584
def complete[E1 >: E, A1 >: A](zio: IO[E1, A1]): UIO[Boolean]
585
586
/** Complete the promise with an Exit value */
587
def done[E1 >: E, A1 >: A](exit: Exit[E1, A1]): UIO[Boolean]
588
589
/** Wait for the promise to be completed */
590
def await: IO[E, A]
591
592
/** Check if the promise is completed without blocking */
593
def poll: UIO[Option[IO[E, A]]]
594
595
/** Interrupt any fibers waiting on this promise */
596
def interrupt: UIO[Boolean]
597
598
/** Check if the promise has been completed */
599
def isDone: UIO[Boolean]
600
}
601
602
object Promise {
603
/** Create a new promise */
604
def make[E, A]: UIO[Promise[E, A]]
605
606
/** Create a promise and complete it with an effect */
607
def fromZIO[R, E, A](zio: ZIO[R, E, A]): ZIO[R, Nothing, Promise[E, A]]
608
}
609
```
610
611
**Usage Examples:**
612
613
```scala
614
// Coordination between fibers
615
val coordination = for {
616
promise <- Promise.make[String, Int]
617
618
// Producer fiber
619
producer <- ZIO.sleep(2.seconds) *> promise.succeed(42).fork
620
621
// Consumer fiber waits for result
622
consumer <- promise.await.fork
623
624
result <- consumer.join
625
_ <- producer.join
626
} yield result
627
628
// Error propagation
629
val errorHandling = for {
630
promise <- Promise.make[String, Int]
631
worker <- (ZIO.sleep(1.second) *> ZIO.fail("computation failed"))
632
.tapError(promise.fail)
633
.fork
634
result <- promise.await.either // Will receive Left("computation failed")
635
_ <- worker.interrupt
636
} yield result
637
```
638
639
### Semaphore - Resource Limiting
640
641
Semaphores provide controlled access to a limited number of resources with automatic blocking and releasing.
642
643
```scala { .api }
644
/**
645
* A Semaphore is a concurrency primitive that maintains a set of permits
646
*/
647
sealed trait Semaphore {
648
/** Acquire a permit, blocking if none available */
649
def acquire: UIO[Unit]
650
651
/** Acquire n permits */
652
def acquireN(n: Long): UIO[Unit]
653
654
/** Release a permit */
655
def release: UIO[Unit]
656
657
/** Release n permits */
658
def releaseN(n: Long): UIO[Unit]
659
660
/** Try to acquire a permit without blocking */
661
def tryAcquire: UIO[Boolean]
662
663
/** Try to acquire n permits without blocking */
664
def tryAcquireN(n: Long): UIO[Boolean]
665
666
/** Get available permits */
667
def available: UIO[Long]
668
669
/** Use a permit for the duration of an effect */
670
def withPermit[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A]
671
672
/** Use n permits for an effect */
673
def withPermits[R, E, A](n: Long)(zio: ZIO[R, E, A]): ZIO[R, E, A]
674
}
675
676
object Semaphore {
677
/** Create a semaphore with n permits */
678
def make(permits: Long): UIO[Semaphore]
679
}
680
```
681
682
**Usage Examples:**
683
684
```scala
685
// Connection pool management
686
val connectionPool = for {
687
semaphore <- Semaphore.make(10) // Max 10 concurrent connections
688
689
// Use connection with automatic permit management
690
result <- semaphore.withPermit {
691
for {
692
conn <- openConnection()
693
result <- conn.query("SELECT * FROM users")
694
_ <- conn.close()
695
} yield result
696
}
697
} yield result
698
699
// Rate limiting
700
val rateLimiter = for {
701
permits <- Semaphore.make(100) // 100 requests per batch
702
703
// Process requests with rate limiting
704
_ <- ZIO.foreach(requests) { request =>
705
permits.withPermit(processRequest(request))
706
}
707
708
// Replenish permits periodically
709
_ <- permits.releaseN(100).repeat(Schedule.fixed(1.second))
710
} yield ()
711
```
712
713
### MVar - Mutable Variable
714
715
MVars are mutable variables that can be empty or contain exactly one value, useful for communication patterns.
716
717
```scala { .api }
718
/**
719
* An MVar is a mutable variable that is either empty or contains exactly one value
720
*/
721
sealed trait MVar[A] {
722
/** Put a value, blocking if already full */
723
def put(a: A): UIO[Unit]
724
725
/** Take the value, blocking if empty */
726
def take: UIO[A]
727
728
/** Try to put without blocking */
729
def tryPut(a: A): UIO[Boolean]
730
731
/** Try to take without blocking */
732
def tryTake: UIO[Option[A]]
733
734
/** Read the value without removing it, blocking if empty */
735
def read: UIO[A]
736
737
/** Try to read without blocking */
738
def tryRead: UIO[Option[A]]
739
740
/** Check if the MVar is empty */
741
def isEmpty: UIO[Boolean]
742
743
/** Swap the current value with a new one */
744
def swap(a: A): UIO[A]
745
746
/** Modify the value using a function */
747
def modify[B](f: A => (B, A)): UIO[B]
748
}
749
750
object MVar {
751
/** Create an empty MVar */
752
def empty[A]: UIO[MVar[A]]
753
754
/** Create an MVar with initial value */
755
def make[A](a: A): UIO[MVar[A]]
756
}
757
```
758
759
**Usage Examples:**
760
761
```scala
762
// Producer-consumer with backpressure
763
val producerConsumer = for {
764
mvar <- MVar.empty[String]
765
766
producer <- ZIO.foreach(1 to 100) { i =>
767
mvar.put(s"item-$i") // Blocks if consumer is slow
768
}.fork
769
770
consumer <- ZIO.forever {
771
mvar.take.flatMap(processItem)
772
}.fork
773
774
_ <- ZIO.sleep(10.seconds)
775
_ <- producer.interrupt
776
_ <- consumer.interrupt
777
} yield ()
778
779
// Shared state with synchronization
780
val sharedCounter = for {
781
counter <- MVar.make(0)
782
783
// Multiple workers incrementing counter
784
workers <- ZIO.foreach(1 to 10) { _ =>
785
ZIO.forever {
786
counter.modify(n => ((), n + 1)) *>
787
ZIO.sleep(100.millis)
788
}.fork
789
}
790
791
_ <- ZIO.sleep(5.seconds)
792
final <- counter.read
793
_ <- ZIO.foreachDiscard(workers)(_.interrupt)
794
} yield final
795
```
796
797
### Additional Concurrent Collections
798
799
Extended concurrent data structures from the concurrent module for specialized use cases.
800
801
```scala { .api }
802
/**
803
* Thread-safe concurrent map
804
*/
805
trait ConcurrentMap[K, V] {
806
def get(key: K): UIO[Option[V]]
807
def put(key: K, value: V): UIO[Option[V]]
808
def putIfAbsent(key: K, value: V): UIO[Option[V]]
809
def remove(key: K): UIO[Option[V]]
810
def replace(key: K, value: V): UIO[Option[V]]
811
def size: UIO[Int]
812
def isEmpty: UIO[Boolean]
813
}
814
815
/**
816
* Thread-safe concurrent set
817
*/
818
trait ConcurrentSet[A] {
819
def add(a: A): UIO[Boolean]
820
def remove(a: A): UIO[Boolean]
821
def contains(a: A): UIO[Boolean]
822
def size: UIO[Int]
823
def toSet: UIO[Set[A]]
824
}
825
826
/**
827
* Reentrant lock for mutual exclusion
828
*/
829
trait ReentrantLock {
830
def lock: UIO[Unit]
831
def unlock: UIO[Unit]
832
def tryLock: UIO[Boolean]
833
def withLock[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A]
834
def isLocked: UIO[Boolean]
835
}
836
837
/**
838
* Countdown latch for coordination
839
*/
840
trait CountdownLatch {
841
def countDown: UIO[Unit]
842
def await: UIO[Unit]
843
def getCount: UIO[Long]
844
}
845
846
/**
847
* Cyclic barrier for multi-phase coordination
848
*/
849
trait CyclicBarrier {
850
def await: UIO[Int]
851
def getParties: UIO[Int]
852
def getNumberWaiting: UIO[Int]
853
def isBroken: UIO[Boolean]
854
def reset: UIO[Unit]
855
}
856
```
857
858
**Usage Examples:**
859
860
```scala
861
// Concurrent map for caching
862
val cacheExample = for {
863
cache <- ConcurrentMap.empty[String, User]
864
865
user <- cache.get("user-123").flatMap {
866
case Some(user) => ZIO.succeed(user)
867
case None => for {
868
user <- fetchUserFromDb("user-123")
869
_ <- cache.put("user-123", user)
870
} yield user
871
}
872
} yield user
873
874
// Coordination with countdown latch
875
val coordinatedStart = for {
876
latch <- CountdownLatch.make(3) // Wait for 3 workers
877
878
workers <- ZIO.foreach(1 to 3) { workerId =>
879
(initializeWorker(workerId) *> latch.countDown).fork
880
}
881
882
_ <- latch.await // Wait for all workers to initialize
883
_ <- Console.printLine("All workers ready, starting main task")
884
885
result <- mainTask
886
_ <- ZIO.foreachDiscard(workers)(_.interrupt)
887
} yield result
888
```