or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

code-generation.mddata-structures.mdfilesystem.mdindex.mdruntime-operators.mdtype-system.mdutilities.md

runtime-operators.mddocs/

0

# Runtime Operators

1

2

Comprehensive set of operators for joins, aggregations, window operations, sorting, ranking, and other table processing operations optimized for both streaming and batch execution modes.

3

4

## Capabilities

5

6

### Operator Factories

7

8

Main factories for creating runtime operators, including code-generated operators and specialized operator types.

9

10

```java { .api }

11

/**

12

* Main factory for code-generated operators

13

* Creates optimized operators from generated code for maximum performance

14

*/

15

class CodeGenOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT> {

16

/** Create factory with generated operator class */

17

CodeGenOperatorFactory(GeneratedClass<? extends StreamOperator<OUT>> operatorCodeGenerator);

18

19

/** Create the actual operator instance */

20

<T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters);

21

22

/** Get operator class */

23

Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader);

24

}

25

26

/** Factory for watermark assignment operators */

27

class WatermarkAssignerOperatorFactory extends AbstractStreamOperatorFactory<RowData> {

28

WatermarkAssignerOperatorFactory(

29

int rowtimeFieldIndex,

30

WatermarkStrategy<RowData> watermarkStrategy

31

);

32

}

33

34

/** Factory for multi-input operators in batch mode */

35

class BatchMultipleInputStreamOperatorFactory<OUT>

36

extends AbstractStreamOperatorFactory<OUT> {

37

38

BatchMultipleInputStreamOperatorFactory(

39

GeneratedClass<? extends MultipleInputStreamOperator<OUT>> generatedClass

40

);

41

}

42

```

43

44

### Window Operations

45

46

Comprehensive window processing capabilities including builders for different window types and window assigners.

47

48

```java { .api }

49

/**

50

* Builder class for window operators

51

* Provides fluent API for constructing window processing operators

52

*/

53

class WindowOperatorBuilder {

54

/** Create new builder instance */

55

static WindowOperatorBuilder builder();

56

57

/** Configure tumbling window */

58

WindowOperatorBuilder tumble(Duration size);

59

60

/** Configure sliding window */

61

WindowOperatorBuilder sliding(Duration size, Duration slide);

62

63

/** Configure event time processing */

64

WindowOperatorBuilder withEventTime(int rowtimeIndex);

65

66

/** Configure processing time */

67

WindowOperatorBuilder withProcessingTime();

68

69

/** Set window assigner */

70

WindowOperatorBuilder withWindowAssigner(WindowAssigner<?, ? extends Window> assigner);

71

72

/** Set window function */

73

WindowOperatorBuilder withWindowFunction(WindowFunction<?, ?, ?, ?> function);

74

75

/** Set trigger */

76

WindowOperatorBuilder withTrigger(Trigger<?, ?> trigger);

77

78

/** Set evictor */

79

WindowOperatorBuilder withEvictor(Evictor<?, ?> evictor);

80

81

/** Set allowed lateness */

82

WindowOperatorBuilder withAllowedLateness(Time allowedLateness);

83

84

/** Build the window operator */

85

OneInputStreamOperator<RowData, RowData> build();

86

}

87

88

/**

89

* Builder for slicing window aggregation operators

90

* Optimized for non-overlapping window aggregations

91

*/

92

class SlicingWindowAggOperatorBuilder {

93

SlicingWindowAggOperatorBuilder(

94

GeneratedAggsHandleFunction aggsHandleFunction,

95

WindowAssigner<?, ?> windowAssigner,

96

Trigger<?, ?> trigger

97

);

98

99

/** Set state backend */

100

SlicingWindowAggOperatorBuilder withStateBackend(StateBackend stateBackend);

101

102

/** Build the operator */

103

OneInputStreamOperator<RowData, RowData> build();

104

}

105

106

/**

107

* Builder for window-based ranking operators

108

* Handles ranking within window boundaries

109

*/

110

class WindowRankOperatorBuilder {

111

WindowRankOperatorBuilder(

112

WindowAssigner<?, ?> windowAssigner,

113

GeneratedRecordComparator comparator,

114

RankType rankType,

115

long rankStart,

116

long rankEnd

117

);

118

119

/** Build the window rank operator */

120

OneInputStreamOperator<RowData, RowData> build();

121

}

122

```

123

124

### Window Assigners

125

126

Different types of window assigners for various windowing strategies including tumbling, sliding, and session windows.

127

128

```java { .api }

129

/** Tumbling window assigner */

130

class TumblingWindowAssigner extends WindowAssigner<Object, TimeWindow> {

131

TumblingWindowAssigner(long windowSize, long offset);

132

133

/** Assign windows to element */

134

Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context);

135

}

136

137

/** Sliding window assigner */

138

class SlidingWindowAssigner extends WindowAssigner<Object, TimeWindow> {

139

SlidingWindowAssigner(long windowSize, long slide, long offset);

140

141

/** Assign windows to element */

142

Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context);

143

}

144

145

/** Session window assigner */

146

class SessionWindowAssigner extends WindowAssigner<Object, TimeWindow> {

147

SessionWindowAssigner(long sessionTimeout);

148

149

/** Assign windows to element */

150

Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context);

151

}

152

```

153

154

### Window Types

155

156

Different window implementations for time-based and count-based windowing operations.

157

158

```java { .api }

159

/** Time-based window implementation */

160

class TimeWindow extends Window {

161

/** Window start timestamp */

162

public final long start;

163

164

/** Window end timestamp */

165

public final long end;

166

167

/** Create time window */

168

TimeWindow(long start, long end);

169

170

/** Get window size */

171

public long getSize();

172

173

/** Check if window covers timestamp */

174

public boolean covers(long timestamp);

175

}

176

177

/** Count-based window implementation */

178

class CountWindow extends Window {

179

/** Window ID */

180

public final long id;

181

182

/** Create count window */

183

CountWindow(long id);

184

}

185

```

186

187

### Join Operations

188

189

Comprehensive join processing capabilities including different join types, join strategies, and specialized join operators.

190

191

```java { .api }

192

/** Join type enumeration */

193

enum FlinkJoinType {

194

INNER, LEFT, RIGHT, FULL, SEMI, ANTI;

195

196

/** Check if join type is outer */

197

boolean isOuter();

198

199

/** Check if join type filters nulls */

200

boolean filtersNulls();

201

}

202

203

/** Hash join type enumeration */

204

enum HashJoinType {

205

INNER, BUILD_LEFT, BUILD_RIGHT, FULL_OUTER, SEMI, ANTI;

206

207

/** Get corresponding Flink join type */

208

FlinkJoinType toFlinkJoinType();

209

}

210

211

/** Sort-merge join operator implementation */

212

class SortMergeJoinOperator extends AbstractStreamOperator<RowData>

213

implements TwoInputStreamOperator<RowData, RowData, RowData> {

214

215

SortMergeJoinOperator(

216

FlinkJoinType joinType,

217

GeneratedJoinCondition joinCondition,

218

GeneratedRecordComparator leftComparator,

219

GeneratedRecordComparator rightComparator

220

);

221

222

/** Process left input */

223

void processElement1(StreamRecord<RowData> element) throws Exception;

224

225

/** Process right input */

226

void processElement2(StreamRecord<RowData> element) throws Exception;

227

}

228

229

/** Streaming join operator */

230

class StreamingJoinOperator extends AbstractStreamOperator<RowData>

231

implements TwoInputStreamOperator<RowData, RowData, RowData> {

232

233

StreamingJoinOperator(

234

FlinkJoinType joinType,

235

GeneratedJoinCondition joinCondition,

236

long leftLowerBound,

237

long leftUpperBound,

238

long rightLowerBound,

239

long rightUpperBound

240

);

241

}

242

243

/** Streaming semi/anti join operator */

244

class StreamingSemiAntiJoinOperator extends AbstractStreamOperator<RowData>

245

implements TwoInputStreamOperator<RowData, RowData, RowData> {

246

247

StreamingSemiAntiJoinOperator(

248

boolean isAntiJoin,

249

GeneratedJoinCondition joinCondition,

250

long minRetentionTime,

251

long maxRetentionTime

252

);

253

}

254

```

255

256

### Lookup Joins

257

258

Specialized join operators for lookup operations against external systems and dimension tables.

259

260

```java { .api }

261

/** Async lookup join runner */

262

class AsyncLookupJoinRunner {

263

AsyncLookupJoinRunner(

264

GeneratedFunction<AsyncTableFunction<Object>> generatedFetcher,

265

DataStructureConverter<RowData, Object> fetcherConverter,

266

DataStructureConverter<Object, RowData> lookupResultConverter,

267

GeneratedResultFuture<Object> generatedResultFuture,

268

boolean isLeftOuterJoin

269

);

270

271

/** Process input row with async lookup */

272

void processElement(RowData input, Collector<RowData> out) throws Exception;

273

}

274

275

/** Sync lookup join runner */

276

class LookupJoinRunner {

277

LookupJoinRunner(

278

GeneratedFunction<TableFunction<Object>> generatedFetcher,

279

DataStructureConverter<RowData, Object> fetcherConverter,

280

DataStructureConverter<Object, RowData> lookupResultConverter,

281

boolean isLeftOuterJoin

282

);

283

284

/** Process input row with sync lookup */

285

void processElement(RowData input, Collector<RowData> out) throws Exception;

286

}

287

288

/** Lookup join with calc runner */

289

class LookupJoinWithCalcRunner {

290

LookupJoinWithCalcRunner(

291

GeneratedFunction<TableFunction<Object>> generatedFetcher,

292

DataStructureConverter<RowData, Object> fetcherConverter,

293

GeneratedProjection projection,

294

boolean isLeftOuterJoin

295

);

296

}

297

```

298

299

### Aggregation Operations

300

301

Aggregation operators for group-by operations, including support for mini-batch processing and table aggregations.

302

303

```java { .api }

304

/** Group aggregation function */

305

class GroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {

306

GroupAggFunction(

307

GeneratedAggsHandleFunction genAggsHandler,

308

LogicalType[] accTypes,

309

int indexOfCountStar,

310

boolean generateUpdateBefore,

311

boolean needRetract

312

);

313

314

/** Process element in group aggregation */

315

void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception;

316

}

317

318

/** Group table aggregation function */

319

class GroupTableAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {

320

GroupTableAggFunction(

321

GeneratedTableAggsHandleFunction genAggsHandler,

322

LogicalType[] accTypes,

323

int indexOfCountStar,

324

boolean generateUpdateBefore

325

);

326

}

327

328

/** Mini-batch global group aggregation function */

329

class MiniBatchGlobalGroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {

330

MiniBatchGlobalGroupAggFunction(

331

GeneratedAggsHandleFunction genAggsHandler,

332

LogicalType[] accTypes,

333

int indexOfCountStar,

334

boolean generateUpdateBefore,

335

long miniBatchSize

336

);

337

}

338

339

/** Mini-batch group aggregation function */

340

class MiniBatchGroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {

341

MiniBatchGroupAggFunction(

342

GeneratedAggsHandleFunction genAggsHandler,

343

LogicalType[] accTypes,

344

int indexOfCountStar,

345

boolean generateUpdateBefore,

346

long miniBatchSize

347

);

348

}

349

```

350

351

### Sorting Operations

352

353

Operators for sorting data including regular sort, sort with limit, and limit-only operations.

354

355

```java { .api }

356

/** Sort operator implementation */

357

class SortOperator extends AbstractStreamOperator<RowData>

358

implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {

359

360

SortOperator(GeneratedRecordComparator gComparator);

361

362

/** Process input element */

363

void processElement(StreamRecord<RowData> element) throws Exception;

364

365

/** End input processing and emit sorted results */

366

void endInput() throws Exception;

367

}

368

369

/** Sort with limit operator */

370

class SortLimitOperator extends AbstractStreamOperator<RowData>

371

implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {

372

373

SortLimitOperator(

374

boolean isGlobal,

375

long limitStart,

376

long limitEnd,

377

GeneratedRecordComparator gComparator

378

);

379

}

380

381

/** Limit operator implementation */

382

class LimitOperator extends AbstractStreamOperator<RowData>

383

implements OneInputStreamOperator<RowData, RowData> {

384

385

LimitOperator(boolean isGlobal, long limitStart, long limitEnd);

386

387

/** Process input element with limit */

388

void processElement(StreamRecord<RowData> element) throws Exception;

389

}

390

```

391

392

### Ranking Operations

393

394

Operators for ranking and Top-N operations including different ranking strategies and buffer management.

395

396

```java { .api }

397

/** Ranking operator implementation */

398

class RankOperator extends KeyedProcessFunction<RowData, RowData, RowData> {

399

RankOperator(

400

GeneratedRecordComparator sortKeyComparator,

401

RankType rankType,

402

long rankStart,

403

long rankEnd,

404

boolean generateUpdateBefore,

405

boolean outputRankNumber

406

);

407

408

/** Process element for ranking */

409

void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception;

410

}

411

412

/** Buffer for top-N operations */

413

class TopNBuffer {

414

TopNBuffer(

415

GeneratedRecordComparator sortKeyComparator,

416

ArrayList<RowData> buffer,

417

RankType rankType,

418

long rankStart,

419

long rankEnd

420

);

421

422

/** Put record into buffer */

423

List<RowData> put(RowData record);

424

425

/** Remove record from buffer */

426

List<RowData> remove(RowData record);

427

428

/** Get current rankings */

429

Iterator<Map.Entry<RowData, Long>> getIterator();

430

}

431

432

/** Append-only top-N function */

433

class AppendOnlyTopNFunction extends KeyedProcessFunction<RowData, RowData, RowData> {

434

AppendOnlyTopNFunction(

435

GeneratedRecordComparator sortKeyComparator,

436

RankType rankType,

437

long rankStart,

438

long rankEnd,

439

boolean generateUpdateBefore,

440

boolean outputRankNumber

441

);

442

}

443

444

/** Retractable top-N function */

445

class RetractableTopNFunction extends KeyedProcessFunction<RowData, RowData, RowData> {

446

RetractableTopNFunction(

447

GeneratedRecordComparator sortKeyComparator,

448

RankType rankType,

449

long rankStart,

450

long rankEnd,

451

boolean generateUpdateBefore,

452

boolean outputRankNumber

453

);

454

}

455

```

456

457

### Hash Table Operations

458

459

Hash table implementations for join and aggregation operations, optimized for different data types and use cases.

460

461

```java { .api }

462

/** Hash table for long keys */

463

class LongHashTable {

464

LongHashTable(int capacity, double loadFactor);

465

466

/** Put key-value pair */

467

boolean put(long key, RowData value);

468

469

/** Get value by key */

470

RowData get(long key);

471

472

/** Remove key-value pair */

473

RowData remove(long key);

474

475

/** Check if contains key */

476

boolean contains(long key);

477

478

/** Get current size */

479

int size();

480

}

481

482

/** Binary hash table implementation */

483

class BinaryHashTable {

484

BinaryHashTable(

485

MemoryManager memManager,

486

long memorySize,

487

LogicalType[] keyTypes,

488

LogicalType[] valueTypes

489

);

490

491

/** Open the hash table */

492

void open() throws IOException;

493

494

/** Put binary row */

495

boolean put(BinaryRowData key, BinaryRowData value) throws IOException;

496

497

/** Get binary row by key */

498

BinaryRowData get(BinaryRowData key) throws IOException;

499

500

/** Close and clean up */

501

void close();

502

}

503

```

504

505

### Deduplication Operations

506

507

Operators for removing duplicate records in both processing time and event time scenarios.

508

509

```java { .api }

510

/** Base class for processing time deduplication functions */

511

abstract class ProcTimeDeduplicateFunctionBase extends KeyedProcessFunction<RowData, RowData, RowData> {

512

ProcTimeDeduplicateFunctionBase(

513

long minRetentionTime,

514

long maxRetentionTime,

515

boolean generateUpdateBefore,

516

boolean keepLastRow

517

);

518

519

/** Process element for deduplication */

520

void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception;

521

}

522

523

/** Processing time mini-batch deduplicate function for keeping first row */

524

class ProcTimeMiniBatchDeduplicateKeepFirstRowFunction

525

extends ProcTimeDeduplicateFunctionBase {

526

527

ProcTimeMiniBatchDeduplicateKeepFirstRowFunction(

528

long minRetentionTime,

529

long maxRetentionTime,

530

boolean generateUpdateBefore,

531

long miniBatchSize

532

);

533

}

534

```

535

536

## Usage Examples

537

538

```java

539

// Create window operator using builder

540

WindowOperatorBuilder builder = new WindowOperatorBuilder()

541

.withWindowAssigner(new TumblingWindowAssigner(Duration.ofMinutes(5).toMillis(), 0))

542

.withWindowFunction(new MyWindowFunction())

543

.withTrigger(EventTimeTrigger.create())

544

.withAllowedLateness(Time.seconds(10));

545

546

OneInputStreamOperator<RowData, RowData> windowOperator = builder.build();

547

548

// Create join operator

549

SortMergeJoinOperator joinOperator = new SortMergeJoinOperator(

550

FlinkJoinType.INNER,

551

generatedJoinCondition,

552

leftComparator,

553

rightComparator

554

);

555

556

// Create aggregation function

557

GroupAggFunction aggFunction = new GroupAggFunction(

558

generatedAggsHandler,

559

accTypes,

560

indexOfCountStar,

561

true, // generateUpdateBefore

562

false // needRetract

563

);

564

565

// Create ranking operator

566

RankOperator rankOperator = new RankOperator(

567

sortKeyComparator,

568

RankType.ROW_NUMBER,

569

1L, // rankStart

570

10L, // rankEnd

571

true, // generateUpdateBefore

572

true // outputRankNumber

573

);

574

```