CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-runtime-blink-2-11

Runtime classes required by a task manager for execution of table programs in Apache Flink's Blink planner

Overview
Eval results
Files

code-generation.mddocs/

Code Generation Framework

Framework for generating optimized runtime code including aggregation functions, join conditions, projections, and other operations for maximum performance through compile-time optimization.

Capabilities

Generated Class Framework

Base infrastructure for all generated classes, providing the foundation for compile-time code generation and runtime class loading.

/**
 * Base class for all generated classes
 * Provides foundation for compile-time code generation and runtime class loading
 */
abstract class GeneratedClass<T> {
    /** Get the class name of the generated class */
    String getClassName();
    
    /** Get the generated Java source code */
    String getCode();
    
    /** Get references to external objects used in generated code */
    Object[] getReferences();
    
    /** Compile and instantiate the generated class */
    T newInstance(ClassLoader classLoader) throws Exception;
}

Generated Function Interfaces

Core interfaces that generated functions must implement, defining the contract for various types of generated operations.

/**
 * Aggregation function interface for generated code
 * Handles accumulation and retrieval of aggregated values
 */
interface AggsHandleFunction {
    /** Set aggregation accumulator */
    void setAccumulators(RowData acc) throws Exception;
    
    /** Get current accumulator values */
    RowData getAccumulators() throws Exception;
    
    /** Accumulate input row */
    void accumulate(RowData input) throws Exception;
    
    /** Retract input row (for retracting streams) */
    void retract(RowData input) throws Exception;
    
    /** Merge accumulators from different partitions */
    void merge(RowData otherAcc) throws Exception;
    
    /** Reset accumulator to initial state */
    void resetAccumulator() throws Exception;
    
    /** Get final aggregation result */
    RowData getValue() throws Exception;
    
    /** Clean up expired state */
    void cleanup() throws Exception;
    
    /** Close and release resources */
    void close() throws Exception;
}

/**
 * Table aggregation function interface for generated code
 * Handles table-valued aggregation functions (UDTAF)
 */
interface TableAggsHandleFunction {
    /** Set table aggregation accumulator */
    void setAccumulators(RowData acc) throws Exception;
    
    /** Get current accumulator values */
    RowData getAccumulators() throws Exception;
    
    /** Accumulate input row */
    void accumulate(RowData input) throws Exception;
    
    /** Retract input row */
    void retract(RowData input) throws Exception;
    
    /** Emit aggregation results */
    void emitValue(RowData acc, Collector<RowData> out) throws Exception;
    
    /** Clean up expired state */
    void cleanup() throws Exception;
    
    /** Close and release resources */
    void close() throws Exception;
}

/**
 * Join condition interface for generated code
 * Evaluates join predicates between left and right input rows
 */
interface JoinCondition {
    /** Evaluate join condition */
    boolean apply(RowData left, RowData right) throws Exception;
    
    /** Close and release resources */
    void close() throws Exception;
}

/**
 * Projection interface for generated code
 * Projects input rows to output rows with transformed fields
 */
interface Projection<IN, OUT> {
    /** Apply projection to input row */
    OUT apply(IN input) throws Exception;
    
    /** Close and release resources */
    void close() throws Exception;
}

/**
 * Record comparator interface for generated code
 * Compares two records for sorting and ranking operations
 */
interface RecordComparator {
    /** Compare two records */
    int compare(RowData o1, RowData o2);
    
    /** Close and release resources */
    void close() throws Exception;
}

/**
 * Record equalizer interface for generated code
 * Checks equality between two records
 */
interface RecordEqualiser {
    /** Check if two records are equal */
    boolean equals(RowData left, RowData right);
    
    /** Close and release resources */
    void close() throws Exception;
}

Generated Class Wrappers

Wrapper classes that encapsulate generated code for different types of operations, providing type safety and lifecycle management.

/** Wrapper for generated aggregation functions */
class GeneratedAggsHandleFunction extends GeneratedClass<AggsHandleFunction> {
    GeneratedAggsHandleFunction(String className, String code, Object[] references);
    
    /** Create new aggregation function instance */
    AggsHandleFunction newInstance(ClassLoader classLoader) throws Exception;
}

/** Wrapper for generated table aggregation functions */
class GeneratedTableAggsHandleFunction extends GeneratedClass<TableAggsHandleFunction> {
    GeneratedTableAggsHandleFunction(String className, String code, Object[] references);
    
    /** Create new table aggregation function instance */
    TableAggsHandleFunction newInstance(ClassLoader classLoader) throws Exception;
}

/** Wrapper for generated join conditions */
class GeneratedJoinCondition extends GeneratedClass<JoinCondition> {
    GeneratedJoinCondition(String className, String code, Object[] references);
    
    /** Create new join condition instance */
    JoinCondition newInstance(ClassLoader classLoader) throws Exception;
}

/** Wrapper for generated projections */
class GeneratedProjection<IN, OUT> extends GeneratedClass<Projection<IN, OUT>> {
    GeneratedProjection(String className, String code, Object[] references);
    
    /** Create new projection instance */
    Projection<IN, OUT> newInstance(ClassLoader classLoader) throws Exception;
}

/** Wrapper for generated record comparators */
class GeneratedRecordComparator extends GeneratedClass<RecordComparator> {
    GeneratedRecordComparator(String className, String code, Object[] references);
    
    /** Create new record comparator instance */
    RecordComparator newInstance(ClassLoader classLoader) throws Exception;
}

/** Wrapper for generated record equalizers */
class GeneratedRecordEqualiser extends GeneratedClass<RecordEqualiser> {
    GeneratedRecordEqualiser(String className, String code, Object[] references);
    
    /** Create new record equalizer instance */
    RecordEqualiser newInstance(ClassLoader classLoader) throws Exception;
}

Advanced Generated Functions

Specialized generated function interfaces for complex operations like window functions and hash functions.

/**
 * Window function interface for generated code
 * Processes elements within window boundaries
 */
interface WindowFunction<IN, OUT, KEY, W extends Window> {
    /** Apply window function to all elements in window */
    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
    
    /** Close and release resources */
    void close() throws Exception;
}

/**
 * Hash function interface for generated code
 * Computes hash codes for partitioning and bucketing
 */
interface HashFunction {
    /** Compute hash code for input row */
    int hashCode(RowData input);
    
    /** Close and release resources */
    void close() throws Exception;
}

/**
 * Filter function interface for generated code
 * Filters rows based on generated predicates
 */
interface FilterFunction {
    /** Evaluate filter condition */
    boolean filter(RowData input) throws Exception;
    
    /** Close and release resources */
    void close() throws Exception;
}

Generated Wrapper Classes for Advanced Functions

Wrapper classes for advanced generated functions providing type safety and lifecycle management.

/** Wrapper for generated window functions */
class GeneratedWindowFunction<IN, OUT, KEY, W extends Window> 
    extends GeneratedClass<WindowFunction<IN, OUT, KEY, W>> {
    
    GeneratedWindowFunction(String className, String code, Object[] references);
}

/** Wrapper for generated hash functions */
class GeneratedHashFunction extends GeneratedClass<HashFunction> {
    GeneratedHashFunction(String className, String code, Object[] references);
}

/** Wrapper for generated filter functions */
class GeneratedFilterFunction extends GeneratedClass<FilterFunction> {
    GeneratedFilterFunction(String className, String code, Object[] references);
}

/** Wrapper for generated result future (async operations) */
class GeneratedResultFuture<T> extends GeneratedClass<ResultFuture<T>> {
    GeneratedResultFuture(String className, String code, Object[] references);
}

Code Generation Utilities

Utility classes and interfaces for supporting the code generation process, including namespace management and function creation.

/**
 * Namespace for generated functions
 * Manages references and context for generated code execution
 */
interface FunctionContext {
    /** Get reference by index */
    <T> T getReference(int index);
    
    /** Get metric group for monitoring */
    MetricGroup getMetricGroup();
    
    /** Get job parameters */
    Configuration getJobParameters();
    
    /** Get user class loader */
    ClassLoader getUserCodeClassLoader();
}

/**
 * Generated function with context
 * Base interface for generated functions that need runtime context
 */
interface GeneratedFunction<T> {
    /** Open function with context */
    void open(FunctionContext context) throws Exception;
    
    /** Get the actual function instance */
    T getInstance();
    
    /** Close and cleanup */
    void close() throws Exception;
}

Specialized Generated Operations

Generated classes for specialized operations like key selectors, partition computers, and serializers.

/** Wrapper for generated key selectors */
class GeneratedKeySelector<IN, KEY> extends GeneratedClass<KeySelector<IN, KEY>> {
    GeneratedKeySelector(String className, String code, Object[] references);
}

/** Wrapper for generated partition computers */
class GeneratedPartitionComputer<T> extends GeneratedClass<PartitionComputer<T>> {
    GeneratedPartitionComputer(String className, String code, Object[] references);
}

/** Wrapper for generated serializers */
class GeneratedSerializer<T> extends GeneratedClass<TypeSerializer<T>> {
    GeneratedSerializer(String className, String code, Object[] references);
}

/** Wrapper for generated watermark generators */
class GeneratedWatermarkGenerator extends GeneratedClass<WatermarkGenerator<RowData>> {
    GeneratedWatermarkGenerator(String className, String code, Object[] references);
}

Code Generation Context

Context and configuration classes for managing the code generation process, including compilation settings and optimization parameters.

/**
 * Configuration for code generation
 * Controls compilation settings and optimization parameters
 */
class CodeGeneratorContext {
    /** Add reference object */
    String addReusableReference(Object obj, String className);
    
    /** Add reusable local variable */
    String addReusableLocalVariable(String variableName, String initialValue);
    
    /** Add reusable function */
    String addReusableFunction(String function, String functionName);
    
    /** Get generated references */
    Object[] getReferences();
    
    /** Set null check enabled */
    void setNullCheckEnabled(boolean enabled);
    
    /** Set operator context */
    void setOperatorContext(StreamingRuntimeContext context);
}

/**
 * Code generation utilities
 * Provides helper methods for generating optimized code
 */
class CodeGenUtils {
    /** Generate null-safe field access code */
    static String genNullSafeFieldAccess(String input, int index, LogicalType type);
    
    /** Generate type conversion code */
    static String genTypeConversion(String input, LogicalType fromType, LogicalType toType);
    
    /** Generate comparison code */
    static String genComparison(String left, String right, LogicalType type);
    
    /** Generate hash code computation */
    static String genHashCode(String input, LogicalType type);
}

Usage Examples

// Create generated aggregation function
GeneratedAggsHandleFunction genAggFunction = new GeneratedAggsHandleFunction(
    "MyAggFunction",
    generatedCode,
    references
);

// Instantiate the generated function
AggsHandleFunction aggFunction = genAggFunction.newInstance(classLoader);

// Use in aggregation operator
GroupAggFunction groupAggOperator = new GroupAggFunction(
    genAggFunction,
    accTypes,
    indexOfCountStar,
    generateUpdateBefore,
    needRetract
);

// Create generated join condition
GeneratedJoinCondition genJoinCondition = new GeneratedJoinCondition(
    "MyJoinCondition", 
    joinConditionCode,
    joinReferences
);

// Use in join operator
SortMergeJoinOperator joinOperator = new SortMergeJoinOperator(
    FlinkJoinType.INNER,
    genJoinCondition,
    leftComparator,
    rightComparator
);

// Create generated projection
GeneratedProjection<RowData, RowData> genProjection = new GeneratedProjection<>(
    "MyProjection",
    projectionCode, 
    projectionReferences
);

// Use projection
Projection<RowData, RowData> projection = genProjection.newInstance(classLoader);
RowData output = projection.apply(input);

// Create code generation context
CodeGeneratorContext ctx = new CodeGeneratorContext();
ctx.setNullCheckEnabled(true);
ctx.addReusableReference(myObject, "MyClass");

// Generate optimized code with context
String optimizedCode = generateOptimizedFunction(ctx, expression);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-runtime-blink-2-11

docs

code-generation.md

data-structures.md

filesystem.md

index.md

runtime-operators.md

type-system.md

utilities.md

tile.json