Apache Arrow JavaScript provides comprehensive I/O capabilities for reading and writing Arrow data in various formats, including the Arrow IPC (Inter-Process Communication) format, JSON serialization, and integration with different data sources and sinks.
High-level functions for converting between tables and binary Arrow format.
/**
* Serialize table to Arrow IPC format
*/
function tableToIPC(
table: Table,
type?: 'stream' | 'file'
): Uint8Array;
/**
* Deserialize table from Arrow IPC format
*/
function tableFromIPC(buffer: ArrayBufferViewInput): Table;
/**
* Create table from IPC stream
*/
function tableFromIPC(
buffer: ArrayBufferViewInput,
dictionaries?: Map<number, Vector>
): Table;
// Type definitions for input sources
type ArrayBufferViewInput =
| ArrayBuffer
| ArrayBufferView
| Uint8Array
| Buffer
| string;Usage Examples:
import { tableToIPC, tableFromIPC, tableFromArrays } from "apache-arrow";
// Create a table
const originalTable = tableFromArrays({
id: [1, 2, 3],
name: ['Alice', 'Bob', 'Charlie'],
score: [95.5, 87.2, 92.1]
});
// Serialize to Arrow IPC format
const fileBuffer = tableToIPC(originalTable, 'file'); // Arrow file format
const streamBuffer = tableToIPC(originalTable, 'stream'); // Arrow stream format
// Deserialize from Arrow IPC format
const deserializedTable = tableFromIPC(fileBuffer);
// Verify data integrity
console.log(deserializedTable.length); // 3
console.log(deserializedTable.getColumn('name').get(0)); // 'Alice'
// Save to file (Node.js)
import { writeFileSync } from 'fs';
writeFileSync('data.arrow', fileBuffer);
// Read from file (Node.js)
import { readFileSync } from 'fs';
const loadedBuffer = readFileSync('data.arrow');
const loadedTable = tableFromIPC(loadedBuffer);Classes for reading Arrow IPC data from various sources.
/**
* Base class for reading Arrow record batches
*/
abstract class RecordBatchReader<T extends TypeMap = any> {
/** Schema of the record batches */
readonly schema: Schema<T>;
/** Dictionary vectors */
readonly dictionaries: Map<number, Vector>;
/** Create reader from various sources */
static from<T extends TypeMap>(source: ReadableSource): RecordBatchReader<T>;
/** Open reader from file */
static readAll<T extends TypeMap>(source: ReadableSource): Table<T>;
/** Read all batches into a table */
readAll(): Table<T>;
/** Iterator over record batches */
[Symbol.iterator](): Iterator<RecordBatch<T>>;
/** Async iterator over record batches */
[Symbol.asyncIterator](): AsyncIterator<RecordBatch<T>>;
/** Cancel reading (async readers) */
cancel(): void;
/** Close reader and free resources */
return(): IteratorResult<RecordBatch<T>>;
}
/**
* Reader for Arrow file format
*/
class RecordBatchFileReader<T extends TypeMap = any> extends RecordBatchReader<T> {
/** Number of record batches in file */
readonly numRecordBatches: number;
/** Footer metadata */
readonly footer: Footer;
/** Get specific batch by index */
getRecordBatch(index: number): RecordBatch<T>;
/** Read batches in range */
readAt(index: number): RecordBatch<T>;
}
/**
* Reader for Arrow stream format
*/
class RecordBatchStreamReader<T extends TypeMap = any> extends RecordBatchReader<T> {
// Inherits base functionality for streaming reads
}
/**
* Async reader for Arrow files
*/
class AsyncRecordBatchFileReader<T extends TypeMap = any> extends RecordBatchReader<T> {
/** Async read all batches */
readAll(): Promise<Table<T>>;
/** Async get batch by index */
getRecordBatch(index: number): Promise<RecordBatch<T>>;
}
/**
* Async reader for Arrow streams
*/
class AsyncRecordBatchStreamReader<T extends TypeMap = any> extends RecordBatchReader<T> {
/** Async read all batches */
readAll(): Promise<Table<T>>;
}Usage Examples:
import {
RecordBatchReader,
RecordBatchFileReader,
RecordBatchStreamReader
} from "apache-arrow";
// Reading from different sources
const fileReader = RecordBatchReader.from('/path/to/data.arrow');
const streamReader = RecordBatchReader.from(streamBuffer);
const urlReader = RecordBatchReader.from('https://example.com/data.arrow');
// Read all data into table
const table = fileReader.readAll();
// Iterate through batches for large files
for (const batch of fileReader) {
console.log(`Batch with ${batch.length} rows`);
// Process batch without loading entire table into memory
}
// Async reading
async function readLargeFile() {
const asyncReader = await AsyncRecordBatchFileReader.from('/large-file.arrow');
for await (const batch of asyncReader) {
// Process each batch asynchronously
await processBatch(batch);
}
await asyncReader.return(); // Clean up resources
}
// Random access in files
const fileReader2 = RecordBatchFileReader.from(buffer);
const firstBatch = fileReader2.getRecordBatch(0);
const lastBatch = fileReader2.getRecordBatch(fileReader2.numRecordBatches - 1);Classes for writing Arrow IPC data to various destinations.
/**
* Base class for writing Arrow record batches
*/
abstract class RecordBatchWriter<T extends TypeMap = any> {
/** Schema being written */
readonly schema: Schema<T>;
/** Create writer for destination */
static writeAll<T extends TypeMap>(
sink: WritableSink,
table: Table<T>
): void;
/** Write entire table */
writeAll(table: Table<T>): void;
/** Write single record batch */
writeRecordBatch(batch: RecordBatch<T>): void;
/** Close writer and finalize output */
close(): void;
/** Get current position/statistics */
readonly bytesWritten: number;
}
/**
* Writer for Arrow file format
*/
class RecordBatchFileWriter<T extends TypeMap = any> extends RecordBatchWriter<T> {
/** Create file writer */
static writeAll<T extends TypeMap>(
sink: WritableSink,
table: Table<T>
): RecordBatchFileWriter<T>;
}
/**
* Writer for Arrow stream format
*/
class RecordBatchStreamWriter<T extends TypeMap = any> extends RecordBatchWriter<T> {
/** Create stream writer */
static writeAll<T extends TypeMap>(
sink: WritableSink,
table: Table<T>
): RecordBatchStreamWriter<T>;
}
/**
* Writer for JSON format
*/
class RecordBatchJSONWriter<T extends TypeMap = any> extends RecordBatchWriter<T> {
/** Create JSON writer */
static writeAll<T extends TypeMap>(
sink: WritableSink,
table: Table<T>
): RecordBatchJSONWriter<T>;
}
// Type definitions for output destinations
type WritableSink =
| NodeJS.WriteStream
| WritableStream
| string
| Uint8Array[];Usage Examples:
import {
RecordBatchFileWriter,
RecordBatchStreamWriter,
RecordBatchJSONWriter,
tableFromArrays
} from "apache-arrow";
const table = tableFromArrays({
id: [1, 2, 3, 4, 5],
name: ['A', 'B', 'C', 'D', 'E'],
value: [10.1, 20.2, 30.3, 40.4, 50.5]
});
// Write to Arrow file format
const fileOutput = [];
RecordBatchFileWriter.writeAll(fileOutput, table);
const arrowFileBytes = new Uint8Array(fileOutput.flat());
// Write to Arrow stream format
const streamOutput = [];
RecordBatchStreamWriter.writeAll(streamOutput, table);
const arrowStreamBytes = new Uint8Array(streamOutput.flat());
// Write to JSON format
const jsonOutput = [];
RecordBatchJSONWriter.writeAll(jsonOutput, table);
const jsonString = new TextDecoder().decode(new Uint8Array(jsonOutput.flat()));
// Manual writing for more control
const writer = new RecordBatchFileWriter(schema);
// Write batches individually
const batch1 = table.slice(0, 3); // First 3 rows
const batch2 = table.slice(3); // Remaining rows
writer.writeRecordBatch(batch1);
writer.writeRecordBatch(batch2);
writer.close();
// Node.js file writing
import { createWriteStream } from 'fs';
const fileStream = createWriteStream('output.arrow');
RecordBatchFileWriter.writeAll(fileStream, table);Low-level message processing for Arrow IPC format.
/**
* Reader for Arrow IPC messages
*/
class MessageReader {
/** Read messages from byte source */
static from(source: ByteStream): MessageReader;
/** Iterator over messages */
[Symbol.iterator](): Iterator<Message>;
/** Read next message */
readMessage(): Message | null;
}
/**
* Async reader for Arrow IPC messages
*/
class AsyncMessageReader {
/** Read messages from async byte source */
static from(source: AsyncByteStream): AsyncMessageReader;
/** Async iterator over messages */
[Symbol.asyncIterator](): AsyncIterator<Message>;
/** Read next message asynchronously */
readMessage(): Promise<Message | null>;
/** Cancel reading */
cancel(): void;
}
/**
* Reader for JSON messages
*/
class JSONMessageReader {
/** Read JSON messages */
static from(source: ArrowJSONLike): JSONMessageReader;
/** Iterator over messages */
[Symbol.iterator](): Iterator<Message>;
}
/**
* Arrow IPC message
*/
class Message {
/** Message header type */
readonly headerType: MessageHeader;
/** Message body length */
readonly bodyLength: number;
/** Create record batch from message */
createRecordBatch(header: RecordBatchHeader): RecordBatch;
/** Create dictionary batch from message */
createDictionaryBatch(header: DictionaryBatchHeader): RecordBatch;
}Low-level byte stream abstractions for I/O operations.
/**
* Synchronous byte stream
*/
class ByteStream {
/** Current position in stream */
position: number;
/** Read bytes from current position */
read(length?: number): Uint8Array | null;
/** Peek at bytes without advancing position */
peek(length?: number): Uint8Array | null;
/** Seek to specific position */
seek(position: number): boolean;
/** Reset to beginning */
reset(): void;
}
/**
* Asynchronous byte stream
*/
class AsyncByteStream {
/** Current position in stream */
position: number;
/** Read bytes asynchronously */
read(length?: number): Promise<Uint8Array | null>;
/** Peek at bytes asynchronously */
peek(length?: number): Promise<Uint8Array | null>;
/** Cancel reading operations */
cancel(): void;
}
/**
* Queue for asynchronous byte processing
*/
class AsyncByteQueue {
/** Write bytes to queue */
write(chunk: Uint8Array): void;
/** Close the queue */
close(): void;
/** Handle errors */
error(error: Error): void;
/** Create readable stream from queue */
toAsyncIterable(): AsyncIterable<Uint8Array>;
}Working with file system sources (Node.js).
/**
* File handle interface for random access
*/
interface FileHandle {
/** Read from file at position */
read(position: number, length: number): Promise<Uint8Array>;
/** Get file size */
size(): Promise<number>;
/** Close file handle */
close(): Promise<void>;
}
/**
* Create reader from file path
*/
function readFileArrow(path: string): Promise<Table>;
/**
* Write table to file path
*/
function writeFileArrow(path: string, table: Table): Promise<void>;Usage Examples:
// Node.js file operations
import { readFileSync, writeFileSync } from 'fs';
import { RecordBatchReader, RecordBatchFileWriter } from "apache-arrow";
// Read Arrow file
async function readArrowFile(path: string) {
const buffer = readFileSync(path);
const reader = RecordBatchReader.from(buffer);
return reader.readAll();
}
// Write Arrow file
async function writeArrowFile(path: string, table: Table) {
const sink = [];
RecordBatchFileWriter.writeAll(sink, table);
const buffer = new Uint8Array(sink.flat());
writeFileSync(path, buffer);
}
// Streaming file read for large files
import { createReadStream } from 'fs';
async function streamArrowFile(path: string) {
const stream = createReadStream(path);
const reader = await RecordBatchReader.from(stream);
for await (const batch of reader) {
// Process each batch without loading entire file
console.log(`Processing batch with ${batch.length} rows`);
}
}Reading Arrow data from network sources.
/**
* Fetch Arrow data from URL
*/
async function fetchArrowTable(url: string): Promise<Table> {
const response = await fetch(url);
const buffer = await response.arrayBuffer();
return tableFromIPC(buffer);
}
/**
* Stream Arrow data from URL
*/
async function* streamArrowFromURL(url: string): AsyncGenerator<RecordBatch> {
const response = await fetch(url);
const reader = RecordBatchReader.from(response.body);
for await (const batch of reader) {
yield batch;
}
}Usage Examples:
// Fetch Arrow data from HTTP endpoint
const table = await fetchArrowTable('https://api.example.com/data.arrow');
// Stream large datasets
for await (const batch of streamArrowFromURL('https://api.example.com/large-dataset.arrow')) {
// Process batch-by-batch to manage memory
await processDataBatch(batch);
}
// POST Arrow data to API
async function uploadArrowData(url: string, table: Table) {
const arrowBuffer = tableToIPC(table, 'file');
await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/octet-stream',
},
body: arrowBuffer
});
}Integration patterns with databases and data warehouses.
/**
* Convert SQL result to Arrow table
*/
function sqlToArrow(rows: any[], schema?: Schema): Table {
// Convert SQL rows to Arrow table
return tableFromJSON(rows);
}
/**
* Convert Arrow table to SQL inserts
*/
function arrowToSQL(table: Table, tableName: string): string[] {
const rows = table.toArray();
return rows.map(row => {
const values = Object.values(row).map(v =>
typeof v === 'string' ? `'${v}'` : v
);
return `INSERT INTO ${tableName} VALUES (${values.join(', ')})`;
});
}Converting between Arrow and JSON formats.
/**
* Convert table to JSON array
*/
function tableToJSON(table: Table): any[];
/**
* Convert table from JSON array
*/
function tableFromJSON<T>(data: T[]): Table<InferredTypes<T>>;
/**
* Convert vector to JSON array
*/
function vectorToJSON<T>(vector: Vector<T>): T['TValue'][];
/**
* Convert vector from JSON array
*/
function vectorFromJSON<T>(data: T[], type?: DataType): Vector<DataType>;
/**
* Serialize schema to JSON
*/
function schemaToJSON(schema: Schema): object;
/**
* Deserialize schema from JSON
*/
function schemaFromJSON(json: object): Schema;Usage Examples:
import {
tableToJSON,
tableFromJSON,
vectorToJSON,
vectorFromJSON,
tableFromArrays
} from "apache-arrow";
const table = tableFromArrays({
id: [1, 2, 3],
name: ['Alice', 'Bob', 'Charlie'],
active: [true, false, true]
});
// Convert to JSON
const jsonArray = tableToJSON(table);
console.log(jsonArray);
// [
// { id: 1, name: 'Alice', active: true },
// { id: 2, name: 'Bob', active: false },
// { id: 3, name: 'Charlie', active: true }
// ]
// Convert back from JSON
const restoredTable = tableFromJSON(jsonArray);
// Vector JSON conversion
const nameVector = table.getColumn('name');
const nameArray = vectorToJSON(nameVector); // ['Alice', 'Bob', 'Charlie']
const newNameVector = vectorFromJSON(nameArray);
// Schema JSON serialization
const schemaJson = table.schema.toJSON();
const restoredSchema = Schema.from(schemaJson);Working with CSV data (requires additional processing).
/**
* Convert CSV to Arrow table (custom implementation)
*/
function csvToArrow(
csvData: string,
options?: {
delimiter?: string;
header?: boolean;
types?: { [column: string]: DataType };
}
): Table;
/**
* Convert Arrow table to CSV
*/
function arrowToCSV(
table: Table,
options?: {
delimiter?: string;
header?: boolean;
}
): string;Usage Examples:
// CSV to Arrow conversion (custom implementation)
function parseCSVToArrow(csvText: string): Table {
const lines = csvText.trim().split('\n');
const headers = lines[0].split(',');
const rows = lines.slice(1).map(line => {
const values = line.split(',');
const row: any = {};
headers.forEach((header, i) => {
const value = values[i];
row[header] = isNaN(Number(value)) ? value : Number(value);
});
return row;
});
return tableFromJSON(rows);
}
// Arrow to CSV conversion
function arrowToCSV(table: Table): string {
const headers = table.schema.names;
const rows = table.toArray();
const csvLines = [
headers.join(','),
...rows.map(row =>
headers.map(header => row[header]).join(',')
)
];
return csvLines.join('\n');
}
// Usage
const csvData = `name,age,city
Alice,25,New York
Bob,30,San Francisco
Charlie,35,Chicago`;
const table = parseCSVToArrow(csvData);
const backToCsv = arrowToCSV(table);Best practices for optimal I/O performance.
// Streaming for large files
async function processLargeArrowFile(path: string) {
const reader = await RecordBatchReader.from(path);
// Process in batches to control memory usage
for await (const batch of reader) {
// Process batch immediately
await processBatch(batch);
// Batch goes out of scope and can be garbage collected
}
}
// Parallel processing of multiple files
async function processMultipleFiles(paths: string[]) {
const readers = paths.map(path => RecordBatchReader.from(path));
// Process files in parallel
await Promise.all(readers.map(async reader => {
for await (const batch of reader) {
await processBatch(batch);
}
}));
}
// Efficient serialization with compression
function serializeWithCompression(table: Table): Uint8Array {
const arrowBuffer = tableToIPC(table, 'file');
// Apply compression (e.g., using pako for gzip)
// const compressed = pako.gzip(arrowBuffer);
// return compressed;
return arrowBuffer;
}Managing memory efficiently during I/O operations.
// Batch processing to limit memory usage
const BATCH_SIZE = 10000;
async function processContinuousStream(source: AsyncIterable<any>) {
const builder = makeBuilder({ type: new Int32() });
let batchCount = 0;
for await (const item of source) {
builder.append(item.value);
batchCount++;
if (batchCount >= BATCH_SIZE) {
// Process batch and clear memory
const vector = builder.finish().toVector();
await processVector(vector);
builder.clear(); // Free memory
batchCount = 0;
}
}
// Process remaining items
if (batchCount > 0) {
const vector = builder.finish().toVector();
await processVector(vector);
}
}
// Memory-efficient table operations
function processLargeTable(table: Table) {
// Instead of table.toArray() which loads everything
// Process row by row
for (let i = 0; i < table.length; i++) {
const row = table.get(i);
processRow(row);
}
// Or process column by column
for (const columnName of table.schema.names) {
const column = table.getColumn(columnName);
processColumn(column);
}
}