A declarative visualization grammar for creating interactive data visualizations through JSON specifications.
—
Vega's reactive dataflow system provides incremental data processing with efficient change tracking, transform operations, and event-driven updates. The system enables complex data transformations while maintaining high performance through minimal recomputation.
The core dataflow engine manages operators, data flow, and incremental updates.
/**
* Core dataflow engine for reactive data processing
*/
class Dataflow {
constructor();
/**
* Add an operator to the dataflow
* @param operator - Operator to add
* @returns The dataflow instance
*/
add(operator: Operator): Dataflow;
/**
* Connect two operators in the dataflow graph
* @param sourceOp - Source operator
* @param targetOp - Target operator
* @returns The dataflow instance
*/
connect(sourceOp: Operator, targetOp: Operator): Dataflow;
/**
* Run the dataflow synchronously
* @returns The dataflow instance
*/
run(): Dataflow;
/**
* Run the dataflow asynchronously
* @returns Promise resolving to the dataflow instance
*/
runAsync(): Promise<Dataflow>;
/**
* Update an operator with new parameters
* @param operator - Operator to update
* @param parameters - New parameter values
* @returns The dataflow instance
*/
update(operator: Operator, parameters: any): Dataflow;
/**
* Touch an operator to mark it for re-evaluation
* @param operator - Operator to mark as dirty
* @returns The dataflow instance
*/
touch(operator: Operator): Dataflow;
/**
* Clean up the dataflow by removing all operators
* @returns The dataflow instance
*/
cleanUp(): Dataflow;
}Pulses represent data changes flowing through the dataflow graph.
/**
* Represents a data change pulse in the dataflow
*/
class Pulse {
/**
* Create a new pulse
* @param dataflow - The parent dataflow instance
* @param stamp - Optional timestamp for the pulse
*/
constructor(dataflow: Dataflow, stamp?: number);
/** Newly added data tuples */
add: any[];
/** Removed data tuples */
rem: any[];
/** Modified data tuples */
mod: any[];
/** Source data array */
source: any[];
/**
* Create a derived pulse with the same timestamp
* @returns New pulse instance
*/
fork(): Pulse;
/**
* Create a derived pulse with new timestamp
* @returns New pulse instance
*/
clone(): Pulse;
/**
* Check if pulse contains any changes
* @returns True if pulse has changes
*/
changed(): boolean;
/**
* Get all data tuples (source + add - rem)
* @returns Array of all current tuples
*/
materialize(): any[];
/**
* Visit all tuples with optional filtering
* @param source - Visit source tuples
* @param add - Visit added tuples
* @param rem - Visit removed tuples
* @param mod - Visit modified tuples
* @returns Array of visited tuples
*/
visit(source?: number, add?: number, rem?: number, mod?: number): any[];
}
/**
* Multi-pulse container for multiple simultaneous changes
*/
class MultiPulse {
constructor(dataflow: Dataflow, stamp: number, pulses: Pulse[]);
/** Array of contained pulses */
pulses: Pulse[];
/**
* Visit all tuples across all contained pulses
* @param source - Visit source tuples
* @param add - Visit added tuples
* @param rem - Visit removed tuples
* @param mod - Visit modified tuples
* @returns Array of visited tuples
*/
visit(source?: number, add?: number, rem?: number, mod?: number): any[];
}Base operator classes for dataflow computation nodes.
/**
* Base operator class for dataflow nodes
*/
class Operator {
/**
* Create a new operator
* @param init - Initial value
* @param update - Update function
* @param params - Parameter object
* @param react - Reactive update flag
*/
constructor(init?: any, update?: Function, params?: any, react?: boolean);
/** Operator unique identifier */
id: number;
/** Current operator value */
value: any;
/** Operator parameters */
params: any;
/** Pulse timestamp of last update */
stamp: number;
/** Reactive update flag */
react: boolean;
/**
* Set operator parameters
* @param params - Parameter object
* @returns The operator instance
*/
parameters(params: any): Operator;
/**
* Evaluate the operator with input pulse
* @param pulse - Input pulse
* @returns Output pulse or value
*/
evaluate(pulse: Pulse): any;
/**
* Check if operator is modified
* @returns True if operator has been modified
*/
modified(): boolean;
/**
* Run the operator's update function
* @param pulse - Input pulse
* @returns Output value
*/
run(pulse: Pulse): any;
}
/**
* Base transform operator class
*/
class Transform extends Operator {
constructor(init?: any, params?: any);
/**
* Transform method to be implemented by subclasses
* @param params - Transform parameters
* @param pulse - Input pulse
* @returns Output pulse
*/
transform(params: any, pulse: Pulse): Pulse;
}Parameter management for operators.
/**
* Parameter container for operators
*/
class Parameters {
constructor(operator: Operator, params?: any, initOnly?: boolean);
/** Reference to the parent operator */
operator: Operator;
/**
* Set parameter values
* @param params - Parameter object
* @returns The parameters instance
*/
set(params: any): Parameters;
/**
* Evaluate parameter expressions
* @param pulse - Current pulse for context
* @returns Evaluated parameter values
*/
evaluate(pulse: Pulse): any;
}Event stream management for reactive updates.
/**
* Event stream for reactive dataflow updates
*/
class EventStream {
/**
* Create a new event stream
* @param filter - Optional event filter function
*/
constructor(filter?: Function);
/** Event filter function */
filter: Function;
/** Target operators to update */
targets: Operator[];
/**
* Add target operator
* @param operator - Operator to add as target
* @returns The event stream instance
*/
target(operator: Operator): EventStream;
/**
* Remove target operator
* @param operator - Operator to remove
* @returns The event stream instance
*/
detarget(operator: Operator): EventStream;
/**
* Evaluate the stream with an event
* @param event - Input event
* @returns Stream evaluation result
*/
evaluate(event: any): any;
}Core functions for data manipulation and change tracking.
/**
* Create a new changeset for incremental data updates
* @returns New changeset instance
*/
function changeset(): Changeset;
/**
* Ingest a data tuple into the dataflow system
* @param datum - Data tuple to ingest
* @returns Ingested tuple with system metadata
*/
function ingest(datum: any): any;
/**
* Check if an object is a data tuple
* @param obj - Object to test
* @returns True if object is a tuple
*/
function isTuple(obj: any): boolean;
/**
* Get or set tuple identifier
* @param tuple - Data tuple
* @param id - Optional ID to set
* @returns Tuple ID
*/
function tupleid(tuple: any, id?: any): any;
interface Changeset {
/**
* Insert new tuples
* @param tuples - Array of tuples to insert
* @returns The changeset instance
*/
insert(tuples: any[]): Changeset;
/**
* Remove existing tuples
* @param tuples - Array of tuples to remove
* @returns The changeset instance
*/
remove(tuples: any[]): Changeset;
/**
* Modify existing tuples
* @param tuples - Array of tuples to modify
* @param field - Field name to modify (optional)
* @param value - New value (optional)
* @returns The changeset instance
*/
modify(tuples: any[], field?: string, value?: any): Changeset;
/**
* Reinsert tuples (remove then add)
* @param tuples - Array of tuples to reinsert
* @returns The changeset instance
*/
reinsert(tuples: any[]): Changeset;
/**
* Clean the changeset by removing empty change arrays
* @returns The changeset instance
*/
clean(): Changeset;
}Transform definition and registration system.
/**
* Get a transform definition by name
* @param name - Transform name
* @returns Transform definition object
*/
function definition(name: string): TransformDefinition;
/**
* Create a new transform operator
* @param name - Transform name
* @param params - Transform parameters
* @returns Transform operator instance
*/
function transform(name: string, params?: any): Transform;
/** Registry of all available transforms */
const transforms: { [name: string]: Transform };
interface TransformDefinition {
/** Transform name */
type: string;
/** Parameter metadata */
metadata: any;
/** Transform constructor */
transform: new (params?: any) => Transform;
}The transforms registry includes these built-in transform types:
import { Dataflow, Operator, transforms } from "vega";
const df = new Dataflow();
// Create data source operator
const data = df.add(new Operator([], null, {}, false));
// Create filter transform
const filter = df.add(transforms.filter({
expr: 'datum.value > 10'
}));
// Connect data to filter
df.connect(data, filter);
// Update data and run
data.pulse = df.pulse([
{value: 5}, {value: 15}, {value: 8}, {value: 20}
]);
df.run();
console.log(filter.value); // [{value: 15}, {value: 20}]import { changeset, ingest } from "vega";
// Create changeset for incremental updates
const cs = changeset()
.insert([
ingest({name: 'Alice', age: 25}),
ingest({name: 'Bob', age: 30})
])
.remove([existingTuple])
.modify([modifiedTuple], 'age', 31);
// Apply changeset through view
view.change('dataset', cs).run();import { Transform } from "vega";
class CustomTransform extends Transform {
constructor(params) {
super(null, params);
}
transform(params, pulse) {
const data = pulse.source || [];
const output = data.map(d => ({
...d,
computed: d.value * params.multiplier
}));
return pulse.fork().source = output;
}
}
// Register and use
transforms['custom'] = CustomTransform;
const customOp = transform('custom', {multiplier: 2});import { EventStream } from "vega";
const stream = new EventStream();
// Add operator targets
stream.target(filterOperator);
stream.target(aggregateOperator);
// Trigger updates
stream.evaluate({type: 'data-changed', data: newData});Install with Tessl CLI
npx tessl i tessl/npm-vega