or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

code-generation.mddata-structures.mdfilesystem.mdindex.mdruntime-operators.mdtype-system.mdutilities.md
tile.json

runtime-operators.mddocs/

Runtime Operators

Comprehensive set of operators for joins, aggregations, window operations, sorting, ranking, and other table processing operations optimized for both streaming and batch execution modes.

Capabilities

Operator Factories

Main factories for creating runtime operators, including code-generated operators and specialized operator types.

/**
 * Main factory for code-generated operators
 * Creates optimized operators from generated code for maximum performance
 */
class CodeGenOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT> {
    /** Create factory with generated operator class */
    CodeGenOperatorFactory(GeneratedClass<? extends StreamOperator<OUT>> operatorCodeGenerator);
    
    /** Create the actual operator instance */
    <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters);
    
    /** Get operator class */
    Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader);
}

/** Factory for watermark assignment operators */
class WatermarkAssignerOperatorFactory extends AbstractStreamOperatorFactory<RowData> {
    WatermarkAssignerOperatorFactory(
        int rowtimeFieldIndex,
        WatermarkStrategy<RowData> watermarkStrategy
    );
}

/** Factory for multi-input operators in batch mode */
class BatchMultipleInputStreamOperatorFactory<OUT> 
    extends AbstractStreamOperatorFactory<OUT> {
    
    BatchMultipleInputStreamOperatorFactory(
        GeneratedClass<? extends MultipleInputStreamOperator<OUT>> generatedClass
    );
}

Window Operations

Comprehensive window processing capabilities including builders for different window types and window assigners.

/**
 * Builder class for window operators
 * Provides fluent API for constructing window processing operators
 */
class WindowOperatorBuilder {
    /** Create new builder instance */
    static WindowOperatorBuilder builder();
    
    /** Configure tumbling window */
    WindowOperatorBuilder tumble(Duration size);
    
    /** Configure sliding window */
    WindowOperatorBuilder sliding(Duration size, Duration slide);
    
    /** Configure event time processing */
    WindowOperatorBuilder withEventTime(int rowtimeIndex);
    
    /** Configure processing time */
    WindowOperatorBuilder withProcessingTime();
    
    /** Set window assigner */
    WindowOperatorBuilder withWindowAssigner(WindowAssigner<?, ? extends Window> assigner);
    
    /** Set window function */
    WindowOperatorBuilder withWindowFunction(WindowFunction<?, ?, ?, ?> function);
    
    /** Set trigger */
    WindowOperatorBuilder withTrigger(Trigger<?, ?> trigger);
    
    /** Set evictor */
    WindowOperatorBuilder withEvictor(Evictor<?, ?> evictor);
    
    /** Set allowed lateness */
    WindowOperatorBuilder withAllowedLateness(Time allowedLateness);
    
    /** Build the window operator */
    OneInputStreamOperator<RowData, RowData> build();
}

/**
 * Builder for slicing window aggregation operators
 * Optimized for non-overlapping window aggregations
 */
class SlicingWindowAggOperatorBuilder {
    SlicingWindowAggOperatorBuilder(
        GeneratedAggsHandleFunction aggsHandleFunction,
        WindowAssigner<?, ?> windowAssigner,
        Trigger<?, ?> trigger
    );
    
    /** Set state backend */
    SlicingWindowAggOperatorBuilder withStateBackend(StateBackend stateBackend);
    
    /** Build the operator */
    OneInputStreamOperator<RowData, RowData> build();
}

/**
 * Builder for window-based ranking operators
 * Handles ranking within window boundaries
 */
class WindowRankOperatorBuilder {
    WindowRankOperatorBuilder(
        WindowAssigner<?, ?> windowAssigner,
        GeneratedRecordComparator comparator,
        RankType rankType,
        long rankStart,
        long rankEnd
    );
    
    /** Build the window rank operator */
    OneInputStreamOperator<RowData, RowData> build();
}

Window Assigners

Different types of window assigners for various windowing strategies including tumbling, sliding, and session windows.

/** Tumbling window assigner */
class TumblingWindowAssigner extends WindowAssigner<Object, TimeWindow> {
    TumblingWindowAssigner(long windowSize, long offset);
    
    /** Assign windows to element */
    Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context);
}

/** Sliding window assigner */
class SlidingWindowAssigner extends WindowAssigner<Object, TimeWindow> {
    SlidingWindowAssigner(long windowSize, long slide, long offset);
    
    /** Assign windows to element */
    Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context);
}

/** Session window assigner */
class SessionWindowAssigner extends WindowAssigner<Object, TimeWindow> {
    SessionWindowAssigner(long sessionTimeout);
    
    /** Assign windows to element */
    Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context);
}

Window Types

Different window implementations for time-based and count-based windowing operations.

/** Time-based window implementation */
class TimeWindow extends Window {
    /** Window start timestamp */
    public final long start;
    
    /** Window end timestamp */
    public final long end;
    
    /** Create time window */
    TimeWindow(long start, long end);
    
    /** Get window size */
    public long getSize();
    
    /** Check if window covers timestamp */
    public boolean covers(long timestamp);
}

/** Count-based window implementation */
class CountWindow extends Window {
    /** Window ID */
    public final long id;
    
    /** Create count window */
    CountWindow(long id);
}

Join Operations

Comprehensive join processing capabilities including different join types, join strategies, and specialized join operators.

/** Join type enumeration */
enum FlinkJoinType {
    INNER, LEFT, RIGHT, FULL, SEMI, ANTI;
    
    /** Check if join type is outer */
    boolean isOuter();
    
    /** Check if join type filters nulls */
    boolean filtersNulls();
}

/** Hash join type enumeration */
enum HashJoinType {
    INNER, BUILD_LEFT, BUILD_RIGHT, FULL_OUTER, SEMI, ANTI;
    
    /** Get corresponding Flink join type */
    FlinkJoinType toFlinkJoinType();
}

/** Sort-merge join operator implementation */
class SortMergeJoinOperator extends AbstractStreamOperator<RowData>
    implements TwoInputStreamOperator<RowData, RowData, RowData> {
    
    SortMergeJoinOperator(
        FlinkJoinType joinType,
        GeneratedJoinCondition joinCondition,
        GeneratedRecordComparator leftComparator,
        GeneratedRecordComparator rightComparator
    );
    
    /** Process left input */
    void processElement1(StreamRecord<RowData> element) throws Exception;
    
    /** Process right input */
    void processElement2(StreamRecord<RowData> element) throws Exception;
}

/** Streaming join operator */
class StreamingJoinOperator extends AbstractStreamOperator<RowData>
    implements TwoInputStreamOperator<RowData, RowData, RowData> {
    
    StreamingJoinOperator(
        FlinkJoinType joinType,
        GeneratedJoinCondition joinCondition,
        long leftLowerBound,
        long leftUpperBound,
        long rightLowerBound,
        long rightUpperBound
    );
}

/** Streaming semi/anti join operator */
class StreamingSemiAntiJoinOperator extends AbstractStreamOperator<RowData>
    implements TwoInputStreamOperator<RowData, RowData, RowData> {
    
    StreamingSemiAntiJoinOperator(
        boolean isAntiJoin,
        GeneratedJoinCondition joinCondition,
        long minRetentionTime,
        long maxRetentionTime
    );
}

Lookup Joins

Specialized join operators for lookup operations against external systems and dimension tables.

/** Async lookup join runner */
class AsyncLookupJoinRunner {
    AsyncLookupJoinRunner(
        GeneratedFunction<AsyncTableFunction<Object>> generatedFetcher,
        DataStructureConverter<RowData, Object> fetcherConverter,
        DataStructureConverter<Object, RowData> lookupResultConverter,
        GeneratedResultFuture<Object> generatedResultFuture,
        boolean isLeftOuterJoin
    );
    
    /** Process input row with async lookup */
    void processElement(RowData input, Collector<RowData> out) throws Exception;
}

/** Sync lookup join runner */
class LookupJoinRunner {
    LookupJoinRunner(
        GeneratedFunction<TableFunction<Object>> generatedFetcher,
        DataStructureConverter<RowData, Object> fetcherConverter,
        DataStructureConverter<Object, RowData> lookupResultConverter,
        boolean isLeftOuterJoin
    );
    
    /** Process input row with sync lookup */
    void processElement(RowData input, Collector<RowData> out) throws Exception;
}

/** Lookup join with calc runner */
class LookupJoinWithCalcRunner {
    LookupJoinWithCalcRunner(
        GeneratedFunction<TableFunction<Object>> generatedFetcher,
        DataStructureConverter<RowData, Object> fetcherConverter,
        GeneratedProjection projection,
        boolean isLeftOuterJoin
    );
}

Aggregation Operations

Aggregation operators for group-by operations, including support for mini-batch processing and table aggregations.

/** Group aggregation function */
class GroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
    GroupAggFunction(
        GeneratedAggsHandleFunction genAggsHandler,
        LogicalType[] accTypes,
        int indexOfCountStar,
        boolean generateUpdateBefore,
        boolean needRetract
    );
    
    /** Process element in group aggregation */
    void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception;
}

/** Group table aggregation function */
class GroupTableAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
    GroupTableAggFunction(
        GeneratedTableAggsHandleFunction genAggsHandler,
        LogicalType[] accTypes,
        int indexOfCountStar,
        boolean generateUpdateBefore
    );
}

/** Mini-batch global group aggregation function */
class MiniBatchGlobalGroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
    MiniBatchGlobalGroupAggFunction(
        GeneratedAggsHandleFunction genAggsHandler,
        LogicalType[] accTypes,
        int indexOfCountStar,
        boolean generateUpdateBefore,
        long miniBatchSize
    );
}

/** Mini-batch group aggregation function */
class MiniBatchGroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
    MiniBatchGroupAggFunction(
        GeneratedAggsHandleFunction genAggsHandler,
        LogicalType[] accTypes,
        int indexOfCountStar,
        boolean generateUpdateBefore,
        long miniBatchSize
    );
}

Sorting Operations

Operators for sorting data including regular sort, sort with limit, and limit-only operations.

/** Sort operator implementation */
class SortOperator extends AbstractStreamOperator<RowData>
    implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
    
    SortOperator(GeneratedRecordComparator gComparator);
    
    /** Process input element */
    void processElement(StreamRecord<RowData> element) throws Exception;
    
    /** End input processing and emit sorted results */
    void endInput() throws Exception;
}

/** Sort with limit operator */
class SortLimitOperator extends AbstractStreamOperator<RowData>
    implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
    
    SortLimitOperator(
        boolean isGlobal,
        long limitStart,
        long limitEnd,
        GeneratedRecordComparator gComparator
    );
}

/** Limit operator implementation */
class LimitOperator extends AbstractStreamOperator<RowData>
    implements OneInputStreamOperator<RowData, RowData> {
    
    LimitOperator(boolean isGlobal, long limitStart, long limitEnd);
    
    /** Process input element with limit */
    void processElement(StreamRecord<RowData> element) throws Exception;
}

Ranking Operations

Operators for ranking and Top-N operations including different ranking strategies and buffer management.

/** Ranking operator implementation */
class RankOperator extends KeyedProcessFunction<RowData, RowData, RowData> {
    RankOperator(
        GeneratedRecordComparator sortKeyComparator,
        RankType rankType,
        long rankStart,
        long rankEnd,
        boolean generateUpdateBefore,
        boolean outputRankNumber
    );
    
    /** Process element for ranking */
    void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception;
}

/** Buffer for top-N operations */
class TopNBuffer {
    TopNBuffer(
        GeneratedRecordComparator sortKeyComparator,
        ArrayList<RowData> buffer,
        RankType rankType,
        long rankStart,
        long rankEnd
    );
    
    /** Put record into buffer */
    List<RowData> put(RowData record);
    
    /** Remove record from buffer */
    List<RowData> remove(RowData record);
    
    /** Get current rankings */
    Iterator<Map.Entry<RowData, Long>> getIterator();
}

/** Append-only top-N function */
class AppendOnlyTopNFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
    AppendOnlyTopNFunction(
        GeneratedRecordComparator sortKeyComparator,
        RankType rankType,
        long rankStart,
        long rankEnd,
        boolean generateUpdateBefore,
        boolean outputRankNumber
    );
}

/** Retractable top-N function */
class RetractableTopNFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
    RetractableTopNFunction(
        GeneratedRecordComparator sortKeyComparator,
        RankType rankType,
        long rankStart,
        long rankEnd,
        boolean generateUpdateBefore,
        boolean outputRankNumber
    );
}

Hash Table Operations

Hash table implementations for join and aggregation operations, optimized for different data types and use cases.

/** Hash table for long keys */
class LongHashTable {
    LongHashTable(int capacity, double loadFactor);
    
    /** Put key-value pair */
    boolean put(long key, RowData value);
    
    /** Get value by key */
    RowData get(long key);
    
    /** Remove key-value pair */
    RowData remove(long key);
    
    /** Check if contains key */
    boolean contains(long key);
    
    /** Get current size */
    int size();
}

/** Binary hash table implementation */
class BinaryHashTable {
    BinaryHashTable(
        MemoryManager memManager,
        long memorySize,
        LogicalType[] keyTypes,
        LogicalType[] valueTypes
    );
    
    /** Open the hash table */
    void open() throws IOException;
    
    /** Put binary row */
    boolean put(BinaryRowData key, BinaryRowData value) throws IOException;
    
    /** Get binary row by key */
    BinaryRowData get(BinaryRowData key) throws IOException;
    
    /** Close and clean up */
    void close();
}

Deduplication Operations

Operators for removing duplicate records in both processing time and event time scenarios.

/** Base class for processing time deduplication functions */
abstract class ProcTimeDeduplicateFunctionBase extends KeyedProcessFunction<RowData, RowData, RowData> {
    ProcTimeDeduplicateFunctionBase(
        long minRetentionTime,
        long maxRetentionTime,
        boolean generateUpdateBefore,
        boolean keepLastRow
    );
    
    /** Process element for deduplication */
    void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception;
}

/** Processing time mini-batch deduplicate function for keeping first row */
class ProcTimeMiniBatchDeduplicateKeepFirstRowFunction 
    extends ProcTimeDeduplicateFunctionBase {
    
    ProcTimeMiniBatchDeduplicateKeepFirstRowFunction(
        long minRetentionTime,
        long maxRetentionTime,
        boolean generateUpdateBefore,
        long miniBatchSize
    );
}

Usage Examples

// Create window operator using builder
WindowOperatorBuilder builder = new WindowOperatorBuilder()
    .withWindowAssigner(new TumblingWindowAssigner(Duration.ofMinutes(5).toMillis(), 0))
    .withWindowFunction(new MyWindowFunction())
    .withTrigger(EventTimeTrigger.create())
    .withAllowedLateness(Time.seconds(10));

OneInputStreamOperator<RowData, RowData> windowOperator = builder.build();

// Create join operator
SortMergeJoinOperator joinOperator = new SortMergeJoinOperator(
    FlinkJoinType.INNER,
    generatedJoinCondition,
    leftComparator,
    rightComparator
);

// Create aggregation function
GroupAggFunction aggFunction = new GroupAggFunction(
    generatedAggsHandler,
    accTypes,
    indexOfCountStar,
    true, // generateUpdateBefore
    false // needRetract
);

// Create ranking operator
RankOperator rankOperator = new RankOperator(
    sortKeyComparator,
    RankType.ROW_NUMBER,
    1L, // rankStart
    10L, // rankEnd
    true, // generateUpdateBefore
    true // outputRankNumber
);