CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java

Java API for Apache Flink's Table ecosystem, enabling type-safe table operations and SQL query execution.

Pending
Overview
Eval results
Files

window-operations.mddocs/

Window Operations

Window operations enable time-based and count-based processing for streaming data. Flink supports multiple window types including tumbling, sliding, session windows, and over windows for different analytical scenarios.

Capabilities

Tumbling Windows

Fixed-size, non-overlapping windows that partition data into distinct time segments.

public final class Tumble {
    /**
     * Creates a tumbling window with the specified size
     * @param size Window size expression (duration)
     * @return TumbleWithSize for further configuration
     */
    public static TumbleWithSize over(Expression size);
}

public final class TumbleWithSize {
    /**
     * Specifies the time attribute for the tumbling window
     * @param timeField Time attribute column
     * @return TumbleWithSizeOnTime for alias configuration
     */
    public TumbleWithSizeOnTime on(Expression timeField);
}

public final class TumbleWithSizeOnTime {
    /**
     * Assigns an alias to the window for reference in selection
     * @param alias Window alias name
     * @return GroupWindow that can be used with groupBy
     */
    public GroupWindow as(String alias);
}

Usage Examples:

import static org.apache.flink.table.api.Expressions.*;

// 1-hour tumbling window
GroupWindow hourlyWindow = Tumble
    .over(lit(1).hour())
    .on($("event_time"))
    .as("hourly_window");

Table hourlyStats = sourceTable
    .window(hourlyWindow)
    .groupBy($("hourly_window"), $("category"))
    .select(
        $("category"),
        $("hourly_window").start().as("window_start"),
        $("hourly_window").end().as("window_end"),
        count($("*")).as("event_count"),
        sum($("amount")).as("total_amount")
    );

// 15-minute tumbling window
GroupWindow quarterHourWindow = Tumble
    .over(lit(15).minute())
    .on($("process_time"))
    .as("quarter_hour");

Table frequentStats = sourceTable
    .window(quarterHourWindow)
    .groupBy($("quarter_hour"))
    .select(
        $("quarter_hour").start().as("period_start"),
        count($("*")).as("transaction_count"),
        avg($("value")).as("avg_transaction_value")
    );

Sliding Windows

Fixed-size, overlapping windows that slide by a specified interval, useful for moving averages and trend analysis.

public final class Slide {
    /**
     * Creates a sliding window with the specified size
     * @param size Window size expression (duration)  
     * @return SlideWithSize for slide interval configuration
     */
    public static SlideWithSize over(Expression size);
}

public final class SlideWithSize {
    /**
     * Specifies the slide interval for the window
     * @param slide Slide interval expression (duration)
     * @return SlideWithSizeAndSlide for time field configuration
     */
    public SlideWithSizeAndSlide every(Expression slide);
}

public final class SlideWithSizeAndSlide {
    /**
     * Specifies the time attribute for the sliding window
     * @param timeField Time attribute column
     * @return SlideWithSizeAndSlideOnTime for alias configuration
     */
    public SlideWithSizeAndSlideOnTime on(Expression timeField);
}

public final class SlideWithSizeAndSlideOnTime {
    /**
     * Assigns an alias to the sliding window
     * @param alias Window alias name
     * @return GroupWindow for use with groupBy
     */
    public GroupWindow as(String alias);
}

Usage Examples:

// 1-hour window sliding every 15 minutes
GroupWindow slidingWindow = Slide
    .over(lit(1).hour())
    .every(lit(15).minute())
    .on($("event_time"))
    .as("sliding_window");

Table movingAverages = sourceTable
    .window(slidingWindow)
    .groupBy($("sliding_window"), $("sensor_id"))
    .select(
        $("sensor_id"),
        $("sliding_window").start().as("window_start"),
        $("sliding_window").end().as("window_end"),
        avg($("temperature")).as("avg_temperature"),
        count($("*")).as("reading_count")
    );

// 30-minute window sliding every 5 minutes for real-time monitoring
GroupWindow realtimeWindow = Slide
    .over(lit(30).minute())
    .every(lit(5).minute())
    .on($("processing_time"))
    .as("realtime_window");

Table realtimeMetrics = sourceTable
    .window(realtimeWindow)
    .groupBy($("realtime_window"))
    .select(
        $("realtime_window").start().as("period_start"),
        count($("*")).as("event_rate"),
        sum($("bytes")).as("total_bytes"),
        max($("latency")).as("max_latency")
    );

Session Windows

Dynamic windows that group events based on activity sessions with configurable gap timeouts.

public final class Session {
    /**
     * Creates a session window with the specified gap
     * @param gap Session gap expression (duration of inactivity)
     * @return SessionWithGap for time field configuration
     */
    public static SessionWithGap withGap(Expression gap);
}

public final class SessionWithGap {
    /**
     * Specifies the time attribute for the session window
     * @param timeField Time attribute column
     * @return SessionWithGapOnTime for alias configuration
     */
    public SessionWithGapOnTime on(Expression timeField);
}

public final class SessionWithGapOnTime {
    /**
     * Assigns an alias to the session window
     * @param alias Window alias name
     * @return GroupWindow for use with groupBy
     */
    public GroupWindow as(String alias);
}

Usage Examples:

// Session window with 30-minute inactivity gap
GroupWindow userSession = Session
    .withGap(lit(30).minute())
    .on($("event_time"))
    .as("user_session");

Table sessionAnalysis = sourceTable
    .window(userSession)
    .groupBy($("user_session"), $("user_id"))
    .select(
        $("user_id"),
        $("user_session").start().as("session_start"),
        $("user_session").end().as("session_end"),
        count($("*")).as("actions_in_session"),
        sum($("duration")).as("total_session_time"),
        max($("page_views")).as("max_page_views")
    );

// Short session window for detecting bursts of activity
GroupWindow activityBurst = Session
    .withGap(lit(2).minute())
    .on($("event_time"))
    .as("activity_burst");

Table burstDetection = sourceTable
    .window(activityBurst)
    .groupBy($("activity_burst"), $("device_id"))
    .select(
        $("device_id"),
        $("activity_burst").start().as("burst_start"),
        $("activity_burst").end().as("burst_end"),
        count($("*")).as("burst_event_count")
    )
    .filter($("burst_event_count").isGreater(10)); // Only high-activity bursts

Over Windows

Unbounded or bounded windows for analytical functions like ranking, cumulative sums, and moving averages without explicit grouping.

public final class Over {
    /**
     * Creates an Over window partitioned by specified fields
     * @param partitionBy Fields to partition the window by
     * @return OverWindowPartitioned for ordering configuration
     */
    public static OverWindowPartitioned partitionBy(Expression... partitionBy);
    
    /**
     * Creates an Over window with ordering but no partitioning
     * @param orderBy Fields to order the window by
     * @return OverWindowPartitionedOrdered for range/rows configuration
     */
    public static OverWindowPartitionedOrdered orderBy(Expression... orderBy);
}

public interface OverWindowPartitioned {
    /**
     * Specifies ordering for the partitioned over window
     * @param orderBy Ordering expressions
     * @return OverWindowPartitionedOrdered for range/rows configuration
     */
    OverWindowPartitionedOrdered orderBy(Expression... orderBy);
}

public interface OverWindowPartitionedOrdered {
    /**
     * Creates a preceding rows window
     * @param preceding Number of preceding rows
     * @return OverWindowPartitionedOrderedPreceding for alias configuration
     */
    OverWindowPartitionedOrderedPreceding preceding(Expression preceding);
    
    /**
     * Creates an unbounded preceding window
     * @return OverWindow for alias configuration
     */
    OverWindow unboundedPreceding();
    
    /**
     * Creates a current row window
     * @return OverWindow for alias configuration  
     */
    OverWindow currentRow();
}

public interface OverWindow {
    /**
     * Assigns an alias to the over window
     * @param alias Window alias name
     * @return OverWindow with alias
     */
    OverWindow as(String alias);
}

Usage Examples:

// Running totals and cumulative calculations
OverWindow cumulativeWindow = Over
    .partitionBy($("customer_id"))
    .orderBy($("order_date"))
    .unboundedPreceding()
    .as("cumulative");

Table runningTotals = sourceTable
    .window(cumulativeWindow)
    .select(
        $("customer_id"),
        $("order_date"),
        $("amount"),
        sum($("amount")).over($("cumulative")).as("running_total"),
        count($("*")).over($("cumulative")).as("order_sequence"),
        row_number().over($("cumulative")).as("order_rank")
    );

// Moving averages with bounded windows
OverWindow movingAvgWindow = Over
    .partitionBy($("product_id"))
    .orderBy($("sale_date"))
    .preceding(lit(6))  // 7-day moving window (6 preceding + current)
    .as("weekly_window");

Table movingAverages = sourceTable
    .window(movingAvgWindow)
    .select(
        $("product_id"),
        $("sale_date"),
        $("daily_sales"),
        avg($("daily_sales")).over($("weekly_window")).as("weekly_avg_sales"),
        sum($("daily_sales")).over($("weekly_window")).as("weekly_total_sales")
    );

// Ranking and analytical functions
OverWindow rankingWindow = Over
    .partitionBy($("department"))
    .orderBy($("salary").desc())
    .currentRow()
    .as("dept_ranking");

Table employeeRanking = sourceTable
    .window(rankingWindow)
    .select(
        $("employee_id"),
        $("name"),
        $("department"),
        $("salary"),
        row_number().over($("dept_ranking")).as("salary_rank"),
        rank().over($("dept_ranking")).as("salary_dense_rank"),
        lag($("salary"), 1).over($("dept_ranking")).as("next_highest_salary")
    );

Window Functions and Expressions

Special functions available for window operations and time manipulation.

// Time interval expressions
public static Expression lit(long value).year();
public static Expression lit(long value).month(); 
public static Expression lit(long value).day();
public static Expression lit(long value).hour();
public static Expression lit(long value).minute();
public static Expression lit(long value).second();
public static Expression lit(long value).milli();

// Window functions
public static Expression rowNumber();
public static Expression rank();  
public static Expression denseRank();
public static Expression lag(Expression field, int offset);
public static Expression lead(Expression field, int offset);
public static Expression firstValue(Expression field);
public static Expression lastValue(Expression field);

// Window start/end functions (for group windows)
// Available as methods on window alias expressions
// $("window_alias").start()
// $("window_alias").end()

Usage Examples:

// Time interval examples
GroupWindow dailyWindow = Tumble
    .over(lit(1).day())
    .on($("event_time"))
    .as("daily");

GroupWindow weeklySliding = Slide
    .over(lit(7).day())
    .every(lit(1).day())
    .on($("event_time"))
    .as("weekly_sliding");

// Analytical window functions
OverWindow analyticalWindow = Over
    .partitionBy($("category"))
    .orderBy($("created_date"))
    .unboundedPreceding()
    .as("analytical");

Table analyticalResults = sourceTable
    .window(analyticalWindow)
    .select(
        $("id"),
        $("category"),
        $("value"),
        $("created_date"),
        rowNumber().over($("analytical")).as("row_num"),
        rank().over($("analytical")).as("rank"),
        lag($("value"), 1).over($("analytical")).as("prev_value"),
        lead($("value"), 1).over($("analytical")).as("next_value"),
        firstValue($("value")).over($("analytical")).as("first_in_category"),
        lastValue($("value")).over($("analytical")).as("current_last")
    );

WindowGroupedTable Operations

Tables with window grouping applied support aggregation operations.

public interface WindowGroupedTable {
    /**
     * Performs selection with aggregation on windowed data
     * @param fields Selection expressions including aggregates and window functions
     * @return Table with windowed aggregation results
     */
    Table select(Expression... fields);
}

Complex Window Scenarios

Advanced windowing patterns for complex analytical use cases.

Usage Examples:

// Multi-level windowing - hourly stats with daily rollups
Table hourlyStats = sourceTable
    .window(Tumble.over(lit(1).hour()).on($("event_time")).as("hourly"))
    .groupBy($("hourly"), $("region"))
    .select(
        $("region"),
        $("hourly").start().as("hour_start"),
        count($("*")).as("hourly_count"),
        sum($("revenue")).as("hourly_revenue")
    );

Table dailyRollup = hourlyStats
    .window(Tumble.over(lit(1).day()).on($("hour_start")).as("daily"))
    .groupBy($("daily"), $("region"))
    .select(
        $("region"),
        $("daily").start().as("day_start"),
        sum($("hourly_count")).as("daily_count"),
        sum($("hourly_revenue")).as("daily_revenue"),
        avg($("hourly_revenue")).as("avg_hourly_revenue")
    );

// Session analysis with user behavior patterns
GroupWindow userActivitySession = Session
    .withGap(lit(20).minute())
    .on($("event_time"))
    .as("session");

Table sessionBehavior = sourceTable
    .window(userActivitySession)
    .groupBy($("session"), $("user_id"))
    .select(
        $("user_id"),
        $("session").start().as("session_start"),
        $("session").end().as("session_end"),
        count($("*")).as("total_actions"),
        countDistinct($("page_id")).as("unique_pages"),
        sum($("time_spent")).as("total_time"),
        first($("entry_page")).as("landing_page"),
        last($("page_id")).as("exit_page"),
        // Calculate session duration manually
        $("session").end().minus($("session").start()).as("session_duration")
    );

Window Time Attributes

// Processing time attribute (system time when record is processed)
// Usually defined in DDL or table descriptor
// PROCTIME() AS processing_time

// Event time attribute (timestamp from the data)
// Defined with watermark strategy in DDL
// event_time TIMESTAMP(3),
// WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

// Window functions for accessing time attributes
Expression start();    // Window start time
Expression end();      // Window end time  
Expression proctime(); // Processing time
Expression rowtime();  // Event time (watermark)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java

docs

aggregation-grouping.md

catalog-management.md

expressions.md

index.md

sql-integration.md

table-environment.md

table-operations.md

user-defined-functions.md

window-operations.md

tile.json