0
# Control Flow and Lifecycle
1
2
Stream lifecycle management with KillSwitch, StreamRefs for distribution, and queue integration.
3
4
## KillSwitch - Stream Termination Control
5
6
### Base KillSwitch Interface
7
8
```scala { .api }
9
trait KillSwitch {
10
def shutdown(): Unit // Graceful shutdown
11
def abort(ex: Throwable): Unit // Abort with error
12
}
13
```
14
15
### UniqueKillSwitch - Single Stream Control
16
17
```scala { .api }
18
final class UniqueKillSwitch private[stream] extends KillSwitch
19
```
20
21
### SharedKillSwitch - Multi-Stream Control
22
23
```scala { .api }
24
final class SharedKillSwitch private[stream] extends KillSwitch {
25
val name: String
26
def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch]
27
}
28
```
29
30
### KillSwitches Factory
31
32
```scala { .api }
33
object KillSwitches {
34
def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch]
35
def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]
36
def shared(name: String): SharedKillSwitch
37
}
38
```
39
40
### Usage Examples
41
42
**Unique KillSwitch:**
43
```scala
44
import akka.stream.scaladsl.{Source, Sink, Keep, KillSwitches}
45
import scala.concurrent.duration._
46
47
// Single stream with kill switch
48
val (killSwitch, done) = Source.tick(1.second, 1.second, "ping")
49
.viaMat(KillSwitches.single)(Keep.right)
50
.toMat(Sink.foreach(println))(Keep.both)
51
.run()
52
53
// Later, terminate the stream gracefully
54
scala.concurrent.Future {
55
Thread.sleep(5000)
56
killSwitch.shutdown()
57
}
58
59
// Or abort with error
60
killSwitch.abort(new RuntimeException("User requested abort"))
61
```
62
63
**Shared KillSwitch:**
64
```scala
65
// Create shared kill switch for multiple streams
66
val sharedKillSwitch = KillSwitches.shared("my-streams")
67
68
// Use in multiple streams
69
val stream1 = Source.tick(1.second, 1.second, "stream1")
70
.via(sharedKillSwitch.flow)
71
.runWith(Sink.foreach(println))
72
73
val stream2 = Source.tick(2.second, 2.second, "stream2")
74
.via(sharedKillSwitch.flow)
75
.runWith(Sink.foreach(println))
76
77
// Shutdown all streams using the shared kill switch
78
sharedKillSwitch.shutdown()
79
```
80
81
## StreamRefs - Distributed Streaming
82
83
### SourceRef - Remote Source Reference
84
85
```scala { .api }
86
trait SourceRef[T] {
87
def source: Source[T, NotUsed]
88
def getSource: javadsl.Source[T, NotUsed] // Java API
89
}
90
```
91
92
### SinkRef - Remote Sink Reference
93
94
```scala { .api }
95
trait SinkRef[In] {
96
def sink(): Sink[In, NotUsed]
97
def getSink(): javadsl.Sink[In, NotUsed] // Java API
98
}
99
```
100
101
### StreamRefs Factory
102
103
```scala { .api }
104
object StreamRefs {
105
def sourceRef[T](): Sink[T, Future[SourceRef[T]]]
106
def sinkRef[T](): Source[SinkRef[T], NotUsed]
107
}
108
```
109
110
### Usage Examples
111
112
**Creating and Using SourceRef:**
113
```scala
114
import akka.stream.scaladsl.StreamRefs
115
116
// Create a SourceRef for remote consumption
117
val sourceRefSink: Sink[String, Future[SourceRef[String]]] = StreamRefs.sourceRef()
118
119
val (sourceRef: Future[SourceRef[String]]) = Source(List("hello", "world"))
120
.runWith(sourceRefSink)
121
122
// Use the SourceRef elsewhere (possibly remote)
123
sourceRef.foreach { ref =>
124
ref.source
125
.runWith(Sink.foreach(println))
126
}
127
```
128
129
**Creating and Using SinkRef:**
130
```scala
131
// Create a SinkRef for remote production
132
val sinkRefSource: Source[SinkRef[String], NotUsed] = StreamRefs.sinkRef()
133
134
sinkRefSource.runWith(Sink.head).foreach { sinkRef =>
135
// Use the SinkRef to send data (possibly from remote)
136
Source(List("data1", "data2", "data3"))
137
.runWith(sinkRef.sink())
138
}
139
```
140
141
## Queue Integration
142
143
### SourceQueue - Dynamic Element Injection
144
145
```scala { .api }
146
trait SourceQueueWithComplete[T] {
147
def offer(elem: T): Future[QueueOfferResult]
148
def complete(): Unit
149
def fail(ex: Throwable): Unit
150
def watchCompletion(): Future[Done]
151
}
152
```
153
154
### SinkQueue - Dynamic Element Extraction
155
156
```scala { .api }
157
trait SinkQueueWithCancel[T] {
158
def pull(): Future[Option[T]]
159
def cancel(): Unit
160
}
161
```
162
163
### QueueOfferResult Types
164
165
```scala { .api }
166
sealed abstract class QueueOfferResult
167
168
object QueueOfferResult {
169
case object Enqueued extends QueueOfferResult
170
case object Dropped extends QueueOfferResult
171
case class Failure(cause: Throwable) extends QueueOfferResult
172
case object QueueClosed extends QueueOfferResult
173
}
174
```
175
176
### Usage Examples
177
178
**SourceQueue Usage:**
179
```scala
180
import akka.stream.OverflowStrategy
181
import akka.stream.scaladsl.{Source, Sink}
182
183
// Create source with queue
184
val (queue, done) = Source.queue[String](10, OverflowStrategy.backpressure)
185
.toMat(Sink.foreach(println))(Keep.both)
186
.run()
187
188
// Offer elements dynamically
189
queue.offer("hello").foreach { result =>
190
result match {
191
case QueueOfferResult.Enqueued => println("Enqueued successfully")
192
case QueueOfferResult.Dropped => println("Element was dropped")
193
case QueueOfferResult.QueueClosed => println("Queue is closed")
194
case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")
195
}
196
}
197
198
queue.offer("world")
199
queue.complete() // Signal completion
200
```
201
202
**SinkQueue Usage:**
203
```scala
204
// Create sink with queue
205
val queue = Source(1 to 100)
206
.runWith(Sink.queue())
207
208
// Pull elements dynamically
209
def pullNext(): Unit = {
210
queue.pull().foreach {
211
case Some(element) =>
212
println(s"Pulled: $element")
213
pullNext() // Continue pulling
214
case None =>
215
println("Stream completed")
216
}
217
}
218
219
pullNext()
220
```
221
222
## Completion Strategies
223
224
### CompletionStrategy Types
225
226
```scala { .api }
227
sealed trait CompletionStrategy
228
229
object CompletionStrategy {
230
case object Immediately extends CompletionStrategy
231
case object Draining extends CompletionStrategy
232
}
233
```
234
235
**Usage Example:**
236
```scala
237
val (actorRef, done) = Source.actorRef[String](
238
completionMatcher = {
239
case "complete" => CompletionStrategy.Immediately
240
},
241
failureMatcher = {
242
case "fail" => new RuntimeException("Actor requested failure")
243
},
244
bufferSize = 10,
245
overflowStrategy = OverflowStrategy.dropHead
246
).toMat(Sink.foreach(println))(Keep.both).run()
247
248
// Control completion via actor messages
249
actorRef ! "hello"
250
actorRef ! "world"
251
actorRef ! "complete" // Triggers completion
252
```
253
254
## Stream Monitoring and Lifecycle
255
256
### Flow Monitoring
257
258
```scala { .api }
259
trait FlowMonitor[T] {
260
def state: Future[StreamState]
261
}
262
263
sealed trait StreamState
264
case object Initializing extends StreamState
265
case object Running extends StreamState
266
case object Completed extends StreamState
267
case class Failed(cause: Throwable) extends StreamState
268
```
269
270
### Lifecycle Hooks
271
272
```scala
273
val monitoredStream = Source(1 to 10)
274
.monitor() { (_, monitor) =>
275
monitor.state.foreach { state =>
276
println(s"Stream state changed to: $state")
277
}
278
}
279
.watchTermination() { (_, done) =>
280
done.onComplete {
281
case Success(_) => println("Stream completed successfully")
282
case Failure(ex) => println(s"Stream failed: $ex")
283
}
284
}
285
.runWith(Sink.ignore)
286
```
287
288
## Timeouts and Keep-Alive
289
290
### Timeout Operations
291
292
```scala { .api }
293
trait FlowOps[+Out, +Mat] {
294
def idleTimeout(timeout: FiniteDuration): Repr[Out]
295
def completionTimeout(timeout: FiniteDuration): Repr[Out]
296
def backpressureTimeout(timeout: FiniteDuration): Repr[Out]
297
def keepAlive(maxIdle: FiniteDuration, injectedElem: () => Out): Repr[Out]
298
}
299
```
300
301
**Usage Examples:**
302
```scala
303
import scala.concurrent.duration._
304
305
// Timeout if no elements received within 30 seconds
306
val withIdleTimeout = Source.tick(10.seconds, 10.seconds, "ping")
307
.idleTimeout(30.seconds)
308
.runWith(Sink.foreach(println))
309
310
// Keep alive by injecting heartbeat elements
311
val keepAliveStream = Source.tick(5.seconds, 5.seconds, "data")
312
.keepAlive(2.seconds, () => "heartbeat")
313
.runWith(Sink.foreach(println))
314
315
// Timeout on overall completion
316
val completionTimeoutStream = Source(1 to 1000)
317
.throttle(1, 1.second)
318
.completionTimeout(10.seconds) // Fail if not completed in 10 seconds
319
.runWith(Sink.seq)
320
```
321
322
## Resource Management
323
324
### Resource Cleanup Patterns
325
326
```scala
327
import scala.util.{Success, Failure}
328
329
// Proper resource cleanup with monitoring
330
def createManagedStream[T](resource: => AutoCloseable)(
331
streamFactory: AutoCloseable => Source[T, NotUsed]
332
): Source[T, NotUsed] = {
333
334
Source.fromGraph(GraphDSL.create() { implicit builder =>
335
val src = Source.lazySource { () =>
336
val res = resource
337
streamFactory(res)
338
.watchTermination() { (_, done) =>
339
done.onComplete {
340
case Success(_) => res.close()
341
case Failure(_) => res.close()
342
}
343
NotUsed
344
}
345
}
346
347
val shape = builder.add(src)
348
SourceShape(shape.out)
349
})
350
}
351
352
// Usage
353
val managedFileStream = createManagedStream {
354
new FileInputStream("data.txt")
355
} { inputStream =>
356
StreamConverters.fromInputStream(() => inputStream)
357
.via(Framing.delimiter(ByteString("\n"), 1024))
358
.map(_.utf8String)
359
}
360
```
361
362
This control flow system provides comprehensive lifecycle management while maintaining the reactive streams semantics and backpressure throughout the stream processing pipeline.