The expression and conversion system provides seamless integration between Scala values and Flink expressions through implicit conversions and rich expression creation methods.
Main trait providing implicit conversions from Scala literals to Expression objects and expression creation utilities.
trait ImplicitExpressionConversions {
// Window operation constants
implicit val UNBOUNDED_ROW: Expression
implicit val UNBOUNDED_RANGE: Expression
implicit val CURRENT_ROW: Expression
implicit val CURRENT_RANGE: Expression
// Field reference creation
def $(name: String): Expression
def col(name: String): Expression
// Literal creation
def lit(v: Any): Expression
def lit(v: Any, dataType: DataType): Expression
// Function calls
def call(path: String, params: Expression*): Expression
def call(function: UserDefinedFunction, params: Expression*): Expression
def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression
def callSql(sqlExpression: String): Expression
}Adds expression operations to existing Expression objects.
implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations {
def expr: Expression = e
}Creates field expressions from Scala symbols.
implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperations {
def expr: Expression = unresolvedRef(s.name)
}Usage example:
val fieldExpr = 'myField // Creates field reference from symbolGeneric wrapper for any type with implicit conversion to Expression.
implicit class AnyWithOperations[T](e: T)(implicit toExpr: T => Expression)
extends ImplicitExpressionOperations {
def expr: Expression = toExpr(e)
}Specialized implicit classes for primitive types that provide expression operations:
implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations
implicit class LiteralByteExpression(b: Byte) extends ImplicitExpressionOperations
implicit class LiteralShortExpression(s: Short) extends ImplicitExpressionOperations
implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations
implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations
implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations
implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations
implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperationsUsage example:
val expr = 42.asInstanceOf[Int] + 10 // Arithmetic operations on literalsEnables calling scalar functions with expression parameters.
implicit class ScalarFunctionCall(scalarFunction: ScalarFunction) {
def apply(params: Expression*): Expression
}Enables calling table functions.
implicit class TableFunctionCall(tableFunction: TableFunction[_]) {
def apply(params: Expression*): Expression
}Enables calling aggregate functions.
implicit class ImperativeAggregateFunctionCall(aggregateFunction: ImperativeAggregateFunction[_, _]) {
def apply(params: Expression*): Expression
}implicit class FieldExpression(sc: StringContext) {
def $(args: Any*): Expression // String interpolation: $"fieldName"
}Usage examples:
$"userId" // Simple field reference
$"user.address" // Nested field reference
$"items[0].price" // Array element accessimplicit def byte2Literal(b: Byte): Expression
implicit def short2Literal(s: Short): Expression
implicit def int2Literal(i: Int): Expression
implicit def long2Literal(l: Long): Expression
implicit def double2Literal(d: Double): Expression
implicit def float2Literal(f: Float): Expression
implicit def string2Literal(str: String): Expression
implicit def boolean2Literal(bool: Boolean): Expression
implicit def javaByte2Literal(b: JByte): Expression
implicit def javaShort2Literal(s: JShort): Expression
implicit def javaInt2Literal(i: JInteger): Expression
implicit def javaLong2Literal(l: JLong): Expression
implicit def javaDouble2Literal(d: JDouble): Expression
implicit def javaFloat2Literal(f: JFloat): Expression
implicit def javaBoolean2Literal(bool: JBoolean): Expressionimplicit def sqlDate2Literal(d: Date): Expression
implicit def sqlTime2Literal(t: Time): Expression
implicit def sqlTimestamp2Literal(t: Timestamp): Expression
implicit def localDate2Literal(d: LocalDate): Expression
implicit def localTime2Literal(t: LocalTime): Expression
implicit def localDateTime2Literal(dt: LocalDateTime): Expressionimplicit def seq2ArrayConstructor(seq: Seq[Expression]): Expression
implicit def map2MapConstructor(map: Map[Expression, Expression]): Expressionimplicit def bigDecimal2Literal(bd: JBigDecimal): Expression
implicit def row2Literal(row: Row): Expression
implicit def list2ArrayConstructor(list: JList[Expression]): Expression
implicit def map2MapConstructor(map: JMap[Expression, Expression]): Expressiondef $(name: String): Expression // Create field reference
def col(name: String): Expression // Alias for $()def lit(v: Any): Expression // Create literal from any value
def lit(v: Any, dataType: DataType): Expression // Create typed literaldef call(path: String, params: Expression*): Expression // Call catalog function
def call(function: UserDefinedFunction, params: Expression*): Expression // Call inline function
def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression // Call UDF class
def callSql(sqlExpression: String): Expression // Execute SQL expressionimport org.apache.flink.table.api._
// Field references
val userId = $"userId"
val userField = col("user")
// Literals
val constValue = lit(42)
val constString = lit("hello")
// Mixed operations
val result = $"price" * 1.1 + lit(5.0)// Built-in function call
val upperName = call("UPPER", $"name")
// User-defined function
class MyUDF extends ScalarFunction {
def eval(value: String): String = value.toUpperCase
}
val myUdf = new MyUDF()
val result = call(myUdf, $"name")
// SQL expression
val sqlResult = callSql("CURRENT_TIMESTAMP")// Using symbols for field names (legacy approach)
val field1 = 'userId // Equivalent to $"userId"
val field2 = 'user_name // Equivalent to $"user_name"// Using window constants
import org.apache.flink.table.api._
val windowSpec = Over
.partitionBy($"userId")
.orderBy($"timestamp")
.preceding(UNBOUNDED_RANGE)
.following(CURRENT_ROW)// Array construction from Scala sequence
val items = Seq(lit(1), lit(2), lit(3))
val arrayExpr = items // Implicitly converted to array constructor
// Map construction
val mapData = Map(lit("key1") -> lit("value1"), lit("key2") -> lit("value2"))
val mapExpr = mapData // Implicitly converted to map constructorThe implicit conversion system maintains type safety through:
Option[T]