0
# Stream Sources
1
2
Factory methods and utilities for creating stream sources from various data sources including collections, futures, actors, and external systems. Sources are the starting points of stream processing pipelines.
3
4
## Capabilities
5
6
### Collection Sources
7
8
Create sources from various collection types and iterators.
9
10
```scala { .api }
11
object Source {
12
/**
13
* Create a source from an iterable collection
14
* @param iterable The collection to stream
15
* @return Source that emits all elements from the collection
16
*/
17
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]
18
19
/**
20
* Create a source from an iterator factory function
21
* @param f Function that creates a new iterator each time the source is materialized
22
* @return Source that emits elements from the iterator
23
*/
24
def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed]
25
26
/**
27
* Create a source with a single element
28
* @param element The single element to emit
29
* @return Source that emits the single element then completes
30
*/
31
def single[T](element: T): Source[T, NotUsed]
32
33
/**
34
* Create an empty source that immediately completes
35
* @return Source that emits no elements
36
*/
37
def empty[T]: Source[T, NotUsed]
38
39
/**
40
* Create a source that infinitely repeats a single element
41
* @param element The element to repeat
42
* @return Source that continuously emits the same element
43
*/
44
def repeat[T](element: T): Source[T, NotUsed]
45
46
/**
47
* Create a source that never emits any elements or completes
48
* @return Source that stays active but never produces elements
49
*/
50
def never[T]: Source[T, NotUsed]
51
52
/**
53
* Create a source from a Promise that can be completed externally
54
* @return Source materialized as a Promise for external completion
55
*/
56
def maybe[T]: Source[T, Promise[Option[T]]]
57
}
58
```
59
60
**Usage Examples:**
61
62
```scala
63
import akka.stream.scaladsl.Source
64
65
// From collections
66
val listSource = Source(List(1, 2, 3, 4, 5))
67
val rangeSource = Source(1 to 100)
68
69
// From iterator
70
val randomSource = Source.fromIterator(() => Iterator.continually(scala.util.Random.nextInt(100)))
71
72
// Single element
73
val helloSource = Source.single("Hello, World!")
74
75
// Infinite repetition
76
val tickSource = Source.repeat("tick")
77
```
78
79
### Async and Future Sources
80
81
Create sources from asynchronous computations and future values.
82
83
```scala { .api }
84
/**
85
* Create a source from a Future value
86
* @param futureElement The future that will provide the element
87
* @return Source that emits the future's value when it completes
88
*/
89
def future[T](futureElement: Future[T]): Source[T, NotUsed]
90
91
/**
92
* Create a source from a Future that produces another Source
93
* @param futureSource Future containing a Source
94
* @return Source that emits elements from the future's source
95
*/
96
def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]]
97
98
/**
99
* Create a source that calls an async function to produce elements
100
* @param f Function that returns a Future for each requested element
101
* @return Source that emits elements from the async function
102
*/
103
def unfoldAsync[T, S](seed: S)(f: S => Future[Option[(S, T)]]): Source[T, NotUsed]
104
105
/**
106
* Create a source by unfolding a function synchronously
107
* @param seed Initial state
108
* @param f Function that produces next state and element
109
* @return Source that emits elements by repeatedly applying the function
110
*/
111
def unfold[T, S](seed: S)(f: S => Option[(S, T)]): Source[T, NotUsed]
112
113
/**
114
* Create a source that defers computation until materialization
115
* @param create Function to create single element
116
* @return Source that calls create function when materialized
117
*/
118
def lazySingle[T](create: () => T): Source[T, NotUsed]
119
120
/**
121
* Create a source that defers Future computation until materialization
122
* @param create Function to create Future element
123
* @return Source that calls create function when materialized
124
*/
125
def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed]
126
127
/**
128
* Create a source that defers Source creation until materialization
129
* @param create Function to create another Source
130
* @return Source that calls create function when materialized
131
*/
132
def lazySource[T, M](create: () => Source[T, M]): Source[T, Future[M]]
133
134
/**
135
* Create a source that defers Future Source creation until materialization
136
* @param create Function to create Future Source
137
* @return Source that calls create function when materialized
138
*/
139
def lazyFutureSource[T, M](create: () => Future[Source[T, M]]): Source[T, Future[M]]
140
```
141
142
**Usage Examples:**
143
144
```scala
145
import scala.concurrent.Future
146
import scala.concurrent.ExecutionContext.Implicits.global
147
148
// From Future
149
val futureValue: Future[String] = Future.successful("Hello")
150
val futureSource = Source.future(futureValue)
151
152
// Lazy computation (deferred until materialization)
153
val lazyComputation = Source.lazySingle(() => {
154
println("Computing expensive value...")
155
expensiveComputation()
156
})
157
158
val lazyFutureSource = Source.lazyFuture(() => {
159
Future(fetchDataFromRemoteAPI())
160
})
161
162
// Lazy source creation
163
val lazyStreamSource = Source.lazySource(() => {
164
if (isDataAvailable()) Source(getData()) else Source.empty
165
})
166
167
// Unfold pattern
168
val fibonacciSource = Source.unfold((0, 1)) {
169
case (a, b) => Some(((b, a + b), a))
170
}
171
172
// Async unfold
173
val asyncCounterSource = Source.unfoldAsync(0) { n =>
174
Future {
175
if (n < 10) Some((n + 1, n)) else None
176
}
177
}
178
```
179
180
### Resource-based Sources
181
182
Create sources from external resources that need to be opened, read, and closed.
183
184
```scala { .api }
185
/**
186
* Create a source from a resource that needs lifecycle management
187
* @param create Function to create/open the resource
188
* @param read Function to read from resource, returns None when exhausted
189
* @param close Function to close/cleanup the resource
190
* @return Source that manages resource lifecycle automatically
191
*/
192
def unfoldResource[T, S](
193
create: () => S,
194
read: S => Option[T],
195
close: S => Unit
196
): Source[T, NotUsed]
197
198
/**
199
* Create a source from an async resource that needs lifecycle management
200
* @param create Function to create/open the resource asynchronously
201
* @param read Function to read from resource asynchronously, returns None when exhausted
202
* @param close Function to close/cleanup the resource asynchronously
203
* @return Source that manages async resource lifecycle automatically
204
*/
205
def unfoldResourceAsync[T, S](
206
create: () => Future[S],
207
read: S => Future[Option[T]],
208
close: S => Future[Done]
209
): Source[T, NotUsed]
210
```
211
212
**Usage Examples:**
213
214
```scala
215
import java.io.{FileInputStream, BufferedReader, InputStreamReader}
216
import scala.concurrent.Future
217
import scala.concurrent.ExecutionContext.Implicits.global
218
219
// File reading with resource management
220
val fileSource = Source.unfoldResource(
221
create = () => new BufferedReader(new InputStreamReader(new FileInputStream("data.txt"))),
222
read = reader => Option(reader.readLine()),
223
close = reader => reader.close()
224
)
225
226
// Async database reading
227
val dbSource = Source.unfoldResourceAsync(
228
create = () => Future(openDatabaseConnection()),
229
read = conn => Future(conn.readNextRecord()).map(Option(_)),
230
close = conn => Future { conn.close(); Done }
231
)
232
```
233
234
### Timed Sources
235
236
Create sources that emit elements based on time intervals.
237
238
```scala { .api }
239
/**
240
* Create a source that emits a tick at regular intervals
241
* @param initialDelay Delay before first element
242
* @param interval Interval between subsequent elements
243
* @param tick Element to emit at each interval
244
* @return Source that emits elements at timed intervals
245
*/
246
def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable]
247
248
```
249
250
**Usage Examples:**
251
252
```scala
253
import scala.concurrent.duration._
254
255
// Periodic ticks
256
val tickerSource = Source.tick(1.second, 500.millis, "tick")
257
258
// Number range
259
val numberSource = Source(1 to 100 by 2) // Odd numbers 1 to 99
260
```
261
262
### Actor Integration Sources
263
264
Create sources that integrate with Akka actors for dynamic element production.
265
266
```scala { .api }
267
/**
268
* Create a source backed by an actor
269
* @param completionMatcher Partial function to detect completion messages
270
* @param failureMatcher Partial function to detect failure messages
271
* @param bufferSize Buffer size for the actor
272
* @param overflowStrategy Strategy when buffer overflows
273
* @return Source materialized as ActorRef for sending elements
274
*/
275
def actorRef[T](
276
completionMatcher: PartialFunction[Any, CompletionStrategy],
277
failureMatcher: PartialFunction[Any, Throwable],
278
bufferSize: Int,
279
overflowStrategy: OverflowStrategy
280
): Source[T, ActorRef]
281
282
/**
283
* Create a source with backpressure-aware actor integration
284
* @param ackMessage Message sent to confirm element processing
285
* @param completionMatcher Partial function to detect completion messages
286
* @param failureMatcher Partial function to detect failure messages
287
* @return Source materialized as ActorRef with backpressure support
288
*/
289
def actorRefWithBackpressure[T](
290
ackMessage: Any,
291
completionMatcher: PartialFunction[Any, CompletionStrategy],
292
failureMatcher: PartialFunction[Any, Throwable]
293
): Source[T, ActorRef]
294
```
295
296
**Usage Examples:**
297
298
```scala
299
import akka.actor.ActorRef
300
import akka.stream.OverflowStrategy
301
302
// Actor-backed source
303
val (actorRef: ActorRef, source: Source[String, NotUsed]) =
304
Source.actorRef[String](
305
completionMatcher = { case "complete" => CompletionStrategy.immediately },
306
failureMatcher = { case akka.actor.Status.Failure(ex) => ex },
307
bufferSize = 100,
308
overflowStrategy = OverflowStrategy.dropHead
309
).preMaterialize()
310
311
// Send elements via actor
312
actorRef ! "Hello"
313
actorRef ! "World"
314
actorRef ! "complete" // Complete the stream
315
```
316
317
### Queue Sources
318
319
Create sources with dynamic element offering capabilities.
320
321
```scala { .api }
322
/**
323
* Create a source backed by a bounded queue for immediate feedback
324
* @param bufferSize Maximum number of elements to buffer
325
* @return Source materialized as BoundedSourceQueue for offering elements
326
*/
327
def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]]
328
329
/**
330
* Create a source backed by a queue with overflow strategy
331
* @param bufferSize Maximum number of elements to buffer
332
* @param overflowStrategy Strategy when buffer is full
333
* @return Source materialized as SourceQueueWithComplete for offering elements
334
*/
335
def queue[T](
336
bufferSize: Int,
337
overflowStrategy: OverflowStrategy
338
): Source[T, SourceQueueWithComplete[T]]
339
340
/**
341
* Create a source backed by a queue with overflow strategy and concurrent offers
342
* @param bufferSize Maximum number of elements to buffer
343
* @param overflowStrategy Strategy when buffer is full
344
* @param maxConcurrentOffers Maximum number of concurrent offers
345
* @return Source materialized as SourceQueueWithComplete for offering elements
346
*/
347
def queue[T](
348
bufferSize: Int,
349
overflowStrategy: OverflowStrategy,
350
maxConcurrentOffers: Int
351
): Source[T, SourceQueueWithComplete[T]]
352
353
/**
354
* Bounded queue interface for immediate offer feedback
355
*/
356
trait BoundedSourceQueue[T] {
357
/**
358
* Offer an element to the queue with immediate result
359
* @param elem Element to offer
360
* @return Immediate result of the offer
361
*/
362
def offer(elem: T): QueueOfferResult
363
364
/**
365
* Complete the source
366
*/
367
def complete(): Unit
368
369
/**
370
* Fail the source with an exception
371
* @param ex Exception to fail with
372
*/
373
def fail(ex: Throwable): Unit
374
375
/**
376
* Returns approximate number of elements in queue
377
*/
378
def size(): Int
379
}
380
381
/**
382
* Queue interface with completion support and async offers
383
*/
384
trait SourceQueueWithComplete[T] extends SourceQueue[T] {
385
/**
386
* Complete the source
387
*/
388
def complete(): Unit
389
390
/**
391
* Fail the source with an exception
392
* @param ex Exception to fail with
393
*/
394
def fail(ex: Throwable): Unit
395
396
/**
397
* Watch for completion of the stream
398
*/
399
def watchCompletion(): Future[Done]
400
}
401
402
trait SourceQueue[T] {
403
/**
404
* Offer an element to the queue asynchronously
405
* @param elem Element to offer
406
* @return Future with the result of the offer
407
*/
408
def offer(elem: T): Future[QueueOfferResult]
409
410
/**
411
* Watch for completion of the stream
412
*/
413
def watchCompletion(): Future[Done]
414
}
415
416
sealed abstract class QueueOfferResult
417
case object Enqueued extends QueueOfferResult
418
case object Dropped extends QueueOfferResult
419
case object QueueClosed extends QueueOfferResult
420
case class Failure(cause: Throwable) extends QueueOfferResult
421
```
422
423
**Usage Examples:**
424
425
```scala
426
import akka.stream.scaladsl.{Source, Sink}
427
import akka.stream.{BoundedSourceQueue, SourceQueueWithComplete, QueueOfferResult, OverflowStrategy}
428
429
// Bounded queue source with immediate feedback
430
val (boundedQueue: BoundedSourceQueue[Int], boundedSource: Source[Int, NotUsed]) =
431
Source.queue[Int](100)
432
.preMaterialize()
433
434
// Offer elements with immediate result
435
boundedQueue.offer(1) match {
436
case QueueOfferResult.Enqueued => println("Element enqueued")
437
case QueueOfferResult.Dropped => println("Element dropped")
438
case QueueOfferResult.QueueClosed => println("Queue closed")
439
case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")
440
}
441
442
// Queue source with overflow strategy and async offers
443
val (asyncQueue: SourceQueueWithComplete[Int], asyncSource: Source[Int, NotUsed]) =
444
Source.queue[Int](100, OverflowStrategy.backpressure)
445
.preMaterialize()
446
447
// Offer elements asynchronously
448
asyncQueue.offer(1).map {
449
case QueueOfferResult.Enqueued => println("Element enqueued")
450
case QueueOfferResult.Dropped => println("Element dropped")
451
case QueueOfferResult.QueueClosed => println("Queue closed")
452
case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")
453
}
454
455
// Process elements
456
boundedSource.runWith(Sink.foreach(println))
457
asyncSource.runWith(Sink.foreach(println))
458
```
459
460
### Reactive Streams Integration
461
462
Create sources from Reactive Streams Publisher implementations.
463
464
```scala { .api }
465
/**
466
* Create a source from a Reactive Streams Publisher
467
* @param publisher The publisher to wrap
468
* @return Source that subscribes to the publisher
469
*/
470
def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]
471
472
/**
473
* Convert this source to a Reactive Streams Publisher
474
* @return Publisher that can be subscribed to
475
*/
476
def toPublisher(fanout: Boolean = false): Source[T, Publisher[T]]
477
```
478
479
**Usage Examples:**
480
481
```scala
482
import org.reactivestreams.Publisher
483
484
// From publisher
485
val publisherSource = Source.fromPublisher(somePublisher)
486
487
// To publisher
488
val (publisher: Publisher[Int], source: Source[Int, NotUsed]) =
489
Source(1 to 10)
490
.toPublisher(fanout = false)
491
.preMaterialize()
492
```
493
494
## Types
495
496
```scala { .api }
497
// Queue offer results
498
sealed abstract class QueueOfferResult
499
case object Enqueued extends QueueOfferResult
500
case object Dropped extends QueueOfferResult
501
case object QueueClosed extends QueueOfferResult
502
case class Failure(cause: Throwable) extends QueueOfferResult
503
504
// Completion strategies for actor sources
505
sealed abstract class CompletionStrategy
506
case object ImmediateCompletionStrategy extends CompletionStrategy
507
case object DrainAndCompletionStrategy extends CompletionStrategy
508
509
// Source queue interfaces
510
trait BoundedSourceQueue[T] {
511
def offer(elem: T): QueueOfferResult // Immediate result
512
def complete(): Unit
513
def fail(ex: Throwable): Unit
514
def size(): Int
515
}
516
517
trait SourceQueue[T] {
518
def offer(elem: T): Future[QueueOfferResult] // Async result
519
def watchCompletion(): Future[Done]
520
}
521
522
trait SourceQueueWithComplete[T] extends SourceQueue[T] {
523
def complete(): Unit
524
def fail(ex: Throwable): Unit
525
}
526
```