or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application.mdconcurrency.mdcore-effects.mddependency-injection.mderror-handling.mdindex.mdmetrics.mdresource-management.mdservices.mdstm.mdstreams.mdtesting.md
tile.json

concurrency.mddocs/

Concurrency

ZIO provides fiber-based lightweight concurrency with atomic data structures for building high-performance concurrent applications without the complexity of traditional thread-based concurrency.

Capabilities

Fiber - Lightweight Threads

Fibers are ZIO's abstraction for lightweight, composable concurrency that can be forked, joined, and interrupted safely.

/**
 * A lightweight thread of execution that can be composed and managed safely
 */
sealed abstract class Fiber[+E, +A] {
  /** Wait for the fiber to complete and return its Exit value */
  def await(implicit trace: Trace): UIO[Exit[E, A]]
  
  /** Join the fiber, returning success value or failing with error */
  def join(implicit trace: Trace): IO[E, A]
  
  /** Interrupt the fiber and wait for it to complete */
  def interrupt(implicit trace: Trace): UIO[Exit[E, A]]
  
  /** Interrupt the fiber with a specific fiber ID */
  def interruptAs(fiberId: FiberId)(implicit trace: Trace): UIO[Exit[E, A]]
  
  /** Interrupt the fiber in the background without waiting */
  def interruptFork(implicit trace: Trace): UIO[Unit]
  
  /** Check if the fiber has completed without blocking */
  def poll(implicit trace: Trace): UIO[Option[Exit[E, A]]]
}

/**
 * Runtime fiber with additional capabilities for introspection
 */
abstract class Fiber.Runtime[+E, +A] extends Fiber[E, A] {
  /** Get the fiber's unique identifier */
  def id: FiberId
  
  /** Get the current status of the fiber */
  def status(implicit trace: Trace): UIO[Fiber.Status]
  
  /** Get the fiber's execution trace */
  def trace(implicit trace: Trace): UIO[StackTrace]
  
  /** Get all child fibers spawned by this fiber */
  def children(implicit trace: Trace): UIO[Chunk[Fiber.Runtime[_, _]]]
}

Usage Examples:

import zio._

// Fork a computation into a fiber
val fiberProgram = for {
  fiber <- heavyComputation.fork
  _     <- quickTask
  result <- fiber.join  // Wait for completion
} yield result

// Race two fibers
val raceProgram = for {
  fiber1 <- task1.fork
  fiber2 <- task2.fork
  winner <- fiber1.race(fiber2)
  _      <- fiber1.interrupt
  _      <- fiber2.interrupt
} yield winner

// Interrupt a long-running fiber
val interruptProgram = for {
  fiber <- longRunningTask.fork
  _     <- ZIO.sleep(5.seconds)
  _     <- fiber.interrupt  // Cancel after 5 seconds
} yield ()

Fiber Composition

Combine and compose fibers using various operators for complex concurrent workflows.

/**
 * Transform the success value of a fiber
 */
def map[B](f: A => B): Fiber.Synthetic[E, B]

/**
 * Transform the success value using a ZIO effect
 */
def mapZIO[E1 >: E, B](f: A => IO[E1, B]): Fiber.Synthetic[E1, B]

/**
 * Replace success value with a constant
 */
def as[B](b: => B): Fiber.Synthetic[E, B]

/**
 * Discard the success value
 */
def unit: Fiber.Synthetic[E, Unit]

/**
 * Combine two fibers, returning both results
 */
def zip[E1 >: E, B](that: => Fiber[E1, B]): Fiber.Synthetic[E1, (A, B)]

/**
 * Combine two fibers with a custom function
 */
def zipWith[E1 >: E, B, C](that: => Fiber[E1, B])(f: (A, B) => C): Fiber.Synthetic[E1, C]

/**
 * Use this fiber if it succeeds, otherwise use the fallback
 */
def orElse[E1, A1 >: A](that: => Fiber[E1, A1]): Fiber.Synthetic[E1, A1]

/**
 * Race this fiber against another, returning the first to complete
 */
def race[E1 >: E, A1 >: A](that: => Fiber[E1, A1]): Fiber.Synthetic[E1, A1]

Usage Examples:

// Combine fiber results
val combined = for {
  fiber1 <- fetchUser(id).fork
  fiber2 <- fetchPreferences(id).fork  
  result <- fiber1.zip(fiber2)
  (user, prefs) = result
} yield UserWithPrefs(user, prefs)

// Transform fiber result
val processedFiber = dataFiber.map(_.processData)

// Race with timeout
val timedFiber = dataFiber.race(
  ZIO.sleep(30.seconds) *> ZIO.fail("Timeout")
)

Fiber Factory Methods

Create fibers from values, effects, and external sources.

/**
 * Create a fiber that has already completed with the given Exit
 */
def done[E, A](exit: => Exit[E, A]): Fiber.Synthetic[E, A]

/**
 * Create a fiber that succeeds with the given value
 */
def succeed[A](a: A): Fiber.Synthetic[Nothing, A]

/**
 * Create a fiber that fails with the given error
 */
def fail[E](e: E): Fiber.Synthetic[E, Nothing]

/**
 * Create a fiber that fails with the given cause
 */
def failCause[E](cause: Cause[E]): Fiber.Synthetic[E, Nothing]

/**
 * Create a fiber from a Scala Future
 */
def fromFuture[A](thunk: => Future[A]): Fiber.Synthetic[Throwable, A]

/**
 * Create a fiber from a ZIO effect (effect runs immediately)
 */
def fromZIO[E, A](io: IO[E, A]): UIO[Fiber.Synthetic[E, A]]

/**
 * Collect results from multiple fibers
 */
def collectAll[E, A](fibers: Iterable[Fiber[E, A]]): Fiber.Synthetic[E, Chunk[A]]

/**
 * Wait for all fibers to complete, ignoring results
 */
def awaitAll(fs: Iterable[Fiber[Any, Any]]): UIO[Unit]

/**
 * Join all fibers, failing if any fail
 */
def joinAll[E](fs: Iterable[Fiber[E, Any]]): IO[E, Unit]

/**
 * Interrupt all fibers
 */
def interruptAll(fs: Iterable[Fiber[Any, Any]]): UIO[Unit]

Usage Examples:

// Create pre-completed fibers
val successFiber = Fiber.succeed(42)
val failureFiber = Fiber.fail("Error occurred")

// Work with multiple fibers
val batchProcessing = for {
  fibers <- ZIO.foreach(dataChunks)(chunk => processChunk(chunk).fork)
  results <- Fiber.collectAll(fibers).join
} yield results

// Cleanup multiple fibers
val cleanup = for {
  _ <- Fiber.interruptAll(runningFibers)
  _ <- Console.printLine("All background tasks cancelled")
} yield ()

Ref - Atomic Reference

Thread-safe mutable reference that can be updated atomically across concurrent fibers.

/**
 * A thread-safe mutable reference that can be updated atomically
 */
sealed abstract class Ref[A] {
  /** Read the current value */
  def get: UIO[A]
  
  /** Set a new value */
  def set(a: A): UIO[Unit]
  
  /** Set a new value asynchronously */
  def setAsync(a: A): UIO[Unit]
  
  /** Atomically modify the value and return a result */
  def modify[B](f: A => (B, A)): UIO[B]
  
  /** Atomically update the value */
  def update(f: A => A): UIO[Unit]
  
  /** Update and return the new value */
  def updateAndGet(f: A => A): UIO[A]
  
  /** Return the old value and update */
  def getAndUpdate(f: A => A): UIO[A]
  
  /** Set new value and return the old value */
  def getAndSet(a: A): UIO[A]
}

/**
 * Synchronized Ref that allows ZIO effects in update functions
 */
sealed abstract class Ref.Synchronized[A] extends Ref[A] {
  /** Modify using a ZIO effect */
  def modifyZIO[R, E, B](f: A => ZIO[R, E, (B, A)]): ZIO[R, E, B]
  
  /** Update using a ZIO effect */
  def updateZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, Unit]
  
  /** Update with ZIO and return new value */
  def updateAndGetZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, A]
  
  /** Get old value and update with ZIO */
  def getAndUpdateZIO[R, E](f: A => ZIO[R, E, A]): ZIO[R, E, A]
}

Usage Examples:

// Counter with atomic operations
val counterProgram = for {
  counter <- Ref.make(0)
  _       <- ZIO.foreachParDiscard(1 to 1000) { _ =>
               counter.update(_ + 1)
             }
  final   <- counter.get
  _       <- Console.printLine(s"Final count: $final")
} yield ()

// Accumulator pattern
val accumulatorProgram = for {
  acc <- Ref.make(List.empty[String])
  _   <- ZIO.foreachParDiscard(tasks) { task =>
           for {
             result <- processTask(task)
             _      <- acc.update(result :: _)
           } yield ()
         }
  results <- acc.get
} yield results.reverse

// Complex state update with Synchronized Ref
val complexUpdate = for {
  state <- Ref.Synchronized.make(AppState.empty)
  _     <- state.updateZIO { currentState =>
             for {
               validated <- validateState(currentState)
               updated   <- applyChanges(validated)
               _         <- logStateChange(currentState, updated)
             } yield updated
           }
} yield ()

Queue - Concurrent Queue

Thread-safe queue for communication between concurrent fibers with backpressure support.

/**
 * A thread-safe queue for concurrent communication between fibers
 */
sealed abstract class Queue[A] extends Dequeue[A] with Enqueue[A] {
  /** Offer an element, returning false if queue is full */
  def offer(a: A): UIO[Boolean]
  
  /** Offer multiple elements, returning elements that couldn't be enqueued */
  def offerAll[A1 <: A](as: Iterable[A1]): UIO[Chunk[A1]]
  
  /** Take an element, blocking if queue is empty */
  def take: UIO[A]
  
  /** Take all available elements */
  def takeAll: UIO[Chunk[A]]
  
  /** Take up to max elements */
  def takeUpTo(max: Int): UIO[Chunk[A]]
  
  /** Try to take an element without blocking */
  def poll: UIO[Option[A]]
  
  /** Get the queue capacity */
  def capacity: Int
  
  /** Get current size */
  def size: UIO[Int]
  
  /** Check if empty */
  def isEmpty: UIO[Boolean]
  
  /** Check if full */
  def isFull: UIO[Boolean]
  
  /** Shutdown the queue */
  def shutdown: UIO[Unit]
  
  /** Wait for queue shutdown */
  def awaitShutdown: UIO[Unit]
  
  /** Check if queue is shutdown */
  def isShutdown: UIO[Boolean]
}

Usage Examples:

// Producer-consumer pattern
val producerConsumer = for {
  queue    <- Queue.bounded[String](100)
  producer <- ZIO.foreach(1 to 1000) { i =>
                queue.offer(s"item-$i")
              }.fork  
  consumer <- ZIO.repeatN(999) {
                queue.take.flatMap(item => processItem(item))
              }.fork
  _        <- producer.join
  _        <- consumer.join
} yield ()

// Work distribution
val workDistribution = for {
  workQueue <- Queue.bounded[Task](50)
  workers   <- ZIO.foreach(1 to 10) { workerId =>
                 ZIO.forever {
                   workQueue.take.flatMap(task => task.perform())
                 }.fork
               }
  _         <- ZIO.foreach(tasks)(workQueue.offer)
  _         <- ZIO.foreachDiscard(workers)(_.interrupt)
} yield ()

// Buffered processing
val bufferedProcessing = for {
  buffer <- Queue.sliding[Data](1000)  // Drops old items when full
  _      <- dataStream.foreach(buffer.offer(_)).fork
  _      <- ZIO.forever {
             buffer.takeAll.flatMap { batch =>
               ZIO.when(batch.nonEmpty)(processBatch(batch))
             }.delay(1.second)
           }
} yield ()

Queue Variants and Factory Methods

Different queue implementations for various use cases and performance characteristics.

/**
 * Create a bounded queue that blocks when full
 */
def bounded[A](requestedCapacity: => Int): UIO[Queue[A]]

/**
 * Create an unbounded queue (limited only by memory)
 */
def unbounded[A]: UIO[Queue[A]]

/**
 * Create a dropping queue that drops new items when full
 */
def dropping[A](requestedCapacity: => Int): UIO[Queue[A]]

/**
 * Create a sliding queue that drops old items when full
 */
def sliding[A](requestedCapacity: => Int): UIO[Queue[A]]

/**
 * Create a back-pressured queue with async boundaries
 */
def backpressured[A](requestedCapacity: => Int): UIO[Queue[A]]

/**
 * Create a single-element queue (like a synchronous channel)
 */  
def single[A]: UIO[Queue[A]]

Usage Examples:

// Different queue types for different needs
val queueTypes = for {
  // High-throughput with memory bounds
  bounded   <- Queue.bounded[Event](10000)
  
  // No memory limits, but may cause OOM
  unbounded <- Queue.unbounded[LogEntry]
  
  // Latest data only, drops old items
  sliding   <- Queue.sliding[SensorReading](100)
  
  // Drops new items when full
  dropping  <- Queue.dropping[NonCriticalUpdate](50)
  
  // Synchronous handoff between fibers
  sync      <- Queue.single[CriticalMessage]
} yield (bounded, unbounded, sliding, dropping, sync)

// Fan-out pattern with multiple queues
val fanOut = for {
  input     <- Queue.unbounded[Data]
  outputs   <- ZIO.foreach(1 to 5)(_ => Queue.bounded[Data](100))
  
  // Distribute input to all outputs
  distributor <- ZIO.forever {
                   input.take.flatMap { data =>
                     ZIO.foreachDiscard(outputs)(_.offer(data))
                   }
                 }.fork
                 
  // Process from each output queue
  processors <- ZIO.foreach(outputs.zipWithIndex) { case (queue, id) =>
                  ZIO.forever {
                    queue.take.flatMap(data => processWithId(data, id))
                  }.fork
                }
} yield (distributor, processors)

Hub - Broadcast Communication

Concurrent hub for broadcasting messages to multiple subscribers with various consumption patterns.

/**
 * A concurrent hub for broadcasting messages to multiple subscribers
 */
sealed abstract class Hub[A] {
  /** Publish a message to all subscribers */
  def publish(a: A): UIO[Boolean]
  
  /** Publish multiple messages */  
  def publishAll(as: Iterable[A]): UIO[Boolean]
  
  /** Subscribe to the hub, getting a queue for messages */
  def subscribe: UIO[Dequeue[A]]
  
  /** Get the hub capacity */
  def capacity: Int
  
  /** Get current size */
  def size: UIO[Int]
  
  /** Check if hub is shutdown */
  def isShutdown: UIO[Boolean]
  
  /** Shutdown the hub */
  def shutdown: UIO[Unit]
}

object Hub {
  /** Create a bounded hub */
  def bounded[A](requestedCapacity: Int): UIO[Hub[A]]
  
  /** Create an unbounded hub */
  def unbounded[A]: UIO[Hub[A]]
  
  /** Create a dropping hub */
  def dropping[A](requestedCapacity: Int): UIO[Hub[A]]
  
  /** Create a sliding hub */
  def sliding[A](requestedCapacity: Int): UIO[Hub[A]]
}

Usage Examples:

// Event broadcasting system
val eventSystem = for {
  eventHub <- Hub.bounded[Event](1000)
  
  // Multiple subscribers
  uiQueue      <- eventHub.subscribe
  loggingQueue <- eventHub.subscribe  
  analyticsQueue <- eventHub.subscribe
  
  // Publishers
  _ <- eventStream.foreach(eventHub.publish).fork
  
  // Subscribers
  _ <- uiQueue.take.foreach(updateUI).forever.fork
  _ <- loggingQueue.take.foreach(logEvent).forever.fork
  _ <- analyticsQueue.take.foreach(trackEvent).forever.fork
  
} yield ()

// Real-time data distribution
val dataDistribution = for {
  dataHub <- Hub.sliding[SensorData](500)  // Keep latest data
  
  // Data producer
  _ <- sensorStream.foreach(dataHub.publish).fork
  
  // Multiple consumers with different processing
  dashboardQueue <- dataHub.subscribe
  alertQueue     <- dataHub.subscribe
  storageQueue   <- dataHub.subscribe
  
  _ <- dashboardQueue.take.foreach(updateDashboard).forever.fork
  _ <- alertQueue.take.foreach(checkAlerts).forever.fork  
  _ <- storageQueue.takeAll.foreach(batchStore).repeat(Schedule.fixed(10.seconds)).fork
  
} yield ()

Promise - Single Assignment Variables

Promises represent single-assignment variables that can be completed exactly once, useful for coordination between fibers.

/**
 * A Promise is a concurrent primitive that represents a value that may not yet be available
 */
sealed trait Promise[+E, +A] {
  /** Complete the promise with a success value */
  def succeed(a: A): UIO[Boolean]
  
  /** Complete the promise with a failure */
  def fail(e: E): UIO[Boolean]
  
  /** Complete the promise with a ZIO effect */
  def complete[E1 >: E, A1 >: A](zio: IO[E1, A1]): UIO[Boolean]
  
  /** Complete the promise with an Exit value */  
  def done[E1 >: E, A1 >: A](exit: Exit[E1, A1]): UIO[Boolean]
  
  /** Wait for the promise to be completed */
  def await: IO[E, A]
  
  /** Check if the promise is completed without blocking */
  def poll: UIO[Option[IO[E, A]]]
  
  /** Interrupt any fibers waiting on this promise */
  def interrupt: UIO[Boolean]
  
  /** Check if the promise has been completed */
  def isDone: UIO[Boolean]
}

object Promise {
  /** Create a new promise */
  def make[E, A]: UIO[Promise[E, A]]
  
  /** Create a promise and complete it with an effect */
  def fromZIO[R, E, A](zio: ZIO[R, E, A]): ZIO[R, Nothing, Promise[E, A]]
}

Usage Examples:

// Coordination between fibers
val coordination = for {
  promise <- Promise.make[String, Int]
  
  // Producer fiber
  producer <- ZIO.sleep(2.seconds) *> promise.succeed(42).fork
  
  // Consumer fiber waits for result
  consumer <- promise.await.fork
  
  result <- consumer.join
  _      <- producer.join
} yield result

// Error propagation
val errorHandling = for {
  promise <- Promise.make[String, Int]
  worker  <- (ZIO.sleep(1.second) *> ZIO.fail("computation failed"))
               .tapError(promise.fail)
               .fork
  result  <- promise.await.either  // Will receive Left("computation failed")
  _       <- worker.interrupt
} yield result

Semaphore - Resource Limiting

Semaphores provide controlled access to a limited number of resources with automatic blocking and releasing.

/**
 * A Semaphore is a concurrency primitive that maintains a set of permits
 */
sealed trait Semaphore {
  /** Acquire a permit, blocking if none available */
  def acquire: UIO[Unit]
  
  /** Acquire n permits */
  def acquireN(n: Long): UIO[Unit]
  
  /** Release a permit */
  def release: UIO[Unit]
  
  /** Release n permits */
  def releaseN(n: Long): UIO[Unit]
  
  /** Try to acquire a permit without blocking */
  def tryAcquire: UIO[Boolean]
  
  /** Try to acquire n permits without blocking */
  def tryAcquireN(n: Long): UIO[Boolean]
  
  /** Get available permits */
  def available: UIO[Long]
  
  /** Use a permit for the duration of an effect */
  def withPermit[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A]
  
  /** Use n permits for an effect */
  def withPermits[R, E, A](n: Long)(zio: ZIO[R, E, A]): ZIO[R, E, A]
}

object Semaphore {
  /** Create a semaphore with n permits */
  def make(permits: Long): UIO[Semaphore]
}

Usage Examples:

// Connection pool management
val connectionPool = for {
  semaphore <- Semaphore.make(10)  // Max 10 concurrent connections
  
  // Use connection with automatic permit management
  result <- semaphore.withPermit {
    for {
      conn   <- openConnection()
      result <- conn.query("SELECT * FROM users")
      _      <- conn.close()
    } yield result
  }
} yield result

// Rate limiting
val rateLimiter = for {
  permits <- Semaphore.make(100)  // 100 requests per batch
  
  // Process requests with rate limiting
  _ <- ZIO.foreach(requests) { request =>
    permits.withPermit(processRequest(request))
  }
  
  // Replenish permits periodically
  _ <- permits.releaseN(100).repeat(Schedule.fixed(1.second))
} yield ()

MVar - Mutable Variable

MVars are mutable variables that can be empty or contain exactly one value, useful for communication patterns.

/**
 * An MVar is a mutable variable that is either empty or contains exactly one value
 */  
sealed trait MVar[A] {
  /** Put a value, blocking if already full */
  def put(a: A): UIO[Unit]
  
  /** Take the value, blocking if empty */
  def take: UIO[A]
  
  /** Try to put without blocking */
  def tryPut(a: A): UIO[Boolean]
  
  /** Try to take without blocking */
  def tryTake: UIO[Option[A]]
  
  /** Read the value without removing it, blocking if empty */
  def read: UIO[A]
  
  /** Try to read without blocking */
  def tryRead: UIO[Option[A]]
  
  /** Check if the MVar is empty */
  def isEmpty: UIO[Boolean]
  
  /** Swap the current value with a new one */
  def swap(a: A): UIO[A]
  
  /** Modify the value using a function */
  def modify[B](f: A => (B, A)): UIO[B]
}

object MVar {
  /** Create an empty MVar */
  def empty[A]: UIO[MVar[A]]
  
  /** Create an MVar with initial value */
  def make[A](a: A): UIO[MVar[A]]
}

Usage Examples:

// Producer-consumer with backpressure
val producerConsumer = for {
  mvar     <- MVar.empty[String]
  
  producer <- ZIO.foreach(1 to 100) { i =>
                mvar.put(s"item-$i")  // Blocks if consumer is slow
              }.fork
              
  consumer <- ZIO.forever {
                mvar.take.flatMap(processItem)
              }.fork
              
  _        <- ZIO.sleep(10.seconds)
  _        <- producer.interrupt
  _        <- consumer.interrupt
} yield ()

// Shared state with synchronization
val sharedCounter = for {
  counter <- MVar.make(0)
  
  // Multiple workers incrementing counter
  workers <- ZIO.foreach(1 to 10) { _ =>
               ZIO.forever {
                 counter.modify(n => ((), n + 1)) *>
                 ZIO.sleep(100.millis)
               }.fork
             }
             
  _       <- ZIO.sleep(5.seconds)
  final   <- counter.read
  _       <- ZIO.foreachDiscard(workers)(_.interrupt)
} yield final

Additional Concurrent Collections

Extended concurrent data structures from the concurrent module for specialized use cases.

/**
 * Thread-safe concurrent map
 */
trait ConcurrentMap[K, V] {
  def get(key: K): UIO[Option[V]]
  def put(key: K, value: V): UIO[Option[V]]
  def putIfAbsent(key: K, value: V): UIO[Option[V]]
  def remove(key: K): UIO[Option[V]]
  def replace(key: K, value: V): UIO[Option[V]]
  def size: UIO[Int]
  def isEmpty: UIO[Boolean]
}

/**
 * Thread-safe concurrent set
 */
trait ConcurrentSet[A] {
  def add(a: A): UIO[Boolean]
  def remove(a: A): UIO[Boolean]
  def contains(a: A): UIO[Boolean]
  def size: UIO[Int]
  def toSet: UIO[Set[A]]
}

/**
 * Reentrant lock for mutual exclusion
 */
trait ReentrantLock {
  def lock: UIO[Unit]
  def unlock: UIO[Unit]
  def tryLock: UIO[Boolean]
  def withLock[R, E, A](zio: ZIO[R, E, A]): ZIO[R, E, A]
  def isLocked: UIO[Boolean]
}

/**
 * Countdown latch for coordination
 */
trait CountdownLatch {
  def countDown: UIO[Unit]
  def await: UIO[Unit]
  def getCount: UIO[Long]
}

/**
 * Cyclic barrier for multi-phase coordination
 */
trait CyclicBarrier {
  def await: UIO[Int]
  def getParties: UIO[Int]
  def getNumberWaiting: UIO[Int]
  def isBroken: UIO[Boolean]
  def reset: UIO[Unit]
}

Usage Examples:

// Concurrent map for caching
val cacheExample = for {
  cache <- ConcurrentMap.empty[String, User]
  
  user <- cache.get("user-123").flatMap {
    case Some(user) => ZIO.succeed(user)
    case None       => for {
      user <- fetchUserFromDb("user-123")
      _    <- cache.put("user-123", user)
    } yield user
  }
} yield user

// Coordination with countdown latch
val coordinatedStart = for {
  latch   <- CountdownLatch.make(3)  // Wait for 3 workers
  
  workers <- ZIO.foreach(1 to 3) { workerId =>
               (initializeWorker(workerId) *> latch.countDown).fork
             }
             
  _       <- latch.await  // Wait for all workers to initialize
  _       <- Console.printLine("All workers ready, starting main task")
  
  result  <- mainTask
  _       <- ZIO.foreachDiscard(workers)(_.interrupt)
} yield result