ZIO provides fiber-based lightweight concurrency with atomic data structures for building high-performance concurrent applications without the complexity of traditional thread-based concurrency.
Fibers are ZIO's abstraction for lightweight, composable concurrency that can be forked, joined, and interrupted safely.
/**
* A lightweight thread of execution that can be composed and managed safely
*/
sealed abstract class Fiber[+E, +A] {
/** Wait for the fiber to complete and return its Exit value */
def await(implicit trace: Trace): UIO[Exit[E, A]]
/** Join the fiber, returning success value or failing with error */
def join(implicit trace: Trace): IO[E, A]
/** Interrupt the fiber and wait for it to complete */
def interrupt(implicit trace: Trace): UIO[Exit[E, A]]
/** Interrupt the fiber with a specific fiber ID */
def interruptAs(fiberId: FiberId)(implicit trace: Trace): UIO[Exit[E, A]]
/** Interrupt the fiber in the background without waiting */
def interruptFork(implicit trace: Trace): UIO[Unit]
/** Check if the fiber has completed without blocking */
def poll(implicit trace: Trace): UIO[Option[Exit[E, A]]]
}
/**
* Runtime fiber with additional capabilities for introspection
*/
abstract class Fiber.Runtime[+E, +A] extends Fiber[E, A] {
/** Get the fiber's unique identifier */
def id: FiberId
/** Get the current status of the fiber */
def status(implicit trace: Trace): UIO[Fiber.Status]
/** Get the fiber's execution trace */
def trace(implicit trace: Trace): UIO[StackTrace]
/** Get all child fibers spawned by this fiber */
def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]]
}Usage Examples:
import zio._
// Fork a computation into a fiber
val fiberProgram = for {
fiber <- heavyComputation.fork
_ <- quickTask
result <- fiber.join // Wait for completion
} yield result
// Race two fibers
val raceProgram = for {
fiber1 <- task1.fork
fiber2 <- task2.fork
winner <- fiber1.race(fiber2)
_ <- fiber1.interrupt
_ <- fiber2.interrupt
} yield winner
// Interrupt a long-running fiber
val interruptProgram = for {
fiber <- longRunningTask.fork
_ <- ZIO.sleep(5.seconds)
_ <- fiber.interrupt // Cancel after 5 seconds
} yield ()Combine and compose fibers using various operators for complex concurrent workflows.
/**
* Transform the success value of a fiber
*/
def map[B](f: A => B): Fiber.Synthetic[E, B]
/**
* Transform the success value using a ZIO effect
*/
def mapZIO[E1 >: E, B](f: A => IO[E1, B]): Fiber.Synthetic[E1, B]
/**
* Replace success value with a constant
*/
def as[B](b: => B): Fiber.Synthetic[E, B]
/**
* Discard the success value
*/
def unit: Fiber.Synthetic[E, Unit]
/**
* Combine two fibers, returning both results
*/
def zip[E1 >: E, B](that: => Fiber[E1, B]): Fiber.Synthetic[E1, (A, B)]
/**
* Combine two fibers with a custom function
*/
def zipWith[E1 >: E, B, C](that: => Fiber[E1, B])(f: (A, B) => C): Fiber.Synthetic[E1, C]
/**
* Use this fiber if it succeeds, otherwise use the fallback
*/
def orElse[E1, A1 >: A](that: => Fiber[E1, A1]): Fiber.Synthetic[E1, A1]
/**
* Race this fiber against another, returning the first to complete
*/
def race[E1 >: E, A1 >: A](that: => Fiber[E1, A1]): Fiber.Synthetic[E1, A1]Usage Examples:
// Combine fiber results
val combined = for {
fiber1 <- fetchUser(id).fork
fiber2 <- fetchPreferences(id).fork
result <- fiber1.zip(fiber2)
(user, prefs) = result
} yield UserWithPrefs(user, prefs)
// Transform fiber result
val processedFiber = dataFiber.map(_.processData)
// Race with timeout
val timedFiber = dataFiber.race(
ZIO.sleep(30.seconds) *> ZIO.fail("Timeout")
)Create fibers from values, effects, and external sources.
/**
* Create a fiber that has already completed with the given Exit
*/
def done[E, A](exit: => Exit[E, A]): Fiber.Synthetic[E, A]
/**
* Create a fiber that succeeds with the given value
*/
def succeed[A](a: A): Fiber.Synthetic[Nothing, A]
/**
* Create a fiber that fails with the given error
*/
def fail[E](e: E): Fiber.Synthetic[E, Nothing]
/**
* Create a fiber that fails with the given cause
*/
def failCause[E](cause: Cause[E]): Fiber.Synthetic[E, Nothing]
/**
* Create a fiber from a Scala Future
*/
def fromFuture[A](thunk: => Future[A]): Fiber.Synthetic[Throwable, A]
/**
* Create a fiber from a ZIO effect (effect runs immediately)
*/
def fromZIO[E, A](io: IO[E, A]): UIO[Fiber.Synthetic[E, A]]
/**
* Collect results from multiple fibers
*/
def collectAll[E, A](fibers: Iterable[Fiber[E, A]]): Fiber.Synthetic[E, Chunk[A]]
/**
* Wait for all fibers to complete, ignoring results
*/
def awaitAll(fs: Iterable[Fiber[Any, Any]]): UIO[Unit]
/**
* Join all fibers, failing if any fail
*/
def joinAll[E](fs: Iterable[Fiber[E, Any]]): IO[E, Unit]
/**
* Interrupt all fibers
*/
def interruptAll(fs: Iterable[Fiber[Any, Any]]): UIO[Unit]Usage Examples:
// Create pre-completed fibers
val successFiber = Fiber.succeed(42)
val failureFiber = Fiber.fail("Error occurred")
// Work with multiple fibers
val batchProcessing = for {
fibers <- ZIO.foreach(dataChunks)(chunk => processChunk(chunk).fork)
results <- Fiber.collectAll(fibers).join
} yield results
// Cleanup multiple fibers
val cleanup = for {
_ <- Fiber.interruptAll(runningFibers)
_ <- Console.printLine("All background tasks cancelled")
} yield ()Thread-safe mutable reference that can be updated atomically across concurrent fibers.
/**
* A thread-safe mutable reference that can be updated atomically
*/
sealed abstract class Ref[A] {
/** Read the current value */
def get: UIO[A]
/** Set a new value */
def set(a: A): UIO[Unit]
/** Set a new value asynchronously */
def setAsync(a: A): UIO[Unit]
/** Atomically modify the value and return a result */
def modify[B](f: A => (B, A)): UIO[B]
/** Atomically update the value */
def update(f: A => A): UIO[Unit]
/** Update and return the new value */
def updateAndGet(f: A => A): UIO[A]
/** Return the old value and update */
def getAndUpdate(f: A => A): UIO[A]
/** Set new value and return the old value */
def getAndSet(a: A): UIO[A]
}
/**
* Synchronized Ref that allows ZIO effects in update functions
*/
sealed abstract class Ref.Synchronized[A] extends Ref[A] {
/** Modify using a ZIO effect */
def modifyZIO[R, E, B](f: A => ZIO[R, E, (B, A)]): ZIO[R, E, B]
/** Update using a ZIO effect */
def updateZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, Unit]
/** Update with ZIO and return new value */
def updateAndGetZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, A]
/** Get old value and update with ZIO */
def getAndUpdateZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, A]
}Usage Examples:
// Counter with atomic operations
val counterProgram = for {
counter <- Ref.make(0)
_ <- ZIO.foreachParDiscard(1 to 1000) { _ =>
counter.update(_ + 1)
}
final <- counter.get
_ <- Console.printLine(s"Final count: $final")
} yield ()
// Accumulator pattern
val accumulatorProgram = for {
acc <- Ref.make(List.empty[String])
_ <- ZIO.foreachParDiscard(tasks) { task =>
for {
result <- processTask(task)
_ <- acc.update(result :: _)
} yield ()
}
results <- acc.get
} yield results.reverse
// Complex state update with Synchronized Ref
val complexUpdate = for {
state <- Ref.Synchronized.make(AppState.empty)
_ <- state.updateZIO { currentState =>
for {
validated <- validateState(currentState)
updated <- applyChanges(validated)
_ <- logStateChange(currentState, updated)
} yield updated
}
} yield ()Thread-safe queue for communication between concurrent fibers with backpressure support.
/**
* A thread-safe queue for concurrent communication between fibers
*/
sealed abstract class Queue[A] extends Dequeue[A] with Enqueue[A] {
/** Offer an element, returning false if queue is full */
def offer(a: A): UIO[Boolean]
/** Offer multiple elements, returning elements that couldn't be enqueued */
def offerAll[A1 <: A](as: Iterable[A1]): UIO[Chunk[A1]]
/** Take an element, blocking if queue is empty */
def take: UIO[A]
/** Take all available elements */
def takeAll: UIO[Chunk[A]]
/** Take up to max elements */
def takeUpTo(max: Int): UIO[Chunk[A]]
/** Try to take an element without blocking */
def poll: UIO[Option[A]]
/** Get the queue capacity */
def capacity: Int
/** Get current size */
def size: UIO[Int]
/** Check if empty */
def isEmpty: UIO[Boolean]
/** Check if full */
def isFull: UIO[Boolean]
/** Shutdown the queue */
def shutdown: UIO[Unit]
/** Wait for queue shutdown */
def awaitShutdown: UIO[Unit]
/** Check if queue is shutdown */
def isShutdown: UIO[Boolean]
}Usage Examples:
// Producer-consumer pattern
val producerConsumer = for {
queue <- Queue.bounded[String](100)
producer <- ZIO.foreach(1 to 1000) { i =>
queue.offer(s"item-$i")
}.fork
consumer <- ZIO.repeatN(999) {
queue.take.flatMap(item => processItem(item))
}.fork
_ <- producer.join
_ <- consumer.join
} yield ()
// Work distribution
val workDistribution = for {
workQueue <- Queue.bounded[Task](50)
workers <- ZIO.foreach(1 to 10) { workerId =>
ZIO.forever {
workQueue.take.flatMap(task => task.perform())
}.fork
}
_ <- ZIO.foreach(tasks)(workQueue.offer)
_ <- ZIO.foreachDiscard(workers)(_.interrupt)
} yield ()
// Buffered processing
val bufferedProcessing = for {
buffer <- Queue.sliding[Data](1000) // Drops old items when full
_ <- dataStream.foreach(buffer.offer(_)).fork
_ <- ZIO.forever {
buffer.takeAll.flatMap { batch =>
ZIO.when(batch.nonEmpty)(processBatch(batch))
}.delay(1.second)
}
} yield ()Different queue implementations for various use cases and performance characteristics.
/**
* Create a bounded queue that blocks when full
*/
def bounded[A](requestedCapacity: => Int): UIO[Queue[A]]
/**
* Create an unbounded queue (limited only by memory)
*/
def unbounded[A]: UIO[Queue[A]]
/**
* Create a dropping queue that drops new items when full
*/
def dropping[A](requestedCapacity: => Int): UIO[Queue[A]]
/**
* Create a sliding queue that drops old items when full
*/
def sliding[A](requestedCapacity: => Int): UIO[Queue[A]]
/**
* Create a back-pressured queue with async boundaries
*/
def backpressured[A](requestedCapacity: => Int): UIO[Queue[A]]
/**
* Create a single-element queue (like a synchronous channel)
*/
def single[A]: UIO[Queue[A]]Usage Examples:
// Different queue types for different needs
val queueTypes = for {
// High-throughput with memory bounds
bounded <- Queue.bounded[Event](10000)
// No memory limits, but may cause OOM
unbounded <- Queue.unbounded[LogEntry]
// Latest data only, drops old items
sliding <- Queue.sliding[SensorReading](100)
// Drops new items when full
dropping <- Queue.dropping[NonCriticalUpdate](50)
// Synchronous handoff between fibers
sync <- Queue.single[CriticalMessage]
} yield (bounded, unbounded, sliding, dropping, sync)
// Fan-out pattern with multiple queues
val fanOut = for {
input <- Queue.unbounded[Data]
outputs <- ZIO.foreach(1 to 5)(_ => Queue.bounded[Data](100))
// Distribute input to all outputs
distributor <- ZIO.forever {
input.take.flatMap { data =>
ZIO.foreachDiscard(outputs)(_.offer(data))
}
}.fork
// Process from each output queue
processors <- ZIO.foreach(outputs.zipWithIndex) { case (queue, id) =>
ZIO.forever {
queue.take.flatMap(data => processWithId(data, id))
}.fork
}
} yield (distributor, processors)Concurrent hub for broadcasting messages to multiple subscribers with various consumption patterns.
/**
* A concurrent hub for broadcasting messages to multiple subscribers
*/
sealed abstract class Hub[A] {
/** Publish a message to all subscribers */
def publish(a: A): UIO[Boolean]
/** Publish multiple messages */
def publishAll(as: Iterable[A]): UIO[Boolean]
/** Subscribe to the hub, getting a queue for messages */
def subscribe: UIO[Dequeue[A]]
/** Get the hub capacity */
def capacity: Int
/** Get current size */
def size: UIO[Int]
/** Check if hub is shutdown */
def isShutdown: UIO[Boolean]
/** Shutdown the hub */
def shutdown: UIO[Unit]
}
object Hub {
/** Create a bounded hub */
def bounded[A](requestedCapacity: Int): UIO[Hub[A]]
/** Create an unbounded hub */
def unbounded[A]: UIO[Hub[A]]
/** Create a dropping hub */
def dropping[A](requestedCapacity: Int): UIO[Hub[A]]
/** Create a sliding hub */
def sliding[A](requestedCapacity: Int): UIO[Hub[A]]
}Usage Examples:
// Event broadcasting system
val eventSystem = for {
eventHub <- Hub.bounded[Event](1000)
// Multiple subscribers
uiQueue <- eventHub.subscribe
loggingQueue <- eventHub.subscribe
analyticsQueue <- eventHub.subscribe
// Publishers
_ <- eventStream.foreach(eventHub.publish).fork
// Subscribers
_ <- uiQueue.take.foreach(updateUI).forever.fork
_ <- loggingQueue.take.foreach(logEvent).forever.fork
_ <- analyticsQueue.take.foreach(trackEvent).forever.fork
} yield ()
// Real-time data distribution
val dataDistribution = for {
dataHub <- Hub.sliding[SensorData](500) // Keep latest data
// Data producer
_ <- sensorStream.foreach(dataHub.publish).fork
// Multiple consumers with different processing
dashboardQueue <- dataHub.subscribe
alertQueue <- dataHub.subscribe
storageQueue <- dataHub.subscribe
_ <- dashboardQueue.take.foreach(updateDashboard).forever.fork
_ <- alertQueue.take.foreach(checkAlerts).forever.fork
_ <- storageQueue.takeAll.foreach(batchStore).repeat(Schedule.fixed(10.seconds)).fork
} yield ()Promises represent single-assignment variables that can be completed exactly once, useful for coordination between fibers.
/**
* A Promise is a concurrent primitive that represents a value that may not yet be available
*/
sealed trait Promise[+E, +A] {
/** Complete the promise with a success value */
def succeed(a: A): UIO[Boolean]
/** Complete the promise with a failure */
def fail(e: E): UIO[Boolean]
/** Complete the promise with a ZIO effect */
def complete[E1 >: E, A1 >: A](zio: IO[E1, A1]): UIO[Boolean]
/** Complete the promise with an Exit value */
def done[E1 >: E, A1 >: A](exit: Exit[E1, A1]): UIO[Boolean]
/** Wait for the promise to be completed */
def await: IO[E, A]
/** Check if the promise is completed without blocking */
def poll: UIO[Option[IO[E, A]]]
/** Interrupt any fibers waiting on this promise */
def interrupt: UIO[Boolean]
/** Check if the promise has been completed */
def isDone: UIO[Boolean]
}
object Promise {
/** Create a new promise */
def make[E, A]: UIO[Promise[E, A]]
/** Create a promise and complete it with an effect */
def fromZIO[R, E, A](zio: ZIO[R, E, A]): ZIO[R, Nothing, Promise[E, A]]
}Usage Examples:
// Coordination between fibers
val coordination = for {
promise <- Promise.make[String, Int]
// Producer fiber
producer <- ZIO.sleep(2.seconds) *> promise.succeed(42).fork
// Consumer fiber waits for result
consumer <- promise.await.fork
result <- consumer.join
_ <- producer.join
} yield result
// Error propagation
val errorHandling = for {
promise <- Promise.make[String, Int]
worker <- (ZIO.sleep(1.second) *> ZIO.fail("computation failed"))
.tapError(promise.fail)
.fork
result <- promise.await.either // Will receive Left("computation failed")
_ <- worker.interrupt
} yield resultSemaphores provide controlled access to a limited number of resources with automatic blocking and releasing.
/**
* A Semaphore is a concurrency primitive that maintains a set of permits
*/
sealed trait Semaphore {
/** Acquire a permit, blocking if none available */
def acquire: UIO[Unit]
/** Acquire n permits */
def acquireN(n: Long): UIO[Unit]
/** Release a permit */
def release: UIO[Unit]
/** Release n permits */
def releaseN(n: Long): UIO[Unit]
/** Try to acquire a permit without blocking */
def tryAcquire: UIO[Boolean]
/** Try to acquire n permits without blocking */
def tryAcquireN(n: Long): UIO[Boolean]
/** Get available permits */
def available: UIO[Long]
/** Use a permit for the duration of an effect */
def withPermit[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A]
/** Use n permits for an effect */
def withPermits[R, E, A](n: Long)(zio: ZIO[R, E, A]): ZIO[R, E, A]
}
object Semaphore {
/** Create a semaphore with n permits */
def make(permits: Long): UIO[Semaphore]
}Usage Examples:
// Connection pool management
val connectionPool = for {
semaphore <- Semaphore.make(10) // Max 10 concurrent connections
// Use connection with automatic permit management
result <- semaphore.withPermit {
for {
conn <- openConnection()
result <- conn.query("SELECT * FROM users")
_ <- conn.close()
} yield result
}
} yield result
// Rate limiting
val rateLimiter = for {
permits <- Semaphore.make(100) // 100 requests per batch
// Process requests with rate limiting
_ <- ZIO.foreach(requests) { request =>
permits.withPermit(processRequest(request))
}
// Replenish permits periodically
_ <- permits.releaseN(100).repeat(Schedule.fixed(1.second))
} yield ()MVars are mutable variables that can be empty or contain exactly one value, useful for communication patterns.
/**
* An MVar is a mutable variable that is either empty or contains exactly one value
*/
sealed trait MVar[A] {
/** Put a value, blocking if already full */
def put(a: A): UIO[Unit]
/** Take the value, blocking if empty */
def take: UIO[A]
/** Try to put without blocking */
def tryPut(a: A): UIO[Boolean]
/** Try to take without blocking */
def tryTake: UIO[Option[A]]
/** Read the value without removing it, blocking if empty */
def read: UIO[A]
/** Try to read without blocking */
def tryRead: UIO[Option[A]]
/** Check if the MVar is empty */
def isEmpty: UIO[Boolean]
/** Swap the current value with a new one */
def swap(a: A): UIO[A]
/** Modify the value using a function */
def modify[B](f: A => (B, A)): UIO[B]
}
object MVar {
/** Create an empty MVar */
def empty[A]: UIO[MVar[A]]
/** Create an MVar with initial value */
def make[A](a: A): UIO[MVar[A]]
}Usage Examples:
// Producer-consumer with backpressure
val producerConsumer = for {
mvar <- MVar.empty[String]
producer <- ZIO.foreach(1 to 100) { i =>
mvar.put(s"item-$i") // Blocks if consumer is slow
}.fork
consumer <- ZIO.forever {
mvar.take.flatMap(processItem)
}.fork
_ <- ZIO.sleep(10.seconds)
_ <- producer.interrupt
_ <- consumer.interrupt
} yield ()
// Shared state with synchronization
val sharedCounter = for {
counter <- MVar.make(0)
// Multiple workers incrementing counter
workers <- ZIO.foreach(1 to 10) { _ =>
ZIO.forever {
counter.modify(n => ((), n + 1)) *>
ZIO.sleep(100.millis)
}.fork
}
_ <- ZIO.sleep(5.seconds)
final <- counter.read
_ <- ZIO.foreachDiscard(workers)(_.interrupt)
} yield finalExtended concurrent data structures from the concurrent module for specialized use cases.
/**
* Thread-safe concurrent map
*/
trait ConcurrentMap[K, V] {
def get(key: K): UIO[Option[V]]
def put(key: K, value: V): UIO[Option[V]]
def putIfAbsent(key: K, value: V): UIO[Option[V]]
def remove(key: K): UIO[Option[V]]
def replace(key: K, value: V): UIO[Option[V]]
def size: UIO[Int]
def isEmpty: UIO[Boolean]
}
/**
* Thread-safe concurrent set
*/
trait ConcurrentSet[A] {
def add(a: A): UIO[Boolean]
def remove(a: A): UIO[Boolean]
def contains(a: A): UIO[Boolean]
def size: UIO[Int]
def toSet: UIO[Set[A]]
}
/**
* Reentrant lock for mutual exclusion
*/
trait ReentrantLock {
def lock: UIO[Unit]
def unlock: UIO[Unit]
def tryLock: UIO[Boolean]
def withLock[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A]
def isLocked: UIO[Boolean]
}
/**
* Countdown latch for coordination
*/
trait CountdownLatch {
def countDown: UIO[Unit]
def await: UIO[Unit]
def getCount: UIO[Long]
}
/**
* Cyclic barrier for multi-phase coordination
*/
trait CyclicBarrier {
def await: UIO[Int]
def getParties: UIO[Int]
def getNumberWaiting: UIO[Int]
def isBroken: UIO[Boolean]
def reset: UIO[Unit]
}Usage Examples:
// Concurrent map for caching
val cacheExample = for {
cache <- ConcurrentMap.empty[String, User]
user <- cache.get("user-123").flatMap {
case Some(user) => ZIO.succeed(user)
case None => for {
user <- fetchUserFromDb("user-123")
_ <- cache.put("user-123", user)
} yield user
}
} yield user
// Coordination with countdown latch
val coordinatedStart = for {
latch <- CountdownLatch.make(3) // Wait for 3 workers
workers <- ZIO.foreach(1 to 3) { workerId =>
(initializeWorker(workerId) *> latch.countDown).fork
}
_ <- latch.await // Wait for all workers to initialize
_ <- Console.printLine("All workers ready, starting main task")
result <- mainTask
_ <- ZIO.foreachDiscard(workers)(_.interrupt)
} yield result