0
# Concurrent Primitives
1
2
Cats Effect provides thread-safe, purely functional data structures for coordination and state management in concurrent programs. These primitives enable safe shared state manipulation and synchronization between fibers.
3
4
## Capabilities
5
6
### Ref - Atomic Reference
7
8
Purely functional atomic reference for thread-safe mutable state.
9
10
```scala { .api }
11
/**
12
* Atomic reference providing thread-safe mutable state
13
*/
14
abstract class Ref[F[_], A] {
15
/** Get the current value */
16
def get: F[A]
17
18
/** Set a new value */
19
def set(a: A): F[Unit]
20
21
/** Atomically update the value with a function */
22
def update(f: A => A): F[Unit]
23
24
/** Atomically modify the value and return a result */
25
def modify[B](f: A => (A, B)): F[B]
26
27
/** Get the current value and set a new one atomically */
28
def getAndSet(a: A): F[A]
29
30
/** Get the current value and update it atomically */
31
def getAndUpdate(f: A => A): F[A]
32
33
/** Update the value and return the new value atomically */
34
def updateAndGet(f: A => A): F[A]
35
36
/** Try to update the value (may fail if concurrent modifications) */
37
def tryUpdate(f: A => A): F[Boolean]
38
39
/** Try to modify the value (may fail if concurrent modifications) */
40
def tryModify[B](f: A => (A, B)): F[Option[B]]
41
42
/** Modify using a State monad */
43
def modifyState[B](state: State[A, B]): F[B]
44
45
/** Try to modify using a State monad (may fail if concurrent modifications) */
46
def tryModifyState[B](state: State[A, B]): F[Option[B]]
47
48
/** Get a snapshot and setter for atomic updates */
49
def access: F[(A, A => F[Boolean])]
50
51
/** Transform the context */
52
def mapK[G[_]](f: F ~> G)(implicit F: Functor[F]): Ref[G, A]
53
}
54
55
object Ref {
56
/** Create a new Ref with an initial value */
57
def of[F[_], A](a: A)(implicit F: Sync[F]): F[Ref[F, A]]
58
59
/** Cross-effect constructor */
60
def in[F[_], G[_], A](a: A)(implicit F: Sync[F], G: Sync[G]): F[Ref[G, A]]
61
62
/** Unsafe direct allocation */
63
def unsafe[F[_], A](a: A)(implicit F: Sync[F]): Ref[F, A]
64
65
/** Create a focused lens Ref */
66
def lens[F[_], A, B <: AnyRef](ref: Ref[F, A])(get: A => B, set: A => B => A)(implicit F: Sync[F]): Ref[F, B]
67
68
/** Alias for of */
69
def apply[F[_]]: PartiallyApplied[F] = new PartiallyApplied[F]
70
71
class PartiallyApplied[F[_]] {
72
def of[A](a: A)(implicit F: Sync[F]): F[Ref[F, A]]
73
}
74
}
75
```
76
77
**Usage Examples:**
78
79
```scala
80
import cats.effect._
81
import cats.effect.concurrent._
82
import cats.data.State
83
84
// Counter example
85
def counterProgram[F[_]: Concurrent]: F[Int] = for {
86
counter <- Ref.of[F](0)
87
_ <- List.range(0, 100).parTraverse(_ => counter.update(_ + 1))
88
final <- counter.get
89
} yield final
90
91
// Shared state between fibers
92
def sharedStateExample[F[_]: Concurrent]: F[String] = for {
93
state <- Ref.of[F](Map.empty[String, Int])
94
fiber1 <- state.update(_.updated("key1", 10)).start
95
fiber2 <- state.update(_.updated("key2", 20)).start
96
_ <- fiber1.join
97
_ <- fiber2.join
98
result <- state.get
99
} yield result.toString
100
```
101
102
### Deferred - Promise-like Synchronization
103
104
Purely functional promise for one-time completion and coordination between fibers.
105
106
```scala { .api }
107
/**
108
* Promise-like primitive for one-time completion
109
*/
110
abstract class Deferred[F[_], A] {
111
/** Block until a value is available */
112
def get: F[A]
113
114
/** Complete the Deferred with a value (idempotent) */
115
def complete(a: A): F[Unit]
116
117
/** Transform the context */
118
def mapK[G[_]](f: F ~> G): Deferred[G, A]
119
}
120
121
/**
122
* Deferred with non-blocking check capability
123
*/
124
abstract class TryableDeferred[F[_], A] extends Deferred[F, A] {
125
/** Non-blocking check if value is available */
126
def tryGet: F[Option[A]]
127
}
128
129
object Deferred {
130
/** Create a new empty Deferred */
131
def apply[F[_], A](implicit F: Concurrent[F]): F[Deferred[F, A]]
132
133
/** Cross-effect constructor */
134
def in[F[_], G[_], A](implicit F: Sync[F], G: Concurrent[G]): F[Deferred[G, A]]
135
136
/** Unsafe direct allocation */
137
def unsafe[F[_]: Concurrent, A]: Deferred[F, A]
138
139
/** Create an uncancelable Deferred */
140
def uncancelable[F[_], A](implicit F: Async[F]): F[Deferred[F, A]]
141
142
/** Cross-effect uncancelable constructor */
143
def uncancelableIn[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[Deferred[G, A]]
144
145
/** Create a tryable Deferred */
146
def tryable[F[_], A](implicit F: Concurrent[F]): F[TryableDeferred[F, A]]
147
148
/** Create an uncancelable tryable Deferred */
149
def tryableUncancelable[F[_], A](implicit F: Async[F]): F[TryableDeferred[F, A]]
150
151
/** Unsafe uncancelable allocation */
152
def unsafeUncancelable[F[_]: Async, A]: Deferred[F, A]
153
}
154
```
155
156
**Usage Examples:**
157
158
```scala
159
// Producer-consumer coordination
160
def producerConsumer[F[_]: Concurrent]: F[String] = for {
161
deferred <- Deferred[F, String]
162
163
// Producer fiber
164
producer <- (for {
165
_ <- Timer[F].sleep(2.seconds)
166
_ <- deferred.complete("Hello from producer!")
167
} yield ()).start
168
169
// Consumer fiber waits for producer
170
result <- deferred.get
171
_ <- producer.join
172
} yield result
173
174
// Barrier synchronization
175
def barrierExample[F[_]: Concurrent]: F[List[Int]] = for {
176
barrier <- Deferred[F, Unit]
177
results <- Ref.of[F](List.empty[Int])
178
179
// Multiple workers wait at barrier
180
workers <- List.range(1, 5).traverse { i =>
181
(for {
182
_ <- Timer[F].sleep(i.seconds) // Different work times
183
_ <- barrier.get // Wait at barrier
184
_ <- results.update(i :: _)
185
} yield ()).start
186
}
187
188
_ <- Timer[F].sleep(5.seconds) // Let all workers reach barrier
189
_ <- barrier.complete(()) // Release all workers
190
_ <- workers.traverse(_.join)
191
final <- results.get
192
} yield final
193
```
194
195
### Semaphore - Resource Control
196
197
Semaphore for controlling access to limited resources.
198
199
```scala { .api }
200
/**
201
* Semaphore for controlling concurrent access to resources
202
*/
203
abstract class Semaphore[F[_]] {
204
/** Current number of available permits */
205
def available: F[Long]
206
207
/** Current number of permits (may be negative if waiters) */
208
def count: F[Long]
209
210
/** Acquire a permit (blocks if none available) */
211
def acquire: F[Unit]
212
213
/** Acquire multiple permits */
214
def acquireN(n: Long): F[Unit]
215
216
/** Try to acquire a permit without blocking */
217
def tryAcquire: F[Boolean]
218
219
/** Try to acquire multiple permits without blocking */
220
def tryAcquireN(n: Long): F[Boolean]
221
222
/** Release a permit */
223
def release: F[Unit]
224
225
/** Release multiple permits */
226
def releaseN(n: Long): F[Unit]
227
228
/** Execute an effect with a permit held */
229
def withPermit[A](t: F[A]): F[A]
230
231
/** Execute an effect with multiple permits held */
232
def withPermitN[A](n: Long)(t: F[A]): F[A]
233
234
/** Transform the context with isomorphism */
235
def imapK[G[_]](f: F ~> G, g: G ~> F): Semaphore[G]
236
}
237
238
object Semaphore {
239
/** Create a semaphore with n permits */
240
def apply[F[_]](n: Long)(implicit F: Concurrent[F]): F[Semaphore[F]]
241
242
/** Create an uncancelable semaphore */
243
def uncancelable[F[_]](n: Long)(implicit F: Async[F]): F[Semaphore[F]]
244
245
/** Cross-effect constructor */
246
def in[F[_], G[_]](n: Long)(implicit F: Sync[F], G: Concurrent[G]): F[Semaphore[G]]
247
248
/** Cross-effect uncancelable constructor */
249
def uncancelableIn[F[_], G[_]](n: Long)(implicit F: Sync[F], G: Async[G]): F[Semaphore[G]]
250
}
251
```
252
253
**Usage Examples:**
254
255
```scala
256
// Database connection pool
257
def databaseExample[F[_]: Concurrent]: F[List[String]] = for {
258
connectionPool <- Semaphore[F](5) // Max 5 concurrent connections
259
260
queries = List.range(1, 20).map(i => s"SELECT * FROM table$i")
261
262
results <- queries.parTraverse { query =>
263
connectionPool.withPermit {
264
// Simulate database query
265
Timer[F].sleep(1.second) *>
266
Sync[F].delay(s"Result for $query")
267
}
268
}
269
} yield results
270
271
// Rate limiting
272
def rateLimitedRequests[F[_]: Concurrent: Timer]: F[Unit] = for {
273
rateLimiter <- Semaphore[F](10) // 10 requests per batch
274
275
_ <- List.range(1, 100).parTraverse { i =>
276
rateLimiter.withPermit {
277
makeHttpRequest(s"request-$i") <*
278
Timer[F].sleep(100.millis) // Simulate request time
279
}
280
}
281
} yield ()
282
```
283
284
### MVar - Mutable Variable
285
286
Thread-safe mutable variable that can be empty or contain a single value.
287
288
```scala { .api }
289
/**
290
* Thread-safe mutable variable that can be empty or full
291
*/
292
abstract class MVar[F[_], A] {
293
/** Put a value (blocks if full) */
294
def put(a: A): F[Unit]
295
296
/** Take the value (blocks if empty) */
297
def take: F[A]
298
299
/** Read the value without taking it (blocks if empty) */
300
def read: F[A]
301
302
/** Try to put a value without blocking */
303
def tryPut(a: A): F[Boolean]
304
305
/** Try to take a value without blocking */
306
def tryTake: F[Option[A]]
307
308
/** Try to read a value without blocking */
309
def tryRead: F[Option[A]]
310
311
/** Check if MVar is empty */
312
def isEmpty: F[Boolean]
313
314
/** Atomically modify the contents */
315
def modify[B](f: A => (A, B)): F[B]
316
317
/** Atomically swap the contents */
318
def swap(newValue: A): F[A]
319
320
/** Apply effectful function to contents */
321
def use[B](f: A => F[B]): F[B]
322
323
/** Modify contents */
324
def modify_[B](f: A => F[A]): F[Unit]
325
326
/** Transform context (deprecated, use imapK) */
327
def mapK[G[_]](f: F ~> G): MVar[G, A]
328
329
/** Transform context with isomorphism */
330
def imapK[G[_]](f: F ~> G, g: G ~> F): MVar[G, A]
331
}
332
333
object MVar {
334
/** Create an empty cancelable MVar */
335
def empty[F[_], A](implicit F: Concurrent[F]): F[MVar[F, A]]
336
337
/** Create an empty uncancelable MVar */
338
def uncancelableEmpty[F[_], A](implicit F: Async[F]): F[MVar[F, A]]
339
340
/** Create an MVar with an initial value */
341
def of[F[_], A](a: A)(implicit F: Concurrent[F]): F[MVar[F, A]]
342
343
/** Create an initialized uncancelable MVar */
344
def uncancelableOf[F[_], A](a: A)(implicit F: Async[F]): F[MVar[F, A]]
345
346
/** Cross-effect constructor (initialized) */
347
def in[F[_], G[_], A](a: A)(implicit F: Sync[F], G: Concurrent[G]): F[MVar[G, A]]
348
349
/** Cross-effect constructor (empty) */
350
def emptyIn[F[_], G[_], A](implicit F: Sync[F], G: Concurrent[G]): F[MVar[G, A]]
351
352
/** Cross-effect uncancelable (initialized) */
353
def uncancelableIn[F[_], G[_], A](a: A)(implicit F: Sync[F], G: Async[G]): F[MVar[G, A]]
354
355
/** Cross-effect uncancelable (empty) */
356
def uncancelableEmptyIn[F[_], G[_], A](implicit F: Sync[F], G: Async[G]): F[MVar[G, A]]
357
358
/** Create an empty MVar (alias) */
359
def apply[F[_]]: PartiallyApplied[F] = new PartiallyApplied[F]
360
361
class PartiallyApplied[F[_]] {
362
def empty[A](implicit F: Concurrent[F]): F[MVar[F, A]]
363
def of[A](a: A)(implicit F: Concurrent[F]): F[MVar[F, A]]
364
}
365
}
366
```
367
368
**Usage Examples:**
369
370
```scala
371
// Producer-consumer with bounded buffer
372
def boundedBuffer[F[_]: Concurrent]: F[String] = for {
373
buffer <- MVar.empty[F, String]
374
375
// Producer
376
producer <- (for {
377
_ <- Timer[F].sleep(1.second)
378
_ <- buffer.put("produced item")
379
} yield ()).start
380
381
// Consumer
382
consumer <- (for {
383
item <- buffer.take
384
_ <- Sync[F].delay(println(s"Consumed: $item"))
385
} yield item).start
386
387
result <- consumer.join
388
_ <- producer.join
389
} yield result
390
391
// Synchronous handoff
392
def handoffExample[F[_]: Concurrent]: F[Unit] = for {
393
handoff <- MVar.empty[F, String]
394
395
// Sender waits for receiver to be ready
396
sender <- handoff.put("message").start
397
398
// Receiver processes message
399
_ <- handoff.take.flatMap(msg => Sync[F].delay(println(s"Received: $msg")))
400
401
_ <- sender.join
402
} yield ()
403
```
404
405
## Types
406
407
```scala { .api }
408
/**
409
* Atomic reference for thread-safe mutable state
410
*/
411
abstract class Ref[F[_], A]
412
413
/**
414
* Promise-like primitive for one-time completion
415
*/
416
abstract class Deferred[F[_], A]
417
418
/**
419
* Deferred with non-blocking tryGet capability
420
*/
421
abstract class TryableDeferred[F[_], A] extends Deferred[F, A]
422
423
/**
424
* Semaphore for controlling resource access
425
*/
426
abstract class Semaphore[F[_]]
427
428
/**
429
* Mutable variable that can be empty or contain a value
430
*/
431
abstract class MVar[F[_], A]
432
```