0
# Error Handling and Supervision
1
2
Comprehensive error handling with supervision strategies, recovery operations, and stream resilience patterns.
3
4
## Supervision Strategies
5
6
### Supervision Directives
7
8
```scala { .api }
9
object Supervision {
10
sealed trait Directive
11
case object Stop extends Directive // Stop the processing stage
12
case object Resume extends Directive // Skip the failing element and continue
13
case object Restart extends Directive // Restart the processing stage
14
15
type Decider = Function[Throwable, Directive]
16
}
17
```
18
19
### Predefined Supervision Deciders
20
21
```scala { .api }
22
object Supervision {
23
val stoppingDecider: Decider = _ => Stop
24
val resumingDecider: Decider = _ => Resume
25
val restartingDecider: Decider = _ => Restart
26
}
27
```
28
29
**Usage Examples:**
30
```scala
31
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
32
33
// Custom supervision strategy
34
val customDecider: Supervision.Decider = {
35
case _: NumberFormatException => Supervision.Resume
36
case _: IllegalArgumentException => Supervision.Restart
37
case _ => Supervision.Stop
38
}
39
40
// Configure materializer with supervision
41
val settings = ActorMaterializerSettings(system)
42
.withSupervisionStrategy(customDecider)
43
44
implicit val materializer = ActorMaterializer(settings)
45
46
// Apply supervision to specific stream
47
val source = Source(List("1", "2", "bad", "4", "5"))
48
.map(_.toInt) // Will fail on "bad"
49
.withAttributes(ActorAttributes.supervisionStrategy(customDecider))
50
.runWith(Sink.foreach(println))
51
// Output: 1, 2, 4, 5 (skips "bad")
52
```
53
54
## Recovery Operations
55
56
### Stream-Level Recovery
57
58
```scala { .api }
59
trait FlowOps[+Out, +Mat] {
60
def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]
61
def recoverWithRetries[T >: Out](
62
attempts: Int,
63
pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]
64
): Repr[T]
65
def mapError(f: Throwable => Throwable): Repr[Out]
66
}
67
```
68
69
**Usage Examples:**
70
```scala
71
// Basic recovery with fallback value
72
val recoveredStream = Source(List("1", "2", "abc", "4"))
73
.map(_.toInt)
74
.recover {
75
case _: NumberFormatException => -1
76
}
77
.runWith(Sink.seq)
78
// Result: List(1, 2, -1, 4)
79
80
// Recovery with retries using alternative source
81
val withRetries = Source(List("1", "2", "abc", "4"))
82
.map(_.toInt)
83
.recoverWithRetries(3, {
84
case _: NumberFormatException => Source.single(0)
85
})
86
87
// Transform errors
88
val transformedErrors = source
89
.mapError {
90
case nfe: NumberFormatException =>
91
new IllegalArgumentException(s"Invalid number: ${nfe.getMessage}")
92
}
93
```
94
95
## Restart Patterns
96
97
### RestartSource
98
99
```scala { .api }
100
object RestartSource {
101
def withBackoff[T](
102
minBackoff: FiniteDuration,
103
maxBackoff: FiniteDuration,
104
randomFactor: Double
105
)(sourceFactory: () => Source[T, _]): Source[T, NotUsed]
106
107
def onFailuresWithBackoff[T](
108
minBackoff: FiniteDuration,
109
maxBackoff: FiniteDuration,
110
randomFactor: Double
111
)(sourceFactory: () => Source[T, _]): Source[T, NotUsed]
112
}
113
```
114
115
**Usage Example:**
116
```scala
117
import akka.stream.scaladsl.RestartSource
118
import scala.concurrent.duration._
119
120
// Restart source with exponential backoff
121
val resilientSource = RestartSource.withBackoff(
122
minBackoff = 1.second,
123
maxBackoff = 30.seconds,
124
randomFactor = 0.2
125
) { () =>
126
Source.tick(1.second, 1.second, "tick")
127
.map { _ =>
128
if (scala.util.Random.nextDouble() < 0.1) throw new RuntimeException("Random failure")
129
else "success"
130
}
131
}
132
133
resilientSource.runWith(Sink.foreach(println))
134
```
135
136
### RestartFlow and RestartSink
137
138
```scala { .api }
139
object RestartFlow {
140
def withBackoff[In, Out](
141
minBackoff: FiniteDuration,
142
maxBackoff: FiniteDuration,
143
randomFactor: Double
144
)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed]
145
}
146
147
object RestartSink {
148
def withBackoff[T](
149
minBackoff: FiniteDuration,
150
maxBackoff: FiniteDuration,
151
randomFactor: Double
152
)(sinkFactory: () => Sink[T, _]): Sink[T, NotUsed]
153
}
154
```
155
156
## Overflow Strategies
157
158
### OverflowStrategy Types
159
160
```scala { .api }
161
sealed abstract class OverflowStrategy extends DelayOverflowStrategy
162
163
object OverflowStrategy {
164
def dropHead: OverflowStrategy // Drop oldest element
165
def dropTail: OverflowStrategy // Drop newest element
166
def dropBuffer: OverflowStrategy // Drop entire buffer
167
def dropNew: OverflowStrategy // Drop incoming element
168
def backpressure: OverflowStrategy // Apply backpressure (block)
169
def fail: OverflowStrategy // Fail the stream
170
}
171
```
172
173
**Usage Examples:**
174
```scala
175
import akka.stream.OverflowStrategy
176
177
// Buffer with overflow handling
178
val bufferedStream = Source(1 to 1000)
179
.buffer(10, OverflowStrategy.dropHead)
180
.throttle(1, 100.millis) // Slow consumer
181
.runWith(Sink.seq)
182
183
// Queue source with overflow strategy
184
val (queueSource, queue) = Source.queue[Int](5, OverflowStrategy.dropTail)
185
.toMat(Sink.seq)(Keep.both)
186
.run()
187
188
// Offer elements that may be dropped
189
queue.offer(1)
190
queue.offer(2)
191
// ... more elements than buffer size
192
```
193
194
## Exception Handling Patterns
195
196
### Try-based Processing
197
198
```scala
199
import scala.util.{Try, Success, Failure}
200
201
val safeParsing = Source(List("1", "2", "abc", "4"))
202
.map(s => Try(s.toInt))
203
.collect {
204
case Success(value) => value
205
}
206
.runWith(Sink.seq)
207
// Result: List(1, 2, 4)
208
209
// Process both successes and failures
210
val withFailureHandling = Source(List("1", "2", "abc", "4"))
211
.map(s => Try(s.toInt))
212
.map {
213
case Success(value) => s"Parsed: $value"
214
case Failure(ex) => s"Failed: ${ex.getMessage}"
215
}
216
.runWith(Sink.foreach(println))
217
```
218
219
### Future-based Error Handling
220
221
```scala
222
import scala.concurrent.Future
223
224
val asyncProcessing = Source(1 to 10)
225
.mapAsync(4) { n =>
226
Future {
227
if (n == 5) throw new RuntimeException("Five is bad!")
228
n * 2
229
}.recover {
230
case _: RuntimeException => -1
231
}
232
}
233
.runWith(Sink.seq)
234
// Result includes -1 for the failed element
235
```
236
237
## Stream Failure Monitoring
238
239
### watchTermination
240
241
```scala { .api }
242
trait FlowOps[+Out, +Mat] {
243
def watchTermination[Mat2]()(matF: (Mat, Future[Done]) => Mat2): ReprMat[Out, Mat2]
244
}
245
```
246
247
**Usage Example:**
248
```scala
249
val monitoredStream = Source(1 to 10)
250
.map { n =>
251
if (n == 5) throw new RuntimeException("Boom!")
252
n
253
}
254
.watchTermination() { (_, done) =>
255
done.onComplete {
256
case Success(_) => println("Stream completed successfully")
257
case Failure(ex) => println(s"Stream failed with: ${ex.getMessage}")
258
}
259
}
260
.runWith(Sink.ignore)
261
```
262
263
### Stream Monitoring with Custom Logic
264
265
```scala
266
val resilientProcessing = Source(1 to 100)
267
.mapAsync(4) { n =>
268
Future {
269
if (scala.util.Random.nextDouble() < 0.1) {
270
throw new RuntimeException(s"Random failure at $n")
271
}
272
n * 2
273
}
274
}
275
.recover {
276
case ex: RuntimeException =>
277
println(s"Recovered from: ${ex.getMessage}")
278
-1
279
}
280
.filter(_ > 0) // Remove recovered elements
281
.runWith(Sink.seq)
282
```
283
284
## Circuit Breaker Pattern
285
286
```scala
287
import akka.pattern.CircuitBreaker
288
import scala.concurrent.duration._
289
290
val circuitBreaker = new CircuitBreaker(
291
scheduler = system.scheduler,
292
maxFailures = 5,
293
callTimeout = 10.seconds,
294
resetTimeout = 1.minute
295
)
296
297
val protectedProcessing = Source(1 to 100)
298
.mapAsync(4) { n =>
299
circuitBreaker.withCircuitBreaker {
300
Future {
301
// Potentially failing operation
302
if (scala.util.Random.nextDouble() < 0.2) {
303
throw new RuntimeException("Service unavailable")
304
}
305
s"Processed: $n"
306
}
307
}.recover {
308
case _: akka.pattern.CircuitBreakerOpenException => "Circuit breaker open"
309
case ex => s"Error: ${ex.getMessage}"
310
}
311
}
312
.runWith(Sink.foreach(println))
313
```
314
315
## Error Propagation Control
316
317
### Isolating Errors in Substreams
318
319
```scala
320
val safeBatchProcessing = Source(1 to 20)
321
.grouped(5)
322
.map { batch =>
323
Source(batch)
324
.map { n =>
325
if (n % 7 == 0) throw new RuntimeException(s"Seven divisor: $n")
326
n * 2
327
}
328
.recover {
329
case _: RuntimeException => -1
330
}
331
.runWith(Sink.seq)
332
}
333
.mapAsync(1)(identity) // Materialize each batch
334
.runWith(Sink.seq)
335
```
336
337
This error handling approach ensures stream resilience while providing fine-grained control over failure scenarios and recovery strategies.