or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md
tile.json

window-operations.mddocs/

Window Operations and Time Processing

This document covers time-based and count-based windowing operations for streaming data analysis in Apache Flink Table Uber Blink.

Time Attributes

Processing Time

// In Table API
Table table = tableEnv.fromDataStream(dataStream, 
    $("user_id"), 
    $("data"), 
    $("proc_time").proctime()
);

// In SQL DDL
tEnv.executeSql(
    "CREATE TABLE events (" +
    "  user_id BIGINT," +
    "  data STRING," +
    "  proc_time AS PROCTIME()" +
    ") WITH (...)"
);

Event Time

// With watermark in DDL
tEnv.executeSql(
    "CREATE TABLE events (" +
    "  user_id BIGINT," +
    "  event_time TIMESTAMP(3)," +
    "  data STRING," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
    ") WITH (...)"
);

// In Table API
Table table = tableEnv.fromDataStream(watermarkedStream,
    $("user_id"),
    $("event_time").rowtime(),
    $("data")
);

Group Windows (Table API)

Tumbling Windows

class Tumble {
    static TumbleWithSize over(Expression size);
}

interface TumbleWithSize {
    TumbleWithSizeOnTime on(Expression timeField);
}

interface TumbleWithSizeOnTime {
    GroupWindow as(String alias);
}

Usage:

Table result = table
    .window(Tumble.over(lit(5).minutes()).on($("event_time")).as("w"))
    .groupBy($("user_id"), $("w"))
    .select($("user_id"), $("w").start(), $("w").end(), $("data").count());

Sliding Windows

class Slide {
    static SlideWithSize over(Expression size);
}

interface SlideWithSize {
    SlideWithSizeAndSlide every(Expression slide);
}

interface SlideWithSizeAndSlide {
    SlideWithSizeAndSlideOnTime on(Expression timeField);
}

Usage:

Table result = table
    .window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($("event_time")).as("w"))
    .groupBy($("user_id"), $("w"))
    .select($("user_id"), $("w").start(), $("w").end(), $("data").count());

Session Windows

class Session {
    static SessionWithGap withGap(Expression gap);
}

interface SessionWithGap {
    SessionWithGapOnTime on(Expression timeField);
}

Usage:

Table result = table
    .window(Session.withGap(lit(30).minutes()).on($("event_time")).as("w"))
    .groupBy($("user_id"), $("w"))
    .select($("user_id"), $("w").start(), $("w").end(), $("data").count());

Window SQL Functions

Tumbling Window Functions

-- TUMBLE function
SELECT 
    user_id,
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
    TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end,
    COUNT(*) as event_count
FROM events
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);

-- TUMBLE_ROWTIME and TUMBLE_PROCTIME
SELECT 
    user_id,
    TUMBLE_ROWTIME(event_time, INTERVAL '5' MINUTE) as window_rowtime,
    TUMBLE_PROCTIME(event_time, INTERVAL '5' MINUTE) as window_proctime,
    COUNT(*) as event_count
FROM events
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);

Sliding Window Functions

-- HOP function (sliding window)
SELECT 
    user_id,
    HOP_START(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE) as window_start,
    HOP_END(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE) as window_end,
    COUNT(*) as event_count
FROM events
GROUP BY user_id, HOP(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE);

Session Window Functions

-- SESSION function
SELECT 
    user_id,
    SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,
    SESSION_END(event_time, INTERVAL '30' MINUTE) as session_end,
    COUNT(*) as event_count
FROM events  
GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE);

Over Windows (Window Aggregations)

Unbounded Over Windows

-- OVER clause with unbounded preceding
SELECT 
    user_id,
    event_time,
    data,
    COUNT(*) OVER (
        PARTITION BY user_id 
        ORDER BY event_time 
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as running_count,
    SUM(amount) OVER (
        PARTITION BY user_id 
        ORDER BY event_time 
        RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as running_sum
FROM events;

Bounded Over Windows

-- Sliding window with OVER
SELECT 
    user_id,
    event_time,
    data,
    COUNT(*) OVER (
        PARTITION BY user_id 
        ORDER BY event_time 
        ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
    ) as count_last_10,
    AVG(amount) OVER (
        PARTITION BY user_id 
        ORDER BY event_time 
        RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
    ) as avg_last_hour
FROM events;

Over Window in Table API

interface Table {
    OverWindowedTable window(OverWindow overWindow);
}

class Over {
    static OverWindowPartitionedOrderedPreceding partitionBy(Expression... fields);
    static OverWindowPartitionedOrdered orderBy(Expression field);
}

Usage:

Table result = table
    .window(Over.partitionBy($("user_id")).orderBy($("event_time")).preceding(UNBOUNDED_ROW).as("w"))
    .select($("user_id"), $("event_time"), $("data"), $("data").count().over($("w")));

Window TVF (Table-Valued Functions)

Tumble TVF

-- TUMBLE TVF (Flink 1.13+)
SELECT 
    window_start,
    window_end,
    user_id,
    COUNT(*) as event_count
FROM TABLE(TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE))
GROUP BY window_start, window_end, user_id;

Hop TVF

-- HOP TVF  
SELECT 
    window_start,
    window_end,
    user_id,
    COUNT(*) as event_count
FROM TABLE(HOP(TABLE events, DESCRIPTOR(event_time), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))
GROUP BY window_start, window_end, user_id;

Session TVF

-- SESSION TVF
SELECT 
    window_start,
    window_end,
    user_id,
    COUNT(*) as event_count
FROM TABLE(SESSION(TABLE events, DESCRIPTOR(event_time), DESCRIPTOR(user_id), INTERVAL '30' MINUTE))
GROUP BY window_start, window_end, user_id;

Time-based Joins

Interval Joins

-- Time-based interval join
SELECT 
    o.order_id,
    o.user_id,
    o.order_time,
    p.payment_id,
    p.payment_time
FROM orders o
JOIN payments p ON o.order_id = p.order_id
    AND p.payment_time BETWEEN o.order_time - INTERVAL '1' HOUR 
                           AND o.order_time + INTERVAL '1' HOUR;

Temporal Joins

// Register temporal table
tEnv.createTemporaryView("rates_temporal", 
    rates.createTemporalTableFunction($("update_time"), $("currency")));

// Temporal join in SQL
Table result = tEnv.sqlQuery(
    "SELECT " +
    "  o.order_id, " +
    "  o.amount, " +
    "  o.currency, " +
    "  r.rate, " +
    "  o.amount * r.rate as amount_usd " +
    "FROM orders o " +
    "JOIN rates_temporal FOR SYSTEM_TIME AS OF o.order_time AS r " +
    "ON o.currency = r.currency"
);

Watermarks and Late Data

Watermark Strategies

// Bounded out-of-orderness
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Monotonous timestamps
WatermarkStrategy<Event> monotonous = WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Custom watermark generator
WatermarkStrategy<Event> custom = WatermarkStrategy
    .forGenerator(ctx -> new CustomWatermarkGenerator())
    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

Late Data Handling

// Configure late data handling
Configuration config = tEnv.getConfig().getConfiguration();
config.setString("table.exec.emit.late-fire.enabled", "true");
config.setString("table.exec.emit.late-fire.delay", "5 s");

// Side output for late data
DataStream<Event> lateEvents = mainStream
    .assignTimestampsAndWatermarks(watermarkStrategy)
    .process(new ProcessFunction<Event, Event>() {
        private OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};
        
        @Override
        public void processElement(Event event, Context ctx, Collector<Event> out) {
            if (event.getTimestamp() < ctx.timerService().currentWatermark()) {
                ctx.output(lateOutputTag, event);
            } else {
                out.collect(event);
            }
        }
    });

Window Aggregations

Built-in Aggregation Functions

-- Count, sum, average
SELECT 
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
    COUNT(*) as event_count,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount,
    MIN(amount) as min_amount,
    MAX(amount) as max_amount
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE);

-- Statistical functions
SELECT 
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
    STDDEV_POP(amount) as stddev,
    VAR_SAMP(amount) as variance,
    COLLECT(user_id) as user_list,
    LISTAGG(event_type, ',') as event_types
FROM events
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE);

Types

interface GroupWindow {
    Expression getTimeField();
    Expression getSize();
    String getAlias();
}

class TumbleWithSize implements GroupWindow;
class SlideWithSizeAndSlide implements GroupWindow;
class SessionWithGap implements GroupWindow;

interface OverWindow {
    Expression getPartitioning();
    Expression getOrder();
    Expression getPreceding();
    Expression getFollowing();  
    String getAlias();
}

interface WindowGroupedTable extends Table {
    Table select(Expression... fields);
    AggregatedTable aggregate(Expression aggregateFunction);
    FlatAggregateTable flatAggregate(Expression tableAggregateFunction);
}

interface OverWindowedTable extends Table {
    Table select(Expression... fields);
}

// Window bounds
class UNBOUNDED_ROW;
class UNBOUNDED_RANGE;
class CURRENT_ROW;
class CURRENT_RANGE;