CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-vega

A declarative visualization grammar for creating interactive data visualizations through JSON specifications.

Pending
Overview
Eval results
Files

dataflow.mddocs/

Dataflow System

Vega's reactive dataflow system provides incremental data processing with efficient change tracking, transform operations, and event-driven updates. The system enables complex data transformations while maintaining high performance through minimal recomputation.

Capabilities

Dataflow Engine

The core dataflow engine manages operators, data flow, and incremental updates.

/**
 * Core dataflow engine for reactive data processing
 */
class Dataflow {
  constructor();
  
  /**
   * Add an operator to the dataflow
   * @param operator - Operator to add
   * @returns The dataflow instance
   */
  add(operator: Operator): Dataflow;
  
  /**
   * Connect two operators in the dataflow graph
   * @param sourceOp - Source operator
   * @param targetOp - Target operator  
   * @returns The dataflow instance
   */
  connect(sourceOp: Operator, targetOp: Operator): Dataflow;
  
  /**
   * Run the dataflow synchronously
   * @returns The dataflow instance
   */
  run(): Dataflow;
  
  /**
   * Run the dataflow asynchronously
   * @returns Promise resolving to the dataflow instance
   */
  runAsync(): Promise<Dataflow>;
  
  /**
   * Update an operator with new parameters
   * @param operator - Operator to update
   * @param parameters - New parameter values
   * @returns The dataflow instance
   */
  update(operator: Operator, parameters: any): Dataflow;
  
  /**
   * Touch an operator to mark it for re-evaluation
   * @param operator - Operator to mark as dirty
   * @returns The dataflow instance
   */
  touch(operator: Operator): Dataflow;
  
  /**
   * Clean up the dataflow by removing all operators
   * @returns The dataflow instance
   */
  cleanUp(): Dataflow;
}

Pulse System

Pulses represent data changes flowing through the dataflow graph.

/**
 * Represents a data change pulse in the dataflow
 */
class Pulse {
  /**
   * Create a new pulse
   * @param dataflow - The parent dataflow instance
   * @param stamp - Optional timestamp for the pulse
   */
  constructor(dataflow: Dataflow, stamp?: number);
  
  /** Newly added data tuples */
  add: any[];
  
  /** Removed data tuples */
  rem: any[];
  
  /** Modified data tuples */
  mod: any[];
  
  /** Source data array */
  source: any[];
  
  /**
   * Create a derived pulse with the same timestamp
   * @returns New pulse instance
   */
  fork(): Pulse;
  
  /**
   * Create a derived pulse with new timestamp
   * @returns New pulse instance  
   */
  clone(): Pulse;
  
  /**
   * Check if pulse contains any changes
   * @returns True if pulse has changes
   */
  changed(): boolean;
  
  /**
   * Get all data tuples (source + add - rem)
   * @returns Array of all current tuples
   */
  materialize(): any[];
  
  /**
   * Visit all tuples with optional filtering
   * @param source - Visit source tuples
   * @param add - Visit added tuples
   * @param rem - Visit removed tuples  
   * @param mod - Visit modified tuples
   * @returns Array of visited tuples
   */
  visit(source?: number, add?: number, rem?: number, mod?: number): any[];
}

/**
 * Multi-pulse container for multiple simultaneous changes
 */
class MultiPulse {
  constructor(dataflow: Dataflow, stamp: number, pulses: Pulse[]);
  
  /** Array of contained pulses */
  pulses: Pulse[];
  
  /**
   * Visit all tuples across all contained pulses
   * @param source - Visit source tuples
   * @param add - Visit added tuples
   * @param rem - Visit removed tuples
   * @param mod - Visit modified tuples
   * @returns Array of visited tuples
   */
  visit(source?: number, add?: number, rem?: number, mod?: number): any[];
}

Operators

Base operator classes for dataflow computation nodes.

/**
 * Base operator class for dataflow nodes
 */
class Operator {
  /**
   * Create a new operator
   * @param init - Initial value
   * @param update - Update function
   * @param params - Parameter object
   * @param react - Reactive update flag
   */
  constructor(init?: any, update?: Function, params?: any, react?: boolean);
  
  /** Operator unique identifier */
  id: number;
  
  /** Current operator value */
  value: any;
  
  /** Operator parameters */
  params: any;
  
  /** Pulse timestamp of last update */
  stamp: number;
  
  /** Reactive update flag */
  react: boolean;
  
  /**
   * Set operator parameters
   * @param params - Parameter object
   * @returns The operator instance
   */
  parameters(params: any): Operator;
  
  /**
   * Evaluate the operator with input pulse
   * @param pulse - Input pulse
   * @returns Output pulse or value
   */
  evaluate(pulse: Pulse): any;
  
  /**
   * Check if operator is modified
   * @returns True if operator has been modified
   */
  modified(): boolean;
  
  /**
   * Run the operator's update function
   * @param pulse - Input pulse
   * @returns Output value
   */
  run(pulse: Pulse): any;
}

/**
 * Base transform operator class
 */
class Transform extends Operator {
  constructor(init?: any, params?: any);
  
  /**
   * Transform method to be implemented by subclasses
   * @param params - Transform parameters
   * @param pulse - Input pulse
   * @returns Output pulse
   */
  transform(params: any, pulse: Pulse): Pulse;
}

Parameters

Parameter management for operators.

/**
 * Parameter container for operators
 */
class Parameters {
  constructor(operator: Operator, params?: any, initOnly?: boolean);
  
  /** Reference to the parent operator */
  operator: Operator;
  
  /**
   * Set parameter values
   * @param params - Parameter object
   * @returns The parameters instance
   */
  set(params: any): Parameters;
  
  /**
   * Evaluate parameter expressions
   * @param pulse - Current pulse for context
   * @returns Evaluated parameter values
   */
  evaluate(pulse: Pulse): any;
}

Event Streams

Event stream management for reactive updates.

/**
 * Event stream for reactive dataflow updates
 */
class EventStream {
  /**
   * Create a new event stream
   * @param filter - Optional event filter function
   */
  constructor(filter?: Function);
  
  /** Event filter function */
  filter: Function;
  
  /** Target operators to update */
  targets: Operator[];
  
  /**
   * Add target operator
   * @param operator - Operator to add as target
   * @returns The event stream instance
   */
  target(operator: Operator): EventStream;
  
  /**
   * Remove target operator
   * @param operator - Operator to remove
   * @returns The event stream instance
   */
  detarget(operator: Operator): EventStream;
  
  /**
   * Evaluate the stream with an event
   * @param event - Input event
   * @returns Stream evaluation result
   */
  evaluate(event: any): any;
}

Data Management Functions

Core functions for data manipulation and change tracking.

/**
 * Create a new changeset for incremental data updates
 * @returns New changeset instance
 */
function changeset(): Changeset;

/**
 * Ingest a data tuple into the dataflow system
 * @param datum - Data tuple to ingest
 * @returns Ingested tuple with system metadata
 */
function ingest(datum: any): any;

/**
 * Check if an object is a data tuple
 * @param obj - Object to test
 * @returns True if object is a tuple
 */
function isTuple(obj: any): boolean;

/**
 * Get or set tuple identifier
 * @param tuple - Data tuple
 * @param id - Optional ID to set
 * @returns Tuple ID
 */
function tupleid(tuple: any, id?: any): any;

interface Changeset {
  /**
   * Insert new tuples
   * @param tuples - Array of tuples to insert
   * @returns The changeset instance
   */
  insert(tuples: any[]): Changeset;
  
  /**
   * Remove existing tuples
   * @param tuples - Array of tuples to remove
   * @returns The changeset instance
   */
  remove(tuples: any[]): Changeset;
  
  /**
   * Modify existing tuples
   * @param tuples - Array of tuples to modify
   * @param field - Field name to modify (optional)
   * @param value - New value (optional)
   * @returns The changeset instance
   */
  modify(tuples: any[], field?: string, value?: any): Changeset;
  
  /**
   * Reinsert tuples (remove then add)
   * @param tuples - Array of tuples to reinsert
   * @returns The changeset instance
   */
  reinsert(tuples: any[]): Changeset;
  
  /**
   * Clean the changeset by removing empty change arrays
   * @returns The changeset instance
   */
  clean(): Changeset;
}

Transform Registry

Transform definition and registration system.

/**
 * Get a transform definition by name
 * @param name - Transform name
 * @returns Transform definition object
 */
function definition(name: string): TransformDefinition;

/**
 * Create a new transform operator
 * @param name - Transform name
 * @param params - Transform parameters
 * @returns Transform operator instance
 */
function transform(name: string, params?: any): Transform;

/** Registry of all available transforms */
const transforms: { [name: string]: Transform };

interface TransformDefinition {
  /** Transform name */
  type: string;
  
  /** Parameter metadata */
  metadata: any;
  
  /** Transform constructor */
  transform: new (params?: any) => Transform;
}

Common Transform Types

The transforms registry includes these built-in transform types:

Data Transforms

  • aggregate - Group and summarize data
  • bin - Create histogram bins
  • collect - Collect and sort data
  • countpattern - Count text pattern matches
  • cross - Cross product of datasets
  • density - Kernel density estimation
  • extent - Calculate data extents
  • facet - Create data facets
  • filter - Filter data tuples
  • flatten - Flatten array fields
  • fold - Convert wide to long format
  • formula - Add calculated fields
  • identifier - Add unique identifiers
  • impute - Fill missing values
  • joinaggregate - Join with aggregated values
  • lookup - Join datasets
  • pivot - Convert long to wide format
  • project - Select/rename fields
  • rank - Rank data values
  • sample - Random sampling
  • sequence - Generate sequences
  • timeunit - Extract time units
  • window - Sliding window calculations

Geo Transforms

  • geojson - Parse GeoJSON
  • geopath - Generate geo paths
  • geopoint - Project geo coordinates
  • geoshape - Create geo shapes
  • graticule - Generate map graticule

Layout Transforms

  • force - Force-directed layout
  • linkpath - Generate link paths
  • pack - Circle packing layout
  • partition - Partition layout
  • pie - Pie/donut layout
  • stack - Stack layout
  • tree - Tree layout

Visual Transforms

  • contour - Contour generation
  • heatmap - Heatmap binning
  • hexbin - Hexagonal binning
  • kde - Kernel density estimation
  • regression - Regression analysis
  • wordcloud - Word cloud layout

Usage Examples

Basic Dataflow Setup

import { Dataflow, Operator, transforms } from "vega";

const df = new Dataflow();

// Create data source operator
const data = df.add(new Operator([], null, {}, false));

// Create filter transform
const filter = df.add(transforms.filter({
  expr: 'datum.value > 10'
}));

// Connect data to filter
df.connect(data, filter);

// Update data and run
data.pulse = df.pulse([
  {value: 5}, {value: 15}, {value: 8}, {value: 20}
]);
df.run();

console.log(filter.value); // [{value: 15}, {value: 20}]

Using Changesets

import { changeset, ingest } from "vega";

// Create changeset for incremental updates
const cs = changeset()
  .insert([
    ingest({name: 'Alice', age: 25}),
    ingest({name: 'Bob', age: 30})
  ])
  .remove([existingTuple])
  .modify([modifiedTuple], 'age', 31);

// Apply changeset through view
view.change('dataset', cs).run();

Custom Transform

import { Transform } from "vega";

class CustomTransform extends Transform {
  constructor(params) {
    super(null, params);
  }
  
  transform(params, pulse) {
    const data = pulse.source || [];
    const output = data.map(d => ({
      ...d,
      computed: d.value * params.multiplier
    }));
    
    return pulse.fork().source = output;
  }
}

// Register and use
transforms['custom'] = CustomTransform;
const customOp = transform('custom', {multiplier: 2});

Event-Driven Updates

import { EventStream } from "vega";

const stream = new EventStream();

// Add operator targets
stream.target(filterOperator);
stream.target(aggregateOperator);

// Trigger updates
stream.evaluate({type: 'data-changed', data: newData});

Install with Tessl CLI

npx tessl i tessl/npm-vega

docs

data-loading.md

dataflow.md

events.md

expressions.md

index.md

parsing.md

scales.md

scenegraph.md

statistics.md

time.md

utilities.md

view.md

tile.json