or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

concurrency.mdcore-io.mdindex.mdresources.mdruntime.mdstd.mdtime.md
tile.json

std.mddocs/

Standard Library

Standard implementations of common concurrent data structures and system integration components. The cats-effect-std module provides production-ready implementations of synchronization primitives, queues, and system interfaces built on the Cats Effect typeclasses.

Capabilities

Queues

Thread-safe FIFO queues with various capacity and overflow strategies.

/**
 * Thread-safe FIFO queue
 */
abstract class Queue[F[_], A] {
  /** Add element to queue, blocking if full */
  def offer(a: A): F[Unit]
  /** Try to add element without blocking */
  def tryOffer(a: A): F[Boolean]  
  /** Remove element from queue, blocking if empty */
  def take: F[A]
  /** Try to remove element without blocking */
  def tryTake: F[Option[A]]
  /** Current queue size */
  def size: F[Int]
}

/**
 * Create bounded queue with specified capacity
 * @param capacity - Maximum queue size
 * @returns F[Queue[F, A]] with bounded capacity
 */
def Queue.bounded[F[_]: Concurrent, A](capacity: Int): F[Queue[F, A]]

/**
 * Create unbounded queue
 * @returns F[Queue[F, A]] with unlimited capacity
 */
def Queue.unbounded[F[_]: Concurrent, A]: F[Queue[F, A]]

/**
 * Create circular buffer queue (overwrites oldest when full)
 * @param capacity - Buffer size
 * @returns F[Queue[F, A]] that overwrites on overflow
 */
def Queue.circularBuffer[F[_]: Concurrent, A](capacity: Int): F[Queue[F, A]]

/**
 * Create dropping queue (drops new elements when full)
 * @param capacity - Maximum size before dropping
 * @returns F[Queue[F, A]] that drops on overflow
 */
def Queue.dropping[F[_]: Concurrent, A](capacity: Int): F[Queue[F, A]]

/**
 * Create sliding queue (drops oldest when full)
 * @param capacity - Maximum size before sliding
 * @returns F[Queue[F, A]] that slides window on overflow
 */
def Queue.sliding[F[_]: Concurrent, A](capacity: Int): F[Queue[F, A]]

/**
 * Create synchronous queue (size 0, requires coordination)
 * @returns F[Queue[F, A]] for hand-off coordination
 */
def Queue.synchronous[F[_]: Concurrent, A]: F[Queue[F, A]]

Priority Queues

Queues that order elements by priority rather than insertion order.

/**
 * Priority queue that orders elements by priority
 */
abstract class PQueue[F[_], A] {
  /** Add element with priority ordering */
  def offer(a: A): F[Unit]
  /** Try to add element without blocking */
  def tryOffer(a: A): F[Boolean]
  /** Remove highest priority element */
  def take: F[A]
  /** Try to remove highest priority without blocking */
  def tryTake: F[Option[A]]
  /** Current queue size */
  def size: F[Int]
}

/**
 * Create bounded priority queue
 * @param capacity - Maximum queue size
 * @param ord - Ordering for priority (higher priority = lower Ordering value)
 * @returns F[PQueue[F, A]] with priority ordering
 */
def PQueue.bounded[F[_]: Concurrent, A](capacity: Int)(implicit ord: Order[A]): F[PQueue[F, A]]

/**
 * Create unbounded priority queue
 * @param ord - Ordering for priority
 * @returns F[PQueue[F, A]] with unlimited capacity
 */
def PQueue.unbounded[F[_]: Concurrent, A](implicit ord: Order[A]): F[PQueue[F, A]]

Double-Ended Queues

Queues supporting insertion and removal from both ends.

/**
 * Double-ended queue (deque) supporting operations at both ends
 */
trait Dequeue[F[_], A] extends Queue[F, A] {
  /** Add element to back of queue */
  def offerBack(a: A): F[Unit]
  /** Add element to front of queue */  
  def offerFront(a: A): F[Unit]
  /** Try to add to back without blocking */
  def tryOfferBack(a: A): F[Boolean]
  /** Try to add to front without blocking */
  def tryOfferFront(a: A): F[Boolean]
  /** Remove element from back */
  def takeBack: F[A]
  /** Remove element from front */
  def takeFront: F[A]  
  /** Try to remove from back without blocking */
  def tryTakeBack: F[Option[A]]
  /** Try to remove from front without blocking */
  def tryTakeFront: F[Option[A]]
}

/**
 * Create bounded dequeue
 * @param capacity - Maximum dequeue size
 * @returns F[Dequeue[F, A]] with bounded capacity
 */
def Dequeue.bounded[F[_]: Concurrent, A](capacity: Int): F[Dequeue[F, A]]

/**
 * Create unbounded dequeue  
 * @returns F[Dequeue[F, A]] with unlimited capacity
 */
def Dequeue.unbounded[F[_]: Concurrent, A]: F[Dequeue[F, A]]

Synchronization Primitives

Thread synchronization and coordination primitives.

/**
 * Counting semaphore for controlling access to resources
 */
abstract class Semaphore[F[_]] {
  /** Current available permits */
  def available: F[Long]
  /** Acquire single permit */
  def acquire: F[Unit]
  /** Acquire n permits */
  def acquireN(n: Long): F[Unit]
  /** Try to acquire permit without blocking */
  def tryAcquire: F[Boolean]
  /** Try to acquire n permits without blocking */
  def tryAcquireN(n: Long): F[Boolean]
  /** Release single permit */
  def release: F[Unit]
  /** Release n permits */
  def releaseN(n: Long): F[Unit]
  /** Use permit around operation */
  def withPermit[A](fa: F[A]): F[A]
  /** Use n permits around operation */
  def withPermits[A](n: Long)(fa: F[A]): F[A]
}

/**
 * Create semaphore with initial permits
 * @param n - Initial number of permits
 * @returns F[Semaphore[F]] with n permits
 */
def Semaphore[F[_]: Concurrent](n: Long): F[Semaphore[F]]

/**
 * Mutual exclusion lock (binary semaphore)
 */
abstract class Mutex[F[_]] {
  /** Acquire exclusive lock */
  def lock: F[Unit]
  /** Try to acquire lock without blocking */
  def tryLock: F[Boolean]
  /** Release lock */
  def unlock: F[Unit]
}

/**
 * Create mutex (initially unlocked)
 * @returns F[Mutex[F]] ready for use
 */
def Mutex[F[_]: Concurrent]: F[Mutex[F]]

/**
 * Countdown latch for one-time coordination
 */
abstract class CountDownLatch[F[_]] {
  /** Wait for countdown to reach zero */
  def await: F[Unit]
  /** Decrement counter by one */
  def release: F[Unit]
  /** Try to decrement without blocking */
  def tryRelease: F[Boolean]
  /** Current count value */
  def count: F[Long]
}

/**
 * Create countdown latch
 * @param n - Initial count value
 * @returns F[CountDownLatch[F]] with initial count
 */
def CountDownLatch[F[_]: Concurrent](n: Long): F[CountDownLatch[F]]

/**
 * Cyclic barrier for coordinating multiple fibers
 */
abstract class CyclicBarrier[F[_]] {
  /** Wait at barrier until all parties arrive */
  def await: F[Unit]
}

/**
 * Create cyclic barrier
 * @param n - Number of parties that must await
 * @returns F[CyclicBarrier[F]] for n parties
 */
def CyclicBarrier[F[_]: Concurrent](n: Int): F[CyclicBarrier[F]]

Atomic Operations

Thread-safe atomic operations and data structures.

/**
 * Thread-safe atomic cell
 */
trait AtomicCell[F[_], A] {
  /** Read current value */
  def get: F[A]
  /** Update value */
  def set(a: A): F[Unit]
  /** Atomic get-and-set */
  def getAndSet(a: A): F[A]
  /** Atomic modify with return value */
  def modify[B](f: A => (A, B)): F[B]
  /** Try to modify without blocking */
  def tryModify[B](f: A => (A, B)): F[Option[B]]
  /** Atomic update */
  def update(f: A => A): F[Unit]
  /** Try to update without blocking */
  def tryUpdate(f: A => A): F[Boolean]
}

/**
 * Create atomic cell
 * @param initial - Initial value
 * @returns F[AtomicCell[F, A]] with initial value
 */
def AtomicCell[F[_]: Concurrent].of[A](initial: A): F[AtomicCell[F, A]]

/**
 * Thread-safe concurrent map with atomic operations
 */
trait MapRef[F[_], K, V] {
  /** Get value for key */
  def apply(k: K): Ref[F, V]
  /** Get all keys */
  def keys: F[List[K]]
}

/**
 * Create concurrent map
 * @param initial - Initial key-value pairs
 * @returns F[MapRef[F, K, V]] with initial mappings
 */
def MapRef.ofShardedImmutableMap[F[_]: Concurrent, K, V](
  shardCount: Int
): F[MapRef[F, K, Option[V]]]

/**
 * Backpressure utilities for flow control
 */
trait Backpressure[F[_]] {
  /** Create backpressure strategy */
  def metered[A](meter: Meter[F]): F[A] => F[A]
}

/**
 * Create backpressure instance
 * @returns F[Backpressure[F]] for flow control
 */
def Backpressure[F[_]: Temporal]: F[Backpressure[F]]

Resource Management

Advanced resource management and supervision patterns.

/**
 * Fiber supervisor for managing fiber lifecycles
 */
trait Supervisor[F[_]] {
  /** Start supervised fiber that will be cleaned up automatically */
  def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]]
}

/**
 * Create supervisor resource  
 * @returns Resource[F, Supervisor[F]] that manages all supervised fibers
 */
def Supervisor[F[_]: Concurrent]: Resource[F, Supervisor[F]]

/**
 * Hot-swappable resource manager
 */
trait Hotswap[F[_], R] {
  /** Replace current resource */
  def swap(next: Resource[F, R]): F[R]
  /** Clear current resource */
  def clear: F[Unit]
  /** Get current resource */
  def get: F[Option[R]]
}

/**
 * Create hotswap resource manager
 * @param initial - Initial resource
 * @returns Resource[F, (R, Hotswap[F, R])] with initial resource and swapper
 */
def Hotswap.create[F[_]: Concurrent, R](initial: Resource[F, R]): Resource[F, (R, Hotswap[F, R])]

/**
 * Effect-to-callback dispatcher for integration with callback-based APIs
 */
trait Dispatcher[F[_]] {
  /** Convert F to Future (unsafe) */
  def unsafeToFuture[A](fa: F[A]): Future[A]
  /** Convert F to CompletableFuture (unsafe) */  
  def unsafeToCompletableFuture[A](fa: F[A]): CompletableFuture[A]
  /** Run F and forget result (fire-and-forget) */
  def unsafeRunAndForget[A](fa: F[A]): Unit
  /** Run F synchronously (unsafe, blocking) */
  def unsafeRunSync[A](fa: F[A]): A
}

/**
 * Create sequential dispatcher (preserves order)
 * @returns Resource[F, Dispatcher[F]] for ordered execution
 */
def Dispatcher.sequential[F[_]: Async]: Resource[F, Dispatcher[F]]

/**
 * Create parallel dispatcher (maximum throughput)
 * @returns Resource[F, Dispatcher[F]] for concurrent execution
 */
def Dispatcher.parallel[F[_]: Async]: Resource[F, Dispatcher[F]]

System Integration

Integration with system resources and environment.

/**
 * Console I/O operations
 */
trait Console[F[_]] {
  /** Print without newline */
  def print(line: Any): F[Unit]
  /** Print with newline */
  def println(line: Any): F[Unit]
  /** Print to stderr without newline */
  def error(line: Any): F[Unit]
  /** Print to stderr with newline */
  def errorln(line: Any): F[Unit]  
  /** Read line from stdin */
  def readLine: F[String]
}

/**
 * Access to system properties
 */
trait SystemProperties[F[_]] {
  /** Get system property */
  def get(name: String): F[Option[String]]
  /** Get system property with default */
  def getOrElse(name: String, default: String): F[String]
  /** Get all system properties */
  def entries: F[Map[String, String]]
}

/**
 * Access to environment variables
 */
trait Env[F[_]] {
  /** Get environment variable */
  def get(name: String): F[Option[String]]
  /** Get environment variable with default */
  def getOrElse(name: String, default: String): F[String]
  /** Get all environment variables */
  def entries: F[Map[String, String]]
}

Random Number Generation

Secure and standard random number generation.

/**
 * Random number generation
 */
trait Random[F[_]] {
  /** Random integer */
  def nextInt: F[Int]
  /** Bounded random integer */
  def nextIntBounded(n: Int): F[Int]
  /** Random long */
  def nextLong: F[Long]
  /** Random float [0.0, 1.0) */
  def nextFloat: F[Float]
  /** Random double [0.0, 1.0) */
  def nextDouble: F[Double]
  /** Random boolean */
  def nextBoolean: F[Boolean]
  /** Random byte array */
  def nextBytes(n: Int): F[Array[Byte]]
  /** Random string */
  def nextString(length: Int): F[String]
  /** Random printable character */
  def nextPrintableChar: F[Char]
  /** Shuffle vector */
  def shuffleVector[A](v: Vector[A]): F[Vector[A]]
  /** Shuffle list */
  def shuffleList[A](l: List[A]): F[List[A]]
}

/**
 * Cryptographically secure random
 */
trait SecureRandom[F[_]] extends Random[F]

/**
 * UUID generation
 */
trait UUIDGen[F[_]] {
  /** Generate random UUID */
  def randomUUID: F[UUID]
}

Usage Examples:

import cats.effect._
import cats.effect.std._
import scala.concurrent.duration._

// Producer-consumer with Queue
val producerConsumer = for {
  queue <- Queue.bounded[IO, String](10)
  
  // Producer fiber
  producer <- (1 to 100).toList.traverse { i =>
    queue.offer(s"Item $i") >> IO.sleep(10.millis)
  }.start
  
  // Consumer fiber
  consumer <- (1 to 100).toList.traverse { _ =>
    queue.take.flatMap(item => IO.println(s"Consumed: $item"))
  }.start
  
  _ <- producer.join
  _ <- consumer.join
} yield ()

// Rate limiting with Semaphore  
val rateLimited = for {
  semaphore <- Semaphore[IO](3) // Max 3 concurrent operations
  
  operations = (1 to 10).map { i =>
    semaphore.withPermit {
      IO.println(s"Starting operation $i") >>
      IO.sleep(1.second) >>
      IO.println(s"Completed operation $i")
    }
  }
  
  _ <- operations.toList.parSequence
} yield ()

// Supervised fibers
val supervised = Supervisor[IO].use { supervisor =>
  for {
    // These fibers will be automatically cleaned up
    fiber1 <- supervisor.supervise(IO.sleep(10.seconds).as("Long task"))
    fiber2 <- supervisor.supervise(IO.sleep(1.second).as("Short task"))
    
    // If we exit this block, both fibers are canceled
    result <- fiber2.joinWithNever
    _ <- IO.println(s"Got result: $result")
  } yield result
} // fiber1 is automatically canceled here

// Hot-swappable resource
val hotswappable = {
  val resource1 = Resource.eval(IO.println("Resource 1 acquired")).as("R1")
  val resource2 = Resource.eval(IO.println("Resource 2 acquired")).as("R2")
  
  Hotswap.create(resource1).use { case (initial, hotswap) =>
    for {
      _ <- IO.println(s"Using initial: $initial")
      r2 <- hotswap.swap(resource2)
      _ <- IO.println(s"Swapped to: $r2")
    } yield r2
  }
}

// System integration
val systemIntegration = for {
  // Read environment
  home <- Env[IO].get("HOME")
  _ <- IO.println(s"Home directory: ${home.getOrElse("Not set")}")
  
  // Console I/O
  _ <- Console[IO].println("Enter your name:")
  name <- Console[IO].readLine
  _ <- Console[IO].println(s"Hello, $name!")
  
  // Random operations
  random <- Random.scalaUtilRandom[IO]
  randomNum <- random.nextIntBounded(100)
  uuid <- UUIDGen.randomUUID[IO]
  _ <- IO.println(s"Random number: $randomNum, UUID: $uuid")
} yield ()