or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

concurrency.mdcore-io.mdindex.mdresources.mdruntime.mdstd.mdtime.md
tile.json

concurrency.mddocs/

Concurrency and Fibers

Fiber-based concurrency with structured concurrency patterns. Cats Effect provides lightweight green threads called fibers that enable safe, composable concurrent programming with automatic resource management and cancellation.

Capabilities

Fiber Operations

Core fiber spawning, joining, and lifecycle management.

/**
 * Start IO as a fiber running concurrently
 * @returns IO[Fiber[IO, Throwable, A]] - Handle to the running fiber
 */
def start: IO[Fiber[IO, Throwable, A]]

/**
 * Start IO in background with automatic cleanup
 * @returns Resource[IO, IO[Outcome[IO, Throwable, A]]] - Resource managing fiber lifecycle
 */
def background: Resource[IO, IO[Outcome[IO, Throwable, A]]]

/**
 * Cancel the fiber
 * @returns IO[Unit] that completes when cancellation is processed
 */
def cancel: IO[Unit] // On Fiber

/**
 * Wait for fiber completion
 * @returns IO[Outcome[IO, Throwable, A]] with fiber result
 */
def join: IO[Outcome[IO, Throwable, A]] // On Fiber

/**
 * Join fiber, treating cancellation as never completing
 * @returns IO[A] that waits for successful completion
 */
def joinWithNever: IO[A] // On Fiber

/**
 * Join fiber with custom cancellation handling
 * @param onCancel - IO to run if fiber was canceled
 * @returns IO[A] with custom cancellation behavior
 */
def joinWith[AA >: A](onCancel: IO[AA]): IO[AA] // On Fiber

Racing and Parallel Execution

Utilities for racing computations and parallel execution patterns.

/**
 * Race two IOs, returning the winner
 * @param other - IO to race against
 * @returns IO[Either[A, B]] with winner indicated by Left/Right
 */
def race[B](other: IO[B]): IO[Either[A, B]]

/**
 * Race two IOs, getting full outcome information
 * @param other - IO to race against  
 * @returns IO with winner outcome and loser fiber
 */
def racePair[B](other: IO[B]): IO[Either[
  (Outcome[IO, Throwable, A], Fiber[IO, Throwable, B]),
  (Fiber[IO, Throwable, A], Outcome[IO, Throwable, B])
]]

/**
 * Run two IOs in parallel, collecting both results
 * @param other - IO to run in parallel
 * @returns IO[(A, B)] with both results
 */
def both[B](other: IO[B]): IO[(A, B)]

/**
 * Run IOs in parallel using Parallel instance
 * @param ios - IOs to run in parallel
 * @returns IO with sequence of results
 */
def IO.parSequenceN[A](n: Int)(ios: List[IO[A]]): IO[List[A]]

/**
 * Traverse in parallel with limited concurrency
 * @param n - Maximum concurrent operations
 * @param f - Function to apply to each element
 * @returns IO with sequence of results
 */
def IO.parTraverseN[A, B](n: Int)(as: List[A])(f: A => IO[B]): IO[List[B]]

/**
 * Replicate IO in parallel with limited concurrency
 * @param n - Maximum concurrent operations
 * @param replicas - Number of replicas to create
 * @param ioa - IO to replicate
 * @returns IO with sequence of replica results
 */
def IO.parReplicateAN[A](n: Int)(replicas: Int, ioa: IO[A]): IO[List[A]]

Concurrent References

Thread-safe mutable references for coordination between fibers.

/**
 * Create a concurrent mutable reference
 * @param initial - Initial value
 * @returns IO[Ref[IO, A]] containing the reference
 */
def Ref.of[A](initial: A): IO[Ref[IO, A]]

/**
 * Read current value atomically
 * @returns IO[A] with current value
 */
def get: IO[A] // On Ref

/**
 * Update value atomically
 * @param a - New value
 * @returns IO[Unit] completing when update is done
 */
def set(a: A): IO[Unit] // On Ref

/**
 * Atomic compare-and-set operation
 * @param expected - Expected current value
 * @param updated - New value to set
 * @returns IO[Boolean] indicating if update succeeded
 */
def compareAndSet(expected: A, updated: A): IO[Boolean] // On Ref

/**
 * Atomic modify with return value
 * @param f - Function to compute new value and result
 * @returns IO[B] with the computed result
 */
def modify[B](f: A => (A, B)): IO[B] // On Ref

/**
 * Atomic update
 * @param f - Update function
 * @returns IO[Unit] completing when update is done
 */
def update(f: A => A): IO[Unit] // On Ref

/**
 * Non-blocking update attempt
 * @param f - Update function
 * @returns IO[Boolean] indicating if update succeeded
 */
def tryUpdate(f: A => A): IO[Boolean] // On Ref

/**
 * Atomic access with separate getter/setter
 * @returns IO[(A, A => IO[Boolean])] with value and setter
 */
def access: IO[(A, A => IO[Boolean])] // On Ref

Deferred - One-Shot Communication

Single-use communication primitive for fiber coordination.

/**
 * Create an empty Deferred
 * @returns IO[Deferred[IO, A]] - Empty deferred value
 */
def Deferred[A]: IO[Deferred[IO, A]]

/**
 * Wait for completion (blocks until value available)
 * @returns IO[A] with the completed value
 */
def get: IO[A] // On Deferred

/**
 * Check for completion without blocking
 * @returns IO[Option[A]] - Some(value) if complete, None if still waiting
 */
def tryGet: IO[Option[A]] // On Deferred

/**
 * Complete the deferred with a value
 * @param a - Value to complete with
 * @returns IO[Unit] completing when value is set
 */
def complete(a: A): IO[Unit] // On Deferred

/**
 * Try to complete the deferred (non-blocking)
 * @param a - Value to complete with
 * @returns IO[Boolean] indicating if completion succeeded
 */
def tryComplete(a: A): IO[Boolean] // On Deferred

Memoization

Cache computation results for repeated access.

/**
 * Memoize an IO computation
 * @returns IO[IO[A]] - Memoized version that caches result
 */
def memoize: IO[IO[A]]

Execution Context Management

Control which execution context runs specific operations.

/**
 * Run IO on specific execution context
 * @param ec - ExecutionContext to use
 * @returns IO that runs on the specified context
 */
def evalOn(ec: ExecutionContext): IO[A]

/**
 * Get current execution context
 * @returns IO[ExecutionContext] with current context
 */
def IO.executionContext: IO[ExecutionContext]

/**
 * Interruptible blocking operation
 * @param thunk - Blocking computation
 * @returns IO that can be interrupted
 */
def IO.interruptible[A](thunk: => A): IO[A]

/**
 * Interruptible with many interruption points
 * @param thunk - Blocking computation with interruption checks
 * @returns IO that checks for interruption frequently
 */
def IO.interruptibleMany[A](thunk: => A): IO[A]

Usage Examples:

import cats.effect._
import cats.syntax.parallel._
import scala.concurrent.duration._

// Basic fiber usage
val fiberProgram = for {
  fiber <- IO.delay {
    Thread.sleep(1000)
    "Fiber result"
  }.start
  
  // Do other work while fiber runs
  _ <- IO.println("Doing other work...")
  
  // Wait for fiber completion
  result <- fiber.joinWithNever
  _ <- IO.println(s"Fiber completed with: $result")
} yield result

// Racing operations
val raceProgram = {
  val fast = IO.sleep(100.millis).as("Fast")
  val slow = IO.sleep(1.second).as("Slow")
  
  fast.race(slow).map {
    case Left(result) => s"Fast won: $result"
    case Right(result) => s"Slow won: $result"
  }
}

// Concurrent coordination with Ref
val coordinated = for {
  counter <- Ref.of[IO, Int](0)
  
  // Start multiple fibers updating counter
  fibers <- List.fill(10)(
    counter.update(_ + 1)
  ).parSequence
  
  finalCount <- counter.get
  _ <- IO.println(s"Final count: $finalCount")
} yield finalCount

// Producer-consumer with Deferred
val producerConsumer = for {
  deferred <- Deferred[IO, String]
  
  // Producer fiber
  producer <- (IO.sleep(1.second) >> 
               deferred.complete("Produced value")).start
  
  // Consumer fiber  
  consumer <- deferred.get.map(value => 
                s"Consumed: $value").start
                
  result <- consumer.joinWithNever
  _ <- producer.join
} yield result