The Flink Scala Table API provides an extensive collection of built-in functions for date/time operations, mathematical calculations, string manipulation, JSON processing, and utility operations.
def currentDate(): Expression // Current date
def currentTime(): Expression // Current time
def currentTimestamp(): Expression // Current timestamp
def localTime(): Expression // Local time
def localTimestamp(): Expression // Local timestampUsage examples:
import org.apache.flink.table.api._
// Current date/time functions
val orders = table.select(
$"orderId",
$"amount",
currentDate() as "processedDate",
currentTimestamp() as "processedTime"
)
// Time-based filtering
val recentOrders = table.filter($"orderDate" >= currentDate() - interval(lit(7), DAY))def interval(count: Expression, unit: TimePointUnit): ExpressionUsage examples:
// Date intervals
val futureDate = $"startDate" + interval(lit(30), DAY)
val pastMonth = currentDate() - interval(lit(1), MONTH)
val nextYear = $"birthDate" + interval(lit(1), YEAR)
// Time intervals
val laterTime = currentTime() + interval(lit(2), HOUR)
val earlierTime = $"meetingTime" - interval(lit(15), MINUTE)def pi(): Expression // π constant
def e(): Expression // Euler's number constantdef rand(): Expression // Random double [0.0, 1.0)
def randInteger(bound: Expression): Expression // Random integer [0, bound)def atan2(y: Expression, x: Expression): Expression // Arctangent of y/xdef log(base: Expression, antilogarithm: Expression): Expression // Logarithm
def exp(base: Expression): Expression // Exponential
def power(base: Expression, exponent: Expression): Expression // Powerdef mod(numeric1: Expression, numeric2: Expression): Expression // ModuloUsage examples:
// Mathematical calculations
val circleArea = pi() * power($"radius", lit(2))
val randomScore = rand() * lit(100)
val randomId = randInteger(lit(1000000))
// Logarithmic operations
val logBase10 = log(lit(10), $"value")
val naturalLog = log(e(), $"value")
val exponential = exp($"growthRate")
// Trigonometry
val angle = atan2($"y", $"x")
// Modulo operations
val evenOdd = mod($"number", lit(2)) === lit(0)def concat(string: Expression*): Expression // Concatenate strings
def concatWs(separator: Expression, string: Expression*): Expression // Concatenate with separator
def upper(string: Expression): Expression // Uppercase
def lower(string: Expression): Expression // Lowercase
def length(string: Expression): Expression // String length
def position(string: Expression, substring: Expression): Expression // Substring positiondef uuid(): Expression // Generate UUID stringUsage examples:
// String concatenation
val fullName = concat($"firstName", lit(" "), $"lastName")
val address = concatWs(lit(", "), $"street", $"city", $"state", $"zip")
// Case conversion
val upperName = upper($"name")
val lowerEmail = lower($"email")
// String analysis
val nameLength = length($"productName")
val emailAtPosition = position($"email", lit("@"))
// Unique identifiers
val trackingId = uuid()
// Complex string operations
val emailDomain = substring(
$"email",
position($"email", lit("@")) + lit(1),
length($"email")
)def jsonString(string: Expression): Expression // Create JSON string
def jsonObject(keyValue: Expression*): Expression // Create JSON object
def jsonArray(element: Expression*): Expression // Create JSON arraydef jsonValue(jsonString: Expression, path: Expression): Expression // Extract JSON value
def jsonQuery(jsonString: Expression, path: Expression): Expression // Extract JSON fragmentUsage examples:
// JSON construction
val userJson = jsonObject(
lit("id"), $"userId",
lit("name"), $"userName",
lit("email"), $"userEmail"
)
val tagsArray = jsonArray($"tag1", $"tag2", $"tag3")
// JSON querying
val jsonData = lit("""{"user": {"name": "John", "age": 30}, "active": true}""")
val userName = jsonValue(jsonData, lit("$.user.name"))
val userInfo = jsonQuery(jsonData, lit("$.user"))
// Working with JSON columns
val extractedAge = jsonValue($"userProfile", lit("$.age"))
val preferences = jsonQuery($"settings", lit("$.preferences"))def nullOf(dataType: DataType): Expression // Create typed null
def coalesce(expr: Expression*): Expression // First non-null value
def isnull(expr: Expression): Expression // Check if null
def isNotNull(expr: Expression): Expression // Check if not nulldef ifThenElse(
condition: Expression,
ifTrue: Expression,
ifFalse: Expression
): Expression // Conditional expressionUsage examples:
// Null handling
val defaultName = coalesce($"nickname", $"firstName", lit("Anonymous"))
val hasEmail = isNotNull($"email")
// Conditional logic
val status = ifThenElse(
$"age" >= lit(18),
lit("Adult"),
lit("Minor")
)
val discountedPrice = ifThenElse(
$"memberType" === lit("PREMIUM"),
$"price" * lit(0.9),
$"price"
)
// Typed nulls
val nullString = nullOf(DataTypes.STRING())
val nullInt = nullOf(DataTypes.INT())def withColumns(columns: Expression*): Expression // Add/modify columns
def withoutColumns(columns: Expression*): Expression // Remove columns
def range(from: Expression, to: Expression): Expression // Column rangeUsage examples:
// Add computed columns
val enrichedData = withColumns(
$"totalPrice" := $"price" * $"quantity",
$"discountedPrice" := $"price" * (lit(1) - $"discountRate"),
$"category" := upper($"productType")
)
// Remove columns
val publicData = withoutColumns($"ssn", $"creditCard", $"internalId")
// Column ranges (for wide tables)
val subset = range($"col1", $"col10") // Select col1 through col10def and(left: Expression, right: Expression): Expression // Logical AND
def or(left: Expression, right: Expression): Expression // Logical OR
def not(expression: Expression): Expression // Logical NOTUsage examples:
// Explicit logical operations (alternative to operators)
val condition1 = and($"age" >= lit(18), $"hasLicense" === lit(true))
val condition2 = or($"priority" === lit("HIGH"), $"urgent" === lit(true))
val condition3 = not($"deleted" === lit(true))
// Complex boolean logic
val eligibleForDiscount = and(
or($"memberType" === lit("GOLD"), $"memberType" === lit("PLATINUM")),
and($"totalPurchases" > lit(1000), not($"suspended" === lit(true)))
)// Combining multiple functions
val processedEmail = lower(trim($"email"))
val formattedName = concat(
upper(substring($"firstName", lit(1), lit(1))),
lower(substring($"firstName", lit(2), length($"firstName")))
)
// Complex date calculations
val businessDaysUntilDeadline = ifThenElse(
dayOfWeek(currentDate()) <= lit(5), // Monday-Friday
($"deadline" - currentDate()) * lit(5) / lit(7),
($"deadline" - currentDate()) * lit(5) / lit(7)
)// Explicit type casting when needed
val stringToInt = cast($"numberString", DataTypes.INT())
val doubleToDecimal = cast($"floatValue", DataTypes.DECIMAL(10, 2))
// Type-aware operations
val numericComparison = cast($"stringNumber", DataTypes.DOUBLE()) > lit(100.0)// Calling custom scalar functions
class CalculateTax extends ScalarFunction {
def eval(amount: Double, rate: Double): Double = amount * rate
}
val taxFunction = new CalculateTax()
val totalWithTax = call(taxFunction, $"price", $"taxRate")
// Calling catalog functions
val customResult = call("MY_CUSTOM_FUNCTION", $"input1", $"input2")// Always consider null handling
val safeLength = ifThenElse(
isNotNull($"text"),
length($"text"),
lit(0)
)
// Use coalesce for default values
val displayName = coalesce($"preferredName", $"firstName", lit("Unknown"))// Ensure compatible types in operations
val numericResult = cast($"stringValue", DataTypes.DOUBLE()) + lit(10.0)
// Consistent null handling across types
val nullSafeResult = coalesce($"optionalValue", nullOf(DataTypes.STRING()))// Prefer built-in functions over complex expressions
val optimizedCase = ifThenElse($"status" === lit("ACTIVE"), lit(1), lit(0))
// Rather than complex string manipulation
val efficientExtraction = jsonValue($"data", lit("$.user.id"))
// Instead of multiple string operations// Break complex function chains into readable parts
val cleanEmail = lower(trim($"rawEmail"))
val isValidEmail = position(cleanEmail, lit("@")) > lit(0)
val processedEmail = ifThenElse(isValidEmail, cleanEmail, lit("invalid"))