Runtime configuration, execution contexts, application patterns, and unsafe operations for running IO computations. This covers the runtime system that powers Cats Effect and patterns for building applications.
The runtime system that executes IO computations with configurable thread pools and execution policies.
/**
* Runtime for executing IO computations
*/
final class IORuntime {
/** Execute IO synchronously, blocking until completion */
def unsafeRunSync[A](io: IO[A]): A
/** Execute IO asynchronously with callback */
def unsafeRunAsync[A](io: IO[A])(cb: Either[Throwable, A] => Unit): Unit
/** Execute IO with timeout */
def unsafeRunTimed[A](io: IO[A], timeout: FiniteDuration): Option[A]
/** Shutdown the runtime and all thread pools */
def shutdown(): Unit
/** Runtime configuration */
val config: IORuntimeConfig
/** Compute thread pool for CPU-bound work */
val compute: ExecutionContext
/** Blocking thread pool for blocking operations */
val blocking: ExecutionContext
/** Scheduler for time-based operations */
val scheduler: Scheduler
/** Shutdown hook */
val shutdown: () => Unit
/** Fiber monitoring system */
val fiberMonitor: FiberMonitor
/** Runtime performance metrics */
def metrics: Option[IORuntimeMetrics]
}
/**
* Global runtime instance (automatically initialized)
*/
def IORuntime.global: IORuntime
/**
* Create custom runtime builder
* @returns IORuntimeBuilder for custom configuration
*/
def IORuntime.builder(): IORuntimeBuilderConfiguration options for the IO runtime system.
/**
* Configuration for IORuntime behavior
*/
final case class IORuntimeConfig(
/** How often to check for cancellation */
cancelationCheckThreshold: Int,
/** Threshold for automatic yielding */
autoYieldThreshold: Int,
/** Enable enhanced exception stack traces */
enhancedExceptions: Boolean,
/** Size of tracing buffer */
traceBufferSize: Int,
/** Timeout for shutdown hooks */
shutdownHookTimeout: FiniteDuration,
/** Whether to report unhandled fiber errors */
reportUnhandledFiberErrors: Boolean,
/** CPU starvation detection settings */
cpuStarvationCheckInitialDelay: FiniteDuration,
cpuStarvationCheckInterval: FiniteDuration,
cpuStarvationCheckThreshold: Double
)
/**
* Default runtime configuration
*/
def IORuntimeConfig.default: IORuntimeConfig
/**
* Builder for custom runtime configuration
*/
sealed abstract class IORuntimeBuilder {
/** Set compute thread pool */
def setCompute(compute: ExecutionContext): IORuntimeBuilder
/** Set blocking thread pool */
def setBlocking(blocking: ExecutionContext): IORuntimeBuilder
/** Set scheduler */
def setScheduler(scheduler: Scheduler): IORuntimeBuilder
/** Set shutdown hook */
def setShutdownHook(shutdown: () => Unit): IORuntimeBuilder
/** Set runtime configuration */
def setConfig(config: IORuntimeConfig): IORuntimeBuilder
/** Set fiber monitor */
def setFiberMonitor(fiberMonitor: FiberMonitor): IORuntimeBuilder
/** Build the runtime */
def build(): (IORuntime, () => Unit)
}Standard patterns for building applications with Cats Effect.
/**
* Main application trait with managed runtime
*/
trait IOApp {
/** Application entry point */
def run(args: List[String]): IO[ExitCode]
/** Override to customize runtime */
def runtimeConfig: IORuntimeConfig = IORuntimeConfig.default
/** Standard main method (calls run) */
final def main(args: Array[String]): Unit
}
/**
* Simplified IOApp without command line arguments
*/
object IOApp {
trait Simple extends IOApp {
/** Simplified run method */
def run: IO[Unit]
/** Delegates to parameterless run */
final def run(args: List[String]): IO[ExitCode] =
run.as(ExitCode.Success)
}
}
/**
* Application with resource management
*/
trait ResourceApp {
/** Application logic with managed resources */
def run(args: List[String]): Resource[IO, ExitCode]
/** Override to customize runtime */
def runtimeConfig: IORuntimeConfig = IORuntimeConfig.default
}
/**
* Simplified ResourceApp
*/
object ResourceApp {
trait Simple extends ResourceApp {
/** Simplified run with resources */
def run: Resource[IO, Unit]
/** Delegates to parameterless run */
final def run(args: List[String]): Resource[IO, ExitCode] =
run.as(ExitCode.Success)
}
}Process exit codes for application termination.
/**
* Process exit codes
*/
abstract class ExitCode {
/** Integer exit code value */
def code: Int
}
object ExitCode {
/** Successful exit (code 0) */
case object Success extends ExitCode { val code = 0 }
/** Error exit (code 1) */
case object Error extends ExitCode { val code = 1 }
/** Create exit code from integer */
def apply(code: Int): ExitCode
}Direct execution methods that bypass the safe IO abstraction.
/**
* Execute IO synchronously (unsafe - can block)
* @param io - IO to execute
* @param runtime - Runtime for execution
* @returns Result value
* @throws Any exception from the IO computation
*/
def unsafeRunSync[A](io: IO[A])(implicit runtime: IORuntime): A
/**
* Convert IO to Future (unsafe - loses cancellation)
* @param io - IO to convert
* @param runtime - Runtime for execution
* @returns Future with result
*/
def unsafeToFuture[A](io: IO[A])(implicit runtime: IORuntime): Future[A]
/**
* Execute IO asynchronously with callback (unsafe)
* @param io - IO to execute
* @param cb - Callback for completion
* @param runtime - Runtime for execution
*/
def unsafeRunAsync[A](io: IO[A])(cb: Either[Throwable, A] => Unit)(implicit runtime: IORuntime): Unit
/**
* Execute with timeout (unsafe - can block)
* @param io - IO to execute
* @param timeout - Maximum execution time
* @param runtime - Runtime for execution
* @returns Some(result) if completed, None if timed out
*/
def unsafeRunTimed[A](io: IO[A], timeout: FiniteDuration)(implicit runtime: IORuntime): Option[A]
/**
* Fire-and-forget execution (unsafe - no error handling)
* @param io - IO to execute
* @param runtime - Runtime for execution
*/
def unsafeRunAndForget[A](io: IO[A])(implicit runtime: IORuntime): UnitLow-level scheduler interface for time-based operations.
/**
* Scheduler for time-based operations
*/
trait Scheduler {
/** Schedule delayed execution */
def sleep(delay: FiniteDuration, task: Runnable): Runnable
/** Current time in milliseconds */
def nowMillis(): Long
/** Monotonic time in nanoseconds */
def monotonicNanos(): Long
}
/**
* Default scheduler implementations
*/
object Scheduler {
/** Create single-threaded scheduler */
def createDefaultScheduler(threadName: String = "cats-effect-scheduler"): (Scheduler, () => Unit)
}Performance monitoring and metrics collection.
/**
* Runtime performance metrics
*/
trait IORuntimeMetrics {
/** Worker thread pool metrics */
def workerThreadCount(): Int
def workerThreadActiveCount(): Int
def workerThreadIdleCount(): Int
/** Task queue metrics */
def globalQueueTotalCount(): Long
def localQueueTotalCount(): Long
/** CPU starvation detection */
def cpuStarvationCount(): Long
def cpuStarvationWarnThreshold(): FiniteDuration
}
/**
* CPU starvation monitoring
*/
trait CpuStarvationWarningMetrics {
/** Check if CPU starvation detection is enabled */
def isEnabled(): Boolean
/** Current starvation check interval */
def checkInterval(): FiniteDuration
/** Threshold for starvation warnings */
def starvationThreshold(): FiniteDuration
}Fiber-local variables for contextual information.
/**
* Fiber-local storage for contextual data
*/
sealed trait IOLocal[A] {
/** Get current value */
def get: IO[A]
/** Set value in current scope */
def set(value: A): IO[Unit]
/** Reset to default value in current scope */
def reset: IO[Unit]
/** Update value in current scope */
def update(f: A => A): IO[Unit]
/** Modify value in current scope */
def modify[B](f: A => (A, B)): IO[B]
/** Get value with default fallback */
def getOrElse(default: A): IO[A]
/** Execute IO with scoped value */
def scope(value: A): IO ~> IO
}
/**
* Create fiber-local variable
* @param default - Default value for new fibers
* @returns IO[IOLocal[A]] with default value
*/
def IOLocal[A](default: A): IO[IOLocal[A]]Usage Examples:
import cats.effect._
import scala.concurrent.duration._
// Basic IOApp
object MyApp extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
for {
_ <- IO.println(s"Hello! Args: ${args.mkString(" ")}")
_ <- IO.println("Press enter to continue...")
_ <- IO.readLine
_ <- IO.println("Goodbye!")
} yield ExitCode.Success
}
}
// ResourceApp with database connection
object DatabaseApp extends ResourceApp {
case class Database(url: String) {
def close(): IO[Unit] = IO.println(s"Closing database: $url")
}
val dbResource = Resource.make(
IO.println("Opening database") >> IO.pure(Database("jdbc:h2:mem:test"))
)(db => db.close())
def run(args: List[String]): Resource[IO, ExitCode] = {
dbResource.evalMap { db =>
for {
_ <- IO.println(s"Using database: ${db.url}")
_ <- IO.println("Running database operations...")
_ <- IO.sleep(1.second)
_ <- IO.println("Database operations complete")
} yield ExitCode.Success
}
}
}
// Custom runtime configuration
object CustomRuntimeApp extends IOApp {
override def runtimeConfig: IORuntimeConfig =
IORuntimeConfig.default.copy(
enhancedExceptions = true,
cpuStarvationCheckInterval = 1.second
)
def run(args: List[String]): IO[ExitCode] = {
IO.println("Running with custom runtime config").as(ExitCode.Success)
}
}
// Using IOLocal for request tracing
val tracingExample = for {
requestId <- IOLocal[String]("unknown")
// Set request ID for this request
_ <- requestId.set("req-12345")
// Simulate request processing
_ <- processRequest.flatMap { result =>
requestId.get.flatMap { id =>
IO.println(s"Request $id completed with result: $result")
}
}
} yield ()
def processRequest: IO[String] = {
// The request ID is available in any IO within this fiber
IO.pure("success")
}
// Manual runtime usage (advanced)
val manualRuntime = {
val (runtime, shutdown) = IORuntime.builder()
.setConfig(IORuntimeConfig.default.copy(enhancedExceptions = true))
.build()
try {
val result = runtime.unsafeRunSync(IO.println("Hello from custom runtime!"))
result
} finally {
shutdown()
}
}
// Integration with Future-based code
def integrateWithFutures(implicit runtime: IORuntime): Future[String] = {
val ioComputation = for {
_ <- IO.println("Starting IO computation")
_ <- IO.sleep(1.second)
result <- IO.pure("IO Result")
_ <- IO.println("IO computation complete")
} yield result
// Convert to Future for integration with existing code
ioComputation.unsafeToFuture()
}
// Dispatcher for callback integration
val callbackIntegration = Dispatcher.parallel[IO].use { dispatcher =>
IO.async[String] { callback =>
IO.delay {
// Register callback with external library
someCallbackBasedLibrary.doAsync { result =>
// Use dispatcher to run IO from callback
dispatcher.unsafeRunAndForget(
IO.println(s"Got callback result: $result") >>
IO.delay(callback(Right(result)))
)
}
// Return cancellation token
Some(IO.delay(someCallbackBasedLibrary.cancel()))
}
}
}