Standard implementations of common concurrent data structures and system integration components. The cats-effect-std module provides production-ready implementations of synchronization primitives, queues, and system interfaces built on the Cats Effect typeclasses.
Thread-safe FIFO queues with various capacity and overflow strategies.
/**
* Thread-safe FIFO queue
*/
abstract class Queue[F[_], A] {
/** Add element to queue, blocking if full */
def offer(a: A): F[Unit]
/** Try to add element without blocking */
def tryOffer(a: A): F[Boolean]
/** Remove element from queue, blocking if empty */
def take: F[A]
/** Try to remove element without blocking */
def tryTake: F[Option[A]]
/** Current queue size */
def size: F[Int]
}
/**
* Create bounded queue with specified capacity
* @param capacity - Maximum queue size
* @returns F[Queue[F, A]] with bounded capacity
*/
def Queue.bounded[F[_]: Concurrent, A](capacity: Int): F[Queue[F, A]]
/**
* Create unbounded queue
* @returns F[Queue[F, A]] with unlimited capacity
*/
def Queue.unbounded[F[_]: Concurrent, A]: F[Queue[F, A]]
/**
* Create circular buffer queue (overwrites oldest when full)
* @param capacity - Buffer size
* @returns F[Queue[F, A]] that overwrites on overflow
*/
def Queue.circularBuffer[F[_]: Concurrent, A](capacity: Int): F[Queue[F, A]]
/**
* Create dropping queue (drops new elements when full)
* @param capacity - Maximum size before dropping
* @returns F[Queue[F, A]] that drops on overflow
*/
def Queue.dropping[F[_]: Concurrent, A](capacity: Int): F[Queue[F, A]]
/**
* Create sliding queue (drops oldest when full)
* @param capacity - Maximum size before sliding
* @returns F[Queue[F, A]] that slides window on overflow
*/
def Queue.sliding[F[_]: Concurrent, A](capacity: Int): F[Queue[F, A]]
/**
* Create synchronous queue (size 0, requires coordination)
* @returns F[Queue[F, A]] for hand-off coordination
*/
def Queue.synchronous[F[_]: Concurrent, A]: F[Queue[F, A]]Queues that order elements by priority rather than insertion order.
/**
* Priority queue that orders elements by priority
*/
abstract class PQueue[F[_], A] {
/** Add element with priority ordering */
def offer(a: A): F[Unit]
/** Try to add element without blocking */
def tryOffer(a: A): F[Boolean]
/** Remove highest priority element */
def take: F[A]
/** Try to remove highest priority without blocking */
def tryTake: F[Option[A]]
/** Current queue size */
def size: F[Int]
}
/**
* Create bounded priority queue
* @param capacity - Maximum queue size
* @param ord - Ordering for priority (higher priority = lower Ordering value)
* @returns F[PQueue[F, A]] with priority ordering
*/
def PQueue.bounded[F[_]: Concurrent, A](capacity: Int)(implicit ord: Order[A]): F[PQueue[F, A]]
/**
* Create unbounded priority queue
* @param ord - Ordering for priority
* @returns F[PQueue[F, A]] with unlimited capacity
*/
def PQueue.unbounded[F[_]: Concurrent, A](implicit ord: Order[A]): F[PQueue[F, A]]Queues supporting insertion and removal from both ends.
/**
* Double-ended queue (deque) supporting operations at both ends
*/
trait Dequeue[F[_], A] extends Queue[F, A] {
/** Add element to back of queue */
def offerBack(a: A): F[Unit]
/** Add element to front of queue */
def offerFront(a: A): F[Unit]
/** Try to add to back without blocking */
def tryOfferBack(a: A): F[Boolean]
/** Try to add to front without blocking */
def tryOfferFront(a: A): F[Boolean]
/** Remove element from back */
def takeBack: F[A]
/** Remove element from front */
def takeFront: F[A]
/** Try to remove from back without blocking */
def tryTakeBack: F[Option[A]]
/** Try to remove from front without blocking */
def tryTakeFront: F[Option[A]]
}
/**
* Create bounded dequeue
* @param capacity - Maximum dequeue size
* @returns F[Dequeue[F, A]] with bounded capacity
*/
def Dequeue.bounded[F[_]: Concurrent, A](capacity: Int): F[Dequeue[F, A]]
/**
* Create unbounded dequeue
* @returns F[Dequeue[F, A]] with unlimited capacity
*/
def Dequeue.unbounded[F[_]: Concurrent, A]: F[Dequeue[F, A]]Thread synchronization and coordination primitives.
/**
* Counting semaphore for controlling access to resources
*/
abstract class Semaphore[F[_]] {
/** Current available permits */
def available: F[Long]
/** Acquire single permit */
def acquire: F[Unit]
/** Acquire n permits */
def acquireN(n: Long): F[Unit]
/** Try to acquire permit without blocking */
def tryAcquire: F[Boolean]
/** Try to acquire n permits without blocking */
def tryAcquireN(n: Long): F[Boolean]
/** Release single permit */
def release: F[Unit]
/** Release n permits */
def releaseN(n: Long): F[Unit]
/** Use permit around operation */
def withPermit[A](fa: F[A]): F[A]
/** Use n permits around operation */
def withPermits[A](n: Long)(fa: F[A]): F[A]
}
/**
* Create semaphore with initial permits
* @param n - Initial number of permits
* @returns F[Semaphore[F]] with n permits
*/
def Semaphore[F[_]: Concurrent](n: Long): F[Semaphore[F]]
/**
* Mutual exclusion lock (binary semaphore)
*/
abstract class Mutex[F[_]] {
/** Acquire exclusive lock */
def lock: F[Unit]
/** Try to acquire lock without blocking */
def tryLock: F[Boolean]
/** Release lock */
def unlock: F[Unit]
}
/**
* Create mutex (initially unlocked)
* @returns F[Mutex[F]] ready for use
*/
def Mutex[F[_]: Concurrent]: F[Mutex[F]]
/**
* Countdown latch for one-time coordination
*/
abstract class CountDownLatch[F[_]] {
/** Wait for countdown to reach zero */
def await: F[Unit]
/** Decrement counter by one */
def release: F[Unit]
/** Try to decrement without blocking */
def tryRelease: F[Boolean]
/** Current count value */
def count: F[Long]
}
/**
* Create countdown latch
* @param n - Initial count value
* @returns F[CountDownLatch[F]] with initial count
*/
def CountDownLatch[F[_]: Concurrent](n: Long): F[CountDownLatch[F]]
/**
* Cyclic barrier for coordinating multiple fibers
*/
abstract class CyclicBarrier[F[_]] {
/** Wait at barrier until all parties arrive */
def await: F[Unit]
}
/**
* Create cyclic barrier
* @param n - Number of parties that must await
* @returns F[CyclicBarrier[F]] for n parties
*/
def CyclicBarrier[F[_]: Concurrent](n: Int): F[CyclicBarrier[F]]Thread-safe atomic operations and data structures.
/**
* Thread-safe atomic cell
*/
trait AtomicCell[F[_], A] {
/** Read current value */
def get: F[A]
/** Update value */
def set(a: A): F[Unit]
/** Atomic get-and-set */
def getAndSet(a: A): F[A]
/** Atomic modify with return value */
def modify[B](f: A => (A, B)): F[B]
/** Try to modify without blocking */
def tryModify[B](f: A => (A, B)): F[Option[B]]
/** Atomic update */
def update(f: A => A): F[Unit]
/** Try to update without blocking */
def tryUpdate(f: A => A): F[Boolean]
}
/**
* Create atomic cell
* @param initial - Initial value
* @returns F[AtomicCell[F, A]] with initial value
*/
def AtomicCell[F[_]: Concurrent].of[A](initial: A): F[AtomicCell[F, A]]
/**
* Thread-safe concurrent map with atomic operations
*/
trait MapRef[F[_], K, V] {
/** Get value for key */
def apply(k: K): Ref[F, V]
/** Get all keys */
def keys: F[List[K]]
}
/**
* Create concurrent map
* @param initial - Initial key-value pairs
* @returns F[MapRef[F, K, V]] with initial mappings
*/
def MapRef.ofShardedImmutableMap[F[_]: Concurrent, K, V](
shardCount: Int
): F[MapRef[F, K, Option[V]]]
/**
* Backpressure utilities for flow control
*/
trait Backpressure[F[_]] {
/** Create backpressure strategy */
def metered[A](meter: Meter[F]): F[A] => F[A]
}
/**
* Create backpressure instance
* @returns F[Backpressure[F]] for flow control
*/
def Backpressure[F[_]: Temporal]: F[Backpressure[F]]Advanced resource management and supervision patterns.
/**
* Fiber supervisor for managing fiber lifecycles
*/
trait Supervisor[F[_]] {
/** Start supervised fiber that will be cleaned up automatically */
def supervise[A](fa: F[A]): F[Fiber[F, Throwable, A]]
}
/**
* Create supervisor resource
* @returns Resource[F, Supervisor[F]] that manages all supervised fibers
*/
def Supervisor[F[_]: Concurrent]: Resource[F, Supervisor[F]]
/**
* Hot-swappable resource manager
*/
trait Hotswap[F[_], R] {
/** Replace current resource */
def swap(next: Resource[F, R]): F[R]
/** Clear current resource */
def clear: F[Unit]
/** Get current resource */
def get: F[Option[R]]
}
/**
* Create hotswap resource manager
* @param initial - Initial resource
* @returns Resource[F, (R, Hotswap[F, R])] with initial resource and swapper
*/
def Hotswap.create[F[_]: Concurrent, R](initial: Resource[F, R]): Resource[F, (R, Hotswap[F, R])]
/**
* Effect-to-callback dispatcher for integration with callback-based APIs
*/
trait Dispatcher[F[_]] {
/** Convert F to Future (unsafe) */
def unsafeToFuture[A](fa: F[A]): Future[A]
/** Convert F to CompletableFuture (unsafe) */
def unsafeToCompletableFuture[A](fa: F[A]): CompletableFuture[A]
/** Run F and forget result (fire-and-forget) */
def unsafeRunAndForget[A](fa: F[A]): Unit
/** Run F synchronously (unsafe, blocking) */
def unsafeRunSync[A](fa: F[A]): A
}
/**
* Create sequential dispatcher (preserves order)
* @returns Resource[F, Dispatcher[F]] for ordered execution
*/
def Dispatcher.sequential[F[_]: Async]: Resource[F, Dispatcher[F]]
/**
* Create parallel dispatcher (maximum throughput)
* @returns Resource[F, Dispatcher[F]] for concurrent execution
*/
def Dispatcher.parallel[F[_]: Async]: Resource[F, Dispatcher[F]]Integration with system resources and environment.
/**
* Console I/O operations
*/
trait Console[F[_]] {
/** Print without newline */
def print(line: Any): F[Unit]
/** Print with newline */
def println(line: Any): F[Unit]
/** Print to stderr without newline */
def error(line: Any): F[Unit]
/** Print to stderr with newline */
def errorln(line: Any): F[Unit]
/** Read line from stdin */
def readLine: F[String]
}
/**
* Access to system properties
*/
trait SystemProperties[F[_]] {
/** Get system property */
def get(name: String): F[Option[String]]
/** Get system property with default */
def getOrElse(name: String, default: String): F[String]
/** Get all system properties */
def entries: F[Map[String, String]]
}
/**
* Access to environment variables
*/
trait Env[F[_]] {
/** Get environment variable */
def get(name: String): F[Option[String]]
/** Get environment variable with default */
def getOrElse(name: String, default: String): F[String]
/** Get all environment variables */
def entries: F[Map[String, String]]
}Secure and standard random number generation.
/**
* Random number generation
*/
trait Random[F[_]] {
/** Random integer */
def nextInt: F[Int]
/** Bounded random integer */
def nextIntBounded(n: Int): F[Int]
/** Random long */
def nextLong: F[Long]
/** Random float [0.0, 1.0) */
def nextFloat: F[Float]
/** Random double [0.0, 1.0) */
def nextDouble: F[Double]
/** Random boolean */
def nextBoolean: F[Boolean]
/** Random byte array */
def nextBytes(n: Int): F[Array[Byte]]
/** Random string */
def nextString(length: Int): F[String]
/** Random printable character */
def nextPrintableChar: F[Char]
/** Shuffle vector */
def shuffleVector[A](v: Vector[A]): F[Vector[A]]
/** Shuffle list */
def shuffleList[A](l: List[A]): F[List[A]]
}
/**
* Cryptographically secure random
*/
trait SecureRandom[F[_]] extends Random[F]
/**
* UUID generation
*/
trait UUIDGen[F[_]] {
/** Generate random UUID */
def randomUUID: F[UUID]
}Usage Examples:
import cats.effect._
import cats.effect.std._
import scala.concurrent.duration._
// Producer-consumer with Queue
val producerConsumer = for {
queue <- Queue.bounded[IO, String](10)
// Producer fiber
producer <- (1 to 100).toList.traverse { i =>
queue.offer(s"Item $i") >> IO.sleep(10.millis)
}.start
// Consumer fiber
consumer <- (1 to 100).toList.traverse { _ =>
queue.take.flatMap(item => IO.println(s"Consumed: $item"))
}.start
_ <- producer.join
_ <- consumer.join
} yield ()
// Rate limiting with Semaphore
val rateLimited = for {
semaphore <- Semaphore[IO](3) // Max 3 concurrent operations
operations = (1 to 10).map { i =>
semaphore.withPermit {
IO.println(s"Starting operation $i") >>
IO.sleep(1.second) >>
IO.println(s"Completed operation $i")
}
}
_ <- operations.toList.parSequence
} yield ()
// Supervised fibers
val supervised = Supervisor[IO].use { supervisor =>
for {
// These fibers will be automatically cleaned up
fiber1 <- supervisor.supervise(IO.sleep(10.seconds).as("Long task"))
fiber2 <- supervisor.supervise(IO.sleep(1.second).as("Short task"))
// If we exit this block, both fibers are canceled
result <- fiber2.joinWithNever
_ <- IO.println(s"Got result: $result")
} yield result
} // fiber1 is automatically canceled here
// Hot-swappable resource
val hotswappable = {
val resource1 = Resource.eval(IO.println("Resource 1 acquired")).as("R1")
val resource2 = Resource.eval(IO.println("Resource 2 acquired")).as("R2")
Hotswap.create(resource1).use { case (initial, hotswap) =>
for {
_ <- IO.println(s"Using initial: $initial")
r2 <- hotswap.swap(resource2)
_ <- IO.println(s"Swapped to: $r2")
} yield r2
}
}
// System integration
val systemIntegration = for {
// Read environment
home <- Env[IO].get("HOME")
_ <- IO.println(s"Home directory: ${home.getOrElse("Not set")}")
// Console I/O
_ <- Console[IO].println("Enter your name:")
name <- Console[IO].readLine
_ <- Console[IO].println(s"Hello, $name!")
// Random operations
random <- Random.scalaUtilRandom[IO]
randomNum <- random.nextIntBounded(100)
uuid <- UUIDGen.randomUUID[IO]
_ <- IO.println(s"Random number: $randomNum, UUID: $uuid")
} yield ()