or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdconcurrency.mdcore-effects.mddependency-injection.mderror-handling.mdindex.mdmetrics.mdresource-management.mdservices.mdstm.mdstreams.mdtesting.md

stm.mddocs/

0

# Software Transactional Memory

1

2

ZIO's Software Transactional Memory (STM) provides lock-free concurrent programming with composable transactional operations and automatic retry mechanisms for building concurrent data structures without traditional locking.

3

4

## Capabilities

5

6

### ZSTM - Transactional Effects

7

8

The core STM effect type for composable, atomic transactions that can be safely retried and combined.

9

10

```scala { .api }

11

/**

12

* A transactional effect that can be composed and executed atomically

13

* - R: Environment required for the transaction

14

* - E: Error type the transaction can fail with

15

* - A: Success value type

16

*/

17

sealed trait ZSTM[-R, +E, +A] {

18

/** Execute the transaction atomically, committing all changes */

19

def commit(implicit trace: Trace): ZIO[R, E, A]

20

21

/** Transform the success value */

22

def map[B](f: A => B): ZSTM[R, E, B]

23

24

/** Chain transactions together */

25

def flatMap[R1 <: R, E1 >: E, B](f: A => ZSTM[R1, E1, B]): ZSTM[R1, E1, B]

26

27

/** Transform the error type */

28

def mapError[E2](f: E => E2): ZSTM[R, E2, A]

29

30

/** Handle errors with recovery transactions */

31

def catchAll[R1 <: R, E2, A1 >: A](h: E => ZSTM[R1, E2, A1]): ZSTM[R1, E2, A1]

32

33

/** Provide a fallback transaction if this one fails */

34

def orElse[R1 <: R, E1, A1 >: A](that: => ZSTM[R1, E1, A1]): ZSTM[R1, E1, A1]

35

36

/** Abort current transaction and retry */

37

def retry: USTM[Nothing]

38

39

/** Handle both success and failure cases */

40

def fold[B](failure: E => B, success: A => B): URSTM[R, B]

41

42

/** Combine with another transaction, returning both results */

43

def zip[R1 <: R, E1 >: E, B](that: => ZSTM[R1, E1, B]): ZSTM[R1, E1, (A, B)]

44

}

45

46

// Type aliases for common STM patterns

47

type STM[+E, +A] = ZSTM[Any, E, A] // No environment requirements

48

type USTM[+A] = ZSTM[Any, Nothing, A] // Cannot fail

49

type URSTM[-R, +A] = ZSTM[R, Nothing, A] // Requires R, cannot fail

50

```

51

52

### STM Factory Methods

53

54

Create transactional effects from values, failures, and computations.

55

56

```scala { .api }

57

/**

58

* Create a transaction that succeeds with the given value

59

*/

60

def succeed[A](a: => A): USTM[A]

61

62

/**

63

* Create a transaction that fails with the given error

64

*/

65

def fail[E](e: => E): STM[E, Nothing]

66

67

/**

68

* Create a transaction that terminates with a defect

69

*/

70

def die(t: => Throwable): USTM[Nothing]

71

72

/**

73

* Create a transaction that aborts and retries

74

*/

75

val retry: USTM[Nothing]

76

77

/**

78

* Execute a transaction atomically

79

*/

80

def atomically[R, E, A](stm: ZSTM[R, E, A]): ZIO[R, E, A]

81

82

/**

83

* Check a condition and retry if false

84

*/

85

def check(p: => Boolean): USTM[Unit]

86

87

/**

88

* Execute effect for each element in a collection within a transaction

89

*/

90

def foreach[R, E, A, B](as: Iterable[A])(f: A => ZSTM[R, E, B]): ZSTM[R, E, List[B]]

91

92

/**

93

* Execute all transactions and collect results

94

*/

95

def collectAll[R, E, A](stms: Iterable[ZSTM[R, E, A]]): ZSTM[R, E, List[A]]

96

97

/**

98

* Suspend a transaction computation

99

*/

100

def suspend[R, E, A](stm: => ZSTM[R, E, A]): ZSTM[R, E, A]

101

```

102

103

**Usage Examples:**

104

105

```scala

106

import zio._

107

import zio.stm._

108

109

// Basic transaction

110

val basicTransaction = for {

111

x <- STM.succeed(42)

112

y <- STM.succeed(58)

113

} yield x + y

114

115

// Execute transaction atomically

116

val result = basicTransaction.commit

117

118

// Conditional retry

119

val waitForCondition = for {

120

value <- someRef.get

121

_ <- STM.check(value > 100) // Retry until value > 100

122

} yield value

123

124

// Complex transaction with error handling

125

val transferMoney = (from: TRef[Int], to: TRef[Int], amount: Int) => {

126

for {

127

fromBalance <- from.get

128

_ <- STM.check(fromBalance >= amount).orElse(STM.fail("Insufficient funds"))

129

_ <- from.update(_ - amount)

130

_ <- to.update(_ + amount)

131

} yield ()

132

}

133

```

134

135

### TRef - Transactional Reference

136

137

Mutable reference that can be safely modified within STM transactions.

138

139

```scala { .api }

140

/**

141

* A transactional reference that can be safely modified across concurrent transactions

142

*/

143

sealed abstract class TRef[A] {

144

/** Read the current value */

145

def get: USTM[A]

146

147

/** Set a new value */

148

def set(a: A): USTM[Unit]

149

150

/** Atomically modify the value and return a result */

151

def modify[B](f: A => (B, A)): USTM[B]

152

153

/** Atomically update the value */

154

def update(f: A => A): USTM[Unit]

155

156

/** Update and return the new value */

157

def updateAndGet(f: A => A): USTM[A]

158

159

/** Return the old value and update */

160

def getAndUpdate(f: A => A): USTM[A]

161

162

/** Set new value and return the old value */

163

def getAndSet(a: A): USTM[A]

164

165

/** Conditionally update if current value matches */

166

def updateSome(f: PartialFunction[A, A]): USTM[Unit]

167

168

/** Conditionally modify if current value matches */

169

def modifySome[B](default: B)(f: PartialFunction[A, (B, A)]): USTM[B]

170

171

/** Get current value and conditionally update */

172

def getAndUpdateSome(f: PartialFunction[A, A]): USTM[A]

173

174

/** Conditionally update and get new value */

175

def updateSomeAndGet(f: PartialFunction[A, A]): USTM[A]

176

}

177

178

object TRef {

179

/** Create a new transactional reference */

180

def make[A](a: => A): USTM[TRef[A]]

181

182

/** Create and commit a transactional reference */

183

def makeCommit[A](a: => A): UIO[TRef[A]]

184

}

185

```

186

187

**Usage Examples:**

188

189

```scala

190

// Shared counter using TRef

191

val counterProgram = for {

192

counter <- TRef.makeCommit(0)

193

194

// Concurrent increments

195

_ <- ZIO.foreachParDiscard(1 to 1000) { _ =>

196

(for {

197

current <- counter.get

198

_ <- counter.set(current + 1)

199

} yield ()).commit

200

}

201

202

final <- counter.get.commit

203

_ <- Console.printLine(s"Final count: $final")

204

} yield ()

205

206

// Account balance transfer (atomic transaction)

207

val transfer = for {

208

from <- TRef.makeCommit(1000)

209

to <- TRef.makeCommit(500)

210

211

// Transfer $200 atomically

212

_ <- (for {

213

fromBalance <- from.get

214

_ <- STM.check(fromBalance >= 200)

215

_ <- from.update(_ - 200)

216

_ <- to.update(_ + 200)

217

} yield ()).commit

218

219

fromFinal <- from.get.commit

220

toFinal <- to.get.commit

221

_ <- Console.printLine(s"From: $fromFinal, To: $toFinal")

222

} yield ()

223

224

// Complex state management

225

case class GameState(score: Int, lives: Int, level: Int)

226

227

val gameProgram = for {

228

state <- TRef.makeCommit(GameState(0, 3, 1))

229

230

// Atomic game state update

231

_ <- (for {

232

current <- state.get

233

_ <- STM.check(current.lives > 0)

234

_ <- state.update(s => s.copy(

235

score = s.score + 100,

236

level = if (s.score + 100 > 1000) s.level + 1 else s.level

237

))

238

} yield ()).commit

239

240

final <- state.get.commit

241

_ <- Console.printLine(s"Game state: $final")

242

} yield ()

243

```

244

245

### Other STM Data Structures

246

247

Additional transactional data structures for complex concurrent programming needs.

248

249

```scala { .api }

250

// Transactional Map

251

sealed abstract class TMap[K, V] {

252

def get(k: K): USTM[Option[V]]

253

def put(k: K, v: V): USTM[Unit]

254

def remove(k: K): USTM[Option[V]]

255

def contains(k: K): USTM[Boolean]

256

def size: USTM[Int]

257

def toList: USTM[List[(K, V)]]

258

}

259

260

// Transactional Set

261

sealed abstract class TSet[A] {

262

def contains(a: A): USTM[Boolean]

263

def put(a: A): USTM[Unit]

264

def remove(a: A): USTM[Unit]

265

def size: USTM[Int]

266

def toList: USTM[List[A]]

267

}

268

269

// Transactional Array

270

sealed abstract class TArray[A] {

271

def apply(index: Int): USTM[A]

272

def update(index: Int, a: A): USTM[Unit]

273

def length: Int

274

def toList: USTM[List[A]]

275

}

276

277

// Transactional Queue

278

sealed abstract class TQueue[A] {

279

def offer(a: A): USTM[Unit]

280

def take: USTM[A]

281

def size: USTM[Int]

282

def isEmpty: USTM[Boolean]

283

}

284

285

// Transactional Promise

286

sealed abstract class TPromise[E, A] {

287

def succeed(a: A): USTM[Boolean]

288

def fail(e: E): USTM[Boolean]

289

def await: USTM[A]

290

def poll: USTM[Option[Exit[E, A]]]

291

}

292

293

// Transactional Semaphore

294

sealed abstract class TSemaphore {

295

def acquire: USTM[Unit]

296

def acquireN(n: Long): USTM[Unit]

297

def release: USTM[Unit]

298

def releaseN(n: Long): USTM[Unit]

299

def available: USTM[Long]

300

}

301

```

302

303

**Usage Examples:**

304

305

```scala

306

// Transactional cache implementation

307

val cacheProgram = for {

308

cache <- TMap.empty[String, String].commit

309

310

// Atomic cache operations

311

_ <- (for {

312

_ <- cache.put("key1", "value1")

313

_ <- cache.put("key2", "value2")

314

size <- cache.size

315

_ <- STM.check(size <= 1000) // Cache size limit

316

} yield ()).commit

317

318

value <- cache.get("key1").commit

319

_ <- Console.printLine(s"Cached value: $value")

320

} yield ()

321

322

// Producer-consumer with transactional queue

323

val producerConsumer = for {

324

queue <- TQueue.bounded[String](10).commit

325

326

// Producer

327

producer <- ZIO.foreach(1 to 100) { i =>

328

queue.offer(s"item-$i").commit

329

}.fork

330

331

// Consumer

332

consumer <- ZIO.foreach(1 to 100) { _ =>

333

queue.take.commit.flatMap(item => processItem(item))

334

}.fork

335

336

_ <- producer.join

337

_ <- consumer.join

338

} yield ()

339

340

// Coordinated resource management

341

val resourceManager = for {

342

available <- TSemaphore.make(5).commit // 5 available resources

343

inUse <- TRef.makeCommit(Set.empty[String])

344

345

// Acquire resource atomically

346

acquireResource = (resourceId: String) => (for {

347

_ <- available.acquire

348

current <- inUse.get

349

_ <- STM.check(!current.contains(resourceId))

350

_ <- inUse.update(_ + resourceId)

351

} yield ()).commit

352

353

// Release resource atomically

354

releaseResource = (resourceId: String) => (for {

355

current <- inUse.get

356

_ <- STM.check(current.contains(resourceId))

357

_ <- inUse.update(_ - resourceId)

358

_ <- available.release

359

} yield ()).commit

360

361

} yield (acquireResource, releaseResource)

362

```

363

364

### STM Patterns and Best Practices

365

366

Common patterns for effective use of Software Transactional Memory.

367

368

```scala { .api }

369

// Optimistic concurrency pattern

370

val optimisticUpdate = (ref: TRef[Counter]) => {

371

val increment = for {

372

counter <- ref.get

373

_ <- STM.check(counter.version == expectedVersion)

374

_ <- ref.set(counter.copy(

375

value = counter.value + 1,

376

version = counter.version + 1

377

))

378

} yield ()

379

380

increment.commit.retry(Schedule.exponential(10.millis) && Schedule.recurs(5))

381

}

382

383

// Coordinated state updates

384

val coordinatedUpdate = (refs: List[TRef[Int]]) => {

385

val transaction = for {

386

values <- STM.foreach(refs)(_.get)

387

total = values.sum

388

_ <- STM.check(total < 1000) // Business constraint

389

_ <- STM.foreach(refs)(_.update(_ + 1))

390

} yield ()

391

392

transaction.commit

393

}

394

395

// Conditional waiting pattern

396

val waitForValue = (ref: TRef[Option[String]]) => {

397

val waitTransaction = for {

398

value <- ref.get

399

result <- value match {

400

case Some(v) => STM.succeed(v)

401

case None => STM.retry // Wait until value is available

402

}

403

} yield result

404

405

waitTransaction.commit

406

}

407

408

// Resource pool implementation

409

class TransactionalPool[A](resources: TRef[List[A]], maxSize: Int) {

410

411

def acquire: UIO[A] = (for {

412

available <- resources.get

413

resource <- available match {

414

case head :: tail =>

415

resources.set(tail) *> STM.succeed(head)

416

case Nil =>

417

STM.retry // Wait for resources to become available

418

}

419

} yield resource).commit

420

421

def release(resource: A): UIO[Unit] = (for {

422

current <- resources.get

423

_ <- STM.check(current.length < maxSize)

424

_ <- resources.update(resource :: _)

425

} yield ()).commit

426

427

def size: UIO[Int] = resources.get.map(_.length).commit

428

}

429

```

430

431

**Usage Examples:**

432

433

```scala

434

// Banking system with STM

435

case class Account(id: String, balance: Int, frozen: Boolean)

436

437

class BankingSystem {

438

private val accounts = TMap.empty[String, TRef[Account]]

439

440

def transfer(fromId: String, toId: String, amount: Int): Task[Unit] = {

441

(for {

442

fromAccount <- accounts.get(fromId).flatMap {

443

case Some(ref) => STM.succeed(ref)

444

case None => STM.fail(s"Account $fromId not found")

445

}

446

toAccount <- accounts.get(toId).flatMap {

447

case Some(ref) => STM.succeed(ref)

448

case None => STM.fail(s"Account $toId not found")

449

}

450

451

from <- fromAccount.get

452

to <- toAccount.get

453

454

_ <- STM.check(!from.frozen && !to.frozen)

455

_ <- STM.check(from.balance >= amount)

456

457

_ <- fromAccount.set(from.copy(balance = from.balance - amount))

458

_ <- toAccount.set(to.copy(balance = to.balance + amount))

459

460

} yield ()).commit

461

}

462

463

def freezeAccount(accountId: String): Task[Unit] = {

464

(for {

465

accountRef <- accounts.get(accountId).flatMap {

466

case Some(ref) => STM.succeed(ref)

467

case None => STM.fail(s"Account $accountId not found")

468

}

469

account <- accountRef.get

470

_ <- accountRef.set(account.copy(frozen = true))

471

} yield ()).commit

472

}

473

}

474

```