CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-2-11

Apache Flink Table API for SQL-like operations on streaming and batch data

Pending
Overview
Eval results
Files

window-operations.mddocs/

Window Operations

The Flink Table API provides comprehensive windowing support for time-based and count-based aggregations. Windows enable grouping of streaming data by time intervals or row counts for meaningful aggregations.

Capabilities

Time Windows

Time-based windows that group events by temporal boundaries.

/**
 * Tumbling window with fixed size and no overlap
 * @param size Window size expression (time interval)
 */
case class TumbleWithSize(size: Expression) {
  /**
   * Specifies the time attribute for the window
   * @param timeField Time field expression (rowtime or proctime)
   * @returns Window specification with time attribute
   */
  def on(timeField: Expression): TumbleWithSizeOnTime
}

case class TumbleWithSizeOnTime(size: Expression, timeField: Expression) {
  /**
   * Assigns an alias to the window
   * @param alias Window alias for referencing window properties
   * @returns Complete tumbling window specification
   */
  def as(alias: Expression): TumbleWithSizeOnTimeWithAlias
}

/**
 * Sliding window with fixed size and slide interval
 * @param size Window size expression (time interval)  
 */
case class SlideWithSize(size: Expression) {
  /**
   * Specifies the slide interval
   * @param slide Slide interval expression (time interval)
   * @returns Window specification with slide
   */
  def every(slide: Expression): SlideWithSizeAndSlide
}

case class SlideWithSizeAndSlide(size: Expression, slide: Expression) {
  /**
   * Specifies the time attribute for the window
   * @param timeField Time field expression (rowtime or proctime)
   * @returns Window specification with time attribute
   */
  def on(timeField: Expression): SlideWithSizeAndSlideOnTime
}

case class SlideWithSizeAndSlideOnTime(size: Expression, slide: Expression, timeField: Expression) {
  /**
   * Assigns an alias to the window
   * @param alias Window alias for referencing window properties
   * @returns Complete sliding window specification
   */
  def as(alias: Expression): SlideWithSizeAndSlideOnTimeWithAlias  
}

/**
 * Session window with dynamic gaps based on data activity
 * @param gap Session gap expression (time interval)
 */
case class SessionWithGap(gap: Expression) {
  /**
   * Specifies the time attribute for the window
   * @param timeField Time field expression (rowtime or proctime)
   * @returns Window specification with time attribute
   */
  def on(timeField: Expression): SessionWithGapOnTime
}

case class SessionWithGapOnTime(gap: Expression, timeField: Expression) {
  /**
   * Assigns an alias to the window
   * @param alias Window alias for referencing window properties
   * @returns Complete session window specification
   */
  def as(alias: Expression): SessionWithGapOnTimeWithAlias
}

Usage Examples:

import org.apache.flink.table.api.Tumble
import org.apache.flink.table.api.Slide
import org.apache.flink.table.api.Session

// Tumbling window - 10 minute non-overlapping windows
val tumblingResult = table
  .window(Tumble over 10.minutes on 'rowtime as 'w)
  .groupBy('w, 'userId)
  .select('userId, 'w.start, 'w.end, 'amount.sum)

// Sliding window - 10 minute windows sliding every 5 minutes  
val slidingResult = table
  .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
  .groupBy('w, 'userId)
  .select('userId, 'w.start, 'w.end, 'amount.avg)

// Session window - sessions with 15 minute inactivity gap
val sessionResult = table  
  .window(Session withGap 15.minutes on 'rowtime as 'w)
  .groupBy('w, 'userId)
  .select('userId, 'w.start, 'w.end, 'eventCount.count)

// Processing time windows (using proctime)
val proctimeResult = table
  .window(Tumble over 1.hour on 'proctime as 'w)
  .groupBy('w, 'category)
  .select('category, 'w.start, 'amount.max)

Window Properties

Access window metadata and boundaries within windowed aggregations.

/**
 * Window properties available in windowed table operations
 */
trait WindowProperty extends Expression {
  /**
   * Start timestamp of the window
   */
  def start: Expression
  
  /**
   * End timestamp of the window  
   */
  def end: Expression
  
  /**
   * Rowtime timestamp of the window (for event time windows)
   */
  def rowtime: Expression
  
  /**
   * Processing time timestamp of the window (for processing time windows)
   */
  def proctime: Expression
}

Usage Examples:

// Access window properties in aggregations
val windowedStats = table
  .window(Tumble over 1.hour on 'rowtime as 'w)
  .groupBy('w, 'department)
  .select(
    'department,
    'w.start as 'windowStart,
    'w.end as 'windowEnd, 
    'w.rowtime as 'windowTime,
    'salary.avg as 'avgSalary,
    'employee.count as 'employeeCount
  )

// Use window properties in filtering
val recentWindows = windowedStats
  .filter('windowEnd > (currentTimestamp() - 2.hours))

Over Windows

Row-based windows for analytical functions and running calculations.

/**
 * Over window specification for analytical functions
 * @param partitionBy Partitioning expressions
 * @param orderBy Ordering expression
 * @param preceding Frame start (rows or range before current)
 * @param following Frame end (rows or range after current)
 */
case class OverWindow(
  partitionBy: Seq[Expression],
  orderBy: Expression, 
  preceding: Expression,
  following: Expression
)

/**
 * Over window builder starting with OVER keyword
 */
object Over {
  /**
   * Partitions the over window by specified fields
   * @param fields Partitioning field expressions
   * @returns Partial over window specification
   */
  def partitionBy(fields: Expression*): OverWindowWithPartitioning
  
  /**
   * Orders the over window by specified field
   * @param field Ordering field expression  
   * @returns Partial over window specification
   */
  def orderBy(field: Expression): OverWindowWithOrdering
}

case class OverWindowWithPartitioning(partitionBy: Seq[Expression]) {
  /**
   * Orders the partitioned over window
   * @param field Ordering field expression
   * @returns Over window with partitioning and ordering
   */
  def orderBy(field: Expression): OverWindowWithPartitioningAndOrdering
}

case class OverWindowWithOrdering(orderBy: Expression) {
  /**
   * Specifies the preceding frame boundary
   * @param preceding Frame start boundary
   * @returns Over window with ordering and preceding
   */
  def preceding(preceding: Expression): OverWindowWithPreceding
}

case class OverWindowWithPartitioningAndOrdering(partitionBy: Seq[Expression], orderBy: Expression) {
  /**
   * Specifies the preceding frame boundary
   * @param preceding Frame start boundary  
   * @returns Over window with partitioning, ordering, and preceding
   */
  def preceding(preceding: Expression): OverWindowWithPreceding
}

case class OverWindowWithPreceding(/* fields */) {
  /**
   * Specifies the following frame boundary
   * @param following Frame end boundary
   * @returns Complete over window specification
   */
  def following(following: Expression): OverWindow
  
  /**
   * Assigns an alias to the over window
   * @param alias Window alias
   * @returns Complete over window specification with alias
   */
  def as(alias: Expression): OverWindow
}

// Frame boundary constants
object FrameBoundary {
  val UNBOUNDED_PRECEDING: Expression = ???
  val UNBOUNDED_FOLLOWING: Expression = ???  
  val CURRENT_ROW: Expression = ???
  val CURRENT_RANGE: Expression = ???
}

Usage Examples:

import org.apache.flink.table.api.Over
import org.apache.flink.table.api.FrameBoundary._

// Running sum over all previous rows in partition
val runningSum = table
  .window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
  .select('employee, 'department, 'salary, 'salary.sum over 'w as 'runningSalary)

// Moving average over last 3 rows
val movingAvg = table
  .window(Over partitionBy 'department orderBy 'date.asc preceding 2.rows following CURRENT_ROW as 'w)
  .select('employee, 'date, 'sales, 'sales.avg over 'w as 'movingAvgSales)

// Ranking within partition
val ranking = table
  .window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
  .select('employee, 'department, 'salary, row_number() over 'w as 'rank)

// Multiple over windows
val analytics = table
  .window(
    Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'salaryWindow,
    Over partitionBy 'department orderBy 'hireDate.asc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'timeWindow
  )
  .select(
    'employee,
    'salary,
    'salary.sum over 'salaryWindow as 'totalDeptSalary,
    row_number() over 'salaryWindow as 'salaryRank,
    row_number() over 'timeWindow as 'seniorityRank
  )

Windowed Table Operations

Operations available on windowed tables for grouping and aggregation.

/**
 * Table with applied window specification
 */
class WindowedTable {
  /**
   * Groups the windowed table by specified fields
   * @param fields Grouping field expressions (usually includes window alias)
   * @returns Grouped windowed table for aggregation
   */  
  def groupBy(fields: Expression*): WindowGroupedTable
}

/**
 * Windowed table after grouping, ready for aggregation
 */
class WindowGroupedTable {
  /**
   * Selects aggregated results from windowed groups
   * @param fields Field expressions including aggregations and window properties
   * @returns Aggregated table
   */
  def select(fields: Expression*): Table
}

/**
 * Table with applied over windows
 */
class OverWindowedTable {
  /**
   * Selects fields with over window functions applied
   * @param fields Field expressions including over window functions
   * @returns Table with over window calculations
   */
  def select(fields: Expression*): Table
}

Usage Examples:

// Complex windowed aggregation
val complexWindowed = table
  .window(Tumble over 15.minutes on 'eventTime as 'w)
  .groupBy('w, 'userId, 'category)
  .select(
    'userId,
    'category, 
    'w.start as 'windowStart,
    'w.end as 'windowEnd,
    'amount.sum as 'totalAmount,
    'amount.avg as 'avgAmount,
    'amount.min as 'minAmount,
    'amount.max as 'maxAmount,
    'transactionId.count as 'transactionCount,
    'transactionId.countDistinct as 'uniqueTransactions
  )

// Over window with multiple analytical functions
val analyticalResult = table
  .window(Over partitionBy 'category orderBy 'amount.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
  .select(
    'transactionId,
    'category,
    'amount,
    'amount.sum over 'w as 'categoryTotal,
    'amount.avg over 'w as 'categoryAvg,
    row_number() over 'w as 'amountRank,
    rank() over 'w as 'amountRankWithTies,
    dense_rank() over 'w as 'amountDenseRank,
    percent_rank() over 'w as 'amountPercentRank
  )

Time Attributes

Special timestamp fields for defining event time and processing time.

/**
 * Methods for defining time attributes in table schemas
 */
object TimeAttributes {
  /**
   * Defines a rowtime attribute for event time processing
   * @param field Timestamp field expression
   * @returns Rowtime attribute expression
   */
  def rowtime(field: Expression): Expression
  
  /**
   * Defines a processing time attribute
   * @returns Processing time attribute expression
   */
  def proctime(): Expression
}

Usage Examples:

// Define table with time attributes
val tableWithTime = table
  .select('userId, 'amount, 'eventTimestamp.rowtime as 'rowtime, proctime() as 'proctime)

// Register table source with time attributes
val sourceWithTime = new StreamTableSource[Row] {
  override def getTableSchema: TableSchema = {
    new TableSchema(
      Array("userId", "amount", "eventTime", "proctime"),
      Array(Types.LONG, Types.DOUBLE, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP)
    )
  }
  
  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    execEnv.addSource(/* source */)
      .assignTimestampsAndWatermarks(/* watermark strategy */)
  }
}

// Use time attributes in window operations
val eventTimeWindows = tableWithTime
  .window(Tumble over 1.hour on 'rowtime as 'w)
  .groupBy('w, 'userId)
  .select('userId, 'w.start, 'amount.sum)

val procTimeWindows = tableWithTime
  .window(Tumble over 1.hour on 'proctime as 'w)  
  .groupBy('w, 'userId)
  .select('userId, 'w.start, 'amount.count)

Window SQL Support

SQL syntax for window operations and time functions.

// Available window functions in SQL
val sqlWindowed = tEnv.sqlQuery("""
  SELECT 
    userId,
    TUMBLE_START(rowtime, INTERVAL '1' HOUR) as window_start,
    TUMBLE_END(rowtime, INTERVAL '1' HOUR) as window_end,
    SUM(amount) as total_amount,
    COUNT(*) as transaction_count
  FROM Transactions
  GROUP BY 
    userId,
    TUMBLE(rowtime, INTERVAL '1' HOUR)
""")

val sqlOverWindow = tEnv.sqlQuery("""
  SELECT 
    userId,
    amount,
    SUM(amount) OVER (
      PARTITION BY userId 
      ORDER BY eventTime 
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as running_total,
    ROW_NUMBER() OVER (
      PARTITION BY userId 
      ORDER BY amount DESC
    ) as amount_rank
  FROM Transactions
""")

Watermarks and Late Data Handling

Configuration for handling out-of-order events and late arrivals.

/**
 * Watermark strategies for event time processing
 */
trait WatermarkStrategy {
  def extractTimestamp(element: Row, recordTimestamp: Long): Long
  def onPeriodicEmit(watermarkOutput: WatermarkOutput): Unit
}

/**
 * Configuration for late data handling
 */
case class LatentConfig(
  allowedLateness: Time,
  sideOutputTag: OutputTag[Row]
)

Usage Examples:

// Configure watermarks and late data handling
val tableWithWatermarks = table
  .select('userId, 'amount, 'eventTime.rowtime as 'rowtime)
  .where('eventTime > (currentTimestamp() - 1.day)) // Filter very old events

// Window with allowed lateness
val lateDataHandling = tableWithWatermarks
  .window(Tumble over 1.hour on 'rowtime as 'w)
  .allowedLateness(5.minutes) // Allow 5 minutes of lateness
  .groupBy('w, 'userId)
  .select('userId, 'w.start, 'amount.sum)

Types

sealed trait Window
case class TumbleWithSize(size: Expression) extends Window
case class TumbleWithSizeOnTime(size: Expression, timeField: Expression) extends Window
case class TumbleWithSizeOnTimeWithAlias(size: Expression, timeField: Expression, alias: Expression) extends Window

case class SlideWithSize(size: Expression) extends Window
case class SlideWithSizeAndSlide(size: Expression, slide: Expression) extends Window
case class SlideWithSizeAndSlideOnTime(size: Expression, slide: Expression, timeField: Expression) extends Window
case class SlideWithSizeAndSlideOnTimeWithAlias(size: Expression, slide: Expression, timeField: Expression, alias: Expression) extends Window

case class SessionWithGap(gap: Expression) extends Window
case class SessionWithGapOnTime(gap: Expression, timeField: Expression) extends Window
case class SessionWithGapOnTimeWithAlias(gap: Expression, timeField: Expression, alias: Expression) extends Window

case class OverWindow(partitionBy: Seq[Expression], orderBy: Expression, preceding: Expression, following: Expression)
case class OverWindowWithAlias(window: OverWindow, alias: Expression)

class WindowedTable
class WindowGroupedTable  
class OverWindowedTable

trait WindowProperty extends Expression
trait WatermarkStrategy
case class LatentConfig(allowedLateness: Time, sideOutputTag: OutputTag[Row])

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-2-11

docs

index.md

sources-sinks.md

sql-integration.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

window-operations.md

tile.json