or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-loading.mddataflow.mdevents.mdexpressions.mdindex.mdparsing.mdscales.mdscenegraph.mdstatistics.mdtime.mdutilities.mdview.md

dataflow.mddocs/

0

# Dataflow System

1

2

Vega's reactive dataflow system provides incremental data processing with efficient change tracking, transform operations, and event-driven updates. The system enables complex data transformations while maintaining high performance through minimal recomputation.

3

4

## Capabilities

5

6

### Dataflow Engine

7

8

The core dataflow engine manages operators, data flow, and incremental updates.

9

10

```typescript { .api }

11

/**

12

* Core dataflow engine for reactive data processing

13

*/

14

class Dataflow {

15

constructor();

16

17

/**

18

* Add an operator to the dataflow

19

* @param operator - Operator to add

20

* @returns The dataflow instance

21

*/

22

add(operator: Operator): Dataflow;

23

24

/**

25

* Connect two operators in the dataflow graph

26

* @param sourceOp - Source operator

27

* @param targetOp - Target operator

28

* @returns The dataflow instance

29

*/

30

connect(sourceOp: Operator, targetOp: Operator): Dataflow;

31

32

/**

33

* Run the dataflow synchronously

34

* @returns The dataflow instance

35

*/

36

run(): Dataflow;

37

38

/**

39

* Run the dataflow asynchronously

40

* @returns Promise resolving to the dataflow instance

41

*/

42

runAsync(): Promise<Dataflow>;

43

44

/**

45

* Update an operator with new parameters

46

* @param operator - Operator to update

47

* @param parameters - New parameter values

48

* @returns The dataflow instance

49

*/

50

update(operator: Operator, parameters: any): Dataflow;

51

52

/**

53

* Touch an operator to mark it for re-evaluation

54

* @param operator - Operator to mark as dirty

55

* @returns The dataflow instance

56

*/

57

touch(operator: Operator): Dataflow;

58

59

/**

60

* Clean up the dataflow by removing all operators

61

* @returns The dataflow instance

62

*/

63

cleanUp(): Dataflow;

64

}

65

```

66

67

### Pulse System

68

69

Pulses represent data changes flowing through the dataflow graph.

70

71

```typescript { .api }

72

/**

73

* Represents a data change pulse in the dataflow

74

*/

75

class Pulse {

76

/**

77

* Create a new pulse

78

* @param dataflow - The parent dataflow instance

79

* @param stamp - Optional timestamp for the pulse

80

*/

81

constructor(dataflow: Dataflow, stamp?: number);

82

83

/** Newly added data tuples */

84

add: any[];

85

86

/** Removed data tuples */

87

rem: any[];

88

89

/** Modified data tuples */

90

mod: any[];

91

92

/** Source data array */

93

source: any[];

94

95

/**

96

* Create a derived pulse with the same timestamp

97

* @returns New pulse instance

98

*/

99

fork(): Pulse;

100

101

/**

102

* Create a derived pulse with new timestamp

103

* @returns New pulse instance

104

*/

105

clone(): Pulse;

106

107

/**

108

* Check if pulse contains any changes

109

* @returns True if pulse has changes

110

*/

111

changed(): boolean;

112

113

/**

114

* Get all data tuples (source + add - rem)

115

* @returns Array of all current tuples

116

*/

117

materialize(): any[];

118

119

/**

120

* Visit all tuples with optional filtering

121

* @param source - Visit source tuples

122

* @param add - Visit added tuples

123

* @param rem - Visit removed tuples

124

* @param mod - Visit modified tuples

125

* @returns Array of visited tuples

126

*/

127

visit(source?: number, add?: number, rem?: number, mod?: number): any[];

128

}

129

130

/**

131

* Multi-pulse container for multiple simultaneous changes

132

*/

133

class MultiPulse {

134

constructor(dataflow: Dataflow, stamp: number, pulses: Pulse[]);

135

136

/** Array of contained pulses */

137

pulses: Pulse[];

138

139

/**

140

* Visit all tuples across all contained pulses

141

* @param source - Visit source tuples

142

* @param add - Visit added tuples

143

* @param rem - Visit removed tuples

144

* @param mod - Visit modified tuples

145

* @returns Array of visited tuples

146

*/

147

visit(source?: number, add?: number, rem?: number, mod?: number): any[];

148

}

149

```

150

151

### Operators

152

153

Base operator classes for dataflow computation nodes.

154

155

```typescript { .api }

156

/**

157

* Base operator class for dataflow nodes

158

*/

159

class Operator {

160

/**

161

* Create a new operator

162

* @param init - Initial value

163

* @param update - Update function

164

* @param params - Parameter object

165

* @param react - Reactive update flag

166

*/

167

constructor(init?: any, update?: Function, params?: any, react?: boolean);

168

169

/** Operator unique identifier */

170

id: number;

171

172

/** Current operator value */

173

value: any;

174

175

/** Operator parameters */

176

params: any;

177

178

/** Pulse timestamp of last update */

179

stamp: number;

180

181

/** Reactive update flag */

182

react: boolean;

183

184

/**

185

* Set operator parameters

186

* @param params - Parameter object

187

* @returns The operator instance

188

*/

189

parameters(params: any): Operator;

190

191

/**

192

* Evaluate the operator with input pulse

193

* @param pulse - Input pulse

194

* @returns Output pulse or value

195

*/

196

evaluate(pulse: Pulse): any;

197

198

/**

199

* Check if operator is modified

200

* @returns True if operator has been modified

201

*/

202

modified(): boolean;

203

204

/**

205

* Run the operator's update function

206

* @param pulse - Input pulse

207

* @returns Output value

208

*/

209

run(pulse: Pulse): any;

210

}

211

212

/**

213

* Base transform operator class

214

*/

215

class Transform extends Operator {

216

constructor(init?: any, params?: any);

217

218

/**

219

* Transform method to be implemented by subclasses

220

* @param params - Transform parameters

221

* @param pulse - Input pulse

222

* @returns Output pulse

223

*/

224

transform(params: any, pulse: Pulse): Pulse;

225

}

226

```

227

228

### Parameters

229

230

Parameter management for operators.

231

232

```typescript { .api }

233

/**

234

* Parameter container for operators

235

*/

236

class Parameters {

237

constructor(operator: Operator, params?: any, initOnly?: boolean);

238

239

/** Reference to the parent operator */

240

operator: Operator;

241

242

/**

243

* Set parameter values

244

* @param params - Parameter object

245

* @returns The parameters instance

246

*/

247

set(params: any): Parameters;

248

249

/**

250

* Evaluate parameter expressions

251

* @param pulse - Current pulse for context

252

* @returns Evaluated parameter values

253

*/

254

evaluate(pulse: Pulse): any;

255

}

256

```

257

258

### Event Streams

259

260

Event stream management for reactive updates.

261

262

```typescript { .api }

263

/**

264

* Event stream for reactive dataflow updates

265

*/

266

class EventStream {

267

/**

268

* Create a new event stream

269

* @param filter - Optional event filter function

270

*/

271

constructor(filter?: Function);

272

273

/** Event filter function */

274

filter: Function;

275

276

/** Target operators to update */

277

targets: Operator[];

278

279

/**

280

* Add target operator

281

* @param operator - Operator to add as target

282

* @returns The event stream instance

283

*/

284

target(operator: Operator): EventStream;

285

286

/**

287

* Remove target operator

288

* @param operator - Operator to remove

289

* @returns The event stream instance

290

*/

291

detarget(operator: Operator): EventStream;

292

293

/**

294

* Evaluate the stream with an event

295

* @param event - Input event

296

* @returns Stream evaluation result

297

*/

298

evaluate(event: any): any;

299

}

300

```

301

302

### Data Management Functions

303

304

Core functions for data manipulation and change tracking.

305

306

```typescript { .api }

307

/**

308

* Create a new changeset for incremental data updates

309

* @returns New changeset instance

310

*/

311

function changeset(): Changeset;

312

313

/**

314

* Ingest a data tuple into the dataflow system

315

* @param datum - Data tuple to ingest

316

* @returns Ingested tuple with system metadata

317

*/

318

function ingest(datum: any): any;

319

320

/**

321

* Check if an object is a data tuple

322

* @param obj - Object to test

323

* @returns True if object is a tuple

324

*/

325

function isTuple(obj: any): boolean;

326

327

/**

328

* Get or set tuple identifier

329

* @param tuple - Data tuple

330

* @param id - Optional ID to set

331

* @returns Tuple ID

332

*/

333

function tupleid(tuple: any, id?: any): any;

334

335

interface Changeset {

336

/**

337

* Insert new tuples

338

* @param tuples - Array of tuples to insert

339

* @returns The changeset instance

340

*/

341

insert(tuples: any[]): Changeset;

342

343

/**

344

* Remove existing tuples

345

* @param tuples - Array of tuples to remove

346

* @returns The changeset instance

347

*/

348

remove(tuples: any[]): Changeset;

349

350

/**

351

* Modify existing tuples

352

* @param tuples - Array of tuples to modify

353

* @param field - Field name to modify (optional)

354

* @param value - New value (optional)

355

* @returns The changeset instance

356

*/

357

modify(tuples: any[], field?: string, value?: any): Changeset;

358

359

/**

360

* Reinsert tuples (remove then add)

361

* @param tuples - Array of tuples to reinsert

362

* @returns The changeset instance

363

*/

364

reinsert(tuples: any[]): Changeset;

365

366

/**

367

* Clean the changeset by removing empty change arrays

368

* @returns The changeset instance

369

*/

370

clean(): Changeset;

371

}

372

```

373

374

### Transform Registry

375

376

Transform definition and registration system.

377

378

```typescript { .api }

379

/**

380

* Get a transform definition by name

381

* @param name - Transform name

382

* @returns Transform definition object

383

*/

384

function definition(name: string): TransformDefinition;

385

386

/**

387

* Create a new transform operator

388

* @param name - Transform name

389

* @param params - Transform parameters

390

* @returns Transform operator instance

391

*/

392

function transform(name: string, params?: any): Transform;

393

394

/** Registry of all available transforms */

395

const transforms: { [name: string]: Transform };

396

397

interface TransformDefinition {

398

/** Transform name */

399

type: string;

400

401

/** Parameter metadata */

402

metadata: any;

403

404

/** Transform constructor */

405

transform: new (params?: any) => Transform;

406

}

407

```

408

409

## Common Transform Types

410

411

The transforms registry includes these built-in transform types:

412

413

### Data Transforms

414

- **aggregate** - Group and summarize data

415

- **bin** - Create histogram bins

416

- **collect** - Collect and sort data

417

- **countpattern** - Count text pattern matches

418

- **cross** - Cross product of datasets

419

- **density** - Kernel density estimation

420

- **extent** - Calculate data extents

421

- **facet** - Create data facets

422

- **filter** - Filter data tuples

423

- **flatten** - Flatten array fields

424

- **fold** - Convert wide to long format

425

- **formula** - Add calculated fields

426

- **identifier** - Add unique identifiers

427

- **impute** - Fill missing values

428

- **joinaggregate** - Join with aggregated values

429

- **lookup** - Join datasets

430

- **pivot** - Convert long to wide format

431

- **project** - Select/rename fields

432

- **rank** - Rank data values

433

- **sample** - Random sampling

434

- **sequence** - Generate sequences

435

- **timeunit** - Extract time units

436

- **window** - Sliding window calculations

437

438

### Geo Transforms

439

- **geojson** - Parse GeoJSON

440

- **geopath** - Generate geo paths

441

- **geopoint** - Project geo coordinates

442

- **geoshape** - Create geo shapes

443

- **graticule** - Generate map graticule

444

445

### Layout Transforms

446

- **force** - Force-directed layout

447

- **linkpath** - Generate link paths

448

- **pack** - Circle packing layout

449

- **partition** - Partition layout

450

- **pie** - Pie/donut layout

451

- **stack** - Stack layout

452

- **tree** - Tree layout

453

454

### Visual Transforms

455

- **contour** - Contour generation

456

- **heatmap** - Heatmap binning

457

- **hexbin** - Hexagonal binning

458

- **kde** - Kernel density estimation

459

- **regression** - Regression analysis

460

- **wordcloud** - Word cloud layout

461

462

## Usage Examples

463

464

### Basic Dataflow Setup

465

466

```typescript

467

import { Dataflow, Operator, transforms } from "vega";

468

469

const df = new Dataflow();

470

471

// Create data source operator

472

const data = df.add(new Operator([], null, {}, false));

473

474

// Create filter transform

475

const filter = df.add(transforms.filter({

476

expr: 'datum.value > 10'

477

}));

478

479

// Connect data to filter

480

df.connect(data, filter);

481

482

// Update data and run

483

data.pulse = df.pulse([

484

{value: 5}, {value: 15}, {value: 8}, {value: 20}

485

]);

486

df.run();

487

488

console.log(filter.value); // [{value: 15}, {value: 20}]

489

```

490

491

### Using Changesets

492

493

```typescript

494

import { changeset, ingest } from "vega";

495

496

// Create changeset for incremental updates

497

const cs = changeset()

498

.insert([

499

ingest({name: 'Alice', age: 25}),

500

ingest({name: 'Bob', age: 30})

501

])

502

.remove([existingTuple])

503

.modify([modifiedTuple], 'age', 31);

504

505

// Apply changeset through view

506

view.change('dataset', cs).run();

507

```

508

509

### Custom Transform

510

511

```typescript

512

import { Transform } from "vega";

513

514

class CustomTransform extends Transform {

515

constructor(params) {

516

super(null, params);

517

}

518

519

transform(params, pulse) {

520

const data = pulse.source || [];

521

const output = data.map(d => ({

522

...d,

523

computed: d.value * params.multiplier

524

}));

525

526

return pulse.fork().source = output;

527

}

528

}

529

530

// Register and use

531

transforms['custom'] = CustomTransform;

532

const customOp = transform('custom', {multiplier: 2});

533

```

534

535

### Event-Driven Updates

536

537

```typescript

538

import { EventStream } from "vega";

539

540

const stream = new EventStream();

541

542

// Add operator targets

543

stream.target(filterOperator);

544

stream.target(aggregateOperator);

545

546

// Trigger updates

547

stream.evaluate({type: 'data-changed', data: newData});

548

```