Apache Arrow JavaScript provides comprehensive streaming capabilities for processing large datasets that don't fit in memory, with support for both DOM ReadableStream (browser) and Node.js stream APIs. This enables efficient data processing pipelines and real-time data integration.
Functions for converting between Arrow data and platform-specific stream formats.
/**
* Convert iterable to DOM ReadableStream (browser/DOM environments)
*/
function toDOMStream<T>(
source: Iterable<T> | AsyncIterable<T>,
options?: ReadableDOMStreamOptions
): ReadableStream<T>;
/**
* Convert iterable to Node.js Readable stream (Node.js environments)
*/
function toNodeStream<T>(
source: Iterable<T> | AsyncIterable<T>,
options?: NodeStreamOptions
): Readable;
/**
* Convert DOM ReadableStream to async iterable
*/
function fromDOMStream<T>(
stream: ReadableStream<T>
): AsyncIterable<T>;
/**
* Convert Node.js Readable stream to async iterable
*/
function fromNodeStream<T>(
stream: Readable
): AsyncIterable<T>;
// Options interfaces
interface ReadableDOMStreamOptions {
highWaterMark?: number;
size?: (chunk: any) => number;
type?: 'bytes' | undefined;
}
interface NodeStreamOptions {
highWaterMark?: number;
objectMode?: boolean;
encoding?: string;
}Usage Examples:
import {
toDOMStream,
toNodeStream,
fromDOMStream,
fromNodeStream,
tableFromArrays
} from "apache-arrow";
// Create sample data
async function* generateData() {
for (let i = 0; i < 1000; i++) {
yield { id: i, value: Math.random(), timestamp: Date.now() };
}
}
// DOM environment (browser)
const domStream = toDOMStream(generateData(), {
highWaterMark: 100
});
// Consume DOM stream
const reader = domStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log('Received chunk:', value);
}
} finally {
reader.releaseLock();
}
// Node.js environment
const nodeStream = toNodeStream(generateData(), {
objectMode: true,
highWaterMark: 100
});
nodeStream.on('data', (chunk) => {
console.log('Received chunk:', chunk);
});
nodeStream.on('end', () => {
console.log('Stream ended');
});Functions for streaming Arrow record batches through various stream APIs.
/**
* Transform DOM stream of record batches (DOM environments)
*/
function recordBatchReaderThroughDOMStream<T extends TypeMap>(
options?: ReadableDOMStreamOptions
): TransformStream<RecordBatch<T>, RecordBatch<T>>;
/**
* Transform Node.js stream of record batches (Node.js environments)
*/
function recordBatchReaderThroughNodeStream<T extends TypeMap>(
options?: NodeStreamOptions
): Transform;
/**
* Write record batches through DOM stream
*/
function recordBatchWriterThroughDOMStream<T extends TypeMap>(
sink: WritableStreamDefaultWriter,
schema: Schema<T>,
options?: WritableDOMStreamOptions
): WritableStream<RecordBatch<T>>;
/**
* Write record batches through Node.js stream
*/
function recordBatchWriterThroughNodeStream<T extends TypeMap>(
sink: Writable,
schema: Schema<T>,
options?: NodeWritableOptions
): Writable;
// Additional options interfaces
interface WritableDOMStreamOptions {
highWaterMark?: number;
size?: (chunk: any) => number;
}
interface NodeWritableOptions {
highWaterMark?: number;
objectMode?: boolean;
decodeStrings?: boolean;
}Usage Examples:
import {
RecordBatchReader,
recordBatchReaderThroughDOMStream,
recordBatchWriterThroughNodeStream,
tableFromArrays
} from "apache-arrow";
// DOM stream processing
async function processBatchesDOM(source: ReadableStream<Uint8Array>) {
const transformStream = recordBatchReaderThroughDOMStream();
const reader = source
.pipeThrough(transformStream)
.getReader();
try {
while (true) {
const { done, value: batch } = await reader.read();
if (done) break;
// Process each batch
console.log(`Processing batch with ${batch.length} rows`);
await processBatch(batch);
}
} finally {
reader.releaseLock();
}
}
// Node.js stream processing
import { pipeline } from 'stream';
import { createReadStream, createWriteStream } from 'fs';
async function processBatchesNode() {
const inputStream = createReadStream('input.arrow');
const outputStream = createWriteStream('output.arrow');
const reader = RecordBatchReader.from(inputStream);
const schema = reader.schema;
const writer = recordBatchWriterThroughNodeStream(
outputStream,
schema,
{ objectMode: true }
);
// Transform each batch
const transform = new Transform({
objectMode: true,
transform(batch: RecordBatch, encoding, callback) {
// Apply transformations to batch
const transformedBatch = transformBatch(batch);
callback(null, transformedBatch);
}
});
return new Promise((resolve, reject) => {
pipeline(reader, transform, writer, (error) => {
if (error) reject(error);
else resolve(undefined);
});
});
}Stream processing with Arrow builders for constructing data on-the-fly.
/**
* Create builder that processes iterable input
*/
function builderThroughIterable<T extends DataType>(
options: IterableBuilderOptions<T>
): (source: Iterable<T['TValue']>) => AsyncIterable<Vector<T>>;
/**
* Create builder that processes async iterable input
*/
function builderThroughAsyncIterable<T extends DataType>(
options: IterableBuilderOptions<T>
): (source: AsyncIterable<T['TValue']>) => AsyncIterable<Vector<T>>;
/**
* Create builder that processes DOM ReadableStream (DOM environments)
*/
function builderThroughDOMStream<T extends DataType>(
options: BuilderDuplexOptions<T>
): BuilderTransform<T>;
/**
* Create builder that processes Node.js stream (Node.js environments)
*/
function builderThroughNodeStream<T extends DataType>(
options: BuilderDuplexOptions<T>
): NodeJS.ReadWriteStream;
// Options interfaces
interface IterableBuilderOptions<T extends DataType> {
type: T;
nullValues?: any[];
highWaterMark?: number;
queueingStrategy?: 'bytes' | 'count';
}
interface BuilderDuplexOptions<T extends DataType> extends IterableBuilderOptions<T> {
writableStrategy?: QueuingStrategy;
readableStrategy?: QueuingStrategy;
}
type BuilderTransform<T extends DataType> = TransformStream<T['TValue'], Vector<T>>;Usage Examples:
import {
builderThroughAsyncIterable,
builderThroughDOMStream,
builderThroughNodeStream,
Int32,
Utf8
} from "apache-arrow";
// Async iterable processing
async function processAsyncData() {
const intBuilder = builderThroughAsyncIterable({
type: new Int32(),
highWaterMark: 1000
});
async function* generateNumbers() {
for (let i = 0; i < 10000; i++) {
yield i * i; // Square numbers
}
}
// Process data in chunks
for await (const vector of intBuilder(generateNumbers())) {
console.log(`Built vector with ${vector.length} elements`);
// Each vector contains up to 1000 elements
}
}
// DOM stream building
async function buildFromDOMStream() {
const stringBuilder = builderThroughDOMStream({
type: new Utf8(),
highWaterMark: 100
});
// Create input stream
const inputStream = new ReadableStream({
start(controller) {
['hello', 'world', 'arrow', 'streaming'].forEach(str => {
controller.enqueue(str);
});
controller.close();
}
});
// Process through builder transform
const outputStream = inputStream.pipeThrough(stringBuilder);
const reader = outputStream.getReader();
try {
while (true) {
const { done, value: vector } = await reader.read();
if (done) break;
console.log('Built string vector:', vector.toArray());
}
} finally {
reader.releaseLock();
}
}
// Node.js stream building
import { Readable } from 'stream';
function buildFromNodeStream() {
const intBuilder = builderThroughNodeStream({
type: new Int32(),
highWaterMark: 500
});
// Create input stream
const inputStream = new Readable({
objectMode: true,
read() {
for (let i = 0; i < 10; i++) {
this.push(Math.floor(Math.random() * 1000));
}
this.push(null); // End stream
}
});
// Process through builder
inputStream.pipe(intBuilder);
intBuilder.on('data', (vector: Vector<Int32>) => {
console.log('Built vector:', vector.toArray());
});
intBuilder.on('end', () => {
console.log('Stream processing completed');
});
}Processing streaming data with aggregations and windowing.
/**
* Windowed aggregation over streaming data
*/
class StreamAggregator<T extends TypeMap> {
constructor(
private schema: Schema<T>,
private windowSize: number,
private aggregationFunctions: { [K in keyof T]: AggregationFunction }
) {}
/** Process batch and update aggregations */
processBatch(batch: RecordBatch<T>): AggregationResult<T>;
/** Get current aggregation state */
getCurrentState(): AggregationResult<T>;
/** Reset aggregation state */
reset(): void;
}
type AggregationFunction = 'sum' | 'avg' | 'min' | 'max' | 'count';
type AggregationResult<T> = { [K in keyof T]: number };Usage Examples:
// Custom stream aggregator
class MovingAverageProcessor {
private window: number[] = [];
private windowSize: number;
constructor(windowSize: number) {
this.windowSize = windowSize;
}
process(value: number): number {
this.window.push(value);
if (this.window.length > this.windowSize) {
this.window.shift(); // Remove oldest value
}
return this.window.reduce((sum, val) => sum + val, 0) / this.window.length;
}
}
// Stream processing with aggregation
async function processStreamWithAggregation(
source: AsyncIterable<RecordBatch>
): Promise<void> {
const processor = new MovingAverageProcessor(10);
for await (const batch of source) {
const valueColumn = batch.getColumn('value');
// Process each value in the batch
for (let i = 0; i < valueColumn.length; i++) {
const value = valueColumn.get(i);
if (value !== null) {
const movingAvg = processor.process(value);
console.log(`Value: ${value}, Moving Avg: ${movingAvg.toFixed(2)}`);
}
}
}
}
// Real-time analytics pipeline
async function realTimeAnalyticsPipeline() {
// Simulate real-time data source
async function* realtimeDataSource() {
while (true) {
const batch = createMockBatch(); // Generate mock data
yield batch;
await new Promise(resolve => setTimeout(resolve, 100)); // 100ms intervals
}
}
const aggregator = new StreamAggregator(
schema,
1000, // 1 second window
{ value: 'avg', count: 'sum' }
);
for await (const batch of realtimeDataSource()) {
const result = aggregator.processBatch(batch);
console.log('Real-time metrics:', result);
}
}Processing event streams with Arrow for analytics.
/**
* Event stream processor with Arrow backend
*/
class EventStreamProcessor<T extends TypeMap> {
private builder: StructBuilder<T>;
private batchSize: number;
constructor(schema: Schema<T>, batchSize: number = 1000) {
this.builder = new StructBuilder({ type: new Struct(schema.fields) });
this.batchSize = batchSize;
}
/** Process single event */
processEvent(event: T[keyof T]): void;
/** Flush current batch */
flushBatch(): RecordBatch<T>;
/** Get processing statistics */
getStats(): ProcessingStats;
}
interface ProcessingStats {
eventsProcessed: number;
batchesCreated: number;
processingRate: number; // events per second
}Usage Examples:
// Event processing system
class RealTimeEventProcessor {
private processor: EventStreamProcessor<EventSchema>;
private outputSink: WritableStream<RecordBatch>;
constructor() {
const eventSchema = new Schema([
new Field('timestamp', new TimestampMillisecond()),
new Field('userId', new Utf8()),
new Field('eventType', new Utf8()),
new Field('value', new Float64())
]);
this.processor = new EventStreamProcessor(eventSchema, 1000);
this.outputSink = createEventSink();
}
async processEvent(event: UserEvent): Promise<void> {
this.processor.processEvent(event);
// Auto-flush when batch is full
if (this.processor.shouldFlush()) {
const batch = this.processor.flushBatch();
await this.outputSink.getWriter().write(batch);
}
}
async flush(): Promise<void> {
const batch = this.processor.flushBatch();
if (batch.length > 0) {
await this.outputSink.getWriter().write(batch);
}
}
}
// WebSocket event stream
async function processWebSocketEvents() {
const processor = new RealTimeEventProcessor();
const ws = new WebSocket('ws://events.example.com');
ws.onmessage = async (event) => {
try {
const eventData = JSON.parse(event.data);
await processor.processEvent(eventData);
} catch (error) {
console.error('Error processing event:', error);
}
};
// Periodic flush
setInterval(async () => {
await processor.flush();
}, 5000); // Flush every 5 seconds
}Efficient memory usage patterns for stream processing.
/**
* Memory-efficient stream processor
*/
class MemoryEfficientProcessor<T extends TypeMap> {
private maxMemoryUsage: number;
private currentMemoryUsage: number = 0;
constructor(maxMemoryMB: number) {
this.maxMemoryUsage = maxMemoryMB * 1024 * 1024; // Convert to bytes
}
/** Check if processing should be paused due to memory pressure */
shouldPause(): boolean;
/** Process batch with memory monitoring */
processBatch(batch: RecordBatch<T>): Promise<void>;
/** Force garbage collection hint */
forceCleanup(): void;
}Usage Examples:
// Memory-conscious stream processing
async function processLargeStreamSafely(
source: AsyncIterable<RecordBatch>
): Promise<void> {
const processor = new MemoryEfficientProcessor(100); // 100MB limit
for await (const batch of source) {
// Check memory pressure
if (processor.shouldPause()) {
console.log('Pausing due to memory pressure');
await new Promise(resolve => setTimeout(resolve, 1000));
processor.forceCleanup();
}
await processor.processBatch(batch);
}
}
// Backpressure handling
async function handleBackpressure(
source: ReadableStream<RecordBatch>,
processor: (batch: RecordBatch) => Promise<void>
): Promise<void> {
const reader = source.getReader();
let processingQueue: Promise<void>[] = [];
const maxConcurrent = 5;
try {
while (true) {
const { done, value: batch } = await reader.read();
if (done) break;
// Limit concurrent processing
if (processingQueue.length >= maxConcurrent) {
await Promise.race(processingQueue);
processingQueue = processingQueue.filter(p => p !== Promise.resolve());
}
// Add new processing task
const task = processor(batch).catch(console.error);
processingQueue.push(task);
}
// Wait for remaining tasks
await Promise.all(processingQueue);
} finally {
reader.releaseLock();
}
}Processing multiple streams in parallel for improved throughput.
/**
* Parallel stream processor
*/
class ParallelStreamProcessor<T extends TypeMap> {
private workers: Worker[];
private roundRobinIndex: number = 0;
constructor(workerCount: number, workerScript: string) {
this.workers = Array.from({ length: workerCount }, () =>
new Worker(workerScript)
);
}
/** Distribute batch to next available worker */
processBatch(batch: RecordBatch<T>): Promise<RecordBatch<T>>;
/** Terminate all workers */
terminate(): void;
}Usage Examples:
// Parallel processing setup
async function setupParallelProcessing() {
const processor = new ParallelStreamProcessor(4, 'worker.js');
const source = createLargeDataStream();
const results: RecordBatch[] = [];
// Process batches in parallel
const processingPromises: Promise<void>[] = [];
for await (const batch of source) {
const promise = processor.processBatch(batch)
.then(result => results.push(result));
processingPromises.push(promise);
// Limit concurrent operations
if (processingPromises.length >= 10) {
await Promise.race(processingPromises);
}
}
// Wait for all processing to complete
await Promise.all(processingPromises);
// Cleanup
processor.terminate();
return new Table(results);
}
// Worker thread processing (worker.js)
// This would be in a separate file
/*
import { RecordBatch, Vector } from "apache-arrow";
self.onmessage = function(event) {
const { batch, operation } = event.data;
try {
const result = processBatch(batch, operation);
self.postMessage({ success: true, result });
} catch (error) {
self.postMessage({ success: false, error: error.message });
}
};
function processBatch(batch: RecordBatch, operation: string): RecordBatch {
// Apply operation to batch
switch (operation) {
case 'filter':
return filterBatch(batch);
case 'transform':
return transformBatch(batch);
default:
return batch;
}
}
*/
// Multi-stream processing
async function processMultipleStreams(
streams: AsyncIterable<RecordBatch>[]
): Promise<Table[]> {
const results = await Promise.all(
streams.map(async (stream, index) => {
const batches: RecordBatch[] = [];
for await (const batch of stream) {
// Process each stream independently
const processed = await processStreamBatch(batch, index);
batches.push(processed);
}
return new Table(batches);
})
);
return results;
}Streaming data from databases with Arrow format.
// PostgreSQL streaming example
import { Client } from 'pg';
async function* streamFromPostgres(query: string): AsyncGenerator<RecordBatch> {
const client = new Client();
await client.connect();
const cursor = client.query(new Cursor(query));
try {
while (true) {
const rows = await cursor.read(1000); // Read 1000 rows at a time
if (rows.length === 0) break;
// Convert rows to RecordBatch
const batch = rowsToRecordBatch(rows);
yield batch;
}
} finally {
await cursor.close();
await client.end();
}
}
// Usage
for await (const batch of streamFromPostgres('SELECT * FROM large_table')) {
await processBatch(batch);
}Streaming file processing with Arrow.
// Process multiple Arrow files in sequence
async function* streamMultipleFiles(filePaths: string[]): AsyncGenerator<RecordBatch> {
for (const path of filePaths) {
const reader = await RecordBatchReader.from(path);
for await (const batch of reader) {
yield batch;
}
}
}
// Process directory of Arrow files
import { readdir } from 'fs/promises';
async function processArrowDirectory(dirPath: string): Promise<void> {
const files = await readdir(dirPath);
const arrowFiles = files
.filter(file => file.endsWith('.arrow'))
.map(file => path.join(dirPath, file));
let totalRows = 0;
for await (const batch of streamMultipleFiles(arrowFiles)) {
totalRows += batch.length;
await processBatch(batch);
}
console.log(`Processed ${totalRows} total rows from ${arrowFiles.length} files`);
}