CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-parent

Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities

Pending
Overview
Eval results
Files

windowing.mddocs/

Windowing System

Complete windowing system for time-based and count-based data aggregation, supporting event time and processing time semantics with customizable triggers and evictors.

Capabilities

Window Assigners

Window assigners determine which windows elements belong to.

/**
 * Base window assigner interface
 * @param <T> Element type
 * @param <W> Window type
 */
abstract class WindowAssigner<T, W extends Window> {
    /**
     * Assign windows to element
     * @param element Element
     * @param timestamp Element timestamp
     * @param context Window assigner context
     * @return Collection of windows
     */
    public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
    
    /**
     * Get default trigger
     * @param env Stream execution environment
     * @return Default trigger
     */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    
    /**
     * Get window serializer
     * @param executionConfig Execution configuration
     * @return Window serializer
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
}

/**
 * Tumbling event time windows
 */
class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    /**
     * Create tumbling windows with size
     * @param size Window size
     * @return Window assigner
     */
    public static TumblingEventTimeWindows of(Time size);
    
    /**
     * Create tumbling windows with size and offset
     * @param size Window size
     * @param offset Window offset
     * @return Window assigner
     */
    public static TumblingEventTimeWindows of(Time size, Time offset);
}

/**
 * Sliding event time windows
 */
class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    /**
     * Create sliding windows
     * @param size Window size
     * @param slide Slide interval
     * @return Window assigner
     */
    public static SlidingEventTimeWindows of(Time size, Time slide);
    
    /**
     * Create sliding windows with offset
     * @param size Window size
     * @param slide Slide interval
     * @param offset Window offset
     * @return Window assigner
     */
    public static SlidingEventTimeWindows of(Time size, Time slide, Time offset);
}

/**
 * Event time session windows
 */
class EventTimeSessionWindows extends WindowAssigner<Object, TimeWindow> {
    /**
     * Create session windows with gap
     * @param sessionTimeout Session timeout
     * @return Window assigner
     */
    public static EventTimeSessionWindows withGap(Time sessionTimeout);
    
    /**
     * Create dynamic session windows
     * @param sessionWindowTimeGapExtractor Gap extractor function
     * @param <T> Element type
     * @return Window assigner
     */
    public static <T> EventTimeSessionWindows withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor);
}

Window Triggers

Triggers determine when window computation should be performed.

/**
 * Base trigger class
 * @param <T> Element type
 * @param <W> Window type
 */
abstract class Trigger<T, W extends Window> {
    /**
     * Called when element is added to window
     * @param element Element
     * @param timestamp Element timestamp
     * @param window Window
     * @param ctx Trigger context
     * @return Trigger result
     * @throws Exception
     */
    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
    
    /**
     * Called when processing time timer fires
     * @param time Timer timestamp
     * @param window Window
     * @param ctx Trigger context
     * @return Trigger result
     * @throws Exception
     */
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
    
    /**
     * Called when event time timer fires
     * @param time Timer timestamp
     * @param window Window
     * @param ctx Trigger context
     * @return Trigger result
     * @throws Exception
     */
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
    
    /**
     * Clear trigger state
     * @param window Window
     * @param ctx Trigger context
     * @throws Exception
     */
    public abstract void clear(W window, TriggerContext ctx) throws Exception;
}

/**
 * Trigger result enumeration
 */
enum TriggerResult {
    /** Continue without action */
    CONTINUE,
    /** Fire window computation */
    FIRE,
    /** Purge window contents */
    PURGE,
    /** Fire and purge */
    FIRE_AND_PURGE
}

/**
 * Continuous processing time trigger
 */
class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
    /**
     * Create trigger with interval
     * @param interval Trigger interval
     * @param <W> Window type
     * @return Trigger instance
     */
    public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval);
}

/**
 * Delta trigger fires when element differs from last by threshold
 * @param <T> Element type
 * @param <W> Window type
 */
class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
    /**
     * Create delta trigger
     * @param threshold Delta threshold
     * @param deltaFunction Delta calculation function
     * @param typeInfo Type information
     * @param <T> Element type
     * @param <W> Window type
     * @return Trigger instance
     */
    public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeInformation<T> typeInfo);
}

Window Evictors

Evictors remove elements from windows before or after window computation.

/**
 * Base evictor interface
 * @param <T> Element type
 * @param <W> Window type
 */
interface Evictor<T, W extends Window> {
    /**
     * Evict elements before window function
     * @param elements Window elements
     * @param size Number of elements
     * @param window Window
     * @param evictorContext Evictor context
     */
    void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    
    /**
     * Evict elements after window function
     * @param elements Window elements
     * @param size Number of elements
     * @param window Window
     * @param evictorContext Evictor context
     */
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
}

/**
 * Time-based evictor
 * @param <W> Window type
 */
class TimeEvictor<W extends Window> implements Evictor<Object, W> {
    /**
     * Create time evictor
     * @param windowSize Window size to keep
     * @return Time evictor
     */
    public static <W extends Window> TimeEvictor<W> of(Time windowSize);
    
    /**
     * Create time evictor with processing time
     * @param windowSize Window size
     * @param doEvictAfter Whether to evict after window function
     * @return Time evictor
     */
    public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter);
}

/**
 * Count-based evictor
 * @param <W> Window type
 */
class CountEvictor<W extends Window> implements Evictor<Object, W> {
    /**
     * Create count evictor
     * @param maxCount Maximum elements to keep
     * @return Count evictor
     */
    public static <W extends Window> CountEvictor<W> of(long maxCount);
    
    /**
     * Create count evictor
     * @param maxCount Maximum elements
     * @param doEvictAfter Whether to evict after window function
     * @return Count evictor
     */
    public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter);
}

/**
 * Delta-based evictor
 * @param <T> Element type
 * @param <W> Window type
 */
class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
    /**
     * Create delta evictor
     * @param threshold Delta threshold
     * @param deltaFunction Delta function
     * @return Delta evictor
     */
    public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction);
}

Window Functions

Functions for processing windowed data.

/**
 * Window function interface
 * @param <IN> Input type
 * @param <OUT> Output type
 * @param <KEY> Key type
 * @param <W> Window type
 */
interface WindowFunction<IN, OUT, KEY, W extends Window> {
    /**
     * Process window contents
     * @param key Window key
     * @param window Window
     * @param input Window elements
     * @param out Result collector
     * @throws Exception
     */
    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

/**
 * Process window function with context
 * @param <IN> Input type
 * @param <OUT> Output type
 * @param <KEY> Key type
 * @param <W> Window type
 */
abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> {
    /**
     * Process window with context
     * @param key Window key
     * @param context Process context
     * @param elements Window elements
     * @param out Result collector
     * @throws Exception
     */
    public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    
    /**
     * Process window context
     */
    public abstract class Context {
        /**
         * Get current window
         * @return Current window
         */
        public abstract W window();
        
        /**
         * Get current processing time
         * @return Processing time
         */
        public abstract long currentProcessingTime();
        
        /**
         * Get current event time
         * @return Event time
         */
        public abstract long currentWatermark();
        
        /**
         * Get global state
         * @param stateDescriptor State descriptor
         * @param <S> State type
         * @return Global state
         */
        public abstract <S extends State> S globalState(StateDescriptor<S, ?> stateDescriptor);
        
        /**
         * Get window state
         * @param stateDescriptor State descriptor
         * @param <S> State type
         * @return Window state
         */
        public abstract <S extends State> S windowState(StateDescriptor<S, ?> stateDescriptor);
    }
}

Time Window

Time-based window implementation.

/**
 * Time window implementation
 */
class TimeWindow extends Window {
    /**
     * Create time window
     * @param start Start timestamp
     * @param end End timestamp
     */
    public TimeWindow(long start, long end);
    
    /**
     * Get window start time
     * @return Start timestamp
     */
    public long getStart();
    
    /**
     * Get window end time
     * @return End timestamp
     */
    public long getEnd();
    
    /**
     * Get maximum timestamp in window
     * @return Maximum timestamp
     */
    public long maxTimestamp();
    
    /**
     * Check if timestamp intersects window
     * @param timestamp Timestamp to check
     * @return true if intersects
     */
    public boolean intersects(TimeWindow other);
    
    /**
     * Get window center timestamp
     * @return Center timestamp
     */
    public long getCenter();
}

Context Types

Context and utility types used by the windowing system.

/**
 * Window assigner context
 */
interface WindowAssignerContext {
    /**
     * Get current processing time
     * @return Current processing time
     */
    long getCurrentProcessingTime();
}

/**
 * Trigger context interface
 */
interface TriggerContext {
    /**
     * Get current processing time
     * @return Current processing time
     */
    long getCurrentProcessingTime();
    
    /**
     * Get current watermark
     * @return Current watermark
     */
    long getCurrentWatermark();
    
    /**
     * Register processing time timer
     * @param time Timer timestamp
     */
    void registerProcessingTimeTimer(long time);
    
    /**
     * Register event time timer
     * @param time Timer timestamp
     */
    void registerEventTimeTimer(long time);
    
    /**
     * Delete processing time timer
     * @param time Timer timestamp
     */
    void deleteProcessingTimeTimer(long time);
    
    /**
     * Delete event time timer
     * @param time Timer timestamp
     */
    void deleteEventTimeTimer(long time);
    
    /**
     * Get partitioned state
     * @param stateDescriptor State descriptor
     * @param <S> State type
     * @return Partitioned state
     */
    <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
}

/**
 * Evictor context interface
 */
interface EvictorContext {
    /**
     * Get current processing time
     * @return Current processing time
     */
    long getCurrentProcessingTime();
    
    /**
     * Get current watermark
     * @return Current watermark
     */
    long getCurrentWatermark();
}

/**
 * Timestamped value wrapper
 * @param <T> Value type
 */
class TimestampedValue<T> {
    /**
     * Create timestamped value
     * @param value Value
     * @param timestamp Timestamp
     */
    public TimestampedValue(T value, long timestamp);
    
    /**
     * Get value
     * @return Value
     */
    public T getValue();
    
    /**
     * Get timestamp
     * @return Timestamp
     */
    public long getTimestamp();
    
    /**
     * Check if has timestamp
     * @return true if has timestamp
     */
    public boolean hasTimestamp();
}

/**
 * Delta function interface
 * @param <DATA> Data type
 */
interface DeltaFunction<DATA> {
    /**
     * Calculate delta between two data points
     * @param oldDataPoint Old data point
     * @param newDataPoint New data point
     * @return Delta value
     */
    double getDelta(DATA oldDataPoint, DATA newDataPoint);
}

/**
 * Session window time gap extractor
 * @param <T> Element type
 */
interface SessionWindowTimeGapExtractor<T> {
    /**
     * Extract session timeout for element
     * @param element Element
     * @return Session timeout in milliseconds
     */
    long extract(T element);
}

Base Window Class

Base window class for all window types.

/**
 * Base window class
 */
abstract class Window {
    /**
     * Get maximum timestamp that belongs to this window
     * @return Maximum timestamp
     */
    public abstract long maxTimestamp();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-parent

docs

configuration.md

connectors.md

core-functions.md

datastream-traditional.md

datastream-v2.md

index.md

state-management.md

table-api.md

windowing.md

tile.json