or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

builders.mddata-types.mdindex.mdio-operations.mdstreaming.mdtables.mdutilities.mdvectors.md
tile.json

io-operations.mddocs/

I/O Operations

Apache Arrow JavaScript provides comprehensive I/O capabilities for reading and writing Arrow data in various formats, including the Arrow IPC (Inter-Process Communication) format, JSON serialization, and integration with different data sources and sinks.

Capabilities

Serialization Functions

High-level functions for converting between tables and binary Arrow format.

/**
 * Serialize table to Arrow IPC format
 */
function tableToIPC(
  table: Table, 
  type?: 'stream' | 'file'
): Uint8Array;

/**
 * Deserialize table from Arrow IPC format
 */
function tableFromIPC(buffer: ArrayBufferViewInput): Table;

/**
 * Create table from IPC stream
 */
function tableFromIPC(
  buffer: ArrayBufferViewInput,
  dictionaries?: Map<number, Vector>
): Table;

// Type definitions for input sources
type ArrayBufferViewInput = 
  | ArrayBuffer 
  | ArrayBufferView 
  | Uint8Array 
  | Buffer 
  | string;

Usage Examples:

import { tableToIPC, tableFromIPC, tableFromArrays } from "apache-arrow";

// Create a table
const originalTable = tableFromArrays({
  id: [1, 2, 3],
  name: ['Alice', 'Bob', 'Charlie'],
  score: [95.5, 87.2, 92.1]
});

// Serialize to Arrow IPC format
const fileBuffer = tableToIPC(originalTable, 'file');     // Arrow file format
const streamBuffer = tableToIPC(originalTable, 'stream'); // Arrow stream format

// Deserialize from Arrow IPC format
const deserializedTable = tableFromIPC(fileBuffer);

// Verify data integrity
console.log(deserializedTable.length);                    // 3
console.log(deserializedTable.getColumn('name').get(0));  // 'Alice'

// Save to file (Node.js)
import { writeFileSync } from 'fs';
writeFileSync('data.arrow', fileBuffer);

// Read from file (Node.js)
import { readFileSync } from 'fs';
const loadedBuffer = readFileSync('data.arrow');
const loadedTable = tableFromIPC(loadedBuffer);

RecordBatch Readers

Classes for reading Arrow IPC data from various sources.

/**
 * Base class for reading Arrow record batches
 */
abstract class RecordBatchReader<T extends TypeMap = any> {
  /** Schema of the record batches */
  readonly schema: Schema<T>;
  
  /** Dictionary vectors */
  readonly dictionaries: Map<number, Vector>;
  
  /** Create reader from various sources */
  static from<T extends TypeMap>(source: ReadableSource): RecordBatchReader<T>;
  
  /** Open reader from file */
  static readAll<T extends TypeMap>(source: ReadableSource): Table<T>;
  
  /** Read all batches into a table */
  readAll(): Table<T>;
  
  /** Iterator over record batches */
  [Symbol.iterator](): Iterator<RecordBatch<T>>;
  
  /** Async iterator over record batches */
  [Symbol.asyncIterator](): AsyncIterator<RecordBatch<T>>;
  
  /** Cancel reading (async readers) */
  cancel(): void;
  
  /** Close reader and free resources */
  return(): IteratorResult<RecordBatch<T>>;
}

/**
 * Reader for Arrow file format
 */
class RecordBatchFileReader<T extends TypeMap = any> extends RecordBatchReader<T> {
  /** Number of record batches in file */
  readonly numRecordBatches: number;
  
  /** Footer metadata */
  readonly footer: Footer;
  
  /** Get specific batch by index */
  getRecordBatch(index: number): RecordBatch<T>;
  
  /** Read batches in range */
  readAt(index: number): RecordBatch<T>;
}

/**
 * Reader for Arrow stream format  
 */
class RecordBatchStreamReader<T extends TypeMap = any> extends RecordBatchReader<T> {
  // Inherits base functionality for streaming reads
}

/**
 * Async reader for Arrow files
 */
class AsyncRecordBatchFileReader<T extends TypeMap = any> extends RecordBatchReader<T> {
  /** Async read all batches */
  readAll(): Promise<Table<T>>;
  
  /** Async get batch by index */
  getRecordBatch(index: number): Promise<RecordBatch<T>>;
}

/**
 * Async reader for Arrow streams
 */
class AsyncRecordBatchStreamReader<T extends TypeMap = any> extends RecordBatchReader<T> {
  /** Async read all batches */
  readAll(): Promise<Table<T>>;
}

Usage Examples:

import { 
  RecordBatchReader, 
  RecordBatchFileReader, 
  RecordBatchStreamReader 
} from "apache-arrow";

// Reading from different sources
const fileReader = RecordBatchReader.from('/path/to/data.arrow');
const streamReader = RecordBatchReader.from(streamBuffer);
const urlReader = RecordBatchReader.from('https://example.com/data.arrow');

// Read all data into table
const table = fileReader.readAll();

// Iterate through batches for large files
for (const batch of fileReader) {
  console.log(`Batch with ${batch.length} rows`);
  // Process batch without loading entire table into memory
}

// Async reading
async function readLargeFile() {
  const asyncReader = await AsyncRecordBatchFileReader.from('/large-file.arrow');
  
  for await (const batch of asyncReader) {
    // Process each batch asynchronously
    await processBatch(batch);
  }
  
  await asyncReader.return(); // Clean up resources
}

// Random access in files
const fileReader2 = RecordBatchFileReader.from(buffer);
const firstBatch = fileReader2.getRecordBatch(0);
const lastBatch = fileReader2.getRecordBatch(fileReader2.numRecordBatches - 1);

RecordBatch Writers

Classes for writing Arrow IPC data to various destinations.

/**
 * Base class for writing Arrow record batches
 */
abstract class RecordBatchWriter<T extends TypeMap = any> {
  /** Schema being written */
  readonly schema: Schema<T>;
  
  /** Create writer for destination */
  static writeAll<T extends TypeMap>(
    sink: WritableSink, 
    table: Table<T>
  ): void;
  
  /** Write entire table */
  writeAll(table: Table<T>): void;
  
  /** Write single record batch */
  writeRecordBatch(batch: RecordBatch<T>): void;
  
  /** Close writer and finalize output */
  close(): void;
  
  /** Get current position/statistics */
  readonly bytesWritten: number;
}

/**
 * Writer for Arrow file format
 */
class RecordBatchFileWriter<T extends TypeMap = any> extends RecordBatchWriter<T> {
  /** Create file writer */
  static writeAll<T extends TypeMap>(
    sink: WritableSink, 
    table: Table<T>
  ): RecordBatchFileWriter<T>;
}

/**
 * Writer for Arrow stream format
 */
class RecordBatchStreamWriter<T extends TypeMap = any> extends RecordBatchWriter<T> {
  /** Create stream writer */
  static writeAll<T extends TypeMap>(
    sink: WritableSink, 
    table: Table<T>
  ): RecordBatchStreamWriter<T>;
}

/**
 * Writer for JSON format
 */
class RecordBatchJSONWriter<T extends TypeMap = any> extends RecordBatchWriter<T> {
  /** Create JSON writer */
  static writeAll<T extends TypeMap>(
    sink: WritableSink, 
    table: Table<T>
  ): RecordBatchJSONWriter<T>;
}

// Type definitions for output destinations
type WritableSink = 
  | NodeJS.WriteStream 
  | WritableStream 
  | string 
  | Uint8Array[];

Usage Examples:

import { 
  RecordBatchFileWriter, 
  RecordBatchStreamWriter, 
  RecordBatchJSONWriter,
  tableFromArrays 
} from "apache-arrow";

const table = tableFromArrays({
  id: [1, 2, 3, 4, 5],
  name: ['A', 'B', 'C', 'D', 'E'],
  value: [10.1, 20.2, 30.3, 40.4, 50.5]
});

// Write to Arrow file format
const fileOutput = [];
RecordBatchFileWriter.writeAll(fileOutput, table);
const arrowFileBytes = new Uint8Array(fileOutput.flat());

// Write to Arrow stream format
const streamOutput = [];
RecordBatchStreamWriter.writeAll(streamOutput, table);
const arrowStreamBytes = new Uint8Array(streamOutput.flat());

// Write to JSON format
const jsonOutput = [];
RecordBatchJSONWriter.writeAll(jsonOutput, table);
const jsonString = new TextDecoder().decode(new Uint8Array(jsonOutput.flat()));

// Manual writing for more control
const writer = new RecordBatchFileWriter(schema);

// Write batches individually
const batch1 = table.slice(0, 3);  // First 3 rows
const batch2 = table.slice(3);     // Remaining rows

writer.writeRecordBatch(batch1);
writer.writeRecordBatch(batch2);
writer.close();

// Node.js file writing
import { createWriteStream } from 'fs';
const fileStream = createWriteStream('output.arrow');
RecordBatchFileWriter.writeAll(fileStream, table);

Message Handling

Low-level message processing for Arrow IPC format.

/**
 * Reader for Arrow IPC messages
 */
class MessageReader {
  /** Read messages from byte source */
  static from(source: ByteStream): MessageReader;
  
  /** Iterator over messages */
  [Symbol.iterator](): Iterator<Message>;
  
  /** Read next message */
  readMessage(): Message | null;
}

/**
 * Async reader for Arrow IPC messages
 */
class AsyncMessageReader {
  /** Read messages from async byte source */
  static from(source: AsyncByteStream): AsyncMessageReader;
  
  /** Async iterator over messages */
  [Symbol.asyncIterator](): AsyncIterator<Message>;
  
  /** Read next message asynchronously */
  readMessage(): Promise<Message | null>;
  
  /** Cancel reading */
  cancel(): void;
}

/**
 * Reader for JSON messages
 */
class JSONMessageReader {
  /** Read JSON messages */
  static from(source: ArrowJSONLike): JSONMessageReader;
  
  /** Iterator over messages */
  [Symbol.iterator](): Iterator<Message>;
}

/**
 * Arrow IPC message
 */
class Message {
  /** Message header type */
  readonly headerType: MessageHeader;
  
  /** Message body length */
  readonly bodyLength: number;
  
  /** Create record batch from message */
  createRecordBatch(header: RecordBatchHeader): RecordBatch;
  
  /** Create dictionary batch from message */
  createDictionaryBatch(header: DictionaryBatchHeader): RecordBatch;
}

Stream Classes

Low-level byte stream abstractions for I/O operations.

/**
 * Synchronous byte stream
 */
class ByteStream {
  /** Current position in stream */
  position: number;
  
  /** Read bytes from current position */
  read(length?: number): Uint8Array | null;
  
  /** Peek at bytes without advancing position */
  peek(length?: number): Uint8Array | null;
  
  /** Seek to specific position */
  seek(position: number): boolean;
  
  /** Reset to beginning */
  reset(): void;
}

/**
 * Asynchronous byte stream
 */
class AsyncByteStream {
  /** Current position in stream */
  position: number;
  
  /** Read bytes asynchronously */
  read(length?: number): Promise<Uint8Array | null>;
  
  /** Peek at bytes asynchronously */
  peek(length?: number): Promise<Uint8Array | null>;
  
  /** Cancel reading operations */
  cancel(): void;
}

/**
 * Queue for asynchronous byte processing
 */
class AsyncByteQueue {
  /** Write bytes to queue */
  write(chunk: Uint8Array): void;
  
  /** Close the queue */
  close(): void;
  
  /** Handle errors */
  error(error: Error): void;
  
  /** Create readable stream from queue */
  toAsyncIterable(): AsyncIterable<Uint8Array>;
}

Data Source Integration

File System Integration

Working with file system sources (Node.js).

/**
 * File handle interface for random access
 */
interface FileHandle {
  /** Read from file at position */
  read(position: number, length: number): Promise<Uint8Array>;
  
  /** Get file size */
  size(): Promise<number>;
  
  /** Close file handle */
  close(): Promise<void>;
}

/**
 * Create reader from file path
 */
function readFileArrow(path: string): Promise<Table>;

/**
 * Write table to file path
 */
function writeFileArrow(path: string, table: Table): Promise<void>;

Usage Examples:

// Node.js file operations
import { readFileSync, writeFileSync } from 'fs';
import { RecordBatchReader, RecordBatchFileWriter } from "apache-arrow";

// Read Arrow file
async function readArrowFile(path: string) {
  const buffer = readFileSync(path);
  const reader = RecordBatchReader.from(buffer);
  return reader.readAll();
}

// Write Arrow file
async function writeArrowFile(path: string, table: Table) {
  const sink = [];
  RecordBatchFileWriter.writeAll(sink, table);
  const buffer = new Uint8Array(sink.flat());
  writeFileSync(path, buffer);
}

// Streaming file read for large files
import { createReadStream } from 'fs';

async function streamArrowFile(path: string) {
  const stream = createReadStream(path);
  const reader = await RecordBatchReader.from(stream);
  
  for await (const batch of reader) {
    // Process each batch without loading entire file
    console.log(`Processing batch with ${batch.length} rows`);
  }
}

Network Integration

Reading Arrow data from network sources.

/**
 * Fetch Arrow data from URL
 */
async function fetchArrowTable(url: string): Promise<Table> {
  const response = await fetch(url);
  const buffer = await response.arrayBuffer();
  return tableFromIPC(buffer);
}

/**
 * Stream Arrow data from URL
 */
async function* streamArrowFromURL(url: string): AsyncGenerator<RecordBatch> {
  const response = await fetch(url);
  const reader = RecordBatchReader.from(response.body);
  
  for await (const batch of reader) {
    yield batch;
  }
}

Usage Examples:

// Fetch Arrow data from HTTP endpoint
const table = await fetchArrowTable('https://api.example.com/data.arrow');

// Stream large datasets
for await (const batch of streamArrowFromURL('https://api.example.com/large-dataset.arrow')) {
  // Process batch-by-batch to manage memory
  await processDataBatch(batch);
}

// POST Arrow data to API
async function uploadArrowData(url: string, table: Table) {
  const arrowBuffer = tableToIPC(table, 'file');
  
  await fetch(url, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/octet-stream',
    },
    body: arrowBuffer
  });
}

Database Integration

Integration patterns with databases and data warehouses.

/**
 * Convert SQL result to Arrow table
 */
function sqlToArrow(rows: any[], schema?: Schema): Table {
  // Convert SQL rows to Arrow table
  return tableFromJSON(rows);
}

/**
 * Convert Arrow table to SQL inserts
 */
function arrowToSQL(table: Table, tableName: string): string[] {
  const rows = table.toArray();
  return rows.map(row => {
    const values = Object.values(row).map(v => 
      typeof v === 'string' ? `'${v}'` : v
    );
    return `INSERT INTO ${tableName} VALUES (${values.join(', ')})`;
  });
}

Format Conversions

JSON Serialization

Converting between Arrow and JSON formats.

/**
 * Convert table to JSON array
 */
function tableToJSON(table: Table): any[];

/**
 * Convert table from JSON array
 */
function tableFromJSON<T>(data: T[]): Table<InferredTypes<T>>;

/**
 * Convert vector to JSON array
 */
function vectorToJSON<T>(vector: Vector<T>): T['TValue'][];

/**
 * Convert vector from JSON array
 */
function vectorFromJSON<T>(data: T[], type?: DataType): Vector<DataType>;

/**
 * Serialize schema to JSON
 */
function schemaToJSON(schema: Schema): object;

/**
 * Deserialize schema from JSON
 */
function schemaFromJSON(json: object): Schema;

Usage Examples:

import { 
  tableToJSON, 
  tableFromJSON, 
  vectorToJSON, 
  vectorFromJSON,
  tableFromArrays 
} from "apache-arrow";

const table = tableFromArrays({
  id: [1, 2, 3],
  name: ['Alice', 'Bob', 'Charlie'],
  active: [true, false, true]
});

// Convert to JSON
const jsonArray = tableToJSON(table);
console.log(jsonArray);
// [
//   { id: 1, name: 'Alice', active: true },
//   { id: 2, name: 'Bob', active: false },
//   { id: 3, name: 'Charlie', active: true }
// ]

// Convert back from JSON
const restoredTable = tableFromJSON(jsonArray);

// Vector JSON conversion
const nameVector = table.getColumn('name');
const nameArray = vectorToJSON(nameVector); // ['Alice', 'Bob', 'Charlie']
const newNameVector = vectorFromJSON(nameArray);

// Schema JSON serialization
const schemaJson = table.schema.toJSON();
const restoredSchema = Schema.from(schemaJson);

CSV Integration

Working with CSV data (requires additional processing).

/**
 * Convert CSV to Arrow table (custom implementation)
 */
function csvToArrow(
  csvData: string, 
  options?: {
    delimiter?: string;
    header?: boolean;
    types?: { [column: string]: DataType };
  }
): Table;

/**
 * Convert Arrow table to CSV
 */
function arrowToCSV(
  table: Table, 
  options?: {
    delimiter?: string;
    header?: boolean;
  }
): string;

Usage Examples:

// CSV to Arrow conversion (custom implementation)
function parseCSVToArrow(csvText: string): Table {
  const lines = csvText.trim().split('\n');
  const headers = lines[0].split(',');
  const rows = lines.slice(1).map(line => {
    const values = line.split(',');
    const row: any = {};
    headers.forEach((header, i) => {
      const value = values[i];
      row[header] = isNaN(Number(value)) ? value : Number(value);
    });
    return row;
  });
  
  return tableFromJSON(rows);
}

// Arrow to CSV conversion
function arrowToCSV(table: Table): string {
  const headers = table.schema.names;
  const rows = table.toArray();
  
  const csvLines = [
    headers.join(','),
    ...rows.map(row => 
      headers.map(header => row[header]).join(',')
    )
  ];
  
  return csvLines.join('\n');
}

// Usage
const csvData = `name,age,city
Alice,25,New York
Bob,30,San Francisco
Charlie,35,Chicago`;

const table = parseCSVToArrow(csvData);
const backToCsv = arrowToCSV(table);

Performance Optimization

Efficient I/O Patterns

Best practices for optimal I/O performance.

// Streaming for large files
async function processLargeArrowFile(path: string) {
  const reader = await RecordBatchReader.from(path);
  
  // Process in batches to control memory usage
  for await (const batch of reader) {
    // Process batch immediately
    await processBatch(batch);
    // Batch goes out of scope and can be garbage collected
  }
}

// Parallel processing of multiple files
async function processMultipleFiles(paths: string[]) {
  const readers = paths.map(path => RecordBatchReader.from(path));
  
  // Process files in parallel
  await Promise.all(readers.map(async reader => {
    for await (const batch of reader) {
      await processBatch(batch);
    }
  }));
}

// Efficient serialization with compression
function serializeWithCompression(table: Table): Uint8Array {
  const arrowBuffer = tableToIPC(table, 'file');
  
  // Apply compression (e.g., using pako for gzip)
  // const compressed = pako.gzip(arrowBuffer);
  // return compressed;
  
  return arrowBuffer;
}

Memory Management

Managing memory efficiently during I/O operations.

// Batch processing to limit memory usage
const BATCH_SIZE = 10000;

async function processContinuousStream(source: AsyncIterable<any>) {
  const builder = makeBuilder({ type: new Int32() });
  let batchCount = 0;
  
  for await (const item of source) {
    builder.append(item.value);
    batchCount++;
    
    if (batchCount >= BATCH_SIZE) {
      // Process batch and clear memory
      const vector = builder.finish().toVector();
      await processVector(vector);
      
      builder.clear(); // Free memory
      batchCount = 0;
    }
  }
  
  // Process remaining items
  if (batchCount > 0) {
    const vector = builder.finish().toVector();
    await processVector(vector);
  }
}

// Memory-efficient table operations
function processLargeTable(table: Table) {
  // Instead of table.toArray() which loads everything
  // Process row by row
  for (let i = 0; i < table.length; i++) {
    const row = table.get(i);
    processRow(row);
  }
  
  // Or process column by column
  for (const columnName of table.schema.names) {
    const column = table.getColumn(columnName);
    processColumn(column);
  }
}