0
# Dataflow System
1
2
Vega's reactive dataflow system provides incremental data processing with efficient change tracking, transform operations, and event-driven updates. The system enables complex data transformations while maintaining high performance through minimal recomputation.
3
4
## Capabilities
5
6
### Dataflow Engine
7
8
The core dataflow engine manages operators, data flow, and incremental updates.
9
10
```typescript { .api }
11
/**
12
* Core dataflow engine for reactive data processing
13
*/
14
class Dataflow {
15
constructor();
16
17
/**
18
* Add an operator to the dataflow
19
* @param operator - Operator to add
20
* @returns The dataflow instance
21
*/
22
add(operator: Operator): Dataflow;
23
24
/**
25
* Connect two operators in the dataflow graph
26
* @param sourceOp - Source operator
27
* @param targetOp - Target operator
28
* @returns The dataflow instance
29
*/
30
connect(sourceOp: Operator, targetOp: Operator): Dataflow;
31
32
/**
33
* Run the dataflow synchronously
34
* @returns The dataflow instance
35
*/
36
run(): Dataflow;
37
38
/**
39
* Run the dataflow asynchronously
40
* @returns Promise resolving to the dataflow instance
41
*/
42
runAsync(): Promise<Dataflow>;
43
44
/**
45
* Update an operator with new parameters
46
* @param operator - Operator to update
47
* @param parameters - New parameter values
48
* @returns The dataflow instance
49
*/
50
update(operator: Operator, parameters: any): Dataflow;
51
52
/**
53
* Touch an operator to mark it for re-evaluation
54
* @param operator - Operator to mark as dirty
55
* @returns The dataflow instance
56
*/
57
touch(operator: Operator): Dataflow;
58
59
/**
60
* Clean up the dataflow by removing all operators
61
* @returns The dataflow instance
62
*/
63
cleanUp(): Dataflow;
64
}
65
```
66
67
### Pulse System
68
69
Pulses represent data changes flowing through the dataflow graph.
70
71
```typescript { .api }
72
/**
73
* Represents a data change pulse in the dataflow
74
*/
75
class Pulse {
76
/**
77
* Create a new pulse
78
* @param dataflow - The parent dataflow instance
79
* @param stamp - Optional timestamp for the pulse
80
*/
81
constructor(dataflow: Dataflow, stamp?: number);
82
83
/** Newly added data tuples */
84
add: any[];
85
86
/** Removed data tuples */
87
rem: any[];
88
89
/** Modified data tuples */
90
mod: any[];
91
92
/** Source data array */
93
source: any[];
94
95
/**
96
* Create a derived pulse with the same timestamp
97
* @returns New pulse instance
98
*/
99
fork(): Pulse;
100
101
/**
102
* Create a derived pulse with new timestamp
103
* @returns New pulse instance
104
*/
105
clone(): Pulse;
106
107
/**
108
* Check if pulse contains any changes
109
* @returns True if pulse has changes
110
*/
111
changed(): boolean;
112
113
/**
114
* Get all data tuples (source + add - rem)
115
* @returns Array of all current tuples
116
*/
117
materialize(): any[];
118
119
/**
120
* Visit all tuples with optional filtering
121
* @param source - Visit source tuples
122
* @param add - Visit added tuples
123
* @param rem - Visit removed tuples
124
* @param mod - Visit modified tuples
125
* @returns Array of visited tuples
126
*/
127
visit(source?: number, add?: number, rem?: number, mod?: number): any[];
128
}
129
130
/**
131
* Multi-pulse container for multiple simultaneous changes
132
*/
133
class MultiPulse {
134
constructor(dataflow: Dataflow, stamp: number, pulses: Pulse[]);
135
136
/** Array of contained pulses */
137
pulses: Pulse[];
138
139
/**
140
* Visit all tuples across all contained pulses
141
* @param source - Visit source tuples
142
* @param add - Visit added tuples
143
* @param rem - Visit removed tuples
144
* @param mod - Visit modified tuples
145
* @returns Array of visited tuples
146
*/
147
visit(source?: number, add?: number, rem?: number, mod?: number): any[];
148
}
149
```
150
151
### Operators
152
153
Base operator classes for dataflow computation nodes.
154
155
```typescript { .api }
156
/**
157
* Base operator class for dataflow nodes
158
*/
159
class Operator {
160
/**
161
* Create a new operator
162
* @param init - Initial value
163
* @param update - Update function
164
* @param params - Parameter object
165
* @param react - Reactive update flag
166
*/
167
constructor(init?: any, update?: Function, params?: any, react?: boolean);
168
169
/** Operator unique identifier */
170
id: number;
171
172
/** Current operator value */
173
value: any;
174
175
/** Operator parameters */
176
params: any;
177
178
/** Pulse timestamp of last update */
179
stamp: number;
180
181
/** Reactive update flag */
182
react: boolean;
183
184
/**
185
* Set operator parameters
186
* @param params - Parameter object
187
* @returns The operator instance
188
*/
189
parameters(params: any): Operator;
190
191
/**
192
* Evaluate the operator with input pulse
193
* @param pulse - Input pulse
194
* @returns Output pulse or value
195
*/
196
evaluate(pulse: Pulse): any;
197
198
/**
199
* Check if operator is modified
200
* @returns True if operator has been modified
201
*/
202
modified(): boolean;
203
204
/**
205
* Run the operator's update function
206
* @param pulse - Input pulse
207
* @returns Output value
208
*/
209
run(pulse: Pulse): any;
210
}
211
212
/**
213
* Base transform operator class
214
*/
215
class Transform extends Operator {
216
constructor(init?: any, params?: any);
217
218
/**
219
* Transform method to be implemented by subclasses
220
* @param params - Transform parameters
221
* @param pulse - Input pulse
222
* @returns Output pulse
223
*/
224
transform(params: any, pulse: Pulse): Pulse;
225
}
226
```
227
228
### Parameters
229
230
Parameter management for operators.
231
232
```typescript { .api }
233
/**
234
* Parameter container for operators
235
*/
236
class Parameters {
237
constructor(operator: Operator, params?: any, initOnly?: boolean);
238
239
/** Reference to the parent operator */
240
operator: Operator;
241
242
/**
243
* Set parameter values
244
* @param params - Parameter object
245
* @returns The parameters instance
246
*/
247
set(params: any): Parameters;
248
249
/**
250
* Evaluate parameter expressions
251
* @param pulse - Current pulse for context
252
* @returns Evaluated parameter values
253
*/
254
evaluate(pulse: Pulse): any;
255
}
256
```
257
258
### Event Streams
259
260
Event stream management for reactive updates.
261
262
```typescript { .api }
263
/**
264
* Event stream for reactive dataflow updates
265
*/
266
class EventStream {
267
/**
268
* Create a new event stream
269
* @param filter - Optional event filter function
270
*/
271
constructor(filter?: Function);
272
273
/** Event filter function */
274
filter: Function;
275
276
/** Target operators to update */
277
targets: Operator[];
278
279
/**
280
* Add target operator
281
* @param operator - Operator to add as target
282
* @returns The event stream instance
283
*/
284
target(operator: Operator): EventStream;
285
286
/**
287
* Remove target operator
288
* @param operator - Operator to remove
289
* @returns The event stream instance
290
*/
291
detarget(operator: Operator): EventStream;
292
293
/**
294
* Evaluate the stream with an event
295
* @param event - Input event
296
* @returns Stream evaluation result
297
*/
298
evaluate(event: any): any;
299
}
300
```
301
302
### Data Management Functions
303
304
Core functions for data manipulation and change tracking.
305
306
```typescript { .api }
307
/**
308
* Create a new changeset for incremental data updates
309
* @returns New changeset instance
310
*/
311
function changeset(): Changeset;
312
313
/**
314
* Ingest a data tuple into the dataflow system
315
* @param datum - Data tuple to ingest
316
* @returns Ingested tuple with system metadata
317
*/
318
function ingest(datum: any): any;
319
320
/**
321
* Check if an object is a data tuple
322
* @param obj - Object to test
323
* @returns True if object is a tuple
324
*/
325
function isTuple(obj: any): boolean;
326
327
/**
328
* Get or set tuple identifier
329
* @param tuple - Data tuple
330
* @param id - Optional ID to set
331
* @returns Tuple ID
332
*/
333
function tupleid(tuple: any, id?: any): any;
334
335
interface Changeset {
336
/**
337
* Insert new tuples
338
* @param tuples - Array of tuples to insert
339
* @returns The changeset instance
340
*/
341
insert(tuples: any[]): Changeset;
342
343
/**
344
* Remove existing tuples
345
* @param tuples - Array of tuples to remove
346
* @returns The changeset instance
347
*/
348
remove(tuples: any[]): Changeset;
349
350
/**
351
* Modify existing tuples
352
* @param tuples - Array of tuples to modify
353
* @param field - Field name to modify (optional)
354
* @param value - New value (optional)
355
* @returns The changeset instance
356
*/
357
modify(tuples: any[], field?: string, value?: any): Changeset;
358
359
/**
360
* Reinsert tuples (remove then add)
361
* @param tuples - Array of tuples to reinsert
362
* @returns The changeset instance
363
*/
364
reinsert(tuples: any[]): Changeset;
365
366
/**
367
* Clean the changeset by removing empty change arrays
368
* @returns The changeset instance
369
*/
370
clean(): Changeset;
371
}
372
```
373
374
### Transform Registry
375
376
Transform definition and registration system.
377
378
```typescript { .api }
379
/**
380
* Get a transform definition by name
381
* @param name - Transform name
382
* @returns Transform definition object
383
*/
384
function definition(name: string): TransformDefinition;
385
386
/**
387
* Create a new transform operator
388
* @param name - Transform name
389
* @param params - Transform parameters
390
* @returns Transform operator instance
391
*/
392
function transform(name: string, params?: any): Transform;
393
394
/** Registry of all available transforms */
395
const transforms: { [name: string]: Transform };
396
397
interface TransformDefinition {
398
/** Transform name */
399
type: string;
400
401
/** Parameter metadata */
402
metadata: any;
403
404
/** Transform constructor */
405
transform: new (params?: any) => Transform;
406
}
407
```
408
409
## Common Transform Types
410
411
The transforms registry includes these built-in transform types:
412
413
### Data Transforms
414
- **aggregate** - Group and summarize data
415
- **bin** - Create histogram bins
416
- **collect** - Collect and sort data
417
- **countpattern** - Count text pattern matches
418
- **cross** - Cross product of datasets
419
- **density** - Kernel density estimation
420
- **extent** - Calculate data extents
421
- **facet** - Create data facets
422
- **filter** - Filter data tuples
423
- **flatten** - Flatten array fields
424
- **fold** - Convert wide to long format
425
- **formula** - Add calculated fields
426
- **identifier** - Add unique identifiers
427
- **impute** - Fill missing values
428
- **joinaggregate** - Join with aggregated values
429
- **lookup** - Join datasets
430
- **pivot** - Convert long to wide format
431
- **project** - Select/rename fields
432
- **rank** - Rank data values
433
- **sample** - Random sampling
434
- **sequence** - Generate sequences
435
- **timeunit** - Extract time units
436
- **window** - Sliding window calculations
437
438
### Geo Transforms
439
- **geojson** - Parse GeoJSON
440
- **geopath** - Generate geo paths
441
- **geopoint** - Project geo coordinates
442
- **geoshape** - Create geo shapes
443
- **graticule** - Generate map graticule
444
445
### Layout Transforms
446
- **force** - Force-directed layout
447
- **linkpath** - Generate link paths
448
- **pack** - Circle packing layout
449
- **partition** - Partition layout
450
- **pie** - Pie/donut layout
451
- **stack** - Stack layout
452
- **tree** - Tree layout
453
454
### Visual Transforms
455
- **contour** - Contour generation
456
- **heatmap** - Heatmap binning
457
- **hexbin** - Hexagonal binning
458
- **kde** - Kernel density estimation
459
- **regression** - Regression analysis
460
- **wordcloud** - Word cloud layout
461
462
## Usage Examples
463
464
### Basic Dataflow Setup
465
466
```typescript
467
import { Dataflow, Operator, transforms } from "vega";
468
469
const df = new Dataflow();
470
471
// Create data source operator
472
const data = df.add(new Operator([], null, {}, false));
473
474
// Create filter transform
475
const filter = df.add(transforms.filter({
476
expr: 'datum.value > 10'
477
}));
478
479
// Connect data to filter
480
df.connect(data, filter);
481
482
// Update data and run
483
data.pulse = df.pulse([
484
{value: 5}, {value: 15}, {value: 8}, {value: 20}
485
]);
486
df.run();
487
488
console.log(filter.value); // [{value: 15}, {value: 20}]
489
```
490
491
### Using Changesets
492
493
```typescript
494
import { changeset, ingest } from "vega";
495
496
// Create changeset for incremental updates
497
const cs = changeset()
498
.insert([
499
ingest({name: 'Alice', age: 25}),
500
ingest({name: 'Bob', age: 30})
501
])
502
.remove([existingTuple])
503
.modify([modifiedTuple], 'age', 31);
504
505
// Apply changeset through view
506
view.change('dataset', cs).run();
507
```
508
509
### Custom Transform
510
511
```typescript
512
import { Transform } from "vega";
513
514
class CustomTransform extends Transform {
515
constructor(params) {
516
super(null, params);
517
}
518
519
transform(params, pulse) {
520
const data = pulse.source || [];
521
const output = data.map(d => ({
522
...d,
523
computed: d.value * params.multiplier
524
}));
525
526
return pulse.fork().source = output;
527
}
528
}
529
530
// Register and use
531
transforms['custom'] = CustomTransform;
532
const customOp = transform('custom', {multiplier: 2});
533
```
534
535
### Event-Driven Updates
536
537
```typescript
538
import { EventStream } from "vega";
539
540
const stream = new EventStream();
541
542
// Add operator targets
543
stream.target(filterOperator);
544
stream.target(aggregateOperator);
545
546
// Trigger updates
547
stream.evaluate({type: 'data-changed', data: newData});
548
```