tessl install tessl/maven-org-apache-flink--flink-table-api-scala_2-12@1.20.0Scala API for Apache Flink's Table/SQL ecosystem providing idiomatic Scala interfaces for table operations
Built-in support for JSON construction and manipulation operations with configurable null handling behavior. These functions enable creation and aggregation of JSON objects and arrays directly within table operations.
Function for serializing arbitrary values to JSON strings.
/**
* Serializes a value into JSON string format
* Returns JSON string containing serialized value, or null if input is null
* @param value Value expression to serialize
* @return JSON string expression
*/
def jsonString(value: Expression): ExpressionUsage:
// Basic value serialization
table.select(
jsonString(1) as "numberJson", // "1"
jsonString("Hello") as "stringJson", // "\"Hello\""
jsonString(true) as "booleanJson", // "true"
jsonString(nullOf(DataTypes.INT())) as "nullJson" // null
)
// Complex value serialization
table.select(
jsonString(array(1, 2, 3)) as "arrayJson", // "[1,2,3]"
jsonString($"userData") as "objectJson"
)Functions for building JSON objects from key-value pairs.
/**
* Builds JSON object string from list of key-value pairs
* Keys must be string literals, values may be arbitrary expressions
* @param onNull Behavior for NULL values (JsonOnNull.NULL or JsonOnNull.ABSENT)
* @param keyValues Even-numbered list of alternating key/value pairs
* @return JSON object string
*/
def jsonObject(onNull: JsonOnNull, keyValues: Expression*): Expression
/**
* Builds JSON object by aggregating key-value expressions
* Keys must be non-nullable character strings, values can be arbitrary
* Not supported in OVER windows
* @param onNull Behavior for NULL values
* @param keyExpr Expression for object keys
* @param valueExpr Expression for object values
* @return Aggregated JSON object string
*/
def jsonObjectAgg(onNull: JsonOnNull, keyExpr: Expression, valueExpr: Expression): ExpressionJSON Null Handling:
// JsonOnNull enumeration values:
// JsonOnNull.NULL - Include null values as JSON null
// JsonOnNull.ABSENT - Exclude null values from objectUsage:
// Static JSON object construction
table.select(
jsonObject(JsonOnNull.NULL) as "emptyObject", // {}
jsonObject(
JsonOnNull.NULL,
"name", $"userName",
"age", $"userAge",
"active", $"isActive"
) as "userObject"
)
// Null handling examples
table.select(
jsonObject(
JsonOnNull.NULL,
"value", nullOf(DataTypes.STRING())
) as "withNull", // {"value":null}
jsonObject(
JsonOnNull.ABSENT,
"value", nullOf(DataTypes.STRING())
) as "withoutNull" // {}
)
// Nested JSON objects
table.select(
jsonObject(
JsonOnNull.NULL,
"user", jsonObject(JsonOnNull.NULL, "name", $"name", "id", $"id"),
"metadata", jsonObject(JsonOnNull.ABSENT, "created", $"createdAt")
) as "nestedObject"
)
// Aggregated JSON objects
orders
.groupBy($"customerId")
.select(
$"customerId",
jsonObjectAgg(JsonOnNull.NULL, $"product", $"quantity") as "orderSummary"
)
// Result: {"Apple":2,"Banana":17,"Orange":0}Functions for building JSON arrays from value lists.
/**
* Builds JSON array string from list of values
* Values can be arbitrary expressions
* @param onNull Behavior for NULL values (JsonOnNull.NULL or JsonOnNull.ABSENT)
* @param values List of value expressions for array elements
* @return JSON array string
*/
def jsonArray(onNull: JsonOnNull, values: Expression*): Expression
/**
* Builds JSON array by aggregating items into an array
* Not supported in OVER windows, unbounded session windows, or hop windows
* @param onNull Behavior for NULL values
* @param itemExpr Expression for array items
* @return Aggregated JSON array string
*/
def jsonArrayAgg(onNull: JsonOnNull, itemExpr: Expression): ExpressionUsage:
// Static JSON array construction
table.select(
jsonArray(JsonOnNull.NULL) as "emptyArray", // []
jsonArray(JsonOnNull.NULL, 1, "hello", true) as "mixedArray" // [1,"hello",true]
)
// Arrays with expressions
table.select(
jsonArray(JsonOnNull.NULL, $"id", $"name", $"score") as "recordArray"
)
// Null handling in arrays
table.select(
jsonArray(
JsonOnNull.NULL,
$"value1",
nullOf(DataTypes.STRING()),
$"value2"
) as "withNulls", // ["val1",null,"val2"]
jsonArray(
JsonOnNull.ABSENT,
$"value1",
nullOf(DataTypes.STRING()),
$"value2"
) as "withoutNulls" // ["val1","val2"]
)
// Nested JSON arrays
table.select(
jsonArray(
JsonOnNull.NULL,
jsonArray(JsonOnNull.NULL, 1, 2),
jsonArray(JsonOnNull.NULL, 3, 4)
) as "nestedArrays" // [[1,2],[3,4]]
)
// Aggregated JSON arrays
products
.groupBy($"category")
.select(
$"category",
jsonArrayAgg(JsonOnNull.NULL, $"productName") as "productList"
)
// Result: ["Apple","Banana","Orange"]Combining object and array construction for complex JSON structures.
Usage:
// Complex nested structures
table.select(
jsonObject(
JsonOnNull.NULL,
"user", jsonObject(
JsonOnNull.NULL,
"id", $"userId",
"name", $"userName",
"preferences", jsonArray(JsonOnNull.ABSENT, $"pref1", $"pref2", $"pref3")
),
"orders", jsonArray(
JsonOnNull.NULL,
jsonObject(JsonOnNull.NULL, "id", $"orderId1", "total", $"total1"),
jsonObject(JsonOnNull.NULL, "id", $"orderId2", "total", $"total2")
),
"metadata", jsonObject(
JsonOnNull.ABSENT,
"lastLogin", $"lastLogin",
"ipAddress", $"ipAddress"
)
) as "userProfile"
)
// Dynamic JSON based on conditions
table.select(
jsonObject(
JsonOnNull.ABSENT,
"status", $"status",
"message", ($"hasError") ? $"errorMessage" : nullOf(DataTypes.STRING()),
"data", ($"hasData") ? jsonObject(
JsonOnNull.NULL,
"items", jsonArrayAgg(JsonOnNull.NULL, $"itemName")
) : nullOf(DataTypes.STRING())
) as "response"
)Common patterns for aggregating data into JSON structures.
Usage:
// Group multiple columns into JSON objects
salesData
.groupBy($"region")
.select(
$"region",
jsonObjectAgg(JsonOnNull.NULL, $"month", $"revenue") as "monthlyRevenue",
jsonArrayAgg(JsonOnNull.ABSENT, $"topProduct") as "topProducts"
)
// Create JSON arrays of objects
orders
.groupBy($"customerId")
.select(
$"customerId",
jsonArrayAgg(
JsonOnNull.NULL,
jsonObject(
JsonOnNull.NULL,
"orderId", $"orderId",
"amount", $"amount",
"date", $"orderDate"
)
) as "orderHistory"
)
// Conditional JSON construction
events
.groupBy($"sessionId")
.select(
$"sessionId",
jsonObject(
JsonOnNull.ABSENT,
"events", jsonArrayAgg(JsonOnNull.NULL, $"eventType"),
"errors", ($"hasErrors") ? jsonArrayAgg(JsonOnNull.NULL, $"errorCode") : nullOf(DataTypes.STRING()),
"duration", $"sessionDuration"
) as "sessionSummary"
)Important limitations and considerations for JSON functions.
Limitations:
jsonObjectAgg: Not supported in OVER windowsjsonArrayAgg: Not supported in OVER windows, unbounded session windows, or hop windowsjsonObject must be string literals (not expressions)jsonObjectAgg must be non-nullable character stringsjsonObjectAgg will cause errorsUsage Considerations:
// Valid key usage
table.select(
jsonObject(JsonOnNull.NULL, "staticKey", $"value") // ✓ Literal key
)
// Invalid key usage - will not compile
// table.select(
// jsonObject(JsonOnNull.NULL, $"dynamicKey", $"value") // ✗ Expression key
// )
// Use jsonObjectAgg for dynamic keys
table
.groupBy($"groupId")
.select(
jsonObjectAgg(JsonOnNull.NULL, $"dynamicKey", $"value") // ✓ Dynamic keys via aggregation
)
// Error handling for duplicate keys
uniqueKeys
.groupBy($"category")
.select(
jsonObjectAgg(JsonOnNull.NULL, $"uniqueKey", $"value") // Ensure key uniqueness
)