or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

control-flow.mdcore-components.mderror-handling.mdgraph-building.mdindex.mdio-integration.mdjunction-operations.mdmaterialization.mdstream-operations.md

control-flow.mddocs/

0

# Control Flow and Lifecycle

1

2

Stream lifecycle management with KillSwitch, StreamRefs for distribution, and queue integration.

3

4

## KillSwitch - Stream Termination Control

5

6

### Base KillSwitch Interface

7

8

```scala { .api }

9

trait KillSwitch {

10

def shutdown(): Unit // Graceful shutdown

11

def abort(ex: Throwable): Unit // Abort with error

12

}

13

```

14

15

### UniqueKillSwitch - Single Stream Control

16

17

```scala { .api }

18

final class UniqueKillSwitch private[stream] extends KillSwitch

19

```

20

21

### SharedKillSwitch - Multi-Stream Control

22

23

```scala { .api }

24

final class SharedKillSwitch private[stream] extends KillSwitch {

25

val name: String

26

def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch]

27

}

28

```

29

30

### KillSwitches Factory

31

32

```scala { .api }

33

object KillSwitches {

34

def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch]

35

def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]

36

def shared(name: String): SharedKillSwitch

37

}

38

```

39

40

### Usage Examples

41

42

**Unique KillSwitch:**

43

```scala

44

import akka.stream.scaladsl.{Source, Sink, Keep, KillSwitches}

45

import scala.concurrent.duration._

46

47

// Single stream with kill switch

48

val (killSwitch, done) = Source.tick(1.second, 1.second, "ping")

49

.viaMat(KillSwitches.single)(Keep.right)

50

.toMat(Sink.foreach(println))(Keep.both)

51

.run()

52

53

// Later, terminate the stream gracefully

54

scala.concurrent.Future {

55

Thread.sleep(5000)

56

killSwitch.shutdown()

57

}

58

59

// Or abort with error

60

killSwitch.abort(new RuntimeException("User requested abort"))

61

```

62

63

**Shared KillSwitch:**

64

```scala

65

// Create shared kill switch for multiple streams

66

val sharedKillSwitch = KillSwitches.shared("my-streams")

67

68

// Use in multiple streams

69

val stream1 = Source.tick(1.second, 1.second, "stream1")

70

.via(sharedKillSwitch.flow)

71

.runWith(Sink.foreach(println))

72

73

val stream2 = Source.tick(2.second, 2.second, "stream2")

74

.via(sharedKillSwitch.flow)

75

.runWith(Sink.foreach(println))

76

77

// Shutdown all streams using the shared kill switch

78

sharedKillSwitch.shutdown()

79

```

80

81

## StreamRefs - Distributed Streaming

82

83

### SourceRef - Remote Source Reference

84

85

```scala { .api }

86

trait SourceRef[T] {

87

def source: Source[T, NotUsed]

88

def getSource: javadsl.Source[T, NotUsed] // Java API

89

}

90

```

91

92

### SinkRef - Remote Sink Reference

93

94

```scala { .api }

95

trait SinkRef[In] {

96

def sink(): Sink[In, NotUsed]

97

def getSink(): javadsl.Sink[In, NotUsed] // Java API

98

}

99

```

100

101

### StreamRefs Factory

102

103

```scala { .api }

104

object StreamRefs {

105

def sourceRef[T](): Sink[T, Future[SourceRef[T]]]

106

def sinkRef[T](): Source[SinkRef[T], NotUsed]

107

}

108

```

109

110

### Usage Examples

111

112

**Creating and Using SourceRef:**

113

```scala

114

import akka.stream.scaladsl.StreamRefs

115

116

// Create a SourceRef for remote consumption

117

val sourceRefSink: Sink[String, Future[SourceRef[String]]] = StreamRefs.sourceRef()

118

119

val (sourceRef: Future[SourceRef[String]]) = Source(List("hello", "world"))

120

.runWith(sourceRefSink)

121

122

// Use the SourceRef elsewhere (possibly remote)

123

sourceRef.foreach { ref =>

124

ref.source

125

.runWith(Sink.foreach(println))

126

}

127

```

128

129

**Creating and Using SinkRef:**

130

```scala

131

// Create a SinkRef for remote production

132

val sinkRefSource: Source[SinkRef[String], NotUsed] = StreamRefs.sinkRef()

133

134

sinkRefSource.runWith(Sink.head).foreach { sinkRef =>

135

// Use the SinkRef to send data (possibly from remote)

136

Source(List("data1", "data2", "data3"))

137

.runWith(sinkRef.sink())

138

}

139

```

140

141

## Queue Integration

142

143

### SourceQueue - Dynamic Element Injection

144

145

```scala { .api }

146

trait SourceQueueWithComplete[T] {

147

def offer(elem: T): Future[QueueOfferResult]

148

def complete(): Unit

149

def fail(ex: Throwable): Unit

150

def watchCompletion(): Future[Done]

151

}

152

```

153

154

### SinkQueue - Dynamic Element Extraction

155

156

```scala { .api }

157

trait SinkQueueWithCancel[T] {

158

def pull(): Future[Option[T]]

159

def cancel(): Unit

160

}

161

```

162

163

### QueueOfferResult Types

164

165

```scala { .api }

166

sealed abstract class QueueOfferResult

167

168

object QueueOfferResult {

169

case object Enqueued extends QueueOfferResult

170

case object Dropped extends QueueOfferResult

171

case class Failure(cause: Throwable) extends QueueOfferResult

172

case object QueueClosed extends QueueOfferResult

173

}

174

```

175

176

### Usage Examples

177

178

**SourceQueue Usage:**

179

```scala

180

import akka.stream.OverflowStrategy

181

import akka.stream.scaladsl.{Source, Sink}

182

183

// Create source with queue

184

val (queue, done) = Source.queue[String](10, OverflowStrategy.backpressure)

185

.toMat(Sink.foreach(println))(Keep.both)

186

.run()

187

188

// Offer elements dynamically

189

queue.offer("hello").foreach { result =>

190

result match {

191

case QueueOfferResult.Enqueued => println("Enqueued successfully")

192

case QueueOfferResult.Dropped => println("Element was dropped")

193

case QueueOfferResult.QueueClosed => println("Queue is closed")

194

case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")

195

}

196

}

197

198

queue.offer("world")

199

queue.complete() // Signal completion

200

```

201

202

**SinkQueue Usage:**

203

```scala

204

// Create sink with queue

205

val queue = Source(1 to 100)

206

.runWith(Sink.queue())

207

208

// Pull elements dynamically

209

def pullNext(): Unit = {

210

queue.pull().foreach {

211

case Some(element) =>

212

println(s"Pulled: $element")

213

pullNext() // Continue pulling

214

case None =>

215

println("Stream completed")

216

}

217

}

218

219

pullNext()

220

```

221

222

## Completion Strategies

223

224

### CompletionStrategy Types

225

226

```scala { .api }

227

sealed trait CompletionStrategy

228

229

object CompletionStrategy {

230

case object Immediately extends CompletionStrategy

231

case object Draining extends CompletionStrategy

232

}

233

```

234

235

**Usage Example:**

236

```scala

237

val (actorRef, done) = Source.actorRef[String](

238

completionMatcher = {

239

case "complete" => CompletionStrategy.Immediately

240

},

241

failureMatcher = {

242

case "fail" => new RuntimeException("Actor requested failure")

243

},

244

bufferSize = 10,

245

overflowStrategy = OverflowStrategy.dropHead

246

).toMat(Sink.foreach(println))(Keep.both).run()

247

248

// Control completion via actor messages

249

actorRef ! "hello"

250

actorRef ! "world"

251

actorRef ! "complete" // Triggers completion

252

```

253

254

## Stream Monitoring and Lifecycle

255

256

### Flow Monitoring

257

258

```scala { .api }

259

trait FlowMonitor[T] {

260

def state: Future[StreamState]

261

}

262

263

sealed trait StreamState

264

case object Initializing extends StreamState

265

case object Running extends StreamState

266

case object Completed extends StreamState

267

case class Failed(cause: Throwable) extends StreamState

268

```

269

270

### Lifecycle Hooks

271

272

```scala

273

val monitoredStream = Source(1 to 10)

274

.monitor() { (_, monitor) =>

275

monitor.state.foreach { state =>

276

println(s"Stream state changed to: $state")

277

}

278

}

279

.watchTermination() { (_, done) =>

280

done.onComplete {

281

case Success(_) => println("Stream completed successfully")

282

case Failure(ex) => println(s"Stream failed: $ex")

283

}

284

}

285

.runWith(Sink.ignore)

286

```

287

288

## Timeouts and Keep-Alive

289

290

### Timeout Operations

291

292

```scala { .api }

293

trait FlowOps[+Out, +Mat] {

294

def idleTimeout(timeout: FiniteDuration): Repr[Out]

295

def completionTimeout(timeout: FiniteDuration): Repr[Out]

296

def backpressureTimeout(timeout: FiniteDuration): Repr[Out]

297

def keepAlive(maxIdle: FiniteDuration, injectedElem: () => Out): Repr[Out]

298

}

299

```

300

301

**Usage Examples:**

302

```scala

303

import scala.concurrent.duration._

304

305

// Timeout if no elements received within 30 seconds

306

val withIdleTimeout = Source.tick(10.seconds, 10.seconds, "ping")

307

.idleTimeout(30.seconds)

308

.runWith(Sink.foreach(println))

309

310

// Keep alive by injecting heartbeat elements

311

val keepAliveStream = Source.tick(5.seconds, 5.seconds, "data")

312

.keepAlive(2.seconds, () => "heartbeat")

313

.runWith(Sink.foreach(println))

314

315

// Timeout on overall completion

316

val completionTimeoutStream = Source(1 to 1000)

317

.throttle(1, 1.second)

318

.completionTimeout(10.seconds) // Fail if not completed in 10 seconds

319

.runWith(Sink.seq)

320

```

321

322

## Resource Management

323

324

### Resource Cleanup Patterns

325

326

```scala

327

import scala.util.{Success, Failure}

328

329

// Proper resource cleanup with monitoring

330

def createManagedStream[T](resource: => AutoCloseable)(

331

streamFactory: AutoCloseable => Source[T, NotUsed]

332

): Source[T, NotUsed] = {

333

334

Source.fromGraph(GraphDSL.create() { implicit builder =>

335

val src = Source.lazySource { () =>

336

val res = resource

337

streamFactory(res)

338

.watchTermination() { (_, done) =>

339

done.onComplete {

340

case Success(_) => res.close()

341

case Failure(_) => res.close()

342

}

343

NotUsed

344

}

345

}

346

347

val shape = builder.add(src)

348

SourceShape(shape.out)

349

})

350

}

351

352

// Usage

353

val managedFileStream = createManagedStream {

354

new FileInputStream("data.txt")

355

} { inputStream =>

356

StreamConverters.fromInputStream(() => inputStream)

357

.via(Framing.delimiter(ByteString("\n"), 1024))

358

.map(_.utf8String)

359

}

360

```

361

362

This control flow system provides comprehensive lifecycle management while maintaining the reactive streams semantics and backpressure throughout the stream processing pipeline.