This document covers time-based and count-based windowing operations for streaming data analysis in Apache Flink Table Uber Blink.
// In Table API
Table table = tableEnv.fromDataStream(dataStream,
$("user_id"),
$("data"),
$("proc_time").proctime()
);
// In SQL DDL
tEnv.executeSql(
"CREATE TABLE events (" +
" user_id BIGINT," +
" data STRING," +
" proc_time AS PROCTIME()" +
") WITH (...)"
);// With watermark in DDL
tEnv.executeSql(
"CREATE TABLE events (" +
" user_id BIGINT," +
" event_time TIMESTAMP(3)," +
" data STRING," +
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (...)"
);
// In Table API
Table table = tableEnv.fromDataStream(watermarkedStream,
$("user_id"),
$("event_time").rowtime(),
$("data")
);class Tumble {
static TumbleWithSize over(Expression size);
}
interface TumbleWithSize {
TumbleWithSizeOnTime on(Expression timeField);
}
interface TumbleWithSizeOnTime {
GroupWindow as(String alias);
}Usage:
Table result = table
.window(Tumble.over(lit(5).minutes()).on($("event_time")).as("w"))
.groupBy($("user_id"), $("w"))
.select($("user_id"), $("w").start(), $("w").end(), $("data").count());class Slide {
static SlideWithSize over(Expression size);
}
interface SlideWithSize {
SlideWithSizeAndSlide every(Expression slide);
}
interface SlideWithSizeAndSlide {
SlideWithSizeAndSlideOnTime on(Expression timeField);
}Usage:
Table result = table
.window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($("event_time")).as("w"))
.groupBy($("user_id"), $("w"))
.select($("user_id"), $("w").start(), $("w").end(), $("data").count());class Session {
static SessionWithGap withGap(Expression gap);
}
interface SessionWithGap {
SessionWithGapOnTime on(Expression timeField);
}Usage:
Table result = table
.window(Session.withGap(lit(30).minutes()).on($("event_time")).as("w"))
.groupBy($("user_id"), $("w"))
.select($("user_id"), $("w").start(), $("w").end(), $("data").count());-- TUMBLE function
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end,
COUNT(*) as event_count
FROM events
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);
-- TUMBLE_ROWTIME and TUMBLE_PROCTIME
SELECT
user_id,
TUMBLE_ROWTIME(event_time, INTERVAL '5' MINUTE) as window_rowtime,
TUMBLE_PROCTIME(event_time, INTERVAL '5' MINUTE) as window_proctime,
COUNT(*) as event_count
FROM events
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);-- HOP function (sliding window)
SELECT
user_id,
HOP_START(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE) as window_start,
HOP_END(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE) as window_end,
COUNT(*) as event_count
FROM events
GROUP BY user_id, HOP(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE);-- SESSION function
SELECT
user_id,
SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,
SESSION_END(event_time, INTERVAL '30' MINUTE) as session_end,
COUNT(*) as event_count
FROM events
GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE);-- OVER clause with unbounded preceding
SELECT
user_id,
event_time,
data,
COUNT(*) OVER (
PARTITION BY user_id
ORDER BY event_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_count,
SUM(amount) OVER (
PARTITION BY user_id
ORDER BY event_time
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_sum
FROM events;-- Sliding window with OVER
SELECT
user_id,
event_time,
data,
COUNT(*) OVER (
PARTITION BY user_id
ORDER BY event_time
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
) as count_last_10,
AVG(amount) OVER (
PARTITION BY user_id
ORDER BY event_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) as avg_last_hour
FROM events;interface Table {
OverWindowedTable window(OverWindow overWindow);
}
class Over {
static OverWindowPartitionedOrderedPreceding partitionBy(Expression... fields);
static OverWindowPartitionedOrdered orderBy(Expression field);
}Usage:
Table result = table
.window(Over.partitionBy($("user_id")).orderBy($("event_time")).preceding(UNBOUNDED_ROW).as("w"))
.select($("user_id"), $("event_time"), $("data"), $("data").count().over($("w")));-- TUMBLE TVF (Flink 1.13+)
SELECT
window_start,
window_end,
user_id,
COUNT(*) as event_count
FROM TABLE(TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE))
GROUP BY window_start, window_end, user_id;-- HOP TVF
SELECT
window_start,
window_end,
user_id,
COUNT(*) as event_count
FROM TABLE(HOP(TABLE events, DESCRIPTOR(event_time), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))
GROUP BY window_start, window_end, user_id;-- SESSION TVF
SELECT
window_start,
window_end,
user_id,
COUNT(*) as event_count
FROM TABLE(SESSION(TABLE events, DESCRIPTOR(event_time), DESCRIPTOR(user_id), INTERVAL '30' MINUTE))
GROUP BY window_start, window_end, user_id;-- Time-based interval join
SELECT
o.order_id,
o.user_id,
o.order_time,
p.payment_id,
p.payment_time
FROM orders o
JOIN payments p ON o.order_id = p.order_id
AND p.payment_time BETWEEN o.order_time - INTERVAL '1' HOUR
AND o.order_time + INTERVAL '1' HOUR;// Register temporal table
tEnv.createTemporaryView("rates_temporal",
rates.createTemporalTableFunction($("update_time"), $("currency")));
// Temporal join in SQL
Table result = tEnv.sqlQuery(
"SELECT " +
" o.order_id, " +
" o.amount, " +
" o.currency, " +
" r.rate, " +
" o.amount * r.rate as amount_usd " +
"FROM orders o " +
"JOIN rates_temporal FOR SYSTEM_TIME AS OF o.order_time AS r " +
"ON o.currency = r.currency"
);// Bounded out-of-orderness
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// Monotonous timestamps
WatermarkStrategy<Event> monotonous = WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// Custom watermark generator
WatermarkStrategy<Event> custom = WatermarkStrategy
.forGenerator(ctx -> new CustomWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());// Configure late data handling
Configuration config = tEnv.getConfig().getConfiguration();
config.setString("table.exec.emit.late-fire.enabled", "true");
config.setString("table.exec.emit.late-fire.delay", "5 s");
// Side output for late data
DataStream<Event> lateEvents = mainStream
.assignTimestampsAndWatermarks(watermarkStrategy)
.process(new ProcessFunction<Event, Event>() {
private OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) {
if (event.getTimestamp() < ctx.timerService().currentWatermark()) {
ctx.output(lateOutputTag, event);
} else {
out.collect(event);
}
}
});-- Count, sum, average
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
COUNT(*) as event_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount,
MIN(amount) as min_amount,
MAX(amount) as max_amount
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE);
-- Statistical functions
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
STDDEV_POP(amount) as stddev,
VAR_SAMP(amount) as variance,
COLLECT(user_id) as user_list,
LISTAGG(event_type, ',') as event_types
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE);interface GroupWindow {
Expression getTimeField();
Expression getSize();
String getAlias();
}
class TumbleWithSize implements GroupWindow;
class SlideWithSizeAndSlide implements GroupWindow;
class SessionWithGap implements GroupWindow;
interface OverWindow {
Expression getPartitioning();
Expression getOrder();
Expression getPreceding();
Expression getFollowing();
String getAlias();
}
interface WindowGroupedTable extends Table {
Table select(Expression... fields);
AggregatedTable aggregate(Expression aggregateFunction);
FlatAggregateTable flatAggregate(Expression tableAggregateFunction);
}
interface OverWindowedTable extends Table {
Table select(Expression... fields);
}
// Window bounds
class UNBOUNDED_ROW;
class UNBOUNDED_RANGE;
class CURRENT_ROW;
class CURRENT_RANGE;