Reactive streaming system with Stream for processing infinite sequences of data with back-pressure support, resource safety, and compositional transformations.
interface Stream<out A, out E = never, out R = never> extends Pipeable {}
declare namespace Stream {
/**
* Creates a stream from values
*/
function make<A>(...as: A[]): Stream<A>;
/**
* Creates a stream from an iterable
*/
function fromIterable<A>(iterable: Iterable<A>): Stream<A>;
/**
* Creates a stream from async iterable
*/
function fromAsyncIterable<A, E>(iterable: AsyncIterable<A>, onError: (e: unknown) => E): Stream<A, E>;
/**
* Maps stream elements
*/
function map<A, B>(f: (a: A) => B): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>;
/**
* Filters stream elements
*/
function filter<A>(predicate: (a: A) => boolean): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>;
/**
* Takes first N elements
*/
function take(n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>;
/**
* Drops first N elements
*/
function drop(n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>;
/**
* Merges streams
*/
function merge<A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A | A2, E | E2, R | R2>;
/**
* Runs stream and collects results
*/
function runCollect<A, E, R>(self: Stream<A, E, R>): Effect<Chunk<A>, E, R>;
/**
* Runs stream for side effects only
*/
function runDrain<A, E, R>(self: Stream<A, E, R>): Effect<void, E, R>;
}Usage Examples:
import { Stream, Effect, pipe } from "effect";
// Basic streaming
const numbers = Stream.make(1, 2, 3, 4, 5);
const processed = pipe(
numbers,
Stream.map(n => n * 2),
Stream.filter(n => n > 5),
Stream.take(2)
);
const result = await Effect.runPromise(Stream.runCollect(processed));
// Chunk([6, 8])
// Async streaming
const fetchData = (url: string) => Effect.promise(() => fetch(url));
const dataStream = pipe(
Stream.make("/api/user/1", "/api/user/2", "/api/user/3"),
Stream.mapEffect(url => fetchData(url)),
Stream.map(response => response.json())
);interface Chunk<A> extends Iterable<A> {
readonly length: number;
}