Java API for Apache Flink's Table ecosystem, enabling type-safe table operations and SQL query execution.
—
Window operations enable time-based and count-based processing for streaming data. Flink supports multiple window types including tumbling, sliding, session windows, and over windows for different analytical scenarios.
Fixed-size, non-overlapping windows that partition data into distinct time segments.
public final class Tumble {
/**
* Creates a tumbling window with the specified size
* @param size Window size expression (duration)
* @return TumbleWithSize for further configuration
*/
public static TumbleWithSize over(Expression size);
}
public final class TumbleWithSize {
/**
* Specifies the time attribute for the tumbling window
* @param timeField Time attribute column
* @return TumbleWithSizeOnTime for alias configuration
*/
public TumbleWithSizeOnTime on(Expression timeField);
}
public final class TumbleWithSizeOnTime {
/**
* Assigns an alias to the window for reference in selection
* @param alias Window alias name
* @return GroupWindow that can be used with groupBy
*/
public GroupWindow as(String alias);
}Usage Examples:
import static org.apache.flink.table.api.Expressions.*;
// 1-hour tumbling window
GroupWindow hourlyWindow = Tumble
.over(lit(1).hour())
.on($("event_time"))
.as("hourly_window");
Table hourlyStats = sourceTable
.window(hourlyWindow)
.groupBy($("hourly_window"), $("category"))
.select(
$("category"),
$("hourly_window").start().as("window_start"),
$("hourly_window").end().as("window_end"),
count($("*")).as("event_count"),
sum($("amount")).as("total_amount")
);
// 15-minute tumbling window
GroupWindow quarterHourWindow = Tumble
.over(lit(15).minute())
.on($("process_time"))
.as("quarter_hour");
Table frequentStats = sourceTable
.window(quarterHourWindow)
.groupBy($("quarter_hour"))
.select(
$("quarter_hour").start().as("period_start"),
count($("*")).as("transaction_count"),
avg($("value")).as("avg_transaction_value")
);Fixed-size, overlapping windows that slide by a specified interval, useful for moving averages and trend analysis.
public final class Slide {
/**
* Creates a sliding window with the specified size
* @param size Window size expression (duration)
* @return SlideWithSize for slide interval configuration
*/
public static SlideWithSize over(Expression size);
}
public final class SlideWithSize {
/**
* Specifies the slide interval for the window
* @param slide Slide interval expression (duration)
* @return SlideWithSizeAndSlide for time field configuration
*/
public SlideWithSizeAndSlide every(Expression slide);
}
public final class SlideWithSizeAndSlide {
/**
* Specifies the time attribute for the sliding window
* @param timeField Time attribute column
* @return SlideWithSizeAndSlideOnTime for alias configuration
*/
public SlideWithSizeAndSlideOnTime on(Expression timeField);
}
public final class SlideWithSizeAndSlideOnTime {
/**
* Assigns an alias to the sliding window
* @param alias Window alias name
* @return GroupWindow for use with groupBy
*/
public GroupWindow as(String alias);
}Usage Examples:
// 1-hour window sliding every 15 minutes
GroupWindow slidingWindow = Slide
.over(lit(1).hour())
.every(lit(15).minute())
.on($("event_time"))
.as("sliding_window");
Table movingAverages = sourceTable
.window(slidingWindow)
.groupBy($("sliding_window"), $("sensor_id"))
.select(
$("sensor_id"),
$("sliding_window").start().as("window_start"),
$("sliding_window").end().as("window_end"),
avg($("temperature")).as("avg_temperature"),
count($("*")).as("reading_count")
);
// 30-minute window sliding every 5 minutes for real-time monitoring
GroupWindow realtimeWindow = Slide
.over(lit(30).minute())
.every(lit(5).minute())
.on($("processing_time"))
.as("realtime_window");
Table realtimeMetrics = sourceTable
.window(realtimeWindow)
.groupBy($("realtime_window"))
.select(
$("realtime_window").start().as("period_start"),
count($("*")).as("event_rate"),
sum($("bytes")).as("total_bytes"),
max($("latency")).as("max_latency")
);Dynamic windows that group events based on activity sessions with configurable gap timeouts.
public final class Session {
/**
* Creates a session window with the specified gap
* @param gap Session gap expression (duration of inactivity)
* @return SessionWithGap for time field configuration
*/
public static SessionWithGap withGap(Expression gap);
}
public final class SessionWithGap {
/**
* Specifies the time attribute for the session window
* @param timeField Time attribute column
* @return SessionWithGapOnTime for alias configuration
*/
public SessionWithGapOnTime on(Expression timeField);
}
public final class SessionWithGapOnTime {
/**
* Assigns an alias to the session window
* @param alias Window alias name
* @return GroupWindow for use with groupBy
*/
public GroupWindow as(String alias);
}Usage Examples:
// Session window with 30-minute inactivity gap
GroupWindow userSession = Session
.withGap(lit(30).minute())
.on($("event_time"))
.as("user_session");
Table sessionAnalysis = sourceTable
.window(userSession)
.groupBy($("user_session"), $("user_id"))
.select(
$("user_id"),
$("user_session").start().as("session_start"),
$("user_session").end().as("session_end"),
count($("*")).as("actions_in_session"),
sum($("duration")).as("total_session_time"),
max($("page_views")).as("max_page_views")
);
// Short session window for detecting bursts of activity
GroupWindow activityBurst = Session
.withGap(lit(2).minute())
.on($("event_time"))
.as("activity_burst");
Table burstDetection = sourceTable
.window(activityBurst)
.groupBy($("activity_burst"), $("device_id"))
.select(
$("device_id"),
$("activity_burst").start().as("burst_start"),
$("activity_burst").end().as("burst_end"),
count($("*")).as("burst_event_count")
)
.filter($("burst_event_count").isGreater(10)); // Only high-activity burstsUnbounded or bounded windows for analytical functions like ranking, cumulative sums, and moving averages without explicit grouping.
public final class Over {
/**
* Creates an Over window partitioned by specified fields
* @param partitionBy Fields to partition the window by
* @return OverWindowPartitioned for ordering configuration
*/
public static OverWindowPartitioned partitionBy(Expression... partitionBy);
/**
* Creates an Over window with ordering but no partitioning
* @param orderBy Fields to order the window by
* @return OverWindowPartitionedOrdered for range/rows configuration
*/
public static OverWindowPartitionedOrdered orderBy(Expression... orderBy);
}
public interface OverWindowPartitioned {
/**
* Specifies ordering for the partitioned over window
* @param orderBy Ordering expressions
* @return OverWindowPartitionedOrdered for range/rows configuration
*/
OverWindowPartitionedOrdered orderBy(Expression... orderBy);
}
public interface OverWindowPartitionedOrdered {
/**
* Creates a preceding rows window
* @param preceding Number of preceding rows
* @return OverWindowPartitionedOrderedPreceding for alias configuration
*/
OverWindowPartitionedOrderedPreceding preceding(Expression preceding);
/**
* Creates an unbounded preceding window
* @return OverWindow for alias configuration
*/
OverWindow unboundedPreceding();
/**
* Creates a current row window
* @return OverWindow for alias configuration
*/
OverWindow currentRow();
}
public interface OverWindow {
/**
* Assigns an alias to the over window
* @param alias Window alias name
* @return OverWindow with alias
*/
OverWindow as(String alias);
}Usage Examples:
// Running totals and cumulative calculations
OverWindow cumulativeWindow = Over
.partitionBy($("customer_id"))
.orderBy($("order_date"))
.unboundedPreceding()
.as("cumulative");
Table runningTotals = sourceTable
.window(cumulativeWindow)
.select(
$("customer_id"),
$("order_date"),
$("amount"),
sum($("amount")).over($("cumulative")).as("running_total"),
count($("*")).over($("cumulative")).as("order_sequence"),
row_number().over($("cumulative")).as("order_rank")
);
// Moving averages with bounded windows
OverWindow movingAvgWindow = Over
.partitionBy($("product_id"))
.orderBy($("sale_date"))
.preceding(lit(6)) // 7-day moving window (6 preceding + current)
.as("weekly_window");
Table movingAverages = sourceTable
.window(movingAvgWindow)
.select(
$("product_id"),
$("sale_date"),
$("daily_sales"),
avg($("daily_sales")).over($("weekly_window")).as("weekly_avg_sales"),
sum($("daily_sales")).over($("weekly_window")).as("weekly_total_sales")
);
// Ranking and analytical functions
OverWindow rankingWindow = Over
.partitionBy($("department"))
.orderBy($("salary").desc())
.currentRow()
.as("dept_ranking");
Table employeeRanking = sourceTable
.window(rankingWindow)
.select(
$("employee_id"),
$("name"),
$("department"),
$("salary"),
row_number().over($("dept_ranking")).as("salary_rank"),
rank().over($("dept_ranking")).as("salary_dense_rank"),
lag($("salary"), 1).over($("dept_ranking")).as("next_highest_salary")
);Special functions available for window operations and time manipulation.
// Time interval expressions
public static Expression lit(long value).year();
public static Expression lit(long value).month();
public static Expression lit(long value).day();
public static Expression lit(long value).hour();
public static Expression lit(long value).minute();
public static Expression lit(long value).second();
public static Expression lit(long value).milli();
// Window functions
public static Expression rowNumber();
public static Expression rank();
public static Expression denseRank();
public static Expression lag(Expression field, int offset);
public static Expression lead(Expression field, int offset);
public static Expression firstValue(Expression field);
public static Expression lastValue(Expression field);
// Window start/end functions (for group windows)
// Available as methods on window alias expressions
// $("window_alias").start()
// $("window_alias").end()Usage Examples:
// Time interval examples
GroupWindow dailyWindow = Tumble
.over(lit(1).day())
.on($("event_time"))
.as("daily");
GroupWindow weeklySliding = Slide
.over(lit(7).day())
.every(lit(1).day())
.on($("event_time"))
.as("weekly_sliding");
// Analytical window functions
OverWindow analyticalWindow = Over
.partitionBy($("category"))
.orderBy($("created_date"))
.unboundedPreceding()
.as("analytical");
Table analyticalResults = sourceTable
.window(analyticalWindow)
.select(
$("id"),
$("category"),
$("value"),
$("created_date"),
rowNumber().over($("analytical")).as("row_num"),
rank().over($("analytical")).as("rank"),
lag($("value"), 1).over($("analytical")).as("prev_value"),
lead($("value"), 1).over($("analytical")).as("next_value"),
firstValue($("value")).over($("analytical")).as("first_in_category"),
lastValue($("value")).over($("analytical")).as("current_last")
);Tables with window grouping applied support aggregation operations.
public interface WindowGroupedTable {
/**
* Performs selection with aggregation on windowed data
* @param fields Selection expressions including aggregates and window functions
* @return Table with windowed aggregation results
*/
Table select(Expression... fields);
}Advanced windowing patterns for complex analytical use cases.
Usage Examples:
// Multi-level windowing - hourly stats with daily rollups
Table hourlyStats = sourceTable
.window(Tumble.over(lit(1).hour()).on($("event_time")).as("hourly"))
.groupBy($("hourly"), $("region"))
.select(
$("region"),
$("hourly").start().as("hour_start"),
count($("*")).as("hourly_count"),
sum($("revenue")).as("hourly_revenue")
);
Table dailyRollup = hourlyStats
.window(Tumble.over(lit(1).day()).on($("hour_start")).as("daily"))
.groupBy($("daily"), $("region"))
.select(
$("region"),
$("daily").start().as("day_start"),
sum($("hourly_count")).as("daily_count"),
sum($("hourly_revenue")).as("daily_revenue"),
avg($("hourly_revenue")).as("avg_hourly_revenue")
);
// Session analysis with user behavior patterns
GroupWindow userActivitySession = Session
.withGap(lit(20).minute())
.on($("event_time"))
.as("session");
Table sessionBehavior = sourceTable
.window(userActivitySession)
.groupBy($("session"), $("user_id"))
.select(
$("user_id"),
$("session").start().as("session_start"),
$("session").end().as("session_end"),
count($("*")).as("total_actions"),
countDistinct($("page_id")).as("unique_pages"),
sum($("time_spent")).as("total_time"),
first($("entry_page")).as("landing_page"),
last($("page_id")).as("exit_page"),
// Calculate session duration manually
$("session").end().minus($("session").start()).as("session_duration")
);// Processing time attribute (system time when record is processed)
// Usually defined in DDL or table descriptor
// PROCTIME() AS processing_time
// Event time attribute (timestamp from the data)
// Defined with watermark strategy in DDL
// event_time TIMESTAMP(3),
// WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
// Window functions for accessing time attributes
Expression start(); // Window start time
Expression end(); // Window end time
Expression proctime(); // Processing time
Expression rowtime(); // Event time (watermark)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java