or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

builders.mddata-types.mdindex.mdio-operations.mdstreaming.mdtables.mdutilities.mdvectors.md
tile.json

streaming.mddocs/

Streaming

Apache Arrow JavaScript provides comprehensive streaming capabilities for processing large datasets that don't fit in memory, with support for both DOM ReadableStream (browser) and Node.js stream APIs. This enables efficient data processing pipelines and real-time data integration.

Capabilities

Stream Adapter Functions

Functions for converting between Arrow data and platform-specific stream formats.

/**
 * Convert iterable to DOM ReadableStream (browser/DOM environments)
 */
function toDOMStream<T>(
  source: Iterable<T> | AsyncIterable<T>,
  options?: ReadableDOMStreamOptions
): ReadableStream<T>;

/**
 * Convert iterable to Node.js Readable stream (Node.js environments)
 */
function toNodeStream<T>(
  source: Iterable<T> | AsyncIterable<T>,
  options?: NodeStreamOptions
): Readable;

/**
 * Convert DOM ReadableStream to async iterable
 */
function fromDOMStream<T>(
  stream: ReadableStream<T>
): AsyncIterable<T>;

/**
 * Convert Node.js Readable stream to async iterable
 */
function fromNodeStream<T>(
  stream: Readable
): AsyncIterable<T>;

// Options interfaces
interface ReadableDOMStreamOptions {
  highWaterMark?: number;
  size?: (chunk: any) => number;
  type?: 'bytes' | undefined;
}

interface NodeStreamOptions {
  highWaterMark?: number;
  objectMode?: boolean;
  encoding?: string;
}

Usage Examples:

import { 
  toDOMStream, 
  toNodeStream, 
  fromDOMStream, 
  fromNodeStream,
  tableFromArrays 
} from "apache-arrow";

// Create sample data
async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    yield { id: i, value: Math.random(), timestamp: Date.now() };
  }
}

// DOM environment (browser)
const domStream = toDOMStream(generateData(), {
  highWaterMark: 100
});

// Consume DOM stream
const reader = domStream.getReader();
try {
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    console.log('Received chunk:', value);
  }
} finally {
  reader.releaseLock();
}

// Node.js environment
const nodeStream = toNodeStream(generateData(), {
  objectMode: true,
  highWaterMark: 100
});

nodeStream.on('data', (chunk) => {
  console.log('Received chunk:', chunk);
});

nodeStream.on('end', () => {
  console.log('Stream ended');
});

RecordBatch Stream Processing

Functions for streaming Arrow record batches through various stream APIs.

/**
 * Transform DOM stream of record batches (DOM environments)
 */
function recordBatchReaderThroughDOMStream<T extends TypeMap>(
  options?: ReadableDOMStreamOptions
): TransformStream<RecordBatch<T>, RecordBatch<T>>;

/**
 * Transform Node.js stream of record batches (Node.js environments)
 */
function recordBatchReaderThroughNodeStream<T extends TypeMap>(
  options?: NodeStreamOptions
): Transform;

/**
 * Write record batches through DOM stream
 */
function recordBatchWriterThroughDOMStream<T extends TypeMap>(
  sink: WritableStreamDefaultWriter,
  schema: Schema<T>,
  options?: WritableDOMStreamOptions
): WritableStream<RecordBatch<T>>;

/**
 * Write record batches through Node.js stream
 */
function recordBatchWriterThroughNodeStream<T extends TypeMap>(
  sink: Writable,
  schema: Schema<T>,
  options?: NodeWritableOptions
): Writable;

// Additional options interfaces
interface WritableDOMStreamOptions {
  highWaterMark?: number;
  size?: (chunk: any) => number;
}

interface NodeWritableOptions {
  highWaterMark?: number;
  objectMode?: boolean;
  decodeStrings?: boolean;
}

Usage Examples:

import { 
  RecordBatchReader,
  recordBatchReaderThroughDOMStream,
  recordBatchWriterThroughNodeStream,
  tableFromArrays 
} from "apache-arrow";

// DOM stream processing
async function processBatchesDOM(source: ReadableStream<Uint8Array>) {
  const transformStream = recordBatchReaderThroughDOMStream();
  
  const reader = source
    .pipeThrough(transformStream)
    .getReader();
    
  try {
    while (true) {
      const { done, value: batch } = await reader.read();
      if (done) break;
      
      // Process each batch
      console.log(`Processing batch with ${batch.length} rows`);
      await processBatch(batch);
    }
  } finally {
    reader.releaseLock();
  }
}

// Node.js stream processing  
import { pipeline } from 'stream';
import { createReadStream, createWriteStream } from 'fs';

async function processBatchesNode() {
  const inputStream = createReadStream('input.arrow');
  const outputStream = createWriteStream('output.arrow');
  
  const reader = RecordBatchReader.from(inputStream);
  const schema = reader.schema;
  
  const writer = recordBatchWriterThroughNodeStream(
    outputStream, 
    schema, 
    { objectMode: true }
  );
  
  // Transform each batch
  const transform = new Transform({
    objectMode: true,
    transform(batch: RecordBatch, encoding, callback) {
      // Apply transformations to batch
      const transformedBatch = transformBatch(batch);
      callback(null, transformedBatch);
    }
  });
  
  return new Promise((resolve, reject) => {
    pipeline(reader, transform, writer, (error) => {
      if (error) reject(error);
      else resolve(undefined);
    });
  });
}

Builder Streaming

Stream processing with Arrow builders for constructing data on-the-fly.

/**
 * Create builder that processes iterable input
 */
function builderThroughIterable<T extends DataType>(
  options: IterableBuilderOptions<T>
): (source: Iterable<T['TValue']>) => AsyncIterable<Vector<T>>;

/**
 * Create builder that processes async iterable input
 */
function builderThroughAsyncIterable<T extends DataType>(
  options: IterableBuilderOptions<T>
): (source: AsyncIterable<T['TValue']>) => AsyncIterable<Vector<T>>;

/**
 * Create builder that processes DOM ReadableStream (DOM environments)
 */
function builderThroughDOMStream<T extends DataType>(
  options: BuilderDuplexOptions<T>
): BuilderTransform<T>;

/**
 * Create builder that processes Node.js stream (Node.js environments)
 */
function builderThroughNodeStream<T extends DataType>(
  options: BuilderDuplexOptions<T>
): NodeJS.ReadWriteStream;

// Options interfaces
interface IterableBuilderOptions<T extends DataType> {
  type: T;
  nullValues?: any[];
  highWaterMark?: number;
  queueingStrategy?: 'bytes' | 'count';
}

interface BuilderDuplexOptions<T extends DataType> extends IterableBuilderOptions<T> {
  writableStrategy?: QueuingStrategy;
  readableStrategy?: QueuingStrategy;
}

type BuilderTransform<T extends DataType> = TransformStream<T['TValue'], Vector<T>>;

Usage Examples:

import { 
  builderThroughAsyncIterable,
  builderThroughDOMStream,
  builderThroughNodeStream,
  Int32,
  Utf8 
} from "apache-arrow";

// Async iterable processing
async function processAsyncData() {
  const intBuilder = builderThroughAsyncIterable({
    type: new Int32(),
    highWaterMark: 1000
  });
  
  async function* generateNumbers() {
    for (let i = 0; i < 10000; i++) {
      yield i * i; // Square numbers
    }
  }
  
  // Process data in chunks
  for await (const vector of intBuilder(generateNumbers())) {
    console.log(`Built vector with ${vector.length} elements`);
    // Each vector contains up to 1000 elements
  }
}

// DOM stream building
async function buildFromDOMStream() {
  const stringBuilder = builderThroughDOMStream({
    type: new Utf8(),
    highWaterMark: 100
  });
  
  // Create input stream
  const inputStream = new ReadableStream({
    start(controller) {
      ['hello', 'world', 'arrow', 'streaming'].forEach(str => {
        controller.enqueue(str);
      });
      controller.close();
    }
  });
  
  // Process through builder transform
  const outputStream = inputStream.pipeThrough(stringBuilder);
  const reader = outputStream.getReader();
  
  try {
    while (true) {
      const { done, value: vector } = await reader.read();
      if (done) break;
      
      console.log('Built string vector:', vector.toArray());
    }
  } finally {
    reader.releaseLock();
  }
}

// Node.js stream building
import { Readable } from 'stream';

function buildFromNodeStream() {
  const intBuilder = builderThroughNodeStream({
    type: new Int32(),
    highWaterMark: 500
  });
  
  // Create input stream
  const inputStream = new Readable({
    objectMode: true,
    read() {
      for (let i = 0; i < 10; i++) {
        this.push(Math.floor(Math.random() * 1000));
      }
      this.push(null); // End stream
    }
  });
  
  // Process through builder
  inputStream.pipe(intBuilder);
  
  intBuilder.on('data', (vector: Vector<Int32>) => {
    console.log('Built vector:', vector.toArray());
  });
  
  intBuilder.on('end', () => {
    console.log('Stream processing completed');
  });
}

Real-time Data Processing

Stream Aggregation

Processing streaming data with aggregations and windowing.

/**
 * Windowed aggregation over streaming data
 */
class StreamAggregator<T extends TypeMap> {
  constructor(
    private schema: Schema<T>,
    private windowSize: number,
    private aggregationFunctions: { [K in keyof T]: AggregationFunction }
  ) {}
  
  /** Process batch and update aggregations */
  processBatch(batch: RecordBatch<T>): AggregationResult<T>;
  
  /** Get current aggregation state */
  getCurrentState(): AggregationResult<T>;
  
  /** Reset aggregation state */
  reset(): void;
}

type AggregationFunction = 'sum' | 'avg' | 'min' | 'max' | 'count';
type AggregationResult<T> = { [K in keyof T]: number };

Usage Examples:

// Custom stream aggregator
class MovingAverageProcessor {
  private window: number[] = [];
  private windowSize: number;
  
  constructor(windowSize: number) {
    this.windowSize = windowSize;
  }
  
  process(value: number): number {
    this.window.push(value);
    
    if (this.window.length > this.windowSize) {
      this.window.shift(); // Remove oldest value
    }
    
    return this.window.reduce((sum, val) => sum + val, 0) / this.window.length;
  }
}

// Stream processing with aggregation
async function processStreamWithAggregation(
  source: AsyncIterable<RecordBatch>
): Promise<void> {
  const processor = new MovingAverageProcessor(10);
  
  for await (const batch of source) {
    const valueColumn = batch.getColumn('value');
    
    // Process each value in the batch
    for (let i = 0; i < valueColumn.length; i++) {
      const value = valueColumn.get(i);
      if (value !== null) {
        const movingAvg = processor.process(value);
        console.log(`Value: ${value}, Moving Avg: ${movingAvg.toFixed(2)}`);
      }
    }
  }
}

// Real-time analytics pipeline
async function realTimeAnalyticsPipeline() {
  // Simulate real-time data source
  async function* realtimeDataSource() {
    while (true) {
      const batch = createMockBatch(); // Generate mock data
      yield batch;
      await new Promise(resolve => setTimeout(resolve, 100)); // 100ms intervals
    }
  }
  
  const aggregator = new StreamAggregator(
    schema,
    1000, // 1 second window
    { value: 'avg', count: 'sum' }
  );
  
  for await (const batch of realtimeDataSource()) {
    const result = aggregator.processBatch(batch);
    console.log('Real-time metrics:', result);
  }
}

Event Stream Processing

Processing event streams with Arrow for analytics.

/**
 * Event stream processor with Arrow backend
 */
class EventStreamProcessor<T extends TypeMap> {
  private builder: StructBuilder<T>;
  private batchSize: number;
  
  constructor(schema: Schema<T>, batchSize: number = 1000) {
    this.builder = new StructBuilder({ type: new Struct(schema.fields) });
    this.batchSize = batchSize;
  }
  
  /** Process single event */
  processEvent(event: T[keyof T]): void;
  
  /** Flush current batch */
  flushBatch(): RecordBatch<T>;
  
  /** Get processing statistics */
  getStats(): ProcessingStats;
}

interface ProcessingStats {
  eventsProcessed: number;
  batchesCreated: number;
  processingRate: number; // events per second
}

Usage Examples:

// Event processing system
class RealTimeEventProcessor {
  private processor: EventStreamProcessor<EventSchema>;
  private outputSink: WritableStream<RecordBatch>;
  
  constructor() {
    const eventSchema = new Schema([
      new Field('timestamp', new TimestampMillisecond()),
      new Field('userId', new Utf8()),
      new Field('eventType', new Utf8()),
      new Field('value', new Float64())
    ]);
    
    this.processor = new EventStreamProcessor(eventSchema, 1000);
    this.outputSink = createEventSink();
  }
  
  async processEvent(event: UserEvent): Promise<void> {
    this.processor.processEvent(event);
    
    // Auto-flush when batch is full
    if (this.processor.shouldFlush()) {
      const batch = this.processor.flushBatch();
      await this.outputSink.getWriter().write(batch);
    }
  }
  
  async flush(): Promise<void> {
    const batch = this.processor.flushBatch();
    if (batch.length > 0) {
      await this.outputSink.getWriter().write(batch);
    }
  }
}

// WebSocket event stream
async function processWebSocketEvents() {
  const processor = new RealTimeEventProcessor();
  
  const ws = new WebSocket('ws://events.example.com');
  
  ws.onmessage = async (event) => {
    try {
      const eventData = JSON.parse(event.data);
      await processor.processEvent(eventData);
    } catch (error) {
      console.error('Error processing event:', error);
    }
  };
  
  // Periodic flush
  setInterval(async () => {
    await processor.flush();
  }, 5000); // Flush every 5 seconds
}

Performance Optimization

Memory Management in Streams

Efficient memory usage patterns for stream processing.

/**
 * Memory-efficient stream processor
 */
class MemoryEfficientProcessor<T extends TypeMap> {
  private maxMemoryUsage: number;
  private currentMemoryUsage: number = 0;
  
  constructor(maxMemoryMB: number) {
    this.maxMemoryUsage = maxMemoryMB * 1024 * 1024; // Convert to bytes
  }
  
  /** Check if processing should be paused due to memory pressure */
  shouldPause(): boolean;
  
  /** Process batch with memory monitoring */
  processBatch(batch: RecordBatch<T>): Promise<void>;
  
  /** Force garbage collection hint */
  forceCleanup(): void;
}

Usage Examples:

// Memory-conscious stream processing
async function processLargeStreamSafely(
  source: AsyncIterable<RecordBatch>
): Promise<void> {
  const processor = new MemoryEfficientProcessor(100); // 100MB limit
  
  for await (const batch of source) {
    // Check memory pressure
    if (processor.shouldPause()) {
      console.log('Pausing due to memory pressure');
      await new Promise(resolve => setTimeout(resolve, 1000));
      processor.forceCleanup();
    }
    
    await processor.processBatch(batch);
  }
}

// Backpressure handling
async function handleBackpressure(
  source: ReadableStream<RecordBatch>,
  processor: (batch: RecordBatch) => Promise<void>
): Promise<void> {
  const reader = source.getReader();
  let processingQueue: Promise<void>[] = [];
  const maxConcurrent = 5;
  
  try {
    while (true) {
      const { done, value: batch } = await reader.read();
      if (done) break;
      
      // Limit concurrent processing
      if (processingQueue.length >= maxConcurrent) {
        await Promise.race(processingQueue);
        processingQueue = processingQueue.filter(p => p !== Promise.resolve());
      }
      
      // Add new processing task
      const task = processor(batch).catch(console.error);
      processingQueue.push(task);
    }
    
    // Wait for remaining tasks
    await Promise.all(processingQueue);
  } finally {
    reader.releaseLock();
  }
}

Parallel Stream Processing

Processing multiple streams in parallel for improved throughput.

/**
 * Parallel stream processor
 */
class ParallelStreamProcessor<T extends TypeMap> {
  private workers: Worker[];
  private roundRobinIndex: number = 0;
  
  constructor(workerCount: number, workerScript: string) {
    this.workers = Array.from({ length: workerCount }, () => 
      new Worker(workerScript)
    );
  }
  
  /** Distribute batch to next available worker */
  processBatch(batch: RecordBatch<T>): Promise<RecordBatch<T>>;
  
  /** Terminate all workers */
  terminate(): void;
}

Usage Examples:

// Parallel processing setup
async function setupParallelProcessing() {
  const processor = new ParallelStreamProcessor(4, 'worker.js');
  
  const source = createLargeDataStream();
  const results: RecordBatch[] = [];
  
  // Process batches in parallel
  const processingPromises: Promise<void>[] = [];
  
  for await (const batch of source) {
    const promise = processor.processBatch(batch)
      .then(result => results.push(result));
    
    processingPromises.push(promise);
    
    // Limit concurrent operations
    if (processingPromises.length >= 10) {
      await Promise.race(processingPromises);
    }
  }
  
  // Wait for all processing to complete
  await Promise.all(processingPromises);
  
  // Cleanup
  processor.terminate();
  
  return new Table(results);
}

// Worker thread processing (worker.js)
// This would be in a separate file
/*
import { RecordBatch, Vector } from "apache-arrow";

self.onmessage = function(event) {
  const { batch, operation } = event.data;
  
  try {
    const result = processBatch(batch, operation);
    self.postMessage({ success: true, result });
  } catch (error) {
    self.postMessage({ success: false, error: error.message });
  }
};

function processBatch(batch: RecordBatch, operation: string): RecordBatch {
  // Apply operation to batch
  switch (operation) {
    case 'filter':
      return filterBatch(batch);
    case 'transform':
      return transformBatch(batch);
    default:
      return batch;
  }
}
*/

// Multi-stream processing
async function processMultipleStreams(
  streams: AsyncIterable<RecordBatch>[]
): Promise<Table[]> {
  const results = await Promise.all(
    streams.map(async (stream, index) => {
      const batches: RecordBatch[] = [];
      
      for await (const batch of stream) {
        // Process each stream independently
        const processed = await processStreamBatch(batch, index);
        batches.push(processed);
      }
      
      return new Table(batches);
    })
  );
  
  return results;
}

Integration Examples

Database Streaming

Streaming data from databases with Arrow format.

// PostgreSQL streaming example
import { Client } from 'pg';

async function* streamFromPostgres(query: string): AsyncGenerator<RecordBatch> {
  const client = new Client();
  await client.connect();
  
  const cursor = client.query(new Cursor(query));
  
  try {
    while (true) {
      const rows = await cursor.read(1000); // Read 1000 rows at a time
      if (rows.length === 0) break;
      
      // Convert rows to RecordBatch
      const batch = rowsToRecordBatch(rows);
      yield batch;
    }
  } finally {
    await cursor.close();
    await client.end();
  }
}

// Usage
for await (const batch of streamFromPostgres('SELECT * FROM large_table')) {
  await processBatch(batch);
}

File System Streaming

Streaming file processing with Arrow.

// Process multiple Arrow files in sequence
async function* streamMultipleFiles(filePaths: string[]): AsyncGenerator<RecordBatch> {
  for (const path of filePaths) {
    const reader = await RecordBatchReader.from(path);
    
    for await (const batch of reader) {
      yield batch;
    }
  }
}

// Process directory of Arrow files
import { readdir } from 'fs/promises';

async function processArrowDirectory(dirPath: string): Promise<void> {
  const files = await readdir(dirPath);
  const arrowFiles = files
    .filter(file => file.endsWith('.arrow'))
    .map(file => path.join(dirPath, file));
  
  let totalRows = 0;
  
  for await (const batch of streamMultipleFiles(arrowFiles)) {
    totalRows += batch.length;
    await processBatch(batch);
  }
  
  console.log(`Processed ${totalRows} total rows from ${arrowFiles.length} files`);
}