Apache Flink SQL, Table API, and UDF development for both OSS Flink and Confluent Cloud
95
Does it follow best practices?
Evaluation — 97%
↑ 1.21xAgent success when using this tile
Validation for skill structure
Stream processing with SQL semantics. Tables that change over time — think of them as movies, not photographs.
| Task | Approach |
|---|---|
| Simple transformations | Flink SQL SELECT/WHERE/GROUP BY |
| Windowed aggregation | Window TVFs (TUMBLE/HOP/SESSION/CUMULATE) |
| Pattern detection | MATCH_RECOGNIZE |
| Custom scalar logic | UDF (ScalarFunction) |
| One-to-many expansion | UDTF (TableFunction) |
| Stateful processing | Process Table Function (PTF) — see ptf-guide.md |
| Join streams | Interval joins, temporal joins, lookup joins |
| Deduplication | ROW_NUMBER() with OVER clause |
| Top-N queries | ROW_NUMBER() OVER (ORDER BY ...) |
Traditional DB: Table = snapshot (photograph)
Flink: Table = changelog (movie)
Every INSERT/UPDATE/DELETE is an event in the changelog.
SQL queries become continuous — results update as data arrives.Changelog modes:
Java Table API:
<dependency>
<groupId>io.confluent.flink</groupId>
<artifactId>confluent-flink-table-api-java-plugin</artifactId>
<version>2.1-8</version>
</dependency>import io.confluent.flink.plugin.ConfluentSettings;
TableEnvironment env = TableEnvironment.create(
ConfluentSettings.fromGlobalVariables()
);Python Table API:
pip install confluent-flink-table-api-python-pluginfrom pyflink.table.confluent import ConfluentSettings
from pyflink.table import TableEnvironment
settings = ConfluentSettings.from_global_variables()
env = TableEnvironment.create(settings)Required environment variables:
export CLOUD_PROVIDER="aws"
export CLOUD_REGION="us-east-1"
export FLINK_API_KEY="<key>"
export FLINK_API_SECRET="<secret>"
export ORG_ID="<org-id>"
export ENV_ID="<env-id>"
export COMPUTE_POOL_ID="<pool-id>"EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment env = TableEnvironment.create(settings);Tumbling window (fixed, non-overlapping):
SELECT
window_start, window_end,
COUNT(*) as cnt,
SUM(amount) as total
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;Hopping window (overlapping):
SELECT window_start, window_end, AVG(price)
FROM TABLE(
HOP(TABLE trades, DESCRIPTOR(ts), INTERVAL '5' MINUTE, INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;Session window (gap-based):
SELECT window_start, window_end, user_id, COUNT(*)
FROM TABLE(
SESSION(TABLE clicks, DESCRIPTOR(click_time), INTERVAL '30' MINUTE)
)
GROUP BY window_start, window_end, user_id;Interval join (time-bounded):
SELECT o.*, s.ship_time
FROM orders o, shipments s
WHERE o.order_id = s.order_id
AND s.ship_time BETWEEN o.order_time AND o.order_time + INTERVAL '4' HOUR;Temporal join (point-in-time lookup):
SELECT o.*, r.rate
FROM orders o
JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency;Lookup join (external table):
SELECT o.*, c.name
FROM orders o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_time DESC) AS rn
FROM events
)
WHERE rn = 1;SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rn
FROM products
)
WHERE rn <= 10;SELECT *
FROM orders
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
FIRST(A.event_time) AS start_time,
LAST(B.event_time) AS end_time,
COUNT(A.amount) AS cnt
ONE ROW PER MATCH
PATTERN (A+ B)
DEFINE
A AS A.amount < 100,
B AS B.amount >= 100
);Table orders = env.from("orders");
Table result = orders
.filter($("status").isEqual("completed"))
.select($("order_id"), $("amount"), $("customer_id"))
.groupBy($("customer_id"))
.select($("customer_id"), $("amount").sum().as("total"));// SQL → Table
Table fromSql = env.sqlQuery("SELECT * FROM orders WHERE amount > 100");
// Table → SQL
env.createTemporaryView("filtered_orders", fromSql);
Table fromTable = env.sqlQuery("SELECT customer_id, SUM(amount) FROM filtered_orders GROUP BY customer_id");// Print results (limited)
ConfluentTools.printMaterializedLimit(table, 100);
// Collect results
List<Row> rows = ConfluentTools.collectMaterializedLimit(table, 100);
// Statement lifecycle
TableResult result = env.executeSql("SELECT * FROM orders");
String statementName = ConfluentTools.getStatementName(result);
ConfluentTools.stopStatement(result);TableDescriptor descriptor = ConfluentTableDescriptor.forManaged()
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("data", DataTypes.STRING())
.watermark("$rowtime", $("$rowtime").minus(lit(5).seconds()))
.build())
.build();
env.createTable("my_table", descriptor);For UDF development patterns, templates, and deployment: See udf-guide.md
public class MyUpperCase extends ScalarFunction {
public String eval(String s) {
return s == null ? null : s.toUpperCase();
}
}
// Register and use
env.createTemporaryFunction("my_upper", MyUpperCase.class);
env.sqlQuery("SELECT my_upper(name) FROM users");# Build JAR
mvn clean package
# Upload artifact
confluent flink artifact create my-udf \
--cloud aws --region us-east-1 \
--artifact-file target/my-udf-1.0.jar
# Register function
CREATE FUNCTION my_upper
AS 'com.example.MyUpperCase'
USING JAR 'confluent-artifact://cfa-xxxxx';Declare watermark in DDL:
CREATE TABLE events (
event_id STRING,
event_time TIMESTAMP(3),
payload STRING,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (...);Watermark strategies:
event_time - INTERVAL 'n' SECOND — bounded out-of-ordernessevent_time — strictly ascending (no late data)SOURCE_WATERMARK() — preserve source watermarks| Error | Cause | Fix |
|---|---|---|
Cannot resolve watermark | Missing watermark declaration | Add WATERMARK FOR col AS ... |
Schema mismatch | Column types don't align | Check data types with DESCRIBE table |
State too large | Unbounded aggregation | Add state TTL or use windows |
Late data dropped | Watermark too aggressive | Increase watermark delay |
UDF not found | Function not registered | Check catalog/database scope |
For detailed troubleshooting: See troubleshooting.md
Install with Tessl CLI
npx tessl i gamussa/flink-sql@1.0.0