or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application.mdconcurrency.mdcore-effects.mddependency-injection.mderror-handling.mdindex.mdmetrics.mdresource-management.mdservices.mdstm.mdstreams.mdtesting.md
tile.json

stm.mddocs/

Software Transactional Memory

ZIO's Software Transactional Memory (STM) provides lock-free concurrent programming with composable transactional operations and automatic retry mechanisms for building concurrent data structures without traditional locking.

Capabilities

ZSTM - Transactional Effects

The core STM effect type for composable, atomic transactions that can be safely retried and combined.

/**
 * A transactional effect that can be composed and executed atomically
 * - R: Environment required for the transaction
 * - E: Error type the transaction can fail with
 * - A: Success value type
 */
sealed trait ZSTM[-R, +E, +A] {
  /** Execute the transaction atomically, committing all changes */
  def commit(implicit trace: Trace): ZIO[R, E, A]
  
  /** Transform the success value */
  def map[B](f: A => B): ZSTM[R, E, B]
  
  /** Chain transactions together */
  def flatMap[R1 <: R, E1 >: E, B](f: A => ZSTM[R1, E1, B]): ZSTM[R1, E1, B]
  
  /** Transform the error type */
  def mapError[E2](f: E => E2): ZSTM[R, E2, A]
  
  /** Handle errors with recovery transactions */
  def catchAll[R1 <: R, E2, A1 >: A](h: E => ZSTM[R1, E2, A1]): ZSTM[R1, E2, A1]
  
  /** Provide a fallback transaction if this one fails */
  def orElse[R1 <: R, E1, A1 >: A](that: => ZSTM[R1, E1, A1]): ZSTM[R1, E1, A1]
  
  /** Abort current transaction and retry */
  def retry: USTM[Nothing]
  
  /** Handle both success and failure cases */
  def fold[B](failure: E => B, success: A => B): URSTM[R, B]
  
  /** Combine with another transaction, returning both results */
  def zip[R1 <: R, E1 >: E, B](that: => ZSTM[R1, E1, B]): ZSTM[R1, E1, (A, B)]
}

// Type aliases for common STM patterns
type STM[+E, +A] = ZSTM[Any, E, A]           // No environment requirements
type USTM[+A] = ZSTM[Any, Nothing, A]        // Cannot fail
type URSTM[-R, +A] = ZSTM[R, Nothing, A]     // Requires R, cannot fail

STM Factory Methods

Create transactional effects from values, failures, and computations.

/**
 * Create a transaction that succeeds with the given value
 */
def succeed[A](a: => A): USTM[A]

/**
 * Create a transaction that fails with the given error
 */
def fail[E](e: => E): STM[E, Nothing]

/**
 * Create a transaction that terminates with a defect
 */
def die(t: => Throwable): USTM[Nothing]

/**
 * Create a transaction that aborts and retries
 */
val retry: USTM[Nothing]

/**
 * Execute a transaction atomically
 */
def atomically[R, E, A](stm: ZSTM[R, E, A]): ZIO[R, E, A]

/**
 * Check a condition and retry if false
 */
def check(p: => Boolean): USTM[Unit]

/**
 * Execute effect for each element in a collection within a transaction
 */
def foreach[R, E, A, B](as: Iterable[A])(f: A => ZSTM[R, E, B]): ZSTM[R, E, List[B]]

/**
 * Execute all transactions and collect results
 */
def collectAll[R, E, A](stms: Iterable[ZSTM[R, E, A]]): ZSTM[R, E, List[A]]

/**
 * Suspend a transaction computation
 */
def suspend[R, E, A](stm: => ZSTM[R, E, A]): ZSTM[R, E, A]

Usage Examples:

import zio._
import zio.stm._

// Basic transaction
val basicTransaction = for {
  x <- STM.succeed(42)
  y <- STM.succeed(58)
} yield x + y

// Execute transaction atomically
val result = basicTransaction.commit

// Conditional retry
val waitForCondition = for {
  value <- someRef.get
  _     <- STM.check(value > 100)  // Retry until value > 100
} yield value

// Complex transaction with error handling
val transferMoney = (from: TRef[Int], to: TRef[Int], amount: Int) => {
  for {
    fromBalance <- from.get
    _           <- STM.check(fromBalance >= amount).orElse(STM.fail("Insufficient funds"))
    _           <- from.update(_ - amount)
    _           <- to.update(_ + amount)
  } yield ()
}

TRef - Transactional Reference

Mutable reference that can be safely modified within STM transactions.

/**
 * A transactional reference that can be safely modified across concurrent transactions
 */
sealed abstract class TRef[A] {
  /** Read the current value */
  def get: USTM[A]
  
  /** Set a new value */
  def set(a: A): USTM[Unit]
  
  /** Atomically modify the value and return a result */
  def modify[B](f: A => (B, A)): USTM[B]
  
  /** Atomically update the value */
  def update(f: A => A): USTM[Unit]
  
  /** Update and return the new value */
  def updateAndGet(f: A => A): USTM[A]
  
  /** Return the old value and update */
  def getAndUpdate(f: A => A): USTM[A]
  
  /** Set new value and return the old value */
  def getAndSet(a: A): USTM[A]
  
  /** Conditionally update if current value matches */
  def updateSome(f: PartialFunction[A, A]): USTM[Unit]
  
  /** Conditionally modify if current value matches */
  def modifySome[B](default: B)(f: PartialFunction[A, (B, A)]): USTM[B]
  
  /** Get current value and conditionally update */
  def getAndUpdateSome(f: PartialFunction[A, A]): USTM[A]
  
  /** Conditionally update and get new value */
  def updateSomeAndGet(f: PartialFunction[A, A]): USTM[A]
}

object TRef {
  /** Create a new transactional reference */
  def make[A](a: => A): USTM[TRef[A]]
  
  /** Create and commit a transactional reference */  
  def makeCommit[A](a: => A): UIO[TRef[A]]
}

Usage Examples:

// Shared counter using TRef
val counterProgram = for {
  counter <- TRef.makeCommit(0)
  
  // Concurrent increments
  _ <- ZIO.foreachParDiscard(1 to 1000) { _ =>
         (for {
           current <- counter.get
           _       <- counter.set(current + 1)
         } yield ()).commit
       }
       
  final <- counter.get.commit
  _     <- Console.printLine(s"Final count: $final")
} yield ()

// Account balance transfer (atomic transaction)
val transfer = for {
  from <- TRef.makeCommit(1000)
  to   <- TRef.makeCommit(500)
  
  // Transfer $200 atomically
  _ <- (for {
         fromBalance <- from.get
         _           <- STM.check(fromBalance >= 200)
         _           <- from.update(_ - 200)
         _           <- to.update(_ + 200)
       } yield ()).commit
       
  fromFinal <- from.get.commit
  toFinal   <- to.get.commit
  _         <- Console.printLine(s"From: $fromFinal, To: $toFinal")
} yield ()

// Complex state management
case class GameState(score: Int, lives: Int, level: Int)

val gameProgram = for {
  state <- TRef.makeCommit(GameState(0, 3, 1))
  
  // Atomic game state update
  _ <- (for {
         current <- state.get
         _       <- STM.check(current.lives > 0)
         _       <- state.update(s => s.copy(
                      score = s.score + 100,
                      level = if (s.score + 100 > 1000) s.level + 1 else s.level
                    ))
       } yield ()).commit
       
  final <- state.get.commit
  _     <- Console.printLine(s"Game state: $final")
} yield ()

Other STM Data Structures

Additional transactional data structures for complex concurrent programming needs.

// Transactional Map
sealed abstract class TMap[K, V] {
  def get(k: K): USTM[Option[V]]
  def put(k: K, v: V): USTM[Unit]
  def remove(k: K): USTM[Option[V]]
  def contains(k: K): USTM[Boolean]
  def size: USTM[Int]
  def toList: USTM[List[(K, V)]]
}

// Transactional Set  
sealed abstract class TSet[A] {
  def contains(a: A): USTM[Boolean]
  def put(a: A): USTM[Unit]
  def remove(a: A): USTM[Unit]
  def size: USTM[Int]
  def toList: USTM[List[A]]
}

// Transactional Array
sealed abstract class TArray[A] {
  def apply(index: Int): USTM[A]
  def update(index: Int, a: A): USTM[Unit]
  def length: Int
  def toList: USTM[List[A]]
}

// Transactional Queue
sealed abstract class TQueue[A] {
  def offer(a: A): USTM[Unit] 
  def take: USTM[A]
  def size: USTM[Int]
  def isEmpty: USTM[Boolean]
}

// Transactional Promise
sealed abstract class TPromise[E, A] {
  def succeed(a: A): USTM[Boolean]
  def fail(e: E): USTM[Boolean]
  def await: USTM[A]
  def poll: USTM[Option[Exit[E, A]]]
}

// Transactional Semaphore
sealed abstract class TSemaphore {
  def acquire: USTM[Unit]
  def acquireN(n: Long): USTM[Unit]
  def release: USTM[Unit]
  def releaseN(n: Long): USTM[Unit]
  def available: USTM[Long]
}

Usage Examples:

// Transactional cache implementation
val cacheProgram = for {
  cache <- TMap.empty[String, String].commit
  
  // Atomic cache operations
  _ <- (for {
         _     <- cache.put("key1", "value1")
         _     <- cache.put("key2", "value2")
         size  <- cache.size
         _     <- STM.check(size <= 1000) // Cache size limit
       } yield ()).commit
       
  value <- cache.get("key1").commit
  _     <- Console.printLine(s"Cached value: $value")
} yield ()

// Producer-consumer with transactional queue
val producerConsumer = for {
  queue <- TQueue.bounded[String](10).commit
  
  // Producer
  producer <- ZIO.foreach(1 to 100) { i =>
                queue.offer(s"item-$i").commit
              }.fork
              
  // Consumer  
  consumer <- ZIO.foreach(1 to 100) { _ =>
                queue.take.commit.flatMap(item => processItem(item))
              }.fork
              
  _ <- producer.join
  _ <- consumer.join
} yield ()

// Coordinated resource management
val resourceManager = for {
  available <- TSemaphore.make(5).commit  // 5 available resources
  inUse     <- TRef.makeCommit(Set.empty[String])
  
  // Acquire resource atomically
  acquireResource = (resourceId: String) => (for {
    _        <- available.acquire
    current  <- inUse.get  
    _        <- STM.check(!current.contains(resourceId))
    _        <- inUse.update(_ + resourceId)
  } yield ()).commit
  
  // Release resource atomically
  releaseResource = (resourceId: String) => (for {
    current <- inUse.get
    _       <- STM.check(current.contains(resourceId))
    _       <- inUse.update(_ - resourceId)
    _       <- available.release
  } yield ()).commit
  
} yield (acquireResource, releaseResource)

STM Patterns and Best Practices

Common patterns for effective use of Software Transactional Memory.

// Optimistic concurrency pattern
val optimisticUpdate = (ref: TRef[Counter]) => {
  val increment = for {
    counter <- ref.get
    _       <- STM.check(counter.version == expectedVersion)
    _       <- ref.set(counter.copy(
                 value = counter.value + 1,
                 version = counter.version + 1
               ))
  } yield ()
  
  increment.commit.retry(Schedule.exponential(10.millis) && Schedule.recurs(5))
}

// Coordinated state updates
val coordinatedUpdate = (refs: List[TRef[Int]]) => {
  val transaction = for {
    values <- STM.foreach(refs)(_.get)
    total   = values.sum
    _      <- STM.check(total < 1000)  // Business constraint
    _      <- STM.foreach(refs)(_.update(_ + 1))
  } yield ()
  
  transaction.commit
}

// Conditional waiting pattern
val waitForValue = (ref: TRef[Option[String]]) => {
  val waitTransaction = for {
    value <- ref.get
    result <- value match {
      case Some(v) => STM.succeed(v)
      case None    => STM.retry  // Wait until value is available
    }
  } yield result
  
  waitTransaction.commit
}

// Resource pool implementation
class TransactionalPool[A](resources: TRef[List[A]], maxSize: Int) {
  
  def acquire: UIO[A] = (for {
    available <- resources.get
    resource  <- available match {
      case head :: tail => 
        resources.set(tail) *> STM.succeed(head)
      case Nil => 
        STM.retry  // Wait for resources to become available
    }
  } yield resource).commit
  
  def release(resource: A): UIO[Unit] = (for {
    current <- resources.get
    _       <- STM.check(current.length < maxSize)
    _       <- resources.update(resource :: _)
  } yield ()).commit
  
  def size: UIO[Int] = resources.get.map(_.length).commit
}

Usage Examples:

// Banking system with STM
case class Account(id: String, balance: Int, frozen: Boolean)

class BankingSystem {
  private val accounts = TMap.empty[String, TRef[Account]]
  
  def transfer(fromId: String, toId: String, amount: Int): Task[Unit] = {
    (for {
      fromAccount <- accounts.get(fromId).flatMap {
        case Some(ref) => STM.succeed(ref)
        case None => STM.fail(s"Account $fromId not found")
      }
      toAccount <- accounts.get(toId).flatMap {
        case Some(ref) => STM.succeed(ref)
        case None => STM.fail(s"Account $toId not found")
      }
      
      from <- fromAccount.get
      to   <- toAccount.get
      
      _ <- STM.check(!from.frozen && !to.frozen)
      _ <- STM.check(from.balance >= amount)
      
      _ <- fromAccount.set(from.copy(balance = from.balance - amount))
      _ <- toAccount.set(to.copy(balance = to.balance + amount))
      
    } yield ()).commit
  }
  
  def freezeAccount(accountId: String): Task[Unit] = {
    (for {
      accountRef <- accounts.get(accountId).flatMap {
        case Some(ref) => STM.succeed(ref)
        case None => STM.fail(s"Account $accountId not found")
      }
      account <- accountRef.get
      _       <- accountRef.set(account.copy(frozen = true))
    } yield ()).commit
  }
}