or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

concurrency-fibers.mdcontext-services.mddata-structures.mdeffect-core.mderror-observability.mdfunction-utilities.mdindex.mdlayer-system.mdschema-validation.mdstreaming.md
tile.json

streaming.mddocs/

Streaming Data Processing

Reactive streaming system with Stream for processing infinite sequences of data with back-pressure support, resource safety, and compositional transformations.

Capabilities

Stream Creation and Operations

interface Stream<out A, out E = never, out R = never> extends Pipeable {}

declare namespace Stream {
  /**
   * Creates a stream from values
   */
  function make<A>(...as: A[]): Stream<A>;
  
  /**
   * Creates a stream from an iterable
   */
  function fromIterable<A>(iterable: Iterable<A>): Stream<A>;
  
  /**
   * Creates a stream from async iterable
   */
  function fromAsyncIterable<A, E>(iterable: AsyncIterable<A>, onError: (e: unknown) => E): Stream<A, E>;
  
  /**
   * Maps stream elements
   */
  function map<A, B>(f: (a: A) => B): <E, R>(self: Stream<A, E, R>) => Stream<B, E, R>;
  
  /**
   * Filters stream elements
   */
  function filter<A>(predicate: (a: A) => boolean): <E, R>(self: Stream<A, E, R>) => Stream<A, E, R>;
  
  /**
   * Takes first N elements
   */
  function take(n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>;
  
  /**
   * Drops first N elements
   */
  function drop(n: number): <A, E, R>(self: Stream<A, E, R>) => Stream<A, E, R>;
  
  /**
   * Merges streams
   */
  function merge<A2, E2, R2>(that: Stream<A2, E2, R2>): <A, E, R>(self: Stream<A, E, R>) => Stream<A | A2, E | E2, R | R2>;
  
  /**
   * Runs stream and collects results
   */
  function runCollect<A, E, R>(self: Stream<A, E, R>): Effect<Chunk<A>, E, R>;
  
  /**
   * Runs stream for side effects only
   */
  function runDrain<A, E, R>(self: Stream<A, E, R>): Effect<void, E, R>;
}

Usage Examples:

import { Stream, Effect, pipe } from "effect";

// Basic streaming
const numbers = Stream.make(1, 2, 3, 4, 5);

const processed = pipe(
  numbers,
  Stream.map(n => n * 2),
  Stream.filter(n => n > 5),
  Stream.take(2)
);

const result = await Effect.runPromise(Stream.runCollect(processed));
// Chunk([6, 8])

// Async streaming
const fetchData = (url: string) => Effect.promise(() => fetch(url));

const dataStream = pipe(
  Stream.make("/api/user/1", "/api/user/2", "/api/user/3"),
  Stream.mapEffect(url => fetchData(url)),
  Stream.map(response => response.json())
);

Types

interface Chunk<A> extends Iterable<A> {
  readonly length: number;
}