Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities
—
Complete windowing system for time-based and count-based data aggregation, supporting event time and processing time semantics with customizable triggers and evictors.
Window assigners determine which windows elements belong to.
/**
* Base window assigner interface
* @param <T> Element type
* @param <W> Window type
*/
abstract class WindowAssigner<T, W extends Window> {
/**
* Assign windows to element
* @param element Element
* @param timestamp Element timestamp
* @param context Window assigner context
* @return Collection of windows
*/
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
/**
* Get default trigger
* @param env Stream execution environment
* @return Default trigger
*/
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
/**
* Get window serializer
* @param executionConfig Execution configuration
* @return Window serializer
*/
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
}
/**
* Tumbling event time windows
*/
class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
/**
* Create tumbling windows with size
* @param size Window size
* @return Window assigner
*/
public static TumblingEventTimeWindows of(Time size);
/**
* Create tumbling windows with size and offset
* @param size Window size
* @param offset Window offset
* @return Window assigner
*/
public static TumblingEventTimeWindows of(Time size, Time offset);
}
/**
* Sliding event time windows
*/
class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
/**
* Create sliding windows
* @param size Window size
* @param slide Slide interval
* @return Window assigner
*/
public static SlidingEventTimeWindows of(Time size, Time slide);
/**
* Create sliding windows with offset
* @param size Window size
* @param slide Slide interval
* @param offset Window offset
* @return Window assigner
*/
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset);
}
/**
* Event time session windows
*/
class EventTimeSessionWindows extends WindowAssigner<Object, TimeWindow> {
/**
* Create session windows with gap
* @param sessionTimeout Session timeout
* @return Window assigner
*/
public static EventTimeSessionWindows withGap(Time sessionTimeout);
/**
* Create dynamic session windows
* @param sessionWindowTimeGapExtractor Gap extractor function
* @param <T> Element type
* @return Window assigner
*/
public static <T> EventTimeSessionWindows withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor);
}Triggers determine when window computation should be performed.
/**
* Base trigger class
* @param <T> Element type
* @param <W> Window type
*/
abstract class Trigger<T, W extends Window> {
/**
* Called when element is added to window
* @param element Element
* @param timestamp Element timestamp
* @param window Window
* @param ctx Trigger context
* @return Trigger result
* @throws Exception
*/
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
/**
* Called when processing time timer fires
* @param time Timer timestamp
* @param window Window
* @param ctx Trigger context
* @return Trigger result
* @throws Exception
*/
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* Called when event time timer fires
* @param time Timer timestamp
* @param window Window
* @param ctx Trigger context
* @return Trigger result
* @throws Exception
*/
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* Clear trigger state
* @param window Window
* @param ctx Trigger context
* @throws Exception
*/
public abstract void clear(W window, TriggerContext ctx) throws Exception;
}
/**
* Trigger result enumeration
*/
enum TriggerResult {
/** Continue without action */
CONTINUE,
/** Fire window computation */
FIRE,
/** Purge window contents */
PURGE,
/** Fire and purge */
FIRE_AND_PURGE
}
/**
* Continuous processing time trigger
*/
class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
/**
* Create trigger with interval
* @param interval Trigger interval
* @param <W> Window type
* @return Trigger instance
*/
public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval);
}
/**
* Delta trigger fires when element differs from last by threshold
* @param <T> Element type
* @param <W> Window type
*/
class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
/**
* Create delta trigger
* @param threshold Delta threshold
* @param deltaFunction Delta calculation function
* @param typeInfo Type information
* @param <T> Element type
* @param <W> Window type
* @return Trigger instance
*/
public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeInformation<T> typeInfo);
}Evictors remove elements from windows before or after window computation.
/**
* Base evictor interface
* @param <T> Element type
* @param <W> Window type
*/
interface Evictor<T, W extends Window> {
/**
* Evict elements before window function
* @param elements Window elements
* @param size Number of elements
* @param window Window
* @param evictorContext Evictor context
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Evict elements after window function
* @param elements Window elements
* @param size Number of elements
* @param window Window
* @param evictorContext Evictor context
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
}
/**
* Time-based evictor
* @param <W> Window type
*/
class TimeEvictor<W extends Window> implements Evictor<Object, W> {
/**
* Create time evictor
* @param windowSize Window size to keep
* @return Time evictor
*/
public static <W extends Window> TimeEvictor<W> of(Time windowSize);
/**
* Create time evictor with processing time
* @param windowSize Window size
* @param doEvictAfter Whether to evict after window function
* @return Time evictor
*/
public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter);
}
/**
* Count-based evictor
* @param <W> Window type
*/
class CountEvictor<W extends Window> implements Evictor<Object, W> {
/**
* Create count evictor
* @param maxCount Maximum elements to keep
* @return Count evictor
*/
public static <W extends Window> CountEvictor<W> of(long maxCount);
/**
* Create count evictor
* @param maxCount Maximum elements
* @param doEvictAfter Whether to evict after window function
* @return Count evictor
*/
public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter);
}
/**
* Delta-based evictor
* @param <T> Element type
* @param <W> Window type
*/
class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
/**
* Create delta evictor
* @param threshold Delta threshold
* @param deltaFunction Delta function
* @return Delta evictor
*/
public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction);
}Functions for processing windowed data.
/**
* Window function interface
* @param <IN> Input type
* @param <OUT> Output type
* @param <KEY> Key type
* @param <W> Window type
*/
interface WindowFunction<IN, OUT, KEY, W extends Window> {
/**
* Process window contents
* @param key Window key
* @param window Window
* @param input Window elements
* @param out Result collector
* @throws Exception
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
/**
* Process window function with context
* @param <IN> Input type
* @param <OUT> Output type
* @param <KEY> Key type
* @param <W> Window type
*/
abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> {
/**
* Process window with context
* @param key Window key
* @param context Process context
* @param elements Window elements
* @param out Result collector
* @throws Exception
*/
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
* Process window context
*/
public abstract class Context {
/**
* Get current window
* @return Current window
*/
public abstract W window();
/**
* Get current processing time
* @return Processing time
*/
public abstract long currentProcessingTime();
/**
* Get current event time
* @return Event time
*/
public abstract long currentWatermark();
/**
* Get global state
* @param stateDescriptor State descriptor
* @param <S> State type
* @return Global state
*/
public abstract <S extends State> S globalState(StateDescriptor<S, ?> stateDescriptor);
/**
* Get window state
* @param stateDescriptor State descriptor
* @param <S> State type
* @return Window state
*/
public abstract <S extends State> S windowState(StateDescriptor<S, ?> stateDescriptor);
}
}Time-based window implementation.
/**
* Time window implementation
*/
class TimeWindow extends Window {
/**
* Create time window
* @param start Start timestamp
* @param end End timestamp
*/
public TimeWindow(long start, long end);
/**
* Get window start time
* @return Start timestamp
*/
public long getStart();
/**
* Get window end time
* @return End timestamp
*/
public long getEnd();
/**
* Get maximum timestamp in window
* @return Maximum timestamp
*/
public long maxTimestamp();
/**
* Check if timestamp intersects window
* @param timestamp Timestamp to check
* @return true if intersects
*/
public boolean intersects(TimeWindow other);
/**
* Get window center timestamp
* @return Center timestamp
*/
public long getCenter();
}Context and utility types used by the windowing system.
/**
* Window assigner context
*/
interface WindowAssignerContext {
/**
* Get current processing time
* @return Current processing time
*/
long getCurrentProcessingTime();
}
/**
* Trigger context interface
*/
interface TriggerContext {
/**
* Get current processing time
* @return Current processing time
*/
long getCurrentProcessingTime();
/**
* Get current watermark
* @return Current watermark
*/
long getCurrentWatermark();
/**
* Register processing time timer
* @param time Timer timestamp
*/
void registerProcessingTimeTimer(long time);
/**
* Register event time timer
* @param time Timer timestamp
*/
void registerEventTimeTimer(long time);
/**
* Delete processing time timer
* @param time Timer timestamp
*/
void deleteProcessingTimeTimer(long time);
/**
* Delete event time timer
* @param time Timer timestamp
*/
void deleteEventTimeTimer(long time);
/**
* Get partitioned state
* @param stateDescriptor State descriptor
* @param <S> State type
* @return Partitioned state
*/
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
/**
* Evictor context interface
*/
interface EvictorContext {
/**
* Get current processing time
* @return Current processing time
*/
long getCurrentProcessingTime();
/**
* Get current watermark
* @return Current watermark
*/
long getCurrentWatermark();
}
/**
* Timestamped value wrapper
* @param <T> Value type
*/
class TimestampedValue<T> {
/**
* Create timestamped value
* @param value Value
* @param timestamp Timestamp
*/
public TimestampedValue(T value, long timestamp);
/**
* Get value
* @return Value
*/
public T getValue();
/**
* Get timestamp
* @return Timestamp
*/
public long getTimestamp();
/**
* Check if has timestamp
* @return true if has timestamp
*/
public boolean hasTimestamp();
}
/**
* Delta function interface
* @param <DATA> Data type
*/
interface DeltaFunction<DATA> {
/**
* Calculate delta between two data points
* @param oldDataPoint Old data point
* @param newDataPoint New data point
* @return Delta value
*/
double getDelta(DATA oldDataPoint, DATA newDataPoint);
}
/**
* Session window time gap extractor
* @param <T> Element type
*/
interface SessionWindowTimeGapExtractor<T> {
/**
* Extract session timeout for element
* @param element Element
* @return Session timeout in milliseconds
*/
long extract(T element);
}Base window class for all window types.
/**
* Base window class
*/
abstract class Window {
/**
* Get maximum timestamp that belongs to this window
* @return Maximum timestamp
*/
public abstract long maxTimestamp();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-parent