The type system provides seamless bridging between Flink's type system and Apache Calcite's type system, enabling sophisticated type inference, expression handling, and runtime type safety. This component handles the complex mapping between logical types, physical types, and runtime representations.
import org.apache.flink.table.planner.expressions.RexNodeExpression;
import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
import org.apache.flink.table.planner.plan.schema.RawRelDataType;
import org.apache.flink.table.planner.plan.schema.StructuredRelDataType;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rel.type.RelDataType;Bridge between Flink expressions and Calcite RexNode, enabling seamless integration between the two expression systems.
public final class RexNodeExpression implements ResolvedExpression {
/**
* Creates a RexNodeExpression from a RexNode and output data type.
*/
public RexNodeExpression(RexNode rexNode, LogicalType outputDataType);
/**
* Returns the underlying Calcite RexNode.
*/
public RexNode getRexNode();
/**
* Returns the logical output data type of this expression.
*/
@Override
public LogicalType getOutputDataType();
/**
* Returns the resolved children of this expression.
*/
@Override
public List<ResolvedExpression> getResolvedChildren();
/**
* Accepts a visitor for expression traversal.
*/
@Override
public <R> R accept(ExpressionVisitor<R> visitor);
/**
* Returns the string representation of this expression.
*/
@Override
public String asSummaryString();
/**
* Returns whether this expression is equivalent to another expression.
*/
public boolean semanticEquals(RexNodeExpression other);
}The RexNodeExpression serves as the primary bridge between Flink's expression system and Calcite's RexNode system, enabling:
Usage Example:
import org.apache.flink.table.planner.expressions.RexNodeExpression;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
// Create RexNode using Calcite's RexBuilder
RexBuilder rexBuilder = // obtained from planner context
RexNode rexNode = rexBuilder.makeCall(
SqlStdOperatorTable.PLUS,
rexBuilder.makeInputRef(DataTypes.INT().getLogicalType(), 0),
rexBuilder.makeLiteral(10)
);
// Create RexNodeExpression bridge
LogicalType outputType = DataTypes.INT().getLogicalType();
RexNodeExpression expression = new RexNodeExpression(rexNode, outputType);
// Use in Flink expression contexts
LogicalType resultType = expression.getOutputDataType();
String summary = expression.asSummaryString(); // "plus($0, 10)"
// Access underlying RexNode for Calcite operations
RexNode underlyingRex = expression.getRexNode();Type inference utilities specifically designed for the planner module, providing sophisticated type resolution for function calls and expressions.
public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil {
/**
* Singleton instance of the type inference utility.
*/
public static final PlannerTypeInferenceUtilImpl INSTANCE;
/**
* Runs type inference for a call expression within a given context.
*
* @param callExpression The call expression to infer types for
* @param callContext The context providing additional type information
* @return TypeInference result containing inferred types and validation
*/
public TypeInference runTypeInference(
CallExpression callExpression,
CallContext callContext
);
/**
* Infers the result type of a function call.
*/
public Optional<LogicalType> inferOutputType(
CallExpression callExpression,
List<LogicalType> argumentTypes
);
/**
* Validates argument types against function signature.
*/
public ValidationResult validateArgumentTypes(
FunctionDefinition functionDefinition,
List<LogicalType> argumentTypes
);
/**
* Performs type coercion to make arguments compatible with function signature.
*/
public List<LogicalType> coerceArgumentTypes(
FunctionDefinition functionDefinition,
List<LogicalType> argumentTypes
);
}Key Features:
Usage Example:
import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl;
import org.apache.flink.table.expressions.CallExpression;
// Get type inference utility
PlannerTypeInferenceUtilImpl typeInference = PlannerTypeInferenceUtilImpl.INSTANCE;
// Run type inference for a function call
CallExpression callExpr = // expression like UPPER(name)
CallContext context = // call context with table schema
TypeInference result = typeInference.runTypeInference(callExpr, context);
if (result.isSuccess()) {
LogicalType outputType = result.getOutputType();
List<LogicalType> argumentTypes = result.getExpectedArgumentTypes();
// Use inferred types for further processing
processTypedExpression(callExpr, outputType, argumentTypes);
} else {
// Handle type inference failure
ValidationException error = result.getValidationException();
throw new TableException("Type inference failed: " + error.getMessage());
}
// Direct type inference for specific scenarios
Optional<LogicalType> outputType = typeInference.inferOutputType(
callExpr,
Arrays.asList(DataTypes.STRING().getLogicalType())
);Calcite type factory implementation for Flink, responsible for creating and managing Calcite RelDataType objects that correspond to Flink's logical types.
class FlinkTypeFactory extends JavaTypeFactoryImpl {
/**
* Creates a Calcite RelDataType from a Flink LogicalType.
*/
def createFieldTypeFromLogicalType(logicalType: LogicalType): RelDataType
/**
* Creates a structured RelDataType for complex types.
*/
def createStructType(
fieldTypes: util.List[RelDataType],
fieldNames: util.List[String]
): RelDataType
/**
* Creates an array RelDataType.
*/
def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType
/**
* Creates a map RelDataType.
*/
def createMapType(keyType: RelDataType, valueType: RelDataType): RelDataType
/**
* Creates a multiset RelDataType.
*/
def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType
/**
* Converts a Calcite RelDataType back to a Flink LogicalType.
*/
def toLogicalType(relDataType: RelDataType): LogicalType
/**
* Creates a raw type for unstructured data.
*/
def createRawType[T](typeInfo: TypeInformation[T]): RelDataType
}Usage Example:
// Create FlinkTypeFactory instance
val typeFactory = new FlinkTypeFactory(getTypeSystem)
// Convert Flink LogicalType to Calcite RelDataType
val flinkStringType = DataTypes.STRING().getLogicalType()
val calciteStringType = typeFactory.createFieldTypeFromLogicalType(flinkStringType)
// Create structured types
val fieldNames = util.Arrays.asList("id", "name", "age")
val fieldTypes = util.Arrays.asList(
typeFactory.createFieldTypeFromLogicalType(DataTypes.BIGINT().getLogicalType()),
typeFactory.createFieldTypeFromLogicalType(DataTypes.STRING().getLogicalType()),
typeFactory.createFieldTypeFromLogicalType(DataTypes.INT().getLogicalType())
)
val structType = typeFactory.createStructType(fieldTypes, fieldNames)
// Create collection types
val arrayType = typeFactory.createArrayType(calciteStringType, -1) // Unlimited size
val mapType = typeFactory.createMapType(calciteStringType, calciteStringType)
// Convert back to Flink types
val backToFlink = typeFactory.toLogicalType(structType)Type system implementation that defines type compatibility, precision, and scale rules for Flink types within Calcite.
object FlinkTypeSystem extends RelDataTypeSystemImpl {
/**
* Returns the maximum precision for numeric types.
*/
override def getMaxPrecision(typeName: SqlTypeName): Int
/**
* Returns the default precision for numeric types.
*/
override def getDefaultPrecision(typeName: SqlTypeName): Int
/**
* Returns the maximum scale for decimal types.
*/
override def getMaxScale(typeName: SqlTypeName): Int
/**
* Determines if a type should be auto-cast to another type.
*/
override def shouldConvertRaggedUnionTypesToVarying(): Boolean
/**
* Returns the literal specification for a given type.
*/
override def deriveLiteralType(literal: SqlLiteral): RelDataType
/**
* Determines type precedence for implicit type conversion.
*/
override def compareTypePrecedence(type1: RelDataType, type2: RelDataType): Int
}Represents unstructured/raw data types in the Calcite type system.
public class RawRelDataType extends RelDataTypeImpl {
/**
* Creates a RawRelDataType from TypeInformation.
*/
public RawRelDataType(TypeInformation<?> typeInformation, FlinkTypeFactory typeFactory);
/**
* Returns the underlying TypeInformation.
*/
public TypeInformation<?> getTypeInformation();
/**
* Returns the originating type factory.
*/
public FlinkTypeFactory getFlinkTypeFactory();
/**
* Converts this raw type to a LogicalType.
*/
public LogicalType toLogicalType();
@Override
public SqlTypeName getSqlTypeName();
@Override
public boolean isStruct();
}Usage Example:
// Create RawRelDataType for custom objects
TypeInformation<MyCustomClass> typeInfo = TypeInformation.of(MyCustomClass.class);
FlinkTypeFactory typeFactory = // obtained from planner
RawRelDataType rawType = new RawRelDataType(typeInfo, typeFactory);
// Use in Calcite operations
SqlTypeName sqlType = rawType.getSqlTypeName(); // Returns OTHER
LogicalType logicalType = rawType.toLogicalType(); // Converts to RawType
TypeInformation<?> originalTypeInfo = rawType.getTypeInformation();Represents structured data types (ROW types) in the Calcite type system.
public class StructuredRelDataType extends RelDataTypeImpl {
/**
* Creates a StructuredRelDataType from field information.
*/
public StructuredRelDataType(
LogicalType logicalType,
List<RelDataTypeField> fields,
FlinkTypeFactory typeFactory
);
/**
* Returns the underlying LogicalType.
*/
public LogicalType getLogicalType();
/**
* Returns the originating type factory.
*/
public FlinkTypeFactory getFlinkTypeFactory();
@Override
public List<RelDataTypeField> getFieldList();
@Override
public RelDataTypeField getField(String fieldName, boolean caseSensitive, boolean elideRecord);
@Override
public boolean isStruct();
}Usage Example:
// Create StructuredRelDataType for ROW types
LogicalType rowType = RowType.of(
new LogicalType[]{
DataTypes.BIGINT().getLogicalType(),
DataTypes.STRING().getLogicalType()
},
new String[]{"id", "name"}
);
List<RelDataTypeField> fields = createFieldsFromRowType(rowType);
StructuredRelDataType structType = new StructuredRelDataType(
rowType,
fields,
typeFactory
);
// Access field information
RelDataTypeField idField = structType.getField("id", false, false);
RelDataTypeField nameField = structType.getField("name", false, false);
List<RelDataTypeField> allFields = structType.getFieldList();// Convert Flink types to Calcite types for optimization
public RelDataType convertFlinkTypeToCalcite(LogicalType flinkType, FlinkTypeFactory typeFactory) {
switch (flinkType.getTypeRoot()) {
case INTEGER:
return typeFactory.createSqlType(SqlTypeName.INTEGER);
case VARCHAR:
VarCharType varCharType = (VarCharType) flinkType;
return typeFactory.createVarcharType(varCharType.getLength());
case ROW:
RowType rowType = (RowType) flinkType;
return convertRowType(rowType, typeFactory);
case ARRAY:
ArrayType arrayType = (ArrayType) flinkType;
RelDataType elementType = convertFlinkTypeToCalcite(arrayType.getElementType(), typeFactory);
return typeFactory.createArrayType(elementType, -1);
default:
return typeFactory.createFieldTypeFromLogicalType(flinkType);
}
}// Convert Calcite types back to Flink types after optimization
public LogicalType convertCalciteTypeToFlink(RelDataType calciteType, FlinkTypeFactory typeFactory) {
if (calciteType instanceof RawRelDataType) {
return ((RawRelDataType) calciteType).toLogicalType();
} else if (calciteType instanceof StructuredRelDataType) {
return ((StructuredRelDataType) calciteType).getLogicalType();
} else {
return typeFactory.toLogicalType(calciteType);
}
}// Convert function call to RexNodeExpression
public RexNodeExpression createUpperExpression(
RexBuilder rexBuilder,
RexNode inputRef,
FlinkTypeFactory typeFactory
) {
// Create UPPER function call
RexNode upperCall = rexBuilder.makeCall(
SqlStdOperatorTable.UPPER,
inputRef
);
// Determine output type (same as input for UPPER)
LogicalType outputType = DataTypes.STRING().getLogicalType();
// Create bridge expression
return new RexNodeExpression(upperCall, outputType);
}// Custom function with sophisticated type inference
public class CustomConcatFunction extends ScalarFunction {
public String eval(String str1, String str2) {
return str1 + str2;
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.inputTypeStrategy(InputTypeStrategies.sequence(
InputTypeStrategies.logical(LogicalTypeRoot.VARCHAR),
InputTypeStrategies.logical(LogicalTypeRoot.VARCHAR)
))
.outputTypeStrategy(TypeStrategies.explicit(
DataTypes.STRING().notNull()
))
.build();
}
}
// Use with PlannerTypeInferenceUtil
CallExpression concatCall = // call to CustomConcatFunction
TypeInference inference = PlannerTypeInferenceUtilImpl.INSTANCE
.runTypeInference(concatCall, callContext);// Robust type handling with proper error management
public RexNodeExpression safeCreateExpression(
RexNode rexNode,
LogicalType expectedType,
FlinkTypeFactory typeFactory
) {
try {
// Validate type compatibility
RelDataType calciteType = typeFactory.createFieldTypeFromLogicalType(expectedType);
if (!TypeCompatibility.isCompatible(rexNode.getType(), calciteType)) {
throw new ValidationException(
String.format("Type mismatch: expected %s but got %s",
calciteType, rexNode.getType())
);
}
return new RexNodeExpression(rexNode, expectedType);
} catch (Exception e) {
throw new TableException(
"Failed to create RexNodeExpression: " + e.getMessage(), e
);
}
}