0
# Software Transactional Memory
1
2
ZIO's Software Transactional Memory (STM) provides lock-free concurrent programming with composable transactional operations and automatic retry mechanisms for building concurrent data structures without traditional locking.
3
4
## Capabilities
5
6
### ZSTM - Transactional Effects
7
8
The core STM effect type for composable, atomic transactions that can be safely retried and combined.
9
10
```scala { .api }
11
/**
12
* A transactional effect that can be composed and executed atomically
13
* - R: Environment required for the transaction
14
* - E: Error type the transaction can fail with
15
* - A: Success value type
16
*/
17
sealed trait ZSTM[-R, +E, +A] {
18
/** Execute the transaction atomically, committing all changes */
19
def commit(implicit trace: Trace): ZIO[R, E, A]
20
21
/** Transform the success value */
22
def map[B](f: A => B): ZSTM[R, E, B]
23
24
/** Chain transactions together */
25
def flatMap[R1 <: R, E1 >: E, B](f: A => ZSTM[R1, E1, B]): ZSTM[R1, E1, B]
26
27
/** Transform the error type */
28
def mapError[E2](f: E => E2): ZSTM[R, E2, A]
29
30
/** Handle errors with recovery transactions */
31
def catchAll[R1 <: R, E2, A1 >: A](h: E => ZSTM[R1, E2, A1]): ZSTM[R1, E2, A1]
32
33
/** Provide a fallback transaction if this one fails */
34
def orElse[R1 <: R, E1, A1 >: A](that: => ZSTM[R1, E1, A1]): ZSTM[R1, E1, A1]
35
36
/** Abort current transaction and retry */
37
def retry: USTM[Nothing]
38
39
/** Handle both success and failure cases */
40
def fold[B](failure: E => B, success: A => B): URSTM[R, B]
41
42
/** Combine with another transaction, returning both results */
43
def zip[R1 <: R, E1 >: E, B](that: => ZSTM[R1, E1, B]): ZSTM[R1, E1, (A, B)]
44
}
45
46
// Type aliases for common STM patterns
47
type STM[+E, +A] = ZSTM[Any, E, A] // No environment requirements
48
type USTM[+A] = ZSTM[Any, Nothing, A] // Cannot fail
49
type URSTM[-R, +A] = ZSTM[R, Nothing, A] // Requires R, cannot fail
50
```
51
52
### STM Factory Methods
53
54
Create transactional effects from values, failures, and computations.
55
56
```scala { .api }
57
/**
58
* Create a transaction that succeeds with the given value
59
*/
60
def succeed[A](a: => A): USTM[A]
61
62
/**
63
* Create a transaction that fails with the given error
64
*/
65
def fail[E](e: => E): STM[E, Nothing]
66
67
/**
68
* Create a transaction that terminates with a defect
69
*/
70
def die(t: => Throwable): USTM[Nothing]
71
72
/**
73
* Create a transaction that aborts and retries
74
*/
75
val retry: USTM[Nothing]
76
77
/**
78
* Execute a transaction atomically
79
*/
80
def atomically[R, E, A](stm: ZSTM[R, E, A]): ZIO[R, E, A]
81
82
/**
83
* Check a condition and retry if false
84
*/
85
def check(p: => Boolean): USTM[Unit]
86
87
/**
88
* Execute effect for each element in a collection within a transaction
89
*/
90
def foreach[R, E, A, B](as: Iterable[A])(f: A => ZSTM[R, E, B]): ZSTM[R, E, List[B]]
91
92
/**
93
* Execute all transactions and collect results
94
*/
95
def collectAll[R, E, A](stms: Iterable[ZSTM[R, E, A]]): ZSTM[R, E, List[A]]
96
97
/**
98
* Suspend a transaction computation
99
*/
100
def suspend[R, E, A](stm: => ZSTM[R, E, A]): ZSTM[R, E, A]
101
```
102
103
**Usage Examples:**
104
105
```scala
106
import zio._
107
import zio.stm._
108
109
// Basic transaction
110
val basicTransaction = for {
111
x <- STM.succeed(42)
112
y <- STM.succeed(58)
113
} yield x + y
114
115
// Execute transaction atomically
116
val result = basicTransaction.commit
117
118
// Conditional retry
119
val waitForCondition = for {
120
value <- someRef.get
121
_ <- STM.check(value > 100) // Retry until value > 100
122
} yield value
123
124
// Complex transaction with error handling
125
val transferMoney = (from: TRef[Int], to: TRef[Int], amount: Int) => {
126
for {
127
fromBalance <- from.get
128
_ <- STM.check(fromBalance >= amount).orElse(STM.fail("Insufficient funds"))
129
_ <- from.update(_ - amount)
130
_ <- to.update(_ + amount)
131
} yield ()
132
}
133
```
134
135
### TRef - Transactional Reference
136
137
Mutable reference that can be safely modified within STM transactions.
138
139
```scala { .api }
140
/**
141
* A transactional reference that can be safely modified across concurrent transactions
142
*/
143
sealed abstract class TRef[A] {
144
/** Read the current value */
145
def get: USTM[A]
146
147
/** Set a new value */
148
def set(a: A): USTM[Unit]
149
150
/** Atomically modify the value and return a result */
151
def modify[B](f: A => (B, A)): USTM[B]
152
153
/** Atomically update the value */
154
def update(f: A => A): USTM[Unit]
155
156
/** Update and return the new value */
157
def updateAndGet(f: A => A): USTM[A]
158
159
/** Return the old value and update */
160
def getAndUpdate(f: A => A): USTM[A]
161
162
/** Set new value and return the old value */
163
def getAndSet(a: A): USTM[A]
164
165
/** Conditionally update if current value matches */
166
def updateSome(f: PartialFunction[A, A]): USTM[Unit]
167
168
/** Conditionally modify if current value matches */
169
def modifySome[B](default: B)(f: PartialFunction[A, (B, A)]): USTM[B]
170
171
/** Get current value and conditionally update */
172
def getAndUpdateSome(f: PartialFunction[A, A]): USTM[A]
173
174
/** Conditionally update and get new value */
175
def updateSomeAndGet(f: PartialFunction[A, A]): USTM[A]
176
}
177
178
object TRef {
179
/** Create a new transactional reference */
180
def make[A](a: => A): USTM[TRef[A]]
181
182
/** Create and commit a transactional reference */
183
def makeCommit[A](a: => A): UIO[TRef[A]]
184
}
185
```
186
187
**Usage Examples:**
188
189
```scala
190
// Shared counter using TRef
191
val counterProgram = for {
192
counter <- TRef.makeCommit(0)
193
194
// Concurrent increments
195
_ <- ZIO.foreachParDiscard(1 to 1000) { _ =>
196
(for {
197
current <- counter.get
198
_ <- counter.set(current + 1)
199
} yield ()).commit
200
}
201
202
final <- counter.get.commit
203
_ <- Console.printLine(s"Final count: $final")
204
} yield ()
205
206
// Account balance transfer (atomic transaction)
207
val transfer = for {
208
from <- TRef.makeCommit(1000)
209
to <- TRef.makeCommit(500)
210
211
// Transfer $200 atomically
212
_ <- (for {
213
fromBalance <- from.get
214
_ <- STM.check(fromBalance >= 200)
215
_ <- from.update(_ - 200)
216
_ <- to.update(_ + 200)
217
} yield ()).commit
218
219
fromFinal <- from.get.commit
220
toFinal <- to.get.commit
221
_ <- Console.printLine(s"From: $fromFinal, To: $toFinal")
222
} yield ()
223
224
// Complex state management
225
case class GameState(score: Int, lives: Int, level: Int)
226
227
val gameProgram = for {
228
state <- TRef.makeCommit(GameState(0, 3, 1))
229
230
// Atomic game state update
231
_ <- (for {
232
current <- state.get
233
_ <- STM.check(current.lives > 0)
234
_ <- state.update(s => s.copy(
235
score = s.score + 100,
236
level = if (s.score + 100 > 1000) s.level + 1 else s.level
237
))
238
} yield ()).commit
239
240
final <- state.get.commit
241
_ <- Console.printLine(s"Game state: $final")
242
} yield ()
243
```
244
245
### Other STM Data Structures
246
247
Additional transactional data structures for complex concurrent programming needs.
248
249
```scala { .api }
250
// Transactional Map
251
sealed abstract class TMap[K, V] {
252
def get(k: K): USTM[Option[V]]
253
def put(k: K, v: V): USTM[Unit]
254
def remove(k: K): USTM[Option[V]]
255
def contains(k: K): USTM[Boolean]
256
def size: USTM[Int]
257
def toList: USTM[List[(K, V)]]
258
}
259
260
// Transactional Set
261
sealed abstract class TSet[A] {
262
def contains(a: A): USTM[Boolean]
263
def put(a: A): USTM[Unit]
264
def remove(a: A): USTM[Unit]
265
def size: USTM[Int]
266
def toList: USTM[List[A]]
267
}
268
269
// Transactional Array
270
sealed abstract class TArray[A] {
271
def apply(index: Int): USTM[A]
272
def update(index: Int, a: A): USTM[Unit]
273
def length: Int
274
def toList: USTM[List[A]]
275
}
276
277
// Transactional Queue
278
sealed abstract class TQueue[A] {
279
def offer(a: A): USTM[Unit]
280
def take: USTM[A]
281
def size: USTM[Int]
282
def isEmpty: USTM[Boolean]
283
}
284
285
// Transactional Promise
286
sealed abstract class TPromise[E, A] {
287
def succeed(a: A): USTM[Boolean]
288
def fail(e: E): USTM[Boolean]
289
def await: USTM[A]
290
def poll: USTM[Option[Exit[E, A]]]
291
}
292
293
// Transactional Semaphore
294
sealed abstract class TSemaphore {
295
def acquire: USTM[Unit]
296
def acquireN(n: Long): USTM[Unit]
297
def release: USTM[Unit]
298
def releaseN(n: Long): USTM[Unit]
299
def available: USTM[Long]
300
}
301
```
302
303
**Usage Examples:**
304
305
```scala
306
// Transactional cache implementation
307
val cacheProgram = for {
308
cache <- TMap.empty[String, String].commit
309
310
// Atomic cache operations
311
_ <- (for {
312
_ <- cache.put("key1", "value1")
313
_ <- cache.put("key2", "value2")
314
size <- cache.size
315
_ <- STM.check(size <= 1000) // Cache size limit
316
} yield ()).commit
317
318
value <- cache.get("key1").commit
319
_ <- Console.printLine(s"Cached value: $value")
320
} yield ()
321
322
// Producer-consumer with transactional queue
323
val producerConsumer = for {
324
queue <- TQueue.bounded[String](10).commit
325
326
// Producer
327
producer <- ZIO.foreach(1 to 100) { i =>
328
queue.offer(s"item-$i").commit
329
}.fork
330
331
// Consumer
332
consumer <- ZIO.foreach(1 to 100) { _ =>
333
queue.take.commit.flatMap(item => processItem(item))
334
}.fork
335
336
_ <- producer.join
337
_ <- consumer.join
338
} yield ()
339
340
// Coordinated resource management
341
val resourceManager = for {
342
available <- TSemaphore.make(5).commit // 5 available resources
343
inUse <- TRef.makeCommit(Set.empty[String])
344
345
// Acquire resource atomically
346
acquireResource = (resourceId: String) => (for {
347
_ <- available.acquire
348
current <- inUse.get
349
_ <- STM.check(!current.contains(resourceId))
350
_ <- inUse.update(_ + resourceId)
351
} yield ()).commit
352
353
// Release resource atomically
354
releaseResource = (resourceId: String) => (for {
355
current <- inUse.get
356
_ <- STM.check(current.contains(resourceId))
357
_ <- inUse.update(_ - resourceId)
358
_ <- available.release
359
} yield ()).commit
360
361
} yield (acquireResource, releaseResource)
362
```
363
364
### STM Patterns and Best Practices
365
366
Common patterns for effective use of Software Transactional Memory.
367
368
```scala { .api }
369
// Optimistic concurrency pattern
370
val optimisticUpdate = (ref: TRef[Counter]) => {
371
val increment = for {
372
counter <- ref.get
373
_ <- STM.check(counter.version == expectedVersion)
374
_ <- ref.set(counter.copy(
375
value = counter.value + 1,
376
version = counter.version + 1
377
))
378
} yield ()
379
380
increment.commit.retry(Schedule.exponential(10.millis) && Schedule.recurs(5))
381
}
382
383
// Coordinated state updates
384
val coordinatedUpdate = (refs: List[TRef[Int]]) => {
385
val transaction = for {
386
values <- STM.foreach(refs)(_.get)
387
total = values.sum
388
_ <- STM.check(total < 1000) // Business constraint
389
_ <- STM.foreach(refs)(_.update(_ + 1))
390
} yield ()
391
392
transaction.commit
393
}
394
395
// Conditional waiting pattern
396
val waitForValue = (ref: TRef[Option[String]]) => {
397
val waitTransaction = for {
398
value <- ref.get
399
result <- value match {
400
case Some(v) => STM.succeed(v)
401
case None => STM.retry // Wait until value is available
402
}
403
} yield result
404
405
waitTransaction.commit
406
}
407
408
// Resource pool implementation
409
class TransactionalPool[A](resources: TRef[List[A]], maxSize: Int) {
410
411
def acquire: UIO[A] = (for {
412
available <- resources.get
413
resource <- available match {
414
case head :: tail =>
415
resources.set(tail) *> STM.succeed(head)
416
case Nil =>
417
STM.retry // Wait for resources to become available
418
}
419
} yield resource).commit
420
421
def release(resource: A): UIO[Unit] = (for {
422
current <- resources.get
423
_ <- STM.check(current.length < maxSize)
424
_ <- resources.update(resource :: _)
425
} yield ()).commit
426
427
def size: UIO[Int] = resources.get.map(_.length).commit
428
}
429
```
430
431
**Usage Examples:**
432
433
```scala
434
// Banking system with STM
435
case class Account(id: String, balance: Int, frozen: Boolean)
436
437
class BankingSystem {
438
private val accounts = TMap.empty[String, TRef[Account]]
439
440
def transfer(fromId: String, toId: String, amount: Int): Task[Unit] = {
441
(for {
442
fromAccount <- accounts.get(fromId).flatMap {
443
case Some(ref) => STM.succeed(ref)
444
case None => STM.fail(s"Account $fromId not found")
445
}
446
toAccount <- accounts.get(toId).flatMap {
447
case Some(ref) => STM.succeed(ref)
448
case None => STM.fail(s"Account $toId not found")
449
}
450
451
from <- fromAccount.get
452
to <- toAccount.get
453
454
_ <- STM.check(!from.frozen && !to.frozen)
455
_ <- STM.check(from.balance >= amount)
456
457
_ <- fromAccount.set(from.copy(balance = from.balance - amount))
458
_ <- toAccount.set(to.copy(balance = to.balance + amount))
459
460
} yield ()).commit
461
}
462
463
def freezeAccount(accountId: String): Task[Unit] = {
464
(for {
465
accountRef <- accounts.get(accountId).flatMap {
466
case Some(ref) => STM.succeed(ref)
467
case None => STM.fail(s"Account $accountId not found")
468
}
469
account <- accountRef.get
470
_ <- accountRef.set(account.copy(frozen = true))
471
} yield ()).commit
472
}
473
}
474
```