or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md

stream-sources.mddocs/

0

# Stream Sources

1

2

Factory methods and utilities for creating stream sources from various data sources including collections, futures, actors, and external systems. Sources are the starting points of stream processing pipelines.

3

4

## Capabilities

5

6

### Collection Sources

7

8

Create sources from various collection types and iterators.

9

10

```scala { .api }

11

object Source {

12

/**

13

* Create a source from an iterable collection

14

* @param iterable The collection to stream

15

* @return Source that emits all elements from the collection

16

*/

17

def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]

18

19

/**

20

* Create a source from an iterator factory function

21

* @param f Function that creates a new iterator each time the source is materialized

22

* @return Source that emits elements from the iterator

23

*/

24

def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed]

25

26

/**

27

* Create a source with a single element

28

* @param element The single element to emit

29

* @return Source that emits the single element then completes

30

*/

31

def single[T](element: T): Source[T, NotUsed]

32

33

/**

34

* Create an empty source that immediately completes

35

* @return Source that emits no elements

36

*/

37

def empty[T]: Source[T, NotUsed]

38

39

/**

40

* Create a source that infinitely repeats a single element

41

* @param element The element to repeat

42

* @return Source that continuously emits the same element

43

*/

44

def repeat[T](element: T): Source[T, NotUsed]

45

46

/**

47

* Create a source that never emits any elements or completes

48

* @return Source that stays active but never produces elements

49

*/

50

def never[T]: Source[T, NotUsed]

51

52

/**

53

* Create a source from a Promise that can be completed externally

54

* @return Source materialized as a Promise for external completion

55

*/

56

def maybe[T]: Source[T, Promise[Option[T]]]

57

}

58

```

59

60

**Usage Examples:**

61

62

```scala

63

import akka.stream.scaladsl.Source

64

65

// From collections

66

val listSource = Source(List(1, 2, 3, 4, 5))

67

val rangeSource = Source(1 to 100)

68

69

// From iterator

70

val randomSource = Source.fromIterator(() => Iterator.continually(scala.util.Random.nextInt(100)))

71

72

// Single element

73

val helloSource = Source.single("Hello, World!")

74

75

// Infinite repetition

76

val tickSource = Source.repeat("tick")

77

```

78

79

### Async and Future Sources

80

81

Create sources from asynchronous computations and future values.

82

83

```scala { .api }

84

/**

85

* Create a source from a Future value

86

* @param futureElement The future that will provide the element

87

* @return Source that emits the future's value when it completes

88

*/

89

def future[T](futureElement: Future[T]): Source[T, NotUsed]

90

91

/**

92

* Create a source from a Future that produces another Source

93

* @param futureSource Future containing a Source

94

* @return Source that emits elements from the future's source

95

*/

96

def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]]

97

98

/**

99

* Create a source that calls an async function to produce elements

100

* @param f Function that returns a Future for each requested element

101

* @return Source that emits elements from the async function

102

*/

103

def unfoldAsync[T, S](seed: S)(f: S => Future[Option[(S, T)]]): Source[T, NotUsed]

104

105

/**

106

* Create a source by unfolding a function synchronously

107

* @param seed Initial state

108

* @param f Function that produces next state and element

109

* @return Source that emits elements by repeatedly applying the function

110

*/

111

def unfold[T, S](seed: S)(f: S => Option[(S, T)]): Source[T, NotUsed]

112

113

/**

114

* Create a source that defers computation until materialization

115

* @param create Function to create single element

116

* @return Source that calls create function when materialized

117

*/

118

def lazySingle[T](create: () => T): Source[T, NotUsed]

119

120

/**

121

* Create a source that defers Future computation until materialization

122

* @param create Function to create Future element

123

* @return Source that calls create function when materialized

124

*/

125

def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed]

126

127

/**

128

* Create a source that defers Source creation until materialization

129

* @param create Function to create another Source

130

* @return Source that calls create function when materialized

131

*/

132

def lazySource[T, M](create: () => Source[T, M]): Source[T, Future[M]]

133

134

/**

135

* Create a source that defers Future Source creation until materialization

136

* @param create Function to create Future Source

137

* @return Source that calls create function when materialized

138

*/

139

def lazyFutureSource[T, M](create: () => Future[Source[T, M]]): Source[T, Future[M]]

140

```

141

142

**Usage Examples:**

143

144

```scala

145

import scala.concurrent.Future

146

import scala.concurrent.ExecutionContext.Implicits.global

147

148

// From Future

149

val futureValue: Future[String] = Future.successful("Hello")

150

val futureSource = Source.future(futureValue)

151

152

// Lazy computation (deferred until materialization)

153

val lazyComputation = Source.lazySingle(() => {

154

println("Computing expensive value...")

155

expensiveComputation()

156

})

157

158

val lazyFutureSource = Source.lazyFuture(() => {

159

Future(fetchDataFromRemoteAPI())

160

})

161

162

// Lazy source creation

163

val lazyStreamSource = Source.lazySource(() => {

164

if (isDataAvailable()) Source(getData()) else Source.empty

165

})

166

167

// Unfold pattern

168

val fibonacciSource = Source.unfold((0, 1)) {

169

case (a, b) => Some(((b, a + b), a))

170

}

171

172

// Async unfold

173

val asyncCounterSource = Source.unfoldAsync(0) { n =>

174

Future {

175

if (n < 10) Some((n + 1, n)) else None

176

}

177

}

178

```

179

180

### Resource-based Sources

181

182

Create sources from external resources that need to be opened, read, and closed.

183

184

```scala { .api }

185

/**

186

* Create a source from a resource that needs lifecycle management

187

* @param create Function to create/open the resource

188

* @param read Function to read from resource, returns None when exhausted

189

* @param close Function to close/cleanup the resource

190

* @return Source that manages resource lifecycle automatically

191

*/

192

def unfoldResource[T, S](

193

create: () => S,

194

read: S => Option[T],

195

close: S => Unit

196

): Source[T, NotUsed]

197

198

/**

199

* Create a source from an async resource that needs lifecycle management

200

* @param create Function to create/open the resource asynchronously

201

* @param read Function to read from resource asynchronously, returns None when exhausted

202

* @param close Function to close/cleanup the resource asynchronously

203

* @return Source that manages async resource lifecycle automatically

204

*/

205

def unfoldResourceAsync[T, S](

206

create: () => Future[S],

207

read: S => Future[Option[T]],

208

close: S => Future[Done]

209

): Source[T, NotUsed]

210

```

211

212

**Usage Examples:**

213

214

```scala

215

import java.io.{FileInputStream, BufferedReader, InputStreamReader}

216

import scala.concurrent.Future

217

import scala.concurrent.ExecutionContext.Implicits.global

218

219

// File reading with resource management

220

val fileSource = Source.unfoldResource(

221

create = () => new BufferedReader(new InputStreamReader(new FileInputStream("data.txt"))),

222

read = reader => Option(reader.readLine()),

223

close = reader => reader.close()

224

)

225

226

// Async database reading

227

val dbSource = Source.unfoldResourceAsync(

228

create = () => Future(openDatabaseConnection()),

229

read = conn => Future(conn.readNextRecord()).map(Option(_)),

230

close = conn => Future { conn.close(); Done }

231

)

232

```

233

234

### Timed Sources

235

236

Create sources that emit elements based on time intervals.

237

238

```scala { .api }

239

/**

240

* Create a source that emits a tick at regular intervals

241

* @param initialDelay Delay before first element

242

* @param interval Interval between subsequent elements

243

* @param tick Element to emit at each interval

244

* @return Source that emits elements at timed intervals

245

*/

246

def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable]

247

248

```

249

250

**Usage Examples:**

251

252

```scala

253

import scala.concurrent.duration._

254

255

// Periodic ticks

256

val tickerSource = Source.tick(1.second, 500.millis, "tick")

257

258

// Number range

259

val numberSource = Source(1 to 100 by 2) // Odd numbers 1 to 99

260

```

261

262

### Actor Integration Sources

263

264

Create sources that integrate with Akka actors for dynamic element production.

265

266

```scala { .api }

267

/**

268

* Create a source backed by an actor

269

* @param completionMatcher Partial function to detect completion messages

270

* @param failureMatcher Partial function to detect failure messages

271

* @param bufferSize Buffer size for the actor

272

* @param overflowStrategy Strategy when buffer overflows

273

* @return Source materialized as ActorRef for sending elements

274

*/

275

def actorRef[T](

276

completionMatcher: PartialFunction[Any, CompletionStrategy],

277

failureMatcher: PartialFunction[Any, Throwable],

278

bufferSize: Int,

279

overflowStrategy: OverflowStrategy

280

): Source[T, ActorRef]

281

282

/**

283

* Create a source with backpressure-aware actor integration

284

* @param ackMessage Message sent to confirm element processing

285

* @param completionMatcher Partial function to detect completion messages

286

* @param failureMatcher Partial function to detect failure messages

287

* @return Source materialized as ActorRef with backpressure support

288

*/

289

def actorRefWithBackpressure[T](

290

ackMessage: Any,

291

completionMatcher: PartialFunction[Any, CompletionStrategy],

292

failureMatcher: PartialFunction[Any, Throwable]

293

): Source[T, ActorRef]

294

```

295

296

**Usage Examples:**

297

298

```scala

299

import akka.actor.ActorRef

300

import akka.stream.OverflowStrategy

301

302

// Actor-backed source

303

val (actorRef: ActorRef, source: Source[String, NotUsed]) =

304

Source.actorRef[String](

305

completionMatcher = { case "complete" => CompletionStrategy.immediately },

306

failureMatcher = { case akka.actor.Status.Failure(ex) => ex },

307

bufferSize = 100,

308

overflowStrategy = OverflowStrategy.dropHead

309

).preMaterialize()

310

311

// Send elements via actor

312

actorRef ! "Hello"

313

actorRef ! "World"

314

actorRef ! "complete" // Complete the stream

315

```

316

317

### Queue Sources

318

319

Create sources with dynamic element offering capabilities.

320

321

```scala { .api }

322

/**

323

* Create a source backed by a bounded queue for immediate feedback

324

* @param bufferSize Maximum number of elements to buffer

325

* @return Source materialized as BoundedSourceQueue for offering elements

326

*/

327

def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]]

328

329

/**

330

* Create a source backed by a queue with overflow strategy

331

* @param bufferSize Maximum number of elements to buffer

332

* @param overflowStrategy Strategy when buffer is full

333

* @return Source materialized as SourceQueueWithComplete for offering elements

334

*/

335

def queue[T](

336

bufferSize: Int,

337

overflowStrategy: OverflowStrategy

338

): Source[T, SourceQueueWithComplete[T]]

339

340

/**

341

* Create a source backed by a queue with overflow strategy and concurrent offers

342

* @param bufferSize Maximum number of elements to buffer

343

* @param overflowStrategy Strategy when buffer is full

344

* @param maxConcurrentOffers Maximum number of concurrent offers

345

* @return Source materialized as SourceQueueWithComplete for offering elements

346

*/

347

def queue[T](

348

bufferSize: Int,

349

overflowStrategy: OverflowStrategy,

350

maxConcurrentOffers: Int

351

): Source[T, SourceQueueWithComplete[T]]

352

353

/**

354

* Bounded queue interface for immediate offer feedback

355

*/

356

trait BoundedSourceQueue[T] {

357

/**

358

* Offer an element to the queue with immediate result

359

* @param elem Element to offer

360

* @return Immediate result of the offer

361

*/

362

def offer(elem: T): QueueOfferResult

363

364

/**

365

* Complete the source

366

*/

367

def complete(): Unit

368

369

/**

370

* Fail the source with an exception

371

* @param ex Exception to fail with

372

*/

373

def fail(ex: Throwable): Unit

374

375

/**

376

* Returns approximate number of elements in queue

377

*/

378

def size(): Int

379

}

380

381

/**

382

* Queue interface with completion support and async offers

383

*/

384

trait SourceQueueWithComplete[T] extends SourceQueue[T] {

385

/**

386

* Complete the source

387

*/

388

def complete(): Unit

389

390

/**

391

* Fail the source with an exception

392

* @param ex Exception to fail with

393

*/

394

def fail(ex: Throwable): Unit

395

396

/**

397

* Watch for completion of the stream

398

*/

399

def watchCompletion(): Future[Done]

400

}

401

402

trait SourceQueue[T] {

403

/**

404

* Offer an element to the queue asynchronously

405

* @param elem Element to offer

406

* @return Future with the result of the offer

407

*/

408

def offer(elem: T): Future[QueueOfferResult]

409

410

/**

411

* Watch for completion of the stream

412

*/

413

def watchCompletion(): Future[Done]

414

}

415

416

sealed abstract class QueueOfferResult

417

case object Enqueued extends QueueOfferResult

418

case object Dropped extends QueueOfferResult

419

case object QueueClosed extends QueueOfferResult

420

case class Failure(cause: Throwable) extends QueueOfferResult

421

```

422

423

**Usage Examples:**

424

425

```scala

426

import akka.stream.scaladsl.{Source, Sink}

427

import akka.stream.{BoundedSourceQueue, SourceQueueWithComplete, QueueOfferResult, OverflowStrategy}

428

429

// Bounded queue source with immediate feedback

430

val (boundedQueue: BoundedSourceQueue[Int], boundedSource: Source[Int, NotUsed]) =

431

Source.queue[Int](100)

432

.preMaterialize()

433

434

// Offer elements with immediate result

435

boundedQueue.offer(1) match {

436

case QueueOfferResult.Enqueued => println("Element enqueued")

437

case QueueOfferResult.Dropped => println("Element dropped")

438

case QueueOfferResult.QueueClosed => println("Queue closed")

439

case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")

440

}

441

442

// Queue source with overflow strategy and async offers

443

val (asyncQueue: SourceQueueWithComplete[Int], asyncSource: Source[Int, NotUsed]) =

444

Source.queue[Int](100, OverflowStrategy.backpressure)

445

.preMaterialize()

446

447

// Offer elements asynchronously

448

asyncQueue.offer(1).map {

449

case QueueOfferResult.Enqueued => println("Element enqueued")

450

case QueueOfferResult.Dropped => println("Element dropped")

451

case QueueOfferResult.QueueClosed => println("Queue closed")

452

case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")

453

}

454

455

// Process elements

456

boundedSource.runWith(Sink.foreach(println))

457

asyncSource.runWith(Sink.foreach(println))

458

```

459

460

### Reactive Streams Integration

461

462

Create sources from Reactive Streams Publisher implementations.

463

464

```scala { .api }

465

/**

466

* Create a source from a Reactive Streams Publisher

467

* @param publisher The publisher to wrap

468

* @return Source that subscribes to the publisher

469

*/

470

def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]

471

472

/**

473

* Convert this source to a Reactive Streams Publisher

474

* @return Publisher that can be subscribed to

475

*/

476

def toPublisher(fanout: Boolean = false): Source[T, Publisher[T]]

477

```

478

479

**Usage Examples:**

480

481

```scala

482

import org.reactivestreams.Publisher

483

484

// From publisher

485

val publisherSource = Source.fromPublisher(somePublisher)

486

487

// To publisher

488

val (publisher: Publisher[Int], source: Source[Int, NotUsed]) =

489

Source(1 to 10)

490

.toPublisher(fanout = false)

491

.preMaterialize()

492

```

493

494

## Types

495

496

```scala { .api }

497

// Queue offer results

498

sealed abstract class QueueOfferResult

499

case object Enqueued extends QueueOfferResult

500

case object Dropped extends QueueOfferResult

501

case object QueueClosed extends QueueOfferResult

502

case class Failure(cause: Throwable) extends QueueOfferResult

503

504

// Completion strategies for actor sources

505

sealed abstract class CompletionStrategy

506

case object ImmediateCompletionStrategy extends CompletionStrategy

507

case object DrainAndCompletionStrategy extends CompletionStrategy

508

509

// Source queue interfaces

510

trait BoundedSourceQueue[T] {

511

def offer(elem: T): QueueOfferResult // Immediate result

512

def complete(): Unit

513

def fail(ex: Throwable): Unit

514

def size(): Int

515

}

516

517

trait SourceQueue[T] {

518

def offer(elem: T): Future[QueueOfferResult] // Async result

519

def watchCompletion(): Future[Done]

520

}

521

522

trait SourceQueueWithComplete[T] extends SourceQueue[T] {

523

def complete(): Unit

524

def fail(ex: Throwable): Unit

525

}

526

```