or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md

stream-control.mddocs/

0

# Stream Control and Lifecycle

1

2

Mechanisms for controlling stream lifecycle, implementing backpressure, rate limiting, and external stream termination. This includes flow control, buffering strategies, and dynamic stream management capabilities.

3

4

## Capabilities

5

6

### Buffering and Flow Control

7

8

Operations for controlling element flow and managing backpressure.

9

10

```scala { .api }

11

/**

12

* Add a buffer with overflow strategy

13

* @param size Buffer size

14

* @param overflowStrategy Strategy when buffer is full

15

* @return Stream with buffering applied

16

*/

17

def buffer(size: Int, overflowStrategy: OverflowStrategy): Source[Out, Mat]

18

19

/**

20

* Async buffer that decouples upstream and downstream processing

21

* @param size Buffer size

22

* @param overflowStrategy Strategy when buffer is full

23

* @return Stream with async buffering

24

*/

25

def async(bufferSize: Int = 16, overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure): Source[Out, Mat]

26

27

/**

28

* Conflate elements when downstream is slower than upstream

29

* @param seed Function to create initial aggregate from first element

30

* @param aggregate Function to combine aggregate with new element

31

* @return Stream that conflates elements under backpressure

32

*/

33

def conflateWithSeed[S](seed: Out => S)(aggregate: (S, Out) => S): Source[S, Mat]

34

35

/**

36

* Expand elements when upstream is slower than downstream

37

* @param extrapolate Function to generate additional elements

38

* @return Stream that expands elements when needed

39

*/

40

def expand[U >: Out](extrapolate: Out => Iterator[U]): Source[U, Mat]

41

42

sealed abstract class OverflowStrategy

43

object OverflowStrategy {

44

case object DropHead extends OverflowStrategy // Drop oldest elements

45

case object DropTail extends OverflowStrategy // Drop newest elements

46

case object DropBuffer extends OverflowStrategy // Drop entire buffer

47

case object DropNew extends OverflowStrategy // Drop incoming elements

48

case object Backpressure extends OverflowStrategy // Apply backpressure

49

case object Fail extends OverflowStrategy // Fail the stream

50

}

51

```

52

53

**Usage Examples:**

54

55

```scala

56

import akka.stream.OverflowStrategy

57

58

// Buffer with backpressure

59

Source(1 to 100)

60

.buffer(10, OverflowStrategy.backpressure)

61

.map(expensiveOperation)

62

.runWith(Sink.seq)

63

64

// Drop elements when buffer full

65

Source.tick(10.millis, 10.millis, "tick")

66

.buffer(5, OverflowStrategy.dropHead)

67

.runWith(Sink.foreach(println))

68

69

// Conflate under backpressure

70

Source(1 to 1000)

71

.conflateWithSeed(identity)(_ + _) // Sum elements when backpressured

72

.runWith(Sink.foreach(println))

73

74

// Expand when upstream slow

75

Source(List(1, 2, 3))

76

.expand(n => Iterator.continually(n)) // Repeat each element

77

.take(10)

78

.runWith(Sink.seq)

79

```

80

81

### Rate Limiting and Throttling

82

83

Operations for controlling the rate of element emission.

84

85

```scala { .api }

86

/**

87

* Throttle the stream to a specific rate

88

* @param elements Number of elements per time period

89

* @param per Time period duration

90

* @param maximumBurst Maximum burst size

91

* @param mode Throttling mode (shaping or enforcing)

92

* @return Stream with rate limiting applied

93

*/

94

def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Source[Out, Mat]

95

96

/**

97

* Delay each element by a fixed duration

98

* @param delay Duration to delay each element

99

* @param strategy Strategy for handling delayed elements

100

* @return Stream with delayed elements

101

*/

102

def delay(delay: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.FixedDelay): Source[Out, Mat]

103

104

/**

105

* Delay elements using a custom strategy

106

* @param delayStrategySupplier Function that provides delay strategy

107

* @return Stream with custom delay applied

108

*/

109

def delayWith(delayStrategySupplier: () => DelayStrategy[Out], overFlowStrategy: DelayOverflowStrategy = DelayOverflowStrategy.FixedDelay): Source[Out, Mat]

110

111

sealed abstract class ThrottleMode

112

object ThrottleMode {

113

case object Shaping extends ThrottleMode // Smooth out bursts

114

case object Enforcing extends ThrottleMode // Fail on rate violations

115

}

116

117

sealed abstract class DelayOverflowStrategy

118

object DelayOverflowStrategy {

119

case object EmitEarly extends DelayOverflowStrategy // Emit early under backpressure

120

case object DropHead extends DelayOverflowStrategy // Drop oldest delayed elements

121

case object DropTail extends DelayOverflowStrategy // Drop newest delayed elements

122

case object DropBuffer extends DelayOverflowStrategy // Drop all delayed elements

123

case object DropNew extends DelayOverflowStrategy // Drop new elements

124

case object Backpressure extends DelayOverflowStrategy // Apply backpressure

125

case object Fail extends DelayOverflowStrategy // Fail the stream

126

case object FixedDelay extends DelayOverflowStrategy // Fixed delay regardless

127

}

128

```

129

130

**Usage Examples:**

131

132

```scala

133

import scala.concurrent.duration._

134

135

// Rate limiting

136

Source(1 to 100)

137

.throttle(10, 1.second, 5, ThrottleMode.shaping) // 10 elements/second, burst of 5

138

.runWith(Sink.foreach(println))

139

140

// Fixed delay

141

Source(List("a", "b", "c"))

142

.delay(500.millis) // Delay each element by 500ms

143

.runWith(Sink.foreach(println))

144

145

// Custom delay strategy

146

val increasingDelay = DelayStrategy.linearIncreasingDelay(100.millis, 50.millis)

147

Source(1 to 10)

148

.delayWith(() => increasingDelay)

149

.runWith(Sink.foreach(println))

150

```

151

152

### Kill Switches

153

154

External controls for terminating streams.

155

156

```scala { .api }

157

/**

158

* Control interface for terminating streams

159

*/

160

trait KillSwitch {

161

/**

162

* Gracefully shutdown the stream

163

*/

164

def shutdown(): Unit

165

166

/**

167

* Abort the stream with an error

168

* @param ex Exception to fail the stream with

169

*/

170

def abort(ex: Throwable): Unit

171

}

172

173

/**

174

* Shared control for terminating multiple streams

175

*/

176

abstract class SharedKillSwitch extends KillSwitch {

177

/**

178

* Create a flow that can be killed by this switch

179

* @return Flow that respects this kill switch

180

*/

181

def flow[T]: Flow[T, T, NotUsed]

182

}

183

184

/**

185

* Factory for creating kill switches

186

*/

187

object KillSwitches {

188

/**

189

* Create a single-use kill switch

190

* @return Flow that materializes to a KillSwitch

191

*/

192

def single[T]: Flow[T, T, UniqueKillSwitch]

193

194

/**

195

* Create a shared kill switch for multiple streams

196

* @param name Name for the kill switch (for debugging)

197

* @return Shared kill switch that can control multiple streams

198

*/

199

def shared(name: String): SharedKillSwitch

200

201

/**

202

* Create a bidirectional kill switch

203

* @return BidiFlow that materializes to a KillSwitch

204

*/

205

def singleBidi[T1, T2]: BidiFlow[T1, T1, T2, T2, UniqueKillSwitch]

206

}

207

208

/**

209

* Kill switch for a single stream

210

*/

211

trait UniqueKillSwitch extends KillSwitch

212

```

213

214

**Usage Examples:**

215

216

```scala

217

// Single kill switch

218

val (killSwitch, done) = Source(1 to 1000)

219

.viaMat(KillSwitches.single)(Keep.right)

220

.toMat(Sink.foreach(println))(Keep.both)

221

.run()

222

223

// Kill after 5 seconds

224

system.scheduler.scheduleOnce(5.seconds) {

225

killSwitch.shutdown()

226

}

227

228

// Shared kill switch for multiple streams

229

val sharedKillSwitch = KillSwitches.shared("my-streams")

230

231

val stream1 = Source.repeat("stream1")

232

.via(sharedKillSwitch.flow)

233

.runWith(Sink.foreach(println))

234

235

val stream2 = Source.repeat("stream2")

236

.via(sharedKillSwitch.flow)

237

.runWith(Sink.foreach(println))

238

239

// Kill both streams

240

sharedKillSwitch.shutdown()

241

242

// Abort with error

243

sharedKillSwitch.abort(new RuntimeException("Emergency stop"))

244

```

245

246

### Stream Lifecycle Management

247

248

Operations for managing stream startup, completion, and cleanup.

249

250

```scala { .api }

251

/**

252

* Execute initialization logic when stream starts

253

* @param callback Function to execute on stream start

254

* @return Stream that executes callback on start

255

*/

256

def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]

257

258

/**

259

* Execute cleanup logic when stream completes

260

* @param onComplete Function to execute on completion

261

* @param onFailure Function to execute on failure

262

* @return Stream that executes cleanup callbacks

263

*/

264

def watchTermination[Mat2](f: (Mat, Future[Done]) => Mat2): Source[Out, Mat2]

265

266

/**

267

* Add a finalizer that runs when stream terminates

268

* @param finalizer Function to execute on termination (success or failure)

269

* @return Stream with finalizer attached

270

*/

271

def finalizeWith[U >: Out](finalizer: () => Future[Done]): Source[U, Mat]

272

273

/**

274

* Keep the stream alive even when there are no downstream subscribers

275

* @return Stream that doesn't cancel when downstream cancels

276

*/

277

def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () => U): Source[U, Mat]

278

```

279

280

**Usage Examples:**

281

282

```scala

283

// Lifecycle management

284

Source(1 to 10)

285

.watchTermination() { (mat, done) =>

286

println("Stream started")

287

done.onComplete { result =>

288

println(s"Stream finished: $result")

289

// Cleanup resources

290

}

291

mat

292

}

293

.runWith(Sink.seq)

294

295

// Stream with finalizer

296

val resourceStream = Source.fromIterator(() => expensiveResourceIterator())

297

.finalizeWith(() => Future {

298

cleanupExpensiveResource()

299

Done

300

})

301

.runWith(Sink.seq)

302

303

// Keep alive stream

304

Source.maybe[String] // Never emits unless completed externally

305

.keepAlive(30.seconds, () => "heartbeat")

306

.runWith(Sink.foreach(println))

307

```

308

309

### Detaching and Isolation

310

311

Operations for decoupling stream segments and managing boundaries.

312

313

```scala { .api }

314

/**

315

* Create an async boundary that decouples upstream and downstream processing

316

* @param bufferSize Size of the async buffer

317

* @return Stream with async boundary

318

*/

319

def async(bufferSize: Int = 16): Source[Out, Mat]

320

321

/**

322

* Detach upstream from downstream, allowing independent lifecycle

323

* @return Stream that can run independently of upstream cancellation

324

*/

325

def detach: Source[Out, Mat]

326

327

/**

328

* Add an explicit async boundary with custom attributes

329

* @param attrs Attributes to apply to the async boundary

330

* @return Stream with custom async boundary

331

*/

332

def asyncBoundary(attrs: Attributes = Attributes.none): Source[Out, Mat]

333

```

334

335

**Usage Examples:**

336

337

```scala

338

// Async boundaries for parallelism

339

Source(1 to 100)

340

.map(slowComputation1)

341

.async() // Process this stage independently

342

.map(slowComputation2)

343

.async() // And this stage independently

344

.runWith(Sink.seq)

345

346

// Detach for independent processing

347

val detachedStream = Source.repeat("data")

348

.take(100)

349

.detach // Continue processing even if downstream cancels

350

.map(processData)

351

.runWith(Sink.ignore)

352

```

353

354

### Dynamic Stream Control

355

356

Advanced operations for dynamic stream behavior modification.

357

358

```scala { .api }

359

/**

360

* Switch to a new source dynamically based on materialized value

361

* @param f Function that takes materialized value and returns new source

362

* @return Stream that can switch sources dynamically

363

*/

364

def flatMapPrefix[Out2, Mat2](n: Int)(f: immutable.Seq[Out] => Flow[Out, Out2, Mat2]): Source[Out2, Mat]

365

366

/**

367

* Dynamically change processing based on stream state

368

* @param decision Function that determines when to switch behavior

369

* @param left Processing for left choice

370

* @param right Processing for right choice

371

* @return Stream with dynamic behavior switching

372

*/

373

def divertTo[Out2](to: Graph[SinkShape[Out], _], when: Out => Boolean): Source[Out, Mat]

374

```

375

376

**Usage Examples:**

377

378

```scala

379

// Dynamic processing based on initial elements

380

Source(1 to 100)

381

.flatMapPrefix(5) { initialElements =>

382

if (initialElements.forall(_ > 0)) {

383

Flow[Int].map(_ * 2) // Positive processing

384

} else {

385

Flow[Int].map(_.abs) // Negative processing

386

}

387

}

388

.runWith(Sink.seq)

389

390

// Divert elements based on condition

391

Source(1 to 10)

392

.divertTo(Sink.foreach(n => println(s"Even: $n")), _ % 2 == 0)

393

.runWith(Sink.foreach(n => println(s"Odd: $n")))

394

```

395

396

## Types

397

398

```scala { .api }

399

// Flow control strategies

400

sealed abstract class OverflowStrategy

401

sealed abstract class ThrottleMode

402

sealed abstract class DelayOverflowStrategy

403

404

// Kill switch interfaces

405

trait KillSwitch {

406

def shutdown(): Unit

407

def abort(ex: Throwable): Unit

408

}

409

410

trait UniqueKillSwitch extends KillSwitch

411

abstract class SharedKillSwitch extends KillSwitch {

412

def flow[T]: Flow[T, T, NotUsed]

413

}

414

415

// Delay strategies

416

trait DelayStrategy[T] {

417

def nextDelay(elem: T): FiniteDuration

418

}

419

420

object DelayStrategy {

421

def fixedDelay[T](delay: FiniteDuration): DelayStrategy[T]

422

def linearIncreasingDelay[T](initialDelay: FiniteDuration, increment: FiniteDuration): DelayStrategy[T]

423

}

424

```