ZIO's Software Transactional Memory (STM) provides lock-free concurrent programming with composable transactional operations and automatic retry mechanisms for building concurrent data structures without traditional locking.
The core STM effect type for composable, atomic transactions that can be safely retried and combined.
/**
* A transactional effect that can be composed and executed atomically
* - R: Environment required for the transaction
* - E: Error type the transaction can fail with
* - A: Success value type
*/
sealed trait ZSTM[-R, +E, +A] {
/** Execute the transaction atomically, committing all changes */
def commit(implicit trace: Trace): ZIO[R, E, A]
/** Transform the success value */
def map[B](f: A => B): ZSTM[R, E, B]
/** Chain transactions together */
def flatMap[R1 <: R, E1 >: E, B](f: A => ZSTM[R1, E1, B]): ZSTM[R1, E1, B]
/** Transform the error type */
def mapError[E2](f: E => E2): ZSTM[R, E2, A]
/** Handle errors with recovery transactions */
def catchAll[R1 <: R, E2, A1 >: A](h: E => ZSTM[R1, E2, A1]): ZSTM[R1, E2, A1]
/** Provide a fallback transaction if this one fails */
def orElse[R1 <: R, E1, A1 >: A](that: => ZSTM[R1, E1, A1]): ZSTM[R1, E1, A1]
/** Abort current transaction and retry */
def retry: USTM[Nothing]
/** Handle both success and failure cases */
def fold[B](failure: E => B, success: A => B): URSTM[R, B]
/** Combine with another transaction, returning both results */
def zip[R1 <: R, E1 >: E, B](that: => ZSTM[R1, E1, B]): ZSTM[R1, E1, (A, B)]
}
// Type aliases for common STM patterns
type STM[+E, +A] = ZSTM[Any, E, A] // No environment requirements
type USTM[+A] = ZSTM[Any, Nothing, A] // Cannot fail
type URSTM[-R, +A] = ZSTM[R, Nothing, A] // Requires R, cannot failCreate transactional effects from values, failures, and computations.
/**
* Create a transaction that succeeds with the given value
*/
def succeed[A](a: => A): USTM[A]
/**
* Create a transaction that fails with the given error
*/
def fail[E](e: => E): STM[E, Nothing]
/**
* Create a transaction that terminates with a defect
*/
def die(t: => Throwable): USTM[Nothing]
/**
* Create a transaction that aborts and retries
*/
val retry: USTM[Nothing]
/**
* Execute a transaction atomically
*/
def atomically[R, E, A](stm: ZSTM[R, E, A]): ZIO[R, E, A]
/**
* Check a condition and retry if false
*/
def check(p: => Boolean): USTM[Unit]
/**
* Execute effect for each element in a collection within a transaction
*/
def foreach[R, E, A, B](as: Iterable[A])(f: A => ZSTM[R, E, B]): ZSTM[R, E, List[B]]
/**
* Execute all transactions and collect results
*/
def collectAll[R, E, A](stms: Iterable[ZSTM[R, E, A]]): ZSTM[R, E, List[A]]
/**
* Suspend a transaction computation
*/
def suspend[R, E, A](stm: => ZSTM[R, E, A]): ZSTM[R, E, A]Usage Examples:
import zio._
import zio.stm._
// Basic transaction
val basicTransaction = for {
x <- STM.succeed(42)
y <- STM.succeed(58)
} yield x + y
// Execute transaction atomically
val result = basicTransaction.commit
// Conditional retry
val waitForCondition = for {
value <- someRef.get
_ <- STM.check(value > 100) // Retry until value > 100
} yield value
// Complex transaction with error handling
val transferMoney = (from: TRef[Int], to: TRef[Int], amount: Int) => {
for {
fromBalance <- from.get
_ <- STM.check(fromBalance >= amount).orElse(STM.fail("Insufficient funds"))
_ <- from.update(_ - amount)
_ <- to.update(_ + amount)
} yield ()
}Mutable reference that can be safely modified within STM transactions.
/**
* A transactional reference that can be safely modified across concurrent transactions
*/
sealed abstract class TRef[A] {
/** Read the current value */
def get: USTM[A]
/** Set a new value */
def set(a: A): USTM[Unit]
/** Atomically modify the value and return a result */
def modify[B](f: A => (B, A)): USTM[B]
/** Atomically update the value */
def update(f: A => A): USTM[Unit]
/** Update and return the new value */
def updateAndGet(f: A => A): USTM[A]
/** Return the old value and update */
def getAndUpdate(f: A => A): USTM[A]
/** Set new value and return the old value */
def getAndSet(a: A): USTM[A]
/** Conditionally update if current value matches */
def updateSome(f: PartialFunction[A, A]): USTM[Unit]
/** Conditionally modify if current value matches */
def modifySome[B](default: B)(f: PartialFunction[A, (B, A)]): USTM[B]
/** Get current value and conditionally update */
def getAndUpdateSome(f: PartialFunction[A, A]): USTM[A]
/** Conditionally update and get new value */
def updateSomeAndGet(f: PartialFunction[A, A]): USTM[A]
}
object TRef {
/** Create a new transactional reference */
def make[A](a: => A): USTM[TRef[A]]
/** Create and commit a transactional reference */
def makeCommit[A](a: => A): UIO[TRef[A]]
}Usage Examples:
// Shared counter using TRef
val counterProgram = for {
counter <- TRef.makeCommit(0)
// Concurrent increments
_ <- ZIO.foreachParDiscard(1 to 1000) { _ =>
(for {
current <- counter.get
_ <- counter.set(current + 1)
} yield ()).commit
}
final <- counter.get.commit
_ <- Console.printLine(s"Final count: $final")
} yield ()
// Account balance transfer (atomic transaction)
val transfer = for {
from <- TRef.makeCommit(1000)
to <- TRef.makeCommit(500)
// Transfer $200 atomically
_ <- (for {
fromBalance <- from.get
_ <- STM.check(fromBalance >= 200)
_ <- from.update(_ - 200)
_ <- to.update(_ + 200)
} yield ()).commit
fromFinal <- from.get.commit
toFinal <- to.get.commit
_ <- Console.printLine(s"From: $fromFinal, To: $toFinal")
} yield ()
// Complex state management
case class GameState(score: Int, lives: Int, level: Int)
val gameProgram = for {
state <- TRef.makeCommit(GameState(0, 3, 1))
// Atomic game state update
_ <- (for {
current <- state.get
_ <- STM.check(current.lives > 0)
_ <- state.update(s => s.copy(
score = s.score + 100,
level = if (s.score + 100 > 1000) s.level + 1 else s.level
))
} yield ()).commit
final <- state.get.commit
_ <- Console.printLine(s"Game state: $final")
} yield ()Additional transactional data structures for complex concurrent programming needs.
// Transactional Map
sealed abstract class TMap[K, V] {
def get(k: K): USTM[Option[V]]
def put(k: K, v: V): USTM[Unit]
def remove(k: K): USTM[Option[V]]
def contains(k: K): USTM[Boolean]
def size: USTM[Int]
def toList: USTM[List[(K, V)]]
}
// Transactional Set
sealed abstract class TSet[A] {
def contains(a: A): USTM[Boolean]
def put(a: A): USTM[Unit]
def remove(a: A): USTM[Unit]
def size: USTM[Int]
def toList: USTM[List[A]]
}
// Transactional Array
sealed abstract class TArray[A] {
def apply(index: Int): USTM[A]
def update(index: Int, a: A): USTM[Unit]
def length: Int
def toList: USTM[List[A]]
}
// Transactional Queue
sealed abstract class TQueue[A] {
def offer(a: A): USTM[Unit]
def take: USTM[A]
def size: USTM[Int]
def isEmpty: USTM[Boolean]
}
// Transactional Promise
sealed abstract class TPromise[E, A] {
def succeed(a: A): USTM[Boolean]
def fail(e: E): USTM[Boolean]
def await: USTM[A]
def poll: USTM[Option[Exit[E, A]]]
}
// Transactional Semaphore
sealed abstract class TSemaphore {
def acquire: USTM[Unit]
def acquireN(n: Long): USTM[Unit]
def release: USTM[Unit]
def releaseN(n: Long): USTM[Unit]
def available: USTM[Long]
}Usage Examples:
// Transactional cache implementation
val cacheProgram = for {
cache <- TMap.empty[String, String].commit
// Atomic cache operations
_ <- (for {
_ <- cache.put("key1", "value1")
_ <- cache.put("key2", "value2")
size <- cache.size
_ <- STM.check(size <= 1000) // Cache size limit
} yield ()).commit
value <- cache.get("key1").commit
_ <- Console.printLine(s"Cached value: $value")
} yield ()
// Producer-consumer with transactional queue
val producerConsumer = for {
queue <- TQueue.bounded[String](10).commit
// Producer
producer <- ZIO.foreach(1 to 100) { i =>
queue.offer(s"item-$i").commit
}.fork
// Consumer
consumer <- ZIO.foreach(1 to 100) { _ =>
queue.take.commit.flatMap(item => processItem(item))
}.fork
_ <- producer.join
_ <- consumer.join
} yield ()
// Coordinated resource management
val resourceManager = for {
available <- TSemaphore.make(5).commit // 5 available resources
inUse <- TRef.makeCommit(Set.empty[String])
// Acquire resource atomically
acquireResource = (resourceId: String) => (for {
_ <- available.acquire
current <- inUse.get
_ <- STM.check(!current.contains(resourceId))
_ <- inUse.update(_ + resourceId)
} yield ()).commit
// Release resource atomically
releaseResource = (resourceId: String) => (for {
current <- inUse.get
_ <- STM.check(current.contains(resourceId))
_ <- inUse.update(_ - resourceId)
_ <- available.release
} yield ()).commit
} yield (acquireResource, releaseResource)Common patterns for effective use of Software Transactional Memory.
// Optimistic concurrency pattern
val optimisticUpdate = (ref: TRef[Counter]) => {
val increment = for {
counter <- ref.get
_ <- STM.check(counter.version == expectedVersion)
_ <- ref.set(counter.copy(
value = counter.value + 1,
version = counter.version + 1
))
} yield ()
increment.commit.retry(Schedule.exponential(10.millis) && Schedule.recurs(5))
}
// Coordinated state updates
val coordinatedUpdate = (refs: List[TRef[Int]]) => {
val transaction = for {
values <- STM.foreach(refs)(_.get)
total = values.sum
_ <- STM.check(total < 1000) // Business constraint
_ <- STM.foreach(refs)(_.update(_ + 1))
} yield ()
transaction.commit
}
// Conditional waiting pattern
val waitForValue = (ref: TRef[Option[String]]) => {
val waitTransaction = for {
value <- ref.get
result <- value match {
case Some(v) => STM.succeed(v)
case None => STM.retry // Wait until value is available
}
} yield result
waitTransaction.commit
}
// Resource pool implementation
class TransactionalPool[A](resources: TRef[List[A]], maxSize: Int) {
def acquire: UIO[A] = (for {
available <- resources.get
resource <- available match {
case head :: tail =>
resources.set(tail) *> STM.succeed(head)
case Nil =>
STM.retry // Wait for resources to become available
}
} yield resource).commit
def release(resource: A): UIO[Unit] = (for {
current <- resources.get
_ <- STM.check(current.length < maxSize)
_ <- resources.update(resource :: _)
} yield ()).commit
def size: UIO[Int] = resources.get.map(_.length).commit
}Usage Examples:
// Banking system with STM
case class Account(id: String, balance: Int, frozen: Boolean)
class BankingSystem {
private val accounts = TMap.empty[String, TRef[Account]]
def transfer(fromId: String, toId: String, amount: Int): Task[Unit] = {
(for {
fromAccount <- accounts.get(fromId).flatMap {
case Some(ref) => STM.succeed(ref)
case None => STM.fail(s"Account $fromId not found")
}
toAccount <- accounts.get(toId).flatMap {
case Some(ref) => STM.succeed(ref)
case None => STM.fail(s"Account $toId not found")
}
from <- fromAccount.get
to <- toAccount.get
_ <- STM.check(!from.frozen && !to.frozen)
_ <- STM.check(from.balance >= amount)
_ <- fromAccount.set(from.copy(balance = from.balance - amount))
_ <- toAccount.set(to.copy(balance = to.balance + amount))
} yield ()).commit
}
def freezeAccount(accountId: String): Task[Unit] = {
(for {
accountRef <- accounts.get(accountId).flatMap {
case Some(ref) => STM.succeed(ref)
case None => STM.fail(s"Account $accountId not found")
}
account <- accountRef.get
_ <- accountRef.set(account.copy(frozen = true))
} yield ()).commit
}
}