0
# Stream Control and Lifecycle
1
2
Mechanisms for controlling stream lifecycle, implementing backpressure, rate limiting, and external stream termination. This includes flow control, buffering strategies, and dynamic stream management capabilities.
3
4
## Capabilities
5
6
### Buffering and Flow Control
7
8
Operations for controlling element flow and managing backpressure.
9
10
```scala { .api }
11
/**
12
* Add a buffer with overflow strategy
13
* @param size Buffer size
14
* @param overflowStrategy Strategy when buffer is full
15
* @return Stream with buffering applied
16
*/
17
def buffer(size: Int, overflowStrategy: OverflowStrategy): Source[Out, Mat]
18
19
/**
20
* Async buffer that decouples upstream and downstream processing
21
* @param size Buffer size
22
* @param overflowStrategy Strategy when buffer is full
23
* @return Stream with async buffering
24
*/
25
def async(bufferSize: Int = 16, overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure): Source[Out, Mat]
26
27
/**
28
* Conflate elements when downstream is slower than upstream
29
* @param seed Function to create initial aggregate from first element
30
* @param aggregate Function to combine aggregate with new element
31
* @return Stream that conflates elements under backpressure
32
*/
33
def conflateWithSeed[S](seed: Out => S)(aggregate: (S, Out) => S): Source[S, Mat]
34
35
/**
36
* Expand elements when upstream is slower than downstream
37
* @param extrapolate Function to generate additional elements
38
* @return Stream that expands elements when needed
39
*/
40
def expand[U >: Out](extrapolate: Out => Iterator[U]): Source[U, Mat]
41
42
sealed abstract class OverflowStrategy
43
object OverflowStrategy {
44
case object DropHead extends OverflowStrategy // Drop oldest elements
45
case object DropTail extends OverflowStrategy // Drop newest elements
46
case object DropBuffer extends OverflowStrategy // Drop entire buffer
47
case object DropNew extends OverflowStrategy // Drop incoming elements
48
case object Backpressure extends OverflowStrategy // Apply backpressure
49
case object Fail extends OverflowStrategy // Fail the stream
50
}
51
```
52
53
**Usage Examples:**
54
55
```scala
56
import akka.stream.OverflowStrategy
57
58
// Buffer with backpressure
59
Source(1 to 100)
60
.buffer(10, OverflowStrategy.backpressure)
61
.map(expensiveOperation)
62
.runWith(Sink.seq)
63
64
// Drop elements when buffer full
65
Source.tick(10.millis, 10.millis, "tick")
66
.buffer(5, OverflowStrategy.dropHead)
67
.runWith(Sink.foreach(println))
68
69
// Conflate under backpressure
70
Source(1 to 1000)
71
.conflateWithSeed(identity)(_ + _) // Sum elements when backpressured
72
.runWith(Sink.foreach(println))
73
74
// Expand when upstream slow
75
Source(List(1, 2, 3))
76
.expand(n => Iterator.continually(n)) // Repeat each element
77
.take(10)
78
.runWith(Sink.seq)
79
```
80
81
### Rate Limiting and Throttling
82
83
Operations for controlling the rate of element emission.
84
85
```scala { .api }
86
/**
87
* Throttle the stream to a specific rate
88
* @param elements Number of elements per time period
89
* @param per Time period duration
90
* @param maximumBurst Maximum burst size
91
* @param mode Throttling mode (shaping or enforcing)
92
* @return Stream with rate limiting applied
93
*/
94
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Source[Out, Mat]
95
96
/**
97
* Delay each element by a fixed duration
98
* @param delay Duration to delay each element
99
* @param strategy Strategy for handling delayed elements
100
* @return Stream with delayed elements
101
*/
102
def delay(delay: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.FixedDelay): Source[Out, Mat]
103
104
/**
105
* Delay elements using a custom strategy
106
* @param delayStrategySupplier Function that provides delay strategy
107
* @return Stream with custom delay applied
108
*/
109
def delayWith(delayStrategySupplier: () => DelayStrategy[Out], overFlowStrategy: DelayOverflowStrategy = DelayOverflowStrategy.FixedDelay): Source[Out, Mat]
110
111
sealed abstract class ThrottleMode
112
object ThrottleMode {
113
case object Shaping extends ThrottleMode // Smooth out bursts
114
case object Enforcing extends ThrottleMode // Fail on rate violations
115
}
116
117
sealed abstract class DelayOverflowStrategy
118
object DelayOverflowStrategy {
119
case object EmitEarly extends DelayOverflowStrategy // Emit early under backpressure
120
case object DropHead extends DelayOverflowStrategy // Drop oldest delayed elements
121
case object DropTail extends DelayOverflowStrategy // Drop newest delayed elements
122
case object DropBuffer extends DelayOverflowStrategy // Drop all delayed elements
123
case object DropNew extends DelayOverflowStrategy // Drop new elements
124
case object Backpressure extends DelayOverflowStrategy // Apply backpressure
125
case object Fail extends DelayOverflowStrategy // Fail the stream
126
case object FixedDelay extends DelayOverflowStrategy // Fixed delay regardless
127
}
128
```
129
130
**Usage Examples:**
131
132
```scala
133
import scala.concurrent.duration._
134
135
// Rate limiting
136
Source(1 to 100)
137
.throttle(10, 1.second, 5, ThrottleMode.shaping) // 10 elements/second, burst of 5
138
.runWith(Sink.foreach(println))
139
140
// Fixed delay
141
Source(List("a", "b", "c"))
142
.delay(500.millis) // Delay each element by 500ms
143
.runWith(Sink.foreach(println))
144
145
// Custom delay strategy
146
val increasingDelay = DelayStrategy.linearIncreasingDelay(100.millis, 50.millis)
147
Source(1 to 10)
148
.delayWith(() => increasingDelay)
149
.runWith(Sink.foreach(println))
150
```
151
152
### Kill Switches
153
154
External controls for terminating streams.
155
156
```scala { .api }
157
/**
158
* Control interface for terminating streams
159
*/
160
trait KillSwitch {
161
/**
162
* Gracefully shutdown the stream
163
*/
164
def shutdown(): Unit
165
166
/**
167
* Abort the stream with an error
168
* @param ex Exception to fail the stream with
169
*/
170
def abort(ex: Throwable): Unit
171
}
172
173
/**
174
* Shared control for terminating multiple streams
175
*/
176
abstract class SharedKillSwitch extends KillSwitch {
177
/**
178
* Create a flow that can be killed by this switch
179
* @return Flow that respects this kill switch
180
*/
181
def flow[T]: Flow[T, T, NotUsed]
182
}
183
184
/**
185
* Factory for creating kill switches
186
*/
187
object KillSwitches {
188
/**
189
* Create a single-use kill switch
190
* @return Flow that materializes to a KillSwitch
191
*/
192
def single[T]: Flow[T, T, UniqueKillSwitch]
193
194
/**
195
* Create a shared kill switch for multiple streams
196
* @param name Name for the kill switch (for debugging)
197
* @return Shared kill switch that can control multiple streams
198
*/
199
def shared(name: String): SharedKillSwitch
200
201
/**
202
* Create a bidirectional kill switch
203
* @return BidiFlow that materializes to a KillSwitch
204
*/
205
def singleBidi[T1, T2]: BidiFlow[T1, T1, T2, T2, UniqueKillSwitch]
206
}
207
208
/**
209
* Kill switch for a single stream
210
*/
211
trait UniqueKillSwitch extends KillSwitch
212
```
213
214
**Usage Examples:**
215
216
```scala
217
// Single kill switch
218
val (killSwitch, done) = Source(1 to 1000)
219
.viaMat(KillSwitches.single)(Keep.right)
220
.toMat(Sink.foreach(println))(Keep.both)
221
.run()
222
223
// Kill after 5 seconds
224
system.scheduler.scheduleOnce(5.seconds) {
225
killSwitch.shutdown()
226
}
227
228
// Shared kill switch for multiple streams
229
val sharedKillSwitch = KillSwitches.shared("my-streams")
230
231
val stream1 = Source.repeat("stream1")
232
.via(sharedKillSwitch.flow)
233
.runWith(Sink.foreach(println))
234
235
val stream2 = Source.repeat("stream2")
236
.via(sharedKillSwitch.flow)
237
.runWith(Sink.foreach(println))
238
239
// Kill both streams
240
sharedKillSwitch.shutdown()
241
242
// Abort with error
243
sharedKillSwitch.abort(new RuntimeException("Emergency stop"))
244
```
245
246
### Stream Lifecycle Management
247
248
Operations for managing stream startup, completion, and cleanup.
249
250
```scala { .api }
251
/**
252
* Execute initialization logic when stream starts
253
* @param callback Function to execute on stream start
254
* @return Stream that executes callback on start
255
*/
256
def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]
257
258
/**
259
* Execute cleanup logic when stream completes
260
* @param onComplete Function to execute on completion
261
* @param onFailure Function to execute on failure
262
* @return Stream that executes cleanup callbacks
263
*/
264
def watchTermination[Mat2](f: (Mat, Future[Done]) => Mat2): Source[Out, Mat2]
265
266
/**
267
* Add a finalizer that runs when stream terminates
268
* @param finalizer Function to execute on termination (success or failure)
269
* @return Stream with finalizer attached
270
*/
271
def finalizeWith[U >: Out](finalizer: () => Future[Done]): Source[U, Mat]
272
273
/**
274
* Keep the stream alive even when there are no downstream subscribers
275
* @return Stream that doesn't cancel when downstream cancels
276
*/
277
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () => U): Source[U, Mat]
278
```
279
280
**Usage Examples:**
281
282
```scala
283
// Lifecycle management
284
Source(1 to 10)
285
.watchTermination() { (mat, done) =>
286
println("Stream started")
287
done.onComplete { result =>
288
println(s"Stream finished: $result")
289
// Cleanup resources
290
}
291
mat
292
}
293
.runWith(Sink.seq)
294
295
// Stream with finalizer
296
val resourceStream = Source.fromIterator(() => expensiveResourceIterator())
297
.finalizeWith(() => Future {
298
cleanupExpensiveResource()
299
Done
300
})
301
.runWith(Sink.seq)
302
303
// Keep alive stream
304
Source.maybe[String] // Never emits unless completed externally
305
.keepAlive(30.seconds, () => "heartbeat")
306
.runWith(Sink.foreach(println))
307
```
308
309
### Detaching and Isolation
310
311
Operations for decoupling stream segments and managing boundaries.
312
313
```scala { .api }
314
/**
315
* Create an async boundary that decouples upstream and downstream processing
316
* @param bufferSize Size of the async buffer
317
* @return Stream with async boundary
318
*/
319
def async(bufferSize: Int = 16): Source[Out, Mat]
320
321
/**
322
* Detach upstream from downstream, allowing independent lifecycle
323
* @return Stream that can run independently of upstream cancellation
324
*/
325
def detach: Source[Out, Mat]
326
327
/**
328
* Add an explicit async boundary with custom attributes
329
* @param attrs Attributes to apply to the async boundary
330
* @return Stream with custom async boundary
331
*/
332
def asyncBoundary(attrs: Attributes = Attributes.none): Source[Out, Mat]
333
```
334
335
**Usage Examples:**
336
337
```scala
338
// Async boundaries for parallelism
339
Source(1 to 100)
340
.map(slowComputation1)
341
.async() // Process this stage independently
342
.map(slowComputation2)
343
.async() // And this stage independently
344
.runWith(Sink.seq)
345
346
// Detach for independent processing
347
val detachedStream = Source.repeat("data")
348
.take(100)
349
.detach // Continue processing even if downstream cancels
350
.map(processData)
351
.runWith(Sink.ignore)
352
```
353
354
### Dynamic Stream Control
355
356
Advanced operations for dynamic stream behavior modification.
357
358
```scala { .api }
359
/**
360
* Switch to a new source dynamically based on materialized value
361
* @param f Function that takes materialized value and returns new source
362
* @return Stream that can switch sources dynamically
363
*/
364
def flatMapPrefix[Out2, Mat2](n: Int)(f: immutable.Seq[Out] => Flow[Out, Out2, Mat2]): Source[Out2, Mat]
365
366
/**
367
* Dynamically change processing based on stream state
368
* @param decision Function that determines when to switch behavior
369
* @param left Processing for left choice
370
* @param right Processing for right choice
371
* @return Stream with dynamic behavior switching
372
*/
373
def divertTo[Out2](to: Graph[SinkShape[Out], _], when: Out => Boolean): Source[Out, Mat]
374
```
375
376
**Usage Examples:**
377
378
```scala
379
// Dynamic processing based on initial elements
380
Source(1 to 100)
381
.flatMapPrefix(5) { initialElements =>
382
if (initialElements.forall(_ > 0)) {
383
Flow[Int].map(_ * 2) // Positive processing
384
} else {
385
Flow[Int].map(_.abs) // Negative processing
386
}
387
}
388
.runWith(Sink.seq)
389
390
// Divert elements based on condition
391
Source(1 to 10)
392
.divertTo(Sink.foreach(n => println(s"Even: $n")), _ % 2 == 0)
393
.runWith(Sink.foreach(n => println(s"Odd: $n")))
394
```
395
396
## Types
397
398
```scala { .api }
399
// Flow control strategies
400
sealed abstract class OverflowStrategy
401
sealed abstract class ThrottleMode
402
sealed abstract class DelayOverflowStrategy
403
404
// Kill switch interfaces
405
trait KillSwitch {
406
def shutdown(): Unit
407
def abort(ex: Throwable): Unit
408
}
409
410
trait UniqueKillSwitch extends KillSwitch
411
abstract class SharedKillSwitch extends KillSwitch {
412
def flow[T]: Flow[T, T, NotUsed]
413
}
414
415
// Delay strategies
416
trait DelayStrategy[T] {
417
def nextDelay(elem: T): FiniteDuration
418
}
419
420
object DelayStrategy {
421
def fixedDelay[T](delay: FiniteDuration): DelayStrategy[T]
422
def linearIncreasingDelay[T](initialDelay: FiniteDuration, increment: FiniteDuration): DelayStrategy[T]
423
}
424
```