or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

control-flow.mdcore-components.mderror-handling.mdgraph-building.mdindex.mdio-integration.mdjunction-operations.mdmaterialization.mdstream-operations.md

error-handling.mddocs/

0

# Error Handling and Supervision

1

2

Comprehensive error handling with supervision strategies, recovery operations, and stream resilience patterns.

3

4

## Supervision Strategies

5

6

### Supervision Directives

7

8

```scala { .api }

9

object Supervision {

10

sealed trait Directive

11

case object Stop extends Directive // Stop the processing stage

12

case object Resume extends Directive // Skip the failing element and continue

13

case object Restart extends Directive // Restart the processing stage

14

15

type Decider = Function[Throwable, Directive]

16

}

17

```

18

19

### Predefined Supervision Deciders

20

21

```scala { .api }

22

object Supervision {

23

val stoppingDecider: Decider = _ => Stop

24

val resumingDecider: Decider = _ => Resume

25

val restartingDecider: Decider = _ => Restart

26

}

27

```

28

29

**Usage Examples:**

30

```scala

31

import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}

32

33

// Custom supervision strategy

34

val customDecider: Supervision.Decider = {

35

case _: NumberFormatException => Supervision.Resume

36

case _: IllegalArgumentException => Supervision.Restart

37

case _ => Supervision.Stop

38

}

39

40

// Configure materializer with supervision

41

val settings = ActorMaterializerSettings(system)

42

.withSupervisionStrategy(customDecider)

43

44

implicit val materializer = ActorMaterializer(settings)

45

46

// Apply supervision to specific stream

47

val source = Source(List("1", "2", "bad", "4", "5"))

48

.map(_.toInt) // Will fail on "bad"

49

.withAttributes(ActorAttributes.supervisionStrategy(customDecider))

50

.runWith(Sink.foreach(println))

51

// Output: 1, 2, 4, 5 (skips "bad")

52

```

53

54

## Recovery Operations

55

56

### Stream-Level Recovery

57

58

```scala { .api }

59

trait FlowOps[+Out, +Mat] {

60

def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]

61

def recoverWithRetries[T >: Out](

62

attempts: Int,

63

pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]

64

): Repr[T]

65

def mapError(f: Throwable => Throwable): Repr[Out]

66

}

67

```

68

69

**Usage Examples:**

70

```scala

71

// Basic recovery with fallback value

72

val recoveredStream = Source(List("1", "2", "abc", "4"))

73

.map(_.toInt)

74

.recover {

75

case _: NumberFormatException => -1

76

}

77

.runWith(Sink.seq)

78

// Result: List(1, 2, -1, 4)

79

80

// Recovery with retries using alternative source

81

val withRetries = Source(List("1", "2", "abc", "4"))

82

.map(_.toInt)

83

.recoverWithRetries(3, {

84

case _: NumberFormatException => Source.single(0)

85

})

86

87

// Transform errors

88

val transformedErrors = source

89

.mapError {

90

case nfe: NumberFormatException =>

91

new IllegalArgumentException(s"Invalid number: ${nfe.getMessage}")

92

}

93

```

94

95

## Restart Patterns

96

97

### RestartSource

98

99

```scala { .api }

100

object RestartSource {

101

def withBackoff[T](

102

minBackoff: FiniteDuration,

103

maxBackoff: FiniteDuration,

104

randomFactor: Double

105

)(sourceFactory: () => Source[T, _]): Source[T, NotUsed]

106

107

def onFailuresWithBackoff[T](

108

minBackoff: FiniteDuration,

109

maxBackoff: FiniteDuration,

110

randomFactor: Double

111

)(sourceFactory: () => Source[T, _]): Source[T, NotUsed]

112

}

113

```

114

115

**Usage Example:**

116

```scala

117

import akka.stream.scaladsl.RestartSource

118

import scala.concurrent.duration._

119

120

// Restart source with exponential backoff

121

val resilientSource = RestartSource.withBackoff(

122

minBackoff = 1.second,

123

maxBackoff = 30.seconds,

124

randomFactor = 0.2

125

) { () =>

126

Source.tick(1.second, 1.second, "tick")

127

.map { _ =>

128

if (scala.util.Random.nextDouble() < 0.1) throw new RuntimeException("Random failure")

129

else "success"

130

}

131

}

132

133

resilientSource.runWith(Sink.foreach(println))

134

```

135

136

### RestartFlow and RestartSink

137

138

```scala { .api }

139

object RestartFlow {

140

def withBackoff[In, Out](

141

minBackoff: FiniteDuration,

142

maxBackoff: FiniteDuration,

143

randomFactor: Double

144

)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed]

145

}

146

147

object RestartSink {

148

def withBackoff[T](

149

minBackoff: FiniteDuration,

150

maxBackoff: FiniteDuration,

151

randomFactor: Double

152

)(sinkFactory: () => Sink[T, _]): Sink[T, NotUsed]

153

}

154

```

155

156

## Overflow Strategies

157

158

### OverflowStrategy Types

159

160

```scala { .api }

161

sealed abstract class OverflowStrategy extends DelayOverflowStrategy

162

163

object OverflowStrategy {

164

def dropHead: OverflowStrategy // Drop oldest element

165

def dropTail: OverflowStrategy // Drop newest element

166

def dropBuffer: OverflowStrategy // Drop entire buffer

167

def dropNew: OverflowStrategy // Drop incoming element

168

def backpressure: OverflowStrategy // Apply backpressure (block)

169

def fail: OverflowStrategy // Fail the stream

170

}

171

```

172

173

**Usage Examples:**

174

```scala

175

import akka.stream.OverflowStrategy

176

177

// Buffer with overflow handling

178

val bufferedStream = Source(1 to 1000)

179

.buffer(10, OverflowStrategy.dropHead)

180

.throttle(1, 100.millis) // Slow consumer

181

.runWith(Sink.seq)

182

183

// Queue source with overflow strategy

184

val (queueSource, queue) = Source.queue[Int](5, OverflowStrategy.dropTail)

185

.toMat(Sink.seq)(Keep.both)

186

.run()

187

188

// Offer elements that may be dropped

189

queue.offer(1)

190

queue.offer(2)

191

// ... more elements than buffer size

192

```

193

194

## Exception Handling Patterns

195

196

### Try-based Processing

197

198

```scala

199

import scala.util.{Try, Success, Failure}

200

201

val safeParsing = Source(List("1", "2", "abc", "4"))

202

.map(s => Try(s.toInt))

203

.collect {

204

case Success(value) => value

205

}

206

.runWith(Sink.seq)

207

// Result: List(1, 2, 4)

208

209

// Process both successes and failures

210

val withFailureHandling = Source(List("1", "2", "abc", "4"))

211

.map(s => Try(s.toInt))

212

.map {

213

case Success(value) => s"Parsed: $value"

214

case Failure(ex) => s"Failed: ${ex.getMessage}"

215

}

216

.runWith(Sink.foreach(println))

217

```

218

219

### Future-based Error Handling

220

221

```scala

222

import scala.concurrent.Future

223

224

val asyncProcessing = Source(1 to 10)

225

.mapAsync(4) { n =>

226

Future {

227

if (n == 5) throw new RuntimeException("Five is bad!")

228

n * 2

229

}.recover {

230

case _: RuntimeException => -1

231

}

232

}

233

.runWith(Sink.seq)

234

// Result includes -1 for the failed element

235

```

236

237

## Stream Failure Monitoring

238

239

### watchTermination

240

241

```scala { .api }

242

trait FlowOps[+Out, +Mat] {

243

def watchTermination[Mat2]()(matF: (Mat, Future[Done]) => Mat2): ReprMat[Out, Mat2]

244

}

245

```

246

247

**Usage Example:**

248

```scala

249

val monitoredStream = Source(1 to 10)

250

.map { n =>

251

if (n == 5) throw new RuntimeException("Boom!")

252

n

253

}

254

.watchTermination() { (_, done) =>

255

done.onComplete {

256

case Success(_) => println("Stream completed successfully")

257

case Failure(ex) => println(s"Stream failed with: ${ex.getMessage}")

258

}

259

}

260

.runWith(Sink.ignore)

261

```

262

263

### Stream Monitoring with Custom Logic

264

265

```scala

266

val resilientProcessing = Source(1 to 100)

267

.mapAsync(4) { n =>

268

Future {

269

if (scala.util.Random.nextDouble() < 0.1) {

270

throw new RuntimeException(s"Random failure at $n")

271

}

272

n * 2

273

}

274

}

275

.recover {

276

case ex: RuntimeException =>

277

println(s"Recovered from: ${ex.getMessage}")

278

-1

279

}

280

.filter(_ > 0) // Remove recovered elements

281

.runWith(Sink.seq)

282

```

283

284

## Circuit Breaker Pattern

285

286

```scala

287

import akka.pattern.CircuitBreaker

288

import scala.concurrent.duration._

289

290

val circuitBreaker = new CircuitBreaker(

291

scheduler = system.scheduler,

292

maxFailures = 5,

293

callTimeout = 10.seconds,

294

resetTimeout = 1.minute

295

)

296

297

val protectedProcessing = Source(1 to 100)

298

.mapAsync(4) { n =>

299

circuitBreaker.withCircuitBreaker {

300

Future {

301

// Potentially failing operation

302

if (scala.util.Random.nextDouble() < 0.2) {

303

throw new RuntimeException("Service unavailable")

304

}

305

s"Processed: $n"

306

}

307

}.recover {

308

case _: akka.pattern.CircuitBreakerOpenException => "Circuit breaker open"

309

case ex => s"Error: ${ex.getMessage}"

310

}

311

}

312

.runWith(Sink.foreach(println))

313

```

314

315

## Error Propagation Control

316

317

### Isolating Errors in Substreams

318

319

```scala

320

val safeBatchProcessing = Source(1 to 20)

321

.grouped(5)

322

.map { batch =>

323

Source(batch)

324

.map { n =>

325

if (n % 7 == 0) throw new RuntimeException(s"Seven divisor: $n")

326

n * 2

327

}

328

.recover {

329

case _: RuntimeException => -1

330

}

331

.runWith(Sink.seq)

332

}

333

.mapAsync(1)(identity) // Materialize each batch

334

.runWith(Sink.seq)

335

```

336

337

This error handling approach ensures stream resilience while providing fine-grained control over failure scenarios and recovery strategies.