ZIO is a zero-dependency Scala library for asynchronous and concurrent programming with comprehensive effect system.
ZIO provides a comprehensive error model with rich failure causes and composable retry/repeat scheduling for building resilient applications that gracefully handle failures and recover automatically.
ZIO's Cause type provides a rich representation of failure modes including typed errors, defects, and interruptions.
/**
* Represents the cause of a ZIO effect failure with rich error information
*/
sealed abstract class Cause[+E] {
/** Extract all typed failures from this cause */
def failures: List[E]
/** Extract all defects (untyped failures) from this cause */
def defects: List[Throwable]
/** Extract all fiber IDs that caused interruption */
def interruptors: Set[FiberId]
/** Get the first failure if any */
def failureOption: Option[E]
/** Get the first defect if any */
def dieOption: Option[Throwable]
/** Get the first interruption cause if any */
def interruptOption: Option[FiberId]
/** Extract either the failure or the remaining cause */
def failureOrCause: Either[E, Cause[Nothing]]
}Create and combine causes to represent complex failure scenarios.
/**
* Combine two causes in parallel (both happened simultaneously)
*/
def &&[E1 >: E](that: Cause[E1]): Cause[E1]
/**
* Combine two causes sequentially (second happened after first)
*/
def ++[E1 >: E](that: Cause[E1]): Cause[E1]
/**
* Transform the typed error part of the cause
*/
def map[E1](f: E => E1): Cause[E1]
/**
* FlatMap over the cause structure
*/
def flatMap[E2](f: E => Cause[E2]): Cause[E2]
/**
* Replace all failures with a constant error
*/
def as[E1](e: => E1): Cause[E1]
// Factory methods
object Cause {
/** Empty cause (no failure) */
val empty: Cause[Nothing]
/** Create a cause from a typed failure */
def fail[E](error: E, trace: StackTrace = StackTrace.none): Cause[E]
/** Create a cause from a defect (untyped failure) */
def die(defect: Throwable, trace: StackTrace = StackTrace.none): Cause[Nothing]
/** Create a cause from fiber interruption */
def interrupt(fiberId: FiberId, trace: StackTrace = StackTrace.none): Cause[Nothing]
}Usage Examples:
import zio._
// Creating different types of causes
val typedFailure = Cause.fail("Invalid input")
val defectCause = Cause.die(new RuntimeException("Unexpected error"))
val interruptCause = Cause.interrupt(FiberId.make(123))
// Combining causes
val combinedCause = Cause.fail("Error 1") && Cause.fail("Error 2")
val sequentialCause = Cause.fail("First error") ++ Cause.die(new Exception("Then this"))
// Analyzing cause structure
val analyzeFailure = (cause: Cause[String]) => {
val failures = cause.failures
val defects = cause.defects
val interruptions = cause.interruptors
s"Failures: ${failures.mkString(", ")}, Defects: ${defects.size}, Interruptions: ${interruptions.size}"
}Query and filter causes to understand and handle specific failure types.
/**
* Check if the cause is empty (no failures)
*/
def isEmpty: Boolean
/**
* Check if cause contains typed failures
*/
def isFailure: Boolean
/**
* Check if cause contains defects
*/
def isDie: Boolean
/**
* Check if cause contains interruptions
*/
def isInterrupted: Boolean
/**
* Check if cause contains only interruptions
*/
def isInterruptedOnly: Boolean
/**
* Keep only defects, removing failures and interruptions
*/
def keepDefects: Option[Cause[Nothing]]
/**
* Remove all typed failures
*/
def stripFailures: Cause[Nothing]
/**
* Remove specific defects matching a partial function
*/
def stripSomeDefects(pf: PartialFunction[Throwable, Any]): Option[Cause[E]]
/**
* Filter the cause structure
*/
def filter(p: Cause[E] => Boolean): Cause[E]Usage Examples:
// Cause analysis and filtering
val handleCause = (cause: Cause[AppError]) => {
if (cause.isInterruptedOnly) {
Console.printLine("Operation was cancelled")
} else if (cause.isFailure) {
Console.printLine(s"Business logic error: ${cause.failures.head}")
} else if (cause.isDie) {
Console.printLineError(s"System error: ${cause.defects.head.getMessage}")
} else {
Console.printLine("No errors occurred")
}
}
// Selective error handling
val recoverableErrors = cause.stripSomeDefects {
case _: TimeoutException => () // Remove timeout defects
case _: IOException => () // Remove IO defects
}Tools for debugging and presenting cause information to users and developers.
/**
* Get the stack trace associated with this cause
*/
def trace: StackTrace
/**
* Get all stack traces in the cause
*/
def traces: List[StackTrace]
/**
* Add a stack trace to the cause
*/
def traced(trace: StackTrace): Cause[E]
/**
* Remove stack traces from the cause
*/
def untraced: Cause[E]
/**
* Convert cause to a human-readable string
*/
def prettyPrint: String
/**
* Convert cause to a single Throwable (for interop)
*/
def squash(implicit ev: E IsSubtypeOfError Throwable): Throwable
/**
* Convert cause to Throwable using custom function
*/
def squashWith(f: E => Throwable): Throwable
/**
* Get annotations attached to the cause
*/
def annotations: Map[String, String]
/**
* Get execution spans in the cause
*/
def spans: List[LogSpan]
/**
* Add annotations to the cause
*/
def annotated(anns: Map[String, String]): Cause[E]
/**
* Add execution spans to the cause
*/
def spanned(spans: List[LogSpan]): Cause[E]Usage Examples:
// Debug information extraction
val debugFailure = (cause: Cause[String]) => for {
_ <- Console.printLineError("=== Failure Analysis ===")
_ <- Console.printLineError(cause.prettyPrint)
_ <- Console.printLineError(s"Stack traces: ${cause.traces.size}")
_ <- Console.printLineError(s"Annotations: ${cause.annotations}")
} yield ()
// Convert for external systems
val convertToThrowable = (cause: Cause[AppError]) => {
cause.squashWith {
case ValidationError(msg) => new IllegalArgumentException(msg)
case DatabaseError(msg) => new SQLException(msg)
case NetworkError(msg) => new IOException(msg)
}
}Composable scheduling policies for retry and repeat operations with various timing strategies.
/**
* A composable schedule for retrying and repeating operations
* - Env: Environment required for scheduling decisions
* - In: Input type (usually the error type for retries)
* - Out: Output type (usually the retry count or delay)
*/
trait Schedule[-Env, -In, +Out] {
type State
/** Initial state of the schedule */
def initial: State
/** Determine the next step given current time, input, and state */
def step(now: OffsetDateTime, in: In, state: State): ZIO[Env, Nothing, (State, Out, Decision)]
}
/**
* Decision whether to continue or stop scheduling
*/
sealed trait Decision
object Decision {
case class Continue(interval: Intervals) extends Decision
case object Done extends Decision
}Combine schedules using various operators to create complex retry policies.
/**
* Intersection - run both schedules and continue only if both want to continue
*/
def &&[Env1 <: Env, In1 <: In, Out2](that: Schedule[Env1, In1, Out2]): Schedule[Env1, In1, (Out, Out2)]
/**
* Union - run both schedules and continue if either wants to continue
*/
def ||[Env1 <: Env, In1 <: In, Out2](that: Schedule[Env1, In1, Out2]): Schedule[Env1, In1, (Out, Out2)]
/**
* Sequential composition - run this schedule, then that schedule
*/
def ++[Env1 <: Env, In1 <: In, Out2 >: Out](that: Schedule[Env1, In1, Out2]): Schedule[Env1, In1, Out2]
/**
* Compose outputs - pipe output of this schedule as input to next schedule
*/
def >>>[Env1 <: Env, Out2](that: Schedule[Env1, Out, Out2]): Schedule[Env1, In, Out2]
/**
* Either composition - try this schedule, fallback to that schedule
*/
def |||[Env1 <: Env, In1 <: In, Out2](that: Schedule[Env1, In1, Out2]): Schedule[Env1, In1, Either[Out, Out2]]Usage Examples:
// Complex retry policy
val resilientRetry =
Schedule.exponential(100.millis) // Exponential backoff
.jittered // Add randomness
&& Schedule.recurs(5) // Max 5 attempts
&& Schedule.recurWhile[Throwable](_.isInstanceOf[TemporaryException])
// Fallback strategy
val retryWithFallback =
Schedule.exponential(1.second) && Schedule.recurs(3) // Primary strategy
++ Schedule.spaced(30.seconds) && Schedule.recurs(2) // Fallback strategy
// Complex condition-based retry
val smartRetry =
Schedule.recurWhile[DatabaseError] {
case ConnectionTimeout => true
case DeadlockDetected => true
case ConstraintViolation => false
} && Schedule.exponential(500.millis).jitteredPre-built schedules for common retry and repeat scenarios.
// Basic schedules
/** Run forever */
val forever: Schedule[Any, Any, Long]
/** Run exactly once */
def once: Schedule[Any, Any, Unit]
/** Run n times */
def recurs(n: Long): Schedule[Any, Any, Long]
/** Always succeed with constant value */
def succeed[A](a: => A): Schedule[Any, Any, A]
/** Identity schedule (pass input as output) */
def identity[A]: Schedule[Any, A, A]
// Time-based schedules
/** Fixed duration delay */
def duration(duration: Duration): Schedule[Any, Any, Duration]
/** Fixed interval spacing */
def spaced(duration: Duration): Schedule[Any, Any, Long]
/** Fixed rate (accounting for execution time) */
def fixed(interval: Duration): Schedule[Any, Any, Long]
/** Exponential backoff */
def exponential(base: Duration, factor: Double = 2.0): Schedule[Any, Any, Duration]
/** Fibonacci sequence delays */
def fibonacci(one: Duration): Schedule[Any, Any, Duration]
/** Linear increasing delays */
def linear(base: Duration): Schedule[Any, Any, Duration]
// Conditional schedules
/** Continue while predicate is true */
def recurWhile[A](f: A => Boolean): Schedule[Any, A, A]
/** Continue until predicate is true */
def recurUntil[A](f: A => Boolean): Schedule[Any, A, A]
/** Collect inputs while predicate is true */
def collectWhile[A](f: A => Boolean): Schedule[Any, A, Chunk[A]]
/** Collect inputs until predicate is true */
def collectUntil[A](f: A => Boolean): Schedule[Any, A, Chunk[A]]Usage Examples:
// Common retry patterns
val httpRetry = httpRequest.retry(
Schedule.exponential(100.millis) && Schedule.recurs(3)
)
val databaseRetry = databaseQuery.retry(
Schedule.fibonacci(1.second) && Schedule.recurs(5)
)
val periodicTask = taskExecution.repeat(
Schedule.fixed(30.minutes)
)
// Smart conditional retry
val apiRetry = apiCall.retry(
Schedule.recurWhile[HttpError] {
case HttpError(code) if code >= 500 => true // Server errors
case HttpError(429) => true // Rate limited
case _ => false // Client errors
} && Schedule.exponential(1.second).jittered && Schedule.recurs(3)
)Transform and customize schedules for specific use cases and requirements.
/**
* Transform the output of the schedule
*/
def map[Out2](f: Out => Out2): Schedule[Env, In, Out2]
/**
* Transform output using a ZIO effect
*/
def mapZIO[Env1 <: Env, Out2](f: Out => URIO[Env1, Out2]): Schedule[Env1, In, Out2]
/**
* Transform the input to the schedule
*/
def contramap[In2](f: In2 => In): Schedule[Env, In2, Out]
/**
* Add delay to each schedule decision
*/
def addDelay(f: Out => Duration): Schedule[Env, In, Out]
/**
* Modify delays in the schedule
*/
def delayed(f: Duration => Duration): Schedule[Env, In, Out]
/**
* Add random jitter to delays
*/
def jittered: Schedule[Env, In, Out]
/**
* Add custom jitter
*/
def jittered(min: Double, max: Double): Schedule[Env, In, Out]
/**
* Continue while input satisfies predicate
*/
def whileInput[In1 <: In](f: In1 => Boolean): Schedule[Env, In1, Out]
/**
* Continue while output satisfies predicate
*/
def whileOutput(f: Out => Boolean): Schedule[Env, In, Out]
/**
* Continue until input satisfies predicate
*/
def untilInput[In1 <: In](f: In1 => Boolean): Schedule[Env, In1, Out]
/**
* Continue until output satisfies predicate
*/
def untilOutput(f: Out => Boolean): Schedule[Env, In, Out]
/**
* Collect all outputs
*/
def collectAll: Schedule[Env, In, Chunk[Out]]
/**
* Fold over outputs with an accumulator
*/
def fold[Z](z: Z)(f: (Z, Out) => Z): Schedule[Env, In, Z]
/**
* Count the number of repetitions
*/
def repetitions: Schedule[Env, In, Long]Usage Examples:
// Custom schedule transformations
val adaptiveRetry = Schedule.exponential(100.millis)
.whileOutput(_ < 30.seconds) // Cap maximum delay
.jittered(0.1, 0.2) // 10-20% jitter
.repetitions // Track attempt count
// Conditional scheduling with state
val circuitBreakerSchedule = Schedule.recurs(3)
.whileInput[ServiceError] {
case ServiceUnavailable => true
case CircuitOpen => false
case _ => true
}
// Accumulating retry information
val retryWithLogging = Schedule.exponential(1.second)
.fold(List.empty[Attempt]) { (attempts, delay) =>
Attempt(System.currentTimeMillis(), delay) :: attempts
}
// Rate limiting schedule
val rateLimitedSchedule = Schedule.fixed(100.millis)
.whileOutput(_ => !rateLimitExceeded())
.mapZIO(_ => checkRateLimit())Install with Tessl CLI
npx tessl i tessl/maven-dev-zio--zio-2-12