Factory methods and utilities for creating stream sources from various data sources including collections, futures, actors, and external systems. Sources are the starting points of stream processing pipelines.
Create sources from various collection types and iterators.
object Source {
/**
* Create a source from an iterable collection
* @param iterable The collection to stream
* @return Source that emits all elements from the collection
*/
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]
/**
* Create a source from an iterator factory function
* @param f Function that creates a new iterator each time the source is materialized
* @return Source that emits elements from the iterator
*/
def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed]
/**
* Create a source with a single element
* @param element The single element to emit
* @return Source that emits the single element then completes
*/
def single[T](element: T): Source[T, NotUsed]
/**
* Create an empty source that immediately completes
* @return Source that emits no elements
*/
def empty[T]: Source[T, NotUsed]
/**
* Create a source that infinitely repeats a single element
* @param element The element to repeat
* @return Source that continuously emits the same element
*/
def repeat[T](element: T): Source[T, NotUsed]
/**
* Create a source that never emits any elements or completes
* @return Source that stays active but never produces elements
*/
def never[T]: Source[T, NotUsed]
/**
* Create a source from a Promise that can be completed externally
* @return Source materialized as a Promise for external completion
*/
def maybe[T]: Source[T, Promise[Option[T]]]
}Usage Examples:
import akka.stream.scaladsl.Source
// From collections
val listSource = Source(List(1, 2, 3, 4, 5))
val rangeSource = Source(1 to 100)
// From iterator
val randomSource = Source.fromIterator(() => Iterator.continually(scala.util.Random.nextInt(100)))
// Single element
val helloSource = Source.single("Hello, World!")
// Infinite repetition
val tickSource = Source.repeat("tick")Create sources from asynchronous computations and future values.
/**
* Create a source from a Future value
* @param futureElement The future that will provide the element
* @return Source that emits the future's value when it completes
*/
def future[T](futureElement: Future[T]): Source[T, NotUsed]
/**
* Create a source from a Future that produces another Source
* @param futureSource Future containing a Source
* @return Source that emits elements from the future's source
*/
def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]]
/**
* Create a source that calls an async function to produce elements
* @param f Function that returns a Future for each requested element
* @return Source that emits elements from the async function
*/
def unfoldAsync[T, S](seed: S)(f: S => Future[Option[(S, T)]]): Source[T, NotUsed]
/**
* Create a source by unfolding a function synchronously
* @param seed Initial state
* @param f Function that produces next state and element
* @return Source that emits elements by repeatedly applying the function
*/
def unfold[T, S](seed: S)(f: S => Option[(S, T)]): Source[T, NotUsed]
/**
* Create a source that defers computation until materialization
* @param create Function to create single element
* @return Source that calls create function when materialized
*/
def lazySingle[T](create: () => T): Source[T, NotUsed]
/**
* Create a source that defers Future computation until materialization
* @param create Function to create Future element
* @return Source that calls create function when materialized
*/
def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed]
/**
* Create a source that defers Source creation until materialization
* @param create Function to create another Source
* @return Source that calls create function when materialized
*/
def lazySource[T, M](create: () => Source[T, M]): Source[T, Future[M]]
/**
* Create a source that defers Future Source creation until materialization
* @param create Function to create Future Source
* @return Source that calls create function when materialized
*/
def lazyFutureSource[T, M](create: () => Future[Source[T, M]]): Source[T, Future[M]]Usage Examples:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// From Future
val futureValue: Future[String] = Future.successful("Hello")
val futureSource = Source.future(futureValue)
// Lazy computation (deferred until materialization)
val lazyComputation = Source.lazySingle(() => {
println("Computing expensive value...")
expensiveComputation()
})
val lazyFutureSource = Source.lazyFuture(() => {
Future(fetchDataFromRemoteAPI())
})
// Lazy source creation
val lazyStreamSource = Source.lazySource(() => {
if (isDataAvailable()) Source(getData()) else Source.empty
})
// Unfold pattern
val fibonacciSource = Source.unfold((0, 1)) {
case (a, b) => Some(((b, a + b), a))
}
// Async unfold
val asyncCounterSource = Source.unfoldAsync(0) { n =>
Future {
if (n < 10) Some((n + 1, n)) else None
}
}Create sources from external resources that need to be opened, read, and closed.
/**
* Create a source from a resource that needs lifecycle management
* @param create Function to create/open the resource
* @param read Function to read from resource, returns None when exhausted
* @param close Function to close/cleanup the resource
* @return Source that manages resource lifecycle automatically
*/
def unfoldResource[T, S](
create: () => S,
read: S => Option[T],
close: S => Unit
): Source[T, NotUsed]
/**
* Create a source from an async resource that needs lifecycle management
* @param create Function to create/open the resource asynchronously
* @param read Function to read from resource asynchronously, returns None when exhausted
* @param close Function to close/cleanup the resource asynchronously
* @return Source that manages async resource lifecycle automatically
*/
def unfoldResourceAsync[T, S](
create: () => Future[S],
read: S => Future[Option[T]],
close: S => Future[Done]
): Source[T, NotUsed]Usage Examples:
import java.io.{FileInputStream, BufferedReader, InputStreamReader}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// File reading with resource management
val fileSource = Source.unfoldResource(
create = () => new BufferedReader(new InputStreamReader(new FileInputStream("data.txt"))),
read = reader => Option(reader.readLine()),
close = reader => reader.close()
)
// Async database reading
val dbSource = Source.unfoldResourceAsync(
create = () => Future(openDatabaseConnection()),
read = conn => Future(conn.readNextRecord()).map(Option(_)),
close = conn => Future { conn.close(); Done }
)Create sources that emit elements based on time intervals.
/**
* Create a source that emits a tick at regular intervals
* @param initialDelay Delay before first element
* @param interval Interval between subsequent elements
* @param tick Element to emit at each interval
* @return Source that emits elements at timed intervals
*/
def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable]Usage Examples:
import scala.concurrent.duration._
// Periodic ticks
val tickerSource = Source.tick(1.second, 500.millis, "tick")
// Number range
val numberSource = Source(1 to 100 by 2) // Odd numbers 1 to 99Create sources that integrate with Akka actors for dynamic element production.
/**
* Create a source backed by an actor
* @param completionMatcher Partial function to detect completion messages
* @param failureMatcher Partial function to detect failure messages
* @param bufferSize Buffer size for the actor
* @param overflowStrategy Strategy when buffer overflows
* @return Source materialized as ActorRef for sending elements
*/
def actorRef[T](
completionMatcher: PartialFunction[Any, CompletionStrategy],
failureMatcher: PartialFunction[Any, Throwable],
bufferSize: Int,
overflowStrategy: OverflowStrategy
): Source[T, ActorRef]
/**
* Create a source with backpressure-aware actor integration
* @param ackMessage Message sent to confirm element processing
* @param completionMatcher Partial function to detect completion messages
* @param failureMatcher Partial function to detect failure messages
* @return Source materialized as ActorRef with backpressure support
*/
def actorRefWithBackpressure[T](
ackMessage: Any,
completionMatcher: PartialFunction[Any, CompletionStrategy],
failureMatcher: PartialFunction[Any, Throwable]
): Source[T, ActorRef]Usage Examples:
import akka.actor.ActorRef
import akka.stream.OverflowStrategy
// Actor-backed source
val (actorRef: ActorRef, source: Source[String, NotUsed]) =
Source.actorRef[String](
completionMatcher = { case "complete" => CompletionStrategy.immediately },
failureMatcher = { case akka.actor.Status.Failure(ex) => ex },
bufferSize = 100,
overflowStrategy = OverflowStrategy.dropHead
).preMaterialize()
// Send elements via actor
actorRef ! "Hello"
actorRef ! "World"
actorRef ! "complete" // Complete the streamCreate sources with dynamic element offering capabilities.
/**
* Create a source backed by a bounded queue for immediate feedback
* @param bufferSize Maximum number of elements to buffer
* @return Source materialized as BoundedSourceQueue for offering elements
*/
def queue[T](bufferSize: Int): Source[T, BoundedSourceQueue[T]]
/**
* Create a source backed by a queue with overflow strategy
* @param bufferSize Maximum number of elements to buffer
* @param overflowStrategy Strategy when buffer is full
* @return Source materialized as SourceQueueWithComplete for offering elements
*/
def queue[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy
): Source[T, SourceQueueWithComplete[T]]
/**
* Create a source backed by a queue with overflow strategy and concurrent offers
* @param bufferSize Maximum number of elements to buffer
* @param overflowStrategy Strategy when buffer is full
* @param maxConcurrentOffers Maximum number of concurrent offers
* @return Source materialized as SourceQueueWithComplete for offering elements
*/
def queue[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy,
maxConcurrentOffers: Int
): Source[T, SourceQueueWithComplete[T]]
/**
* Bounded queue interface for immediate offer feedback
*/
trait BoundedSourceQueue[T] {
/**
* Offer an element to the queue with immediate result
* @param elem Element to offer
* @return Immediate result of the offer
*/
def offer(elem: T): QueueOfferResult
/**
* Complete the source
*/
def complete(): Unit
/**
* Fail the source with an exception
* @param ex Exception to fail with
*/
def fail(ex: Throwable): Unit
/**
* Returns approximate number of elements in queue
*/
def size(): Int
}
/**
* Queue interface with completion support and async offers
*/
trait SourceQueueWithComplete[T] extends SourceQueue[T] {
/**
* Complete the source
*/
def complete(): Unit
/**
* Fail the source with an exception
* @param ex Exception to fail with
*/
def fail(ex: Throwable): Unit
/**
* Watch for completion of the stream
*/
def watchCompletion(): Future[Done]
}
trait SourceQueue[T] {
/**
* Offer an element to the queue asynchronously
* @param elem Element to offer
* @return Future with the result of the offer
*/
def offer(elem: T): Future[QueueOfferResult]
/**
* Watch for completion of the stream
*/
def watchCompletion(): Future[Done]
}
sealed abstract class QueueOfferResult
case object Enqueued extends QueueOfferResult
case object Dropped extends QueueOfferResult
case object QueueClosed extends QueueOfferResult
case class Failure(cause: Throwable) extends QueueOfferResultUsage Examples:
import akka.stream.scaladsl.{Source, Sink}
import akka.stream.{BoundedSourceQueue, SourceQueueWithComplete, QueueOfferResult, OverflowStrategy}
// Bounded queue source with immediate feedback
val (boundedQueue: BoundedSourceQueue[Int], boundedSource: Source[Int, NotUsed]) =
Source.queue[Int](100)
.preMaterialize()
// Offer elements with immediate result
boundedQueue.offer(1) match {
case QueueOfferResult.Enqueued => println("Element enqueued")
case QueueOfferResult.Dropped => println("Element dropped")
case QueueOfferResult.QueueClosed => println("Queue closed")
case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")
}
// Queue source with overflow strategy and async offers
val (asyncQueue: SourceQueueWithComplete[Int], asyncSource: Source[Int, NotUsed]) =
Source.queue[Int](100, OverflowStrategy.backpressure)
.preMaterialize()
// Offer elements asynchronously
asyncQueue.offer(1).map {
case QueueOfferResult.Enqueued => println("Element enqueued")
case QueueOfferResult.Dropped => println("Element dropped")
case QueueOfferResult.QueueClosed => println("Queue closed")
case QueueOfferResult.Failure(ex) => println(s"Failed: $ex")
}
// Process elements
boundedSource.runWith(Sink.foreach(println))
asyncSource.runWith(Sink.foreach(println))Create sources from Reactive Streams Publisher implementations.
/**
* Create a source from a Reactive Streams Publisher
* @param publisher The publisher to wrap
* @return Source that subscribes to the publisher
*/
def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]
/**
* Convert this source to a Reactive Streams Publisher
* @return Publisher that can be subscribed to
*/
def toPublisher(fanout: Boolean = false): Source[T, Publisher[T]]Usage Examples:
import org.reactivestreams.Publisher
// From publisher
val publisherSource = Source.fromPublisher(somePublisher)
// To publisher
val (publisher: Publisher[Int], source: Source[Int, NotUsed]) =
Source(1 to 10)
.toPublisher(fanout = false)
.preMaterialize()// Queue offer results
sealed abstract class QueueOfferResult
case object Enqueued extends QueueOfferResult
case object Dropped extends QueueOfferResult
case object QueueClosed extends QueueOfferResult
case class Failure(cause: Throwable) extends QueueOfferResult
// Completion strategies for actor sources
sealed abstract class CompletionStrategy
case object ImmediateCompletionStrategy extends CompletionStrategy
case object DrainAndCompletionStrategy extends CompletionStrategy
// Source queue interfaces
trait BoundedSourceQueue[T] {
def offer(elem: T): QueueOfferResult // Immediate result
def complete(): Unit
def fail(ex: Throwable): Unit
def size(): Int
}
trait SourceQueue[T] {
def offer(elem: T): Future[QueueOfferResult] // Async result
def watchCompletion(): Future[Done]
}
trait SourceQueueWithComplete[T] extends SourceQueue[T] {
def complete(): Unit
def fail(ex: Throwable): Unit
}