Runtime classes required by a task manager for execution of table programs in Apache Flink's Blink planner
Framework for generating optimized runtime code including aggregation functions, join conditions, projections, and other operations for maximum performance through compile-time optimization.
Base infrastructure for all generated classes, providing the foundation for compile-time code generation and runtime class loading.
/**
* Base class for all generated classes
* Provides foundation for compile-time code generation and runtime class loading
*/
abstract class GeneratedClass<T> {
/** Get the class name of the generated class */
String getClassName();
/** Get the generated Java source code */
String getCode();
/** Get references to external objects used in generated code */
Object[] getReferences();
/** Compile and instantiate the generated class */
T newInstance(ClassLoader classLoader) throws Exception;
}Core interfaces that generated functions must implement, defining the contract for various types of generated operations.
/**
* Aggregation function interface for generated code
* Handles accumulation and retrieval of aggregated values
*/
interface AggsHandleFunction {
/** Set aggregation accumulator */
void setAccumulators(RowData acc) throws Exception;
/** Get current accumulator values */
RowData getAccumulators() throws Exception;
/** Accumulate input row */
void accumulate(RowData input) throws Exception;
/** Retract input row (for retracting streams) */
void retract(RowData input) throws Exception;
/** Merge accumulators from different partitions */
void merge(RowData otherAcc) throws Exception;
/** Reset accumulator to initial state */
void resetAccumulator() throws Exception;
/** Get final aggregation result */
RowData getValue() throws Exception;
/** Clean up expired state */
void cleanup() throws Exception;
/** Close and release resources */
void close() throws Exception;
}
/**
* Table aggregation function interface for generated code
* Handles table-valued aggregation functions (UDTAF)
*/
interface TableAggsHandleFunction {
/** Set table aggregation accumulator */
void setAccumulators(RowData acc) throws Exception;
/** Get current accumulator values */
RowData getAccumulators() throws Exception;
/** Accumulate input row */
void accumulate(RowData input) throws Exception;
/** Retract input row */
void retract(RowData input) throws Exception;
/** Emit aggregation results */
void emitValue(RowData acc, Collector<RowData> out) throws Exception;
/** Clean up expired state */
void cleanup() throws Exception;
/** Close and release resources */
void close() throws Exception;
}
/**
* Join condition interface for generated code
* Evaluates join predicates between left and right input rows
*/
interface JoinCondition {
/** Evaluate join condition */
boolean apply(RowData left, RowData right) throws Exception;
/** Close and release resources */
void close() throws Exception;
}
/**
* Projection interface for generated code
* Projects input rows to output rows with transformed fields
*/
interface Projection<IN, OUT> {
/** Apply projection to input row */
OUT apply(IN input) throws Exception;
/** Close and release resources */
void close() throws Exception;
}
/**
* Record comparator interface for generated code
* Compares two records for sorting and ranking operations
*/
interface RecordComparator {
/** Compare two records */
int compare(RowData o1, RowData o2);
/** Close and release resources */
void close() throws Exception;
}
/**
* Record equalizer interface for generated code
* Checks equality between two records
*/
interface RecordEqualiser {
/** Check if two records are equal */
boolean equals(RowData left, RowData right);
/** Close and release resources */
void close() throws Exception;
}Wrapper classes that encapsulate generated code for different types of operations, providing type safety and lifecycle management.
/** Wrapper for generated aggregation functions */
class GeneratedAggsHandleFunction extends GeneratedClass<AggsHandleFunction> {
GeneratedAggsHandleFunction(String className, String code, Object[] references);
/** Create new aggregation function instance */
AggsHandleFunction newInstance(ClassLoader classLoader) throws Exception;
}
/** Wrapper for generated table aggregation functions */
class GeneratedTableAggsHandleFunction extends GeneratedClass<TableAggsHandleFunction> {
GeneratedTableAggsHandleFunction(String className, String code, Object[] references);
/** Create new table aggregation function instance */
TableAggsHandleFunction newInstance(ClassLoader classLoader) throws Exception;
}
/** Wrapper for generated join conditions */
class GeneratedJoinCondition extends GeneratedClass<JoinCondition> {
GeneratedJoinCondition(String className, String code, Object[] references);
/** Create new join condition instance */
JoinCondition newInstance(ClassLoader classLoader) throws Exception;
}
/** Wrapper for generated projections */
class GeneratedProjection<IN, OUT> extends GeneratedClass<Projection<IN, OUT>> {
GeneratedProjection(String className, String code, Object[] references);
/** Create new projection instance */
Projection<IN, OUT> newInstance(ClassLoader classLoader) throws Exception;
}
/** Wrapper for generated record comparators */
class GeneratedRecordComparator extends GeneratedClass<RecordComparator> {
GeneratedRecordComparator(String className, String code, Object[] references);
/** Create new record comparator instance */
RecordComparator newInstance(ClassLoader classLoader) throws Exception;
}
/** Wrapper for generated record equalizers */
class GeneratedRecordEqualiser extends GeneratedClass<RecordEqualiser> {
GeneratedRecordEqualiser(String className, String code, Object[] references);
/** Create new record equalizer instance */
RecordEqualiser newInstance(ClassLoader classLoader) throws Exception;
}Specialized generated function interfaces for complex operations like window functions and hash functions.
/**
* Window function interface for generated code
* Processes elements within window boundaries
*/
interface WindowFunction<IN, OUT, KEY, W extends Window> {
/** Apply window function to all elements in window */
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
/** Close and release resources */
void close() throws Exception;
}
/**
* Hash function interface for generated code
* Computes hash codes for partitioning and bucketing
*/
interface HashFunction {
/** Compute hash code for input row */
int hashCode(RowData input);
/** Close and release resources */
void close() throws Exception;
}
/**
* Filter function interface for generated code
* Filters rows based on generated predicates
*/
interface FilterFunction {
/** Evaluate filter condition */
boolean filter(RowData input) throws Exception;
/** Close and release resources */
void close() throws Exception;
}Wrapper classes for advanced generated functions providing type safety and lifecycle management.
/** Wrapper for generated window functions */
class GeneratedWindowFunction<IN, OUT, KEY, W extends Window>
extends GeneratedClass<WindowFunction<IN, OUT, KEY, W>> {
GeneratedWindowFunction(String className, String code, Object[] references);
}
/** Wrapper for generated hash functions */
class GeneratedHashFunction extends GeneratedClass<HashFunction> {
GeneratedHashFunction(String className, String code, Object[] references);
}
/** Wrapper for generated filter functions */
class GeneratedFilterFunction extends GeneratedClass<FilterFunction> {
GeneratedFilterFunction(String className, String code, Object[] references);
}
/** Wrapper for generated result future (async operations) */
class GeneratedResultFuture<T> extends GeneratedClass<ResultFuture<T>> {
GeneratedResultFuture(String className, String code, Object[] references);
}Utility classes and interfaces for supporting the code generation process, including namespace management and function creation.
/**
* Namespace for generated functions
* Manages references and context for generated code execution
*/
interface FunctionContext {
/** Get reference by index */
<T> T getReference(int index);
/** Get metric group for monitoring */
MetricGroup getMetricGroup();
/** Get job parameters */
Configuration getJobParameters();
/** Get user class loader */
ClassLoader getUserCodeClassLoader();
}
/**
* Generated function with context
* Base interface for generated functions that need runtime context
*/
interface GeneratedFunction<T> {
/** Open function with context */
void open(FunctionContext context) throws Exception;
/** Get the actual function instance */
T getInstance();
/** Close and cleanup */
void close() throws Exception;
}Generated classes for specialized operations like key selectors, partition computers, and serializers.
/** Wrapper for generated key selectors */
class GeneratedKeySelector<IN, KEY> extends GeneratedClass<KeySelector<IN, KEY>> {
GeneratedKeySelector(String className, String code, Object[] references);
}
/** Wrapper for generated partition computers */
class GeneratedPartitionComputer<T> extends GeneratedClass<PartitionComputer<T>> {
GeneratedPartitionComputer(String className, String code, Object[] references);
}
/** Wrapper for generated serializers */
class GeneratedSerializer<T> extends GeneratedClass<TypeSerializer<T>> {
GeneratedSerializer(String className, String code, Object[] references);
}
/** Wrapper for generated watermark generators */
class GeneratedWatermarkGenerator extends GeneratedClass<WatermarkGenerator<RowData>> {
GeneratedWatermarkGenerator(String className, String code, Object[] references);
}Context and configuration classes for managing the code generation process, including compilation settings and optimization parameters.
/**
* Configuration for code generation
* Controls compilation settings and optimization parameters
*/
class CodeGeneratorContext {
/** Add reference object */
String addReusableReference(Object obj, String className);
/** Add reusable local variable */
String addReusableLocalVariable(String variableName, String initialValue);
/** Add reusable function */
String addReusableFunction(String function, String functionName);
/** Get generated references */
Object[] getReferences();
/** Set null check enabled */
void setNullCheckEnabled(boolean enabled);
/** Set operator context */
void setOperatorContext(StreamingRuntimeContext context);
}
/**
* Code generation utilities
* Provides helper methods for generating optimized code
*/
class CodeGenUtils {
/** Generate null-safe field access code */
static String genNullSafeFieldAccess(String input, int index, LogicalType type);
/** Generate type conversion code */
static String genTypeConversion(String input, LogicalType fromType, LogicalType toType);
/** Generate comparison code */
static String genComparison(String left, String right, LogicalType type);
/** Generate hash code computation */
static String genHashCode(String input, LogicalType type);
}// Create generated aggregation function
GeneratedAggsHandleFunction genAggFunction = new GeneratedAggsHandleFunction(
"MyAggFunction",
generatedCode,
references
);
// Instantiate the generated function
AggsHandleFunction aggFunction = genAggFunction.newInstance(classLoader);
// Use in aggregation operator
GroupAggFunction groupAggOperator = new GroupAggFunction(
genAggFunction,
accTypes,
indexOfCountStar,
generateUpdateBefore,
needRetract
);
// Create generated join condition
GeneratedJoinCondition genJoinCondition = new GeneratedJoinCondition(
"MyJoinCondition",
joinConditionCode,
joinReferences
);
// Use in join operator
SortMergeJoinOperator joinOperator = new SortMergeJoinOperator(
FlinkJoinType.INNER,
genJoinCondition,
leftComparator,
rightComparator
);
// Create generated projection
GeneratedProjection<RowData, RowData> genProjection = new GeneratedProjection<>(
"MyProjection",
projectionCode,
projectionReferences
);
// Use projection
Projection<RowData, RowData> projection = genProjection.newInstance(classLoader);
RowData output = projection.apply(input);
// Create code generation context
CodeGeneratorContext ctx = new CodeGeneratorContext();
ctx.setNullCheckEnabled(true);
ctx.addReusableReference(myObject, "MyClass");
// Generate optimized code with context
String optimizedCode = generateOptimizedFunction(ctx, expression);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-runtime-blink-2-11