or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdconcurrency.mdindex.mdio.mdresources.mdtype-classes.md

concurrency.mddocs/

0

# Concurrent Primitives

1

2

Cats Effect provides thread-safe, purely functional data structures for coordination and state management in concurrent programs. These primitives enable safe shared state manipulation and synchronization between fibers.

3

4

## Capabilities

5

6

### Ref - Atomic Reference

7

8

Purely functional atomic reference for thread-safe mutable state.

9

10

```scala { .api }

11

/**

12

* Atomic reference providing thread-safe mutable state

13

*/

14

abstract class Ref[F[_], A] {

15

/** Get the current value */

16

def get: F[A]

17

18

/** Set a new value */

19

def set(a: A): F[Unit]

20

21

/** Atomically update the value with a function */

22

def update(f: A => A): F[Unit]

23

24

/** Atomically modify the value and return a result */

25

def modify[B](f: A => (A, B)): F[B]

26

27

/** Get the current value and set a new one atomically */

28

def getAndSet(a: A): F[A]

29

30

/** Get the current value and update it atomically */

31

def getAndUpdate(f: A => A): F[A]

32

33

/** Update the value and return the new value atomically */

34

def updateAndGet(f: A => A): F[A]

35

36

/** Try to update the value (may fail if concurrent modifications) */

37

def tryUpdate(f: A => A): F[Boolean]

38

39

/** Try to modify the value (may fail if concurrent modifications) */

40

def tryModify[B](f: A => (A, B)): F[Option[B]]

41

42

/** Modify using a State monad */

43

def modifyState[B](state: State[A, B]): F[B]

44

45

/** Try to modify using a State monad (may fail if concurrent modifications) */

46

def tryModifyState[B](state: State[A, B]): F[Option[B]]

47

48

/** Get a snapshot and setter for atomic updates */

49

def access: F[(A, A => F[Boolean])]

50

51

/** Transform the context */

52

def mapK[G[_]](f: F ~> G)(implicit F: Functor[F]): Ref[G, A]

53

}

54

55

object Ref {

56

/** Create a new Ref with an initial value */

57

def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]]

58

59

/** Cross-effect constructor */

60

def in[F[_], G[_], A](a: A)(implicit F: Sync[F], G: Sync[G]): F[Ref[G, A]]

61

62

/** Unsafe direct allocation */

63

def unsafe[F[_], A](a: A)(implicit F: Sync[F]): Ref[F, A]

64

65

/** Create a focused lens Ref */

66

def lens[F[_], A, B <: AnyRef](ref: Ref[F, A])(get: A => B, set: A => B => A)(implicit F: Sync[F]): Ref[F, B]

67

68

/** Alias for of */

69

def apply[F[_]]: PartiallyApplied[F] = new PartiallyApplied[F]

70

71

class PartiallyApplied[F[_]] {

72

def of[A](a: A)(implicit F: Sync[F]): F[Ref[F, A]]

73

}

74

}

75

```

76

77

**Usage Examples:**

78

79

```scala

80

import cats.effect._

81

import cats.effect.concurrent._

82

import cats.data.State

83

84

// Counter example

85

def counterProgram[F[_]: Concurrent]: F[Int] = for {

86

counter <- Ref.of[F](0)

87

_ <- List.range(0, 100).parTraverse(_ => counter.update(_ + 1))

88

final <- counter.get

89

} yield final

90

91

// Shared state between fibers

92

def sharedStateExample[F[_]: Concurrent]: F[String] = for {

93

state <- Ref.of[F](Map.empty[String, Int])

94

fiber1 <- state.update(_.updated("key1", 10)).start

95

fiber2 <- state.update(_.updated("key2", 20)).start

96

_ <- fiber1.join

97

_ <- fiber2.join

98

result <- state.get

99

} yield result.toString

100

```

101

102

### Deferred - Promise-like Synchronization

103

104

Purely functional promise for one-time completion and coordination between fibers.

105

106

```scala { .api }

107

/**

108

* Promise-like primitive for one-time completion

109

*/

110

abstract class Deferred[F[_], A] {

111

/** Block until a value is available */

112

def get: F[A]

113

114

/** Complete the Deferred with a value (idempotent) */

115

def complete(a: A): F[Unit]

116

117

/** Transform the context */

118

def mapK[G[_]](f: F ~> G): Deferred[G, A]

119

}

120

121

/**

122

* Deferred with non-blocking check capability

123

*/

124

abstract class TryableDeferred[F[_], A] extends Deferred[F, A] {

125

/** Non-blocking check if value is available */

126

def tryGet: F[Option[A]]

127

}

128

129

object Deferred {

130

/** Create a new empty Deferred */

131

def apply[F[_], A](implicit F: Concurrent[F]): F[Deferred[F, A]]

132

133

/** Cross-effect constructor */

134

def in[F[_], G[_], A](implicit F: Sync[F], G: Concurrent[G]): F[Deferred[G, A]]

135

136

/** Unsafe direct allocation */

137

def unsafe[F[_]: Concurrent, A]: Deferred[F, A]

138

139

/** Create an uncancelable Deferred */

140

def uncancelable[F[_], A](implicit F: Async[F]): F[Deferred[F, A]]

141

142

/** Cross-effect uncancelable constructor */

143

def uncancelableIn[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[Deferred[G, A]]

144

145

/** Create a tryable Deferred */

146

def tryable[F[_], A](implicit F: Concurrent[F]): F[TryableDeferred[F, A]]

147

148

/** Create an uncancelable tryable Deferred */

149

def tryableUncancelable[F[_], A](implicit F: Async[F]): F[TryableDeferred[F, A]]

150

151

/** Unsafe uncancelable allocation */

152

def unsafeUncancelable[F[_]: Async, A]: Deferred[F, A]

153

}

154

```

155

156

**Usage Examples:**

157

158

```scala

159

// Producer-consumer coordination

160

def producerConsumer[F[_]: Concurrent]: F[String] = for {

161

deferred <- Deferred[F, String]

162

163

// Producer fiber

164

producer <- (for {

165

_ <- Timer[F].sleep(2.seconds)

166

_ <- deferred.complete("Hello from producer!")

167

} yield ()).start

168

169

// Consumer fiber waits for producer

170

result <- deferred.get

171

_ <- producer.join

172

} yield result

173

174

// Barrier synchronization

175

def barrierExample[F[_]: Concurrent]: F[List[Int]] = for {

176

barrier <- Deferred[F, Unit]

177

results <- Ref.of[F](List.empty[Int])

178

179

// Multiple workers wait at barrier

180

workers <- List.range(1, 5).traverse { i =>

181

(for {

182

_ <- Timer[F].sleep(i.seconds) // Different work times

183

_ <- barrier.get // Wait at barrier

184

_ <- results.update(i :: _)

185

} yield ()).start

186

}

187

188

_ <- Timer[F].sleep(5.seconds) // Let all workers reach barrier

189

_ <- barrier.complete(()) // Release all workers

190

_ <- workers.traverse(_.join)

191

final <- results.get

192

} yield final

193

```

194

195

### Semaphore - Resource Control

196

197

Semaphore for controlling access to limited resources.

198

199

```scala { .api }

200

/**

201

* Semaphore for controlling concurrent access to resources

202

*/

203

abstract class Semaphore[F[_]] {

204

/** Current number of available permits */

205

def available: F[Long]

206

207

/** Current number of permits (may be negative if waiters) */

208

def count: F[Long]

209

210

/** Acquire a permit (blocks if none available) */

211

def acquire: F[Unit]

212

213

/** Acquire multiple permits */

214

def acquireN(n: Long): F[Unit]

215

216

/** Try to acquire a permit without blocking */

217

def tryAcquire: F[Boolean]

218

219

/** Try to acquire multiple permits without blocking */

220

def tryAcquireN(n: Long): F[Boolean]

221

222

/** Release a permit */

223

def release: F[Unit]

224

225

/** Release multiple permits */

226

def releaseN(n: Long): F[Unit]

227

228

/** Execute an effect with a permit held */

229

def withPermit[A](t: F[A]): F[A]

230

231

/** Execute an effect with multiple permits held */

232

def withPermitN[A](n: Long)(t: F[A]): F[A]

233

234

/** Transform the context with isomorphism */

235

def imapK[G[_]](f: F ~> G, g: G ~> F): Semaphore[G]

236

}

237

238

object Semaphore {

239

/** Create a semaphore with n permits */

240

def apply[F[_]](n: Long)(implicit F: Concurrent[F]): F[Semaphore[F]]

241

242

/** Create an uncancelable semaphore */

243

def uncancelable[F[_]](n: Long)(implicit F: Async[F]): F[Semaphore[F]]

244

245

/** Cross-effect constructor */

246

def in[F[_], G[_]](n: Long)(implicit F: Sync[F], G: Concurrent[G]): F[Semaphore[G]]

247

248

/** Cross-effect uncancelable constructor */

249

def uncancelableIn[F[_], G[_]](n: Long)(implicit F: Sync[F], G: Async[G]): F[Semaphore[G]]

250

}

251

```

252

253

**Usage Examples:**

254

255

```scala

256

// Database connection pool

257

def databaseExample[F[_]: Concurrent]: F[List[String]] = for {

258

connectionPool <- Semaphore[F](5) // Max 5 concurrent connections

259

260

queries = List.range(1, 20).map(i => s"SELECT * FROM table$i")

261

262

results <- queries.parTraverse { query =>

263

connectionPool.withPermit {

264

// Simulate database query

265

Timer[F].sleep(1.second) *>

266

Sync[F].delay(s"Result for $query")

267

}

268

}

269

} yield results

270

271

// Rate limiting

272

def rateLimitedRequests[F[_]: Concurrent: Timer]: F[Unit] = for {

273

rateLimiter <- Semaphore[F](10) // 10 requests per batch

274

275

_ <- List.range(1, 100).parTraverse { i =>

276

rateLimiter.withPermit {

277

makeHttpRequest(s"request-$i") <*

278

Timer[F].sleep(100.millis) // Simulate request time

279

}

280

}

281

} yield ()

282

```

283

284

### MVar - Mutable Variable

285

286

Thread-safe mutable variable that can be empty or contain a single value.

287

288

```scala { .api }

289

/**

290

* Thread-safe mutable variable that can be empty or full

291

*/

292

abstract class MVar[F[_], A] {

293

/** Put a value (blocks if full) */

294

def put(a: A): F[Unit]

295

296

/** Take the value (blocks if empty) */

297

def take: F[A]

298

299

/** Read the value without taking it (blocks if empty) */

300

def read: F[A]

301

302

/** Try to put a value without blocking */

303

def tryPut(a: A): F[Boolean]

304

305

/** Try to take a value without blocking */

306

def tryTake: F[Option[A]]

307

308

/** Try to read a value without blocking */

309

def tryRead: F[Option[A]]

310

311

/** Check if MVar is empty */

312

def isEmpty: F[Boolean]

313

314

/** Atomically modify the contents */

315

def modify[B](f: A => (A, B)): F[B]

316

317

/** Atomically swap the contents */

318

def swap(newValue: A): F[A]

319

320

/** Apply effectful function to contents */

321

def use[B](f: A => F[B]): F[B]

322

323

/** Modify contents */

324

def modify_[B](f: A => F[A]): F[Unit]

325

326

/** Transform context (deprecated, use imapK) */

327

def mapK[G[_]](f: F ~> G): MVar[G, A]

328

329

/** Transform context with isomorphism */

330

def imapK[G[_]](f: F ~> G, g: G ~> F): MVar[G, A]

331

}

332

333

object MVar {

334

/** Create an empty cancelable MVar */

335

def empty[F[_], A](implicit F: Concurrent[F]): F[MVar[F, A]]

336

337

/** Create an empty uncancelable MVar */

338

def uncancelableEmpty[F[_], A](implicit F: Async[F]): F[MVar[F, A]]

339

340

/** Create an MVar with an initial value */

341

def of[F[_], A](a: A)(implicit F: Concurrent[F]): F[MVar[F, A]]

342

343

/** Create an initialized uncancelable MVar */

344

def uncancelableOf[F[_], A](a: A)(implicit F: Async[F]): F[MVar[F, A]]

345

346

/** Cross-effect constructor (initialized) */

347

def in[F[_], G[_], A](a: A)(implicit F: Sync[F], G: Concurrent[G]): F[MVar[G, A]]

348

349

/** Cross-effect constructor (empty) */

350

def emptyIn[F[_], G[_], A](implicit F: Sync[F], G: Concurrent[G]): F[MVar[G, A]]

351

352

/** Cross-effect uncancelable (initialized) */

353

def uncancelableIn[F[_], G[_], A](a: A)(implicit F: Sync[F], G: Async[G]): F[MVar[G, A]]

354

355

/** Cross-effect uncancelable (empty) */

356

def uncancelableEmptyIn[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[MVar[G, A]]

357

358

/** Create an empty MVar (alias) */

359

def apply[F[_]]: PartiallyApplied[F] = new PartiallyApplied[F]

360

361

class PartiallyApplied[F[_]] {

362

def empty[A](implicit F: Concurrent[F]): F[MVar[F, A]]

363

def of[A](a: A)(implicit F: Concurrent[F]): F[MVar[F, A]]

364

}

365

}

366

```

367

368

**Usage Examples:**

369

370

```scala

371

// Producer-consumer with bounded buffer

372

def boundedBuffer[F[_]: Concurrent]: F[String] = for {

373

buffer <- MVar.empty[F, String]

374

375

// Producer

376

producer <- (for {

377

_ <- Timer[F].sleep(1.second)

378

_ <- buffer.put("produced item")

379

} yield ()).start

380

381

// Consumer

382

consumer <- (for {

383

item <- buffer.take

384

_ <- Sync[F].delay(println(s"Consumed: $item"))

385

} yield item).start

386

387

result <- consumer.join

388

_ <- producer.join

389

} yield result

390

391

// Synchronous handoff

392

def handoffExample[F[_]: Concurrent]: F[Unit] = for {

393

handoff <- MVar.empty[F, String]

394

395

// Sender waits for receiver to be ready

396

sender <- handoff.put("message").start

397

398

// Receiver processes message

399

_ <- handoff.take.flatMap(msg => Sync[F].delay(println(s"Received: $msg")))

400

401

_ <- sender.join

402

} yield ()

403

```

404

405

## Types

406

407

```scala { .api }

408

/**

409

* Atomic reference for thread-safe mutable state

410

*/

411

abstract class Ref[F[_], A]

412

413

/**

414

* Promise-like primitive for one-time completion

415

*/

416

abstract class Deferred[F[_], A]

417

418

/**

419

* Deferred with non-blocking tryGet capability

420

*/

421

abstract class TryableDeferred[F[_], A] extends Deferred[F, A]

422

423

/**

424

* Semaphore for controlling resource access

425

*/

426

abstract class Semaphore[F[_]]

427

428

/**

429

* Mutable variable that can be empty or contain a value

430

*/

431

abstract class MVar[F[_], A]

432

```