Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities
—
KeyedStream represents a DataStream partitioned by key, enabling stateful operations and guarantees that all elements with the same key are processed by the same parallel instance. This enables Flink to maintain state per key and provide exactly-once processing guarantees.
Perform reduction operations that maintain state across elements with the same key.
/**
* Reduce elements by key using a ReduceFunction
* @param reducer - function to combine two elements of the same type
* @return reduced DataStream
*/
DataStream<T> reduce(ReduceFunction<T> reducer);Usage Examples:
DataStream<Tuple2<String, Integer>> input = env.fromElements(
Tuple2.of("a", 1), Tuple2.of("b", 2), Tuple2.of("a", 3), Tuple2.of("b", 4)
);
KeyedStream<Tuple2<String, Integer>, String> keyed = input.keyBy(value -> value.f0);
// Reduce - sum the integer values for each key
DataStream<Tuple2<String, Integer>> reduced = keyed.reduce(
new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(
Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2
) {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}
);
// Using lambda
DataStream<Tuple2<String, Integer>> reduced = keyed.reduce(
(value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1)
);Pre-defined aggregation operations for common use cases.
/**
* Sum by field position (for Tuple types)
* @param positionToSum - position of the field to sum
* @return summed DataStream
*/
DataStream<T> sum(int positionToSum);
/**
* Sum by field name (for POJO types)
* @param field - name of the field to sum
* @return summed DataStream
*/
DataStream<T> sum(String field);
/**
* Get minimum by field position
* @param positionToMin - position of the field to find minimum
* @return DataStream with minimum values
*/
DataStream<T> min(int positionToMin);
/**
* Get minimum by field name
* @param field - name of the field to find minimum
* @return DataStream with minimum values
*/
DataStream<T> min(String field);
/**
* Get maximum by field position
* @param positionToMax - position of the field to find maximum
* @return DataStream with maximum values
*/
DataStream<T> max(int positionToMax);
/**
* Get maximum by field name
* @param field - name of the field to find maximum
* @return DataStream with maximum values
*/
DataStream<T> max(String field);
/**
* Get element with minimum value by field position
* @param positionToMinBy - position of the field to compare
* @return DataStream with elements having minimum field values
*/
DataStream<T> minBy(int positionToMinBy);
/**
* Get element with minimum value by field position with tie-breaking
* @param positionToMinBy - position of the field to compare
* @param first - if true, return first element in case of tie; if false, return last
* @return DataStream with elements having minimum field values
*/
DataStream<T> minBy(int positionToMinBy, boolean first);
/**
* Get element with minimum value by field name
* @param field - name of the field to compare
* @return DataStream with elements having minimum field values
*/
DataStream<T> minBy(String field);
/**
* Get element with minimum value by field name with tie-breaking
* @param field - name of the field to compare
* @param first - if true, return first element in case of tie; if false, return last
* @return DataStream with elements having minimum field values
*/
DataStream<T> minBy(String field, boolean first);
/**
* Get element with maximum value by field position
* @param positionToMaxBy - position of the field to compare
* @return DataStream with elements having maximum field values
*/
DataStream<T> maxBy(int positionToMaxBy);
/**
* Get element with maximum value by field position with tie-breaking
* @param positionToMaxBy - position of the field to compare
* @param first - if true, return first element in case of tie; if false, return last
* @return DataStream with elements having maximum field values
*/
DataStream<T> maxBy(int positionToMaxBy, boolean first);
/**
* Get element with maximum value by field name
* @param field - name of the field to compare
* @return DataStream with elements having maximum field values
*/
DataStream<T> maxBy(String field);
/**
* Get element with maximum value by field name with tie-breaking
* @param field - name of the field to compare
* @param first - if true, return first element in case of tie; if false, return last
* @return DataStream with elements having maximum field values
*/
DataStream<T> maxBy(String field, boolean first);Usage Examples:
DataStream<Tuple3<String, Integer, Double>> sales = env.fromElements(
Tuple3.of("product1", 10, 25.50),
Tuple3.of("product1", 15, 30.00),
Tuple3.of("product2", 8, 15.75)
);
KeyedStream<Tuple3<String, Integer, Double>, String> keyedSales =
sales.keyBy(value -> value.f0);
// Sum quantities (field position 1)
DataStream<Tuple3<String, Integer, Double>> totalQuantity = keyedSales.sum(1);
// Sum prices (field position 2)
DataStream<Tuple3<String, Integer, Double>> totalPrice = keyedSales.sum(2);
// Min quantity
DataStream<Tuple3<String, Integer, Double>> minQuantity = keyedSales.min(1);
// Max price
DataStream<Tuple3<String, Integer, Double>> maxPrice = keyedSales.max(2);
// Element with minimum quantity
DataStream<Tuple3<String, Integer, Double>> minQuantityElement = keyedSales.minBy(1);
// Element with maximum price
DataStream<Tuple3<String, Integer, Double>> maxPriceElement = keyedSales.maxBy(2);Use KeyedProcessFunction for complex stateful processing with access to timers and state.
/**
* Apply a KeyedProcessFunction for complex keyed stream processing
* @param keyedProcessFunction - the keyed process function
* @return processed DataStream
*/
<R> DataStream<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction);
/**
* Apply a KeyedProcessFunction with type information
* @param keyedProcessFunction - the keyed process function
* @param outputType - type information for output
* @return processed DataStream
*/
<R> DataStream<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType
);Usage Examples:
DataStream<Tuple2<String, Long>> events = env.fromElements(
Tuple2.of("user1", 1000L),
Tuple2.of("user1", 2000L),
Tuple2.of("user2", 1500L)
);
KeyedStream<Tuple2<String, Long>, String> keyedEvents =
events.keyBy(value -> value.f0);
// Complex stateful processing
DataStream<String> alerts = keyedEvents.process(
new KeyedProcessFunction<String, Tuple2<String, Long>, String>() {
private ValueState<Long> lastEventTime;
private ValueState<Integer> eventCount;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> timeDescriptor =
new ValueStateDescriptor<>("lastEventTime", Long.class);
lastEventTime = getRuntimeContext().getState(timeDescriptor);
ValueStateDescriptor<Integer> countDescriptor =
new ValueStateDescriptor<>("eventCount", Integer.class);
eventCount = getRuntimeContext().getState(countDescriptor);
}
@Override
public void processElement(
Tuple2<String, Long> value,
Context ctx,
Collector<String> out
) throws Exception {
Long lastTime = lastEventTime.value();
Integer count = eventCount.value();
if (count == null) count = 0;
count++;
eventCount.update(count);
if (lastTime != null && value.f1 - lastTime < 1000) {
out.collect("Rapid events detected for user: " + value.f0);
}
lastEventTime.update(value.f1);
// Register timer to clear state after 1 hour of inactivity
ctx.timerService().registerEventTimeTimer(value.f1 + 3600000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
// Clear state on timer
lastEventTime.clear();
eventCount.clear();
}
}
);Apply windowing operations to keyed streams for time-based or count-based grouping.
/**
* Create count-based tumbling window
* @param size - number of elements in each window
* @return windowed stream
*/
WindowedStream<T, KEY, GlobalWindow> countWindow(long size);
/**
* Create count-based sliding window
* @param size - number of elements in each window
* @param slide - number of elements between window starts
* @return windowed stream
*/
WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide);
/**
* Create time-based tumbling window
* @param size - size of the time window
* @return windowed stream
*/
WindowedStream<T, KEY, TimeWindow> timeWindow(Time size);
/**
* Create time-based sliding window
* @param size - size of the time window
* @param slide - slide interval for the window
* @return windowed stream
*/
WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide);
/**
* Create custom window
* @param assigner - window assigner to use
* @return windowed stream
*/
<W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner);Usage Examples:
KeyedStream<Tuple2<String, Integer>, String> keyedStream =
input.keyBy(value -> value.f0);
// Count window - every 10 elements
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow =
keyedStream.countWindow(10);
// Sliding count window - window of 10, sliding by 5
WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> slidingCountWindow =
keyedStream.countWindow(10, 5);
// Time window - every 5 minutes
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> timeWindow =
keyedStream.timeWindow(Time.minutes(5));
// Sliding time window - 10 minutes window, sliding every 2 minutes
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> slidingTimeWindow =
keyedStream.timeWindow(Time.minutes(10), Time.minutes(2));
// Session window
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> sessionWindow =
keyedStream.window(EventTimeSessionWindows.withGap(Time.minutes(30)));Access and manage keyed state for complex stateful computations.
// State is accessed within RichFunction implementations
// ValueState - single value per key
ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor);
// ListState - list of values per key
ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);
// MapState - map of values per key
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor);
// ReducingState - single value per key with ReduceFunction
ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor);
// AggregatingState - single value per key with AggregateFunction
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor);Usage Examples:
public class StatefulProcessor extends KeyedProcessFunction<String, Event, Alert> {
private ValueState<Integer> countState;
private ListState<Long> timestampState;
private MapState<String, Integer> mapState;
@Override
public void open(Configuration parameters) {
// Value state
ValueStateDescriptor<Integer> countDescriptor =
new ValueStateDescriptor<>("count", Integer.class);
countState = getRuntimeContext().getState(countDescriptor);
// List state
ListStateDescriptor<Long> timestampDescriptor =
new ListStateDescriptor<>("timestamps", Long.class);
timestampState = getRuntimeContext().getListState(timestampDescriptor);
// Map state
MapStateDescriptor<String, Integer> mapDescriptor =
new MapStateDescriptor<>("eventMap", String.class, Integer.class);
mapState = getRuntimeContext().getMapState(mapDescriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<Alert> out) throws Exception {
// Access value state
Integer count = countState.value();
if (count == null) count = 0;
countState.update(count + 1);
// Access list state
timestampState.add(event.getTimestamp());
// Access map state
Integer eventTypeCount = mapState.get(event.getType());
if (eventTypeCount == null) eventTypeCount = 0;
mapState.put(event.getType(), eventTypeCount + 1);
// Clear state if needed
if (count > 100) {
countState.clear();
timestampState.clear();
mapState.clear();
}
}
}Make keyed state queryable from external applications.
/**
* Make the keyed state queryable with a given name
* @param queryableStateName - name for the queryable state
* @return queryable DataStream
*/
DataStream<T> asQueryableState(String queryableStateName);
/**
* Make the keyed state queryable with custom ValueStateDescriptor
* @param queryableStateName - name for the queryable state
* @param stateDescriptor - descriptor for the state
* @return queryable DataStream
*/
DataStream<T> asQueryableState(String queryableStateName, ValueStateDescriptor<T> stateDescriptor);Usage Examples:
KeyedStream<Tuple2<String, Integer>, String> keyedStream =
input.keyBy(value -> value.f0);
// Make current stream queryable
DataStream<Tuple2<String, Integer>> queryableStream =
keyedStream.asQueryableState("my-query-name");
// Make state queryable with custom descriptor
ValueStateDescriptor<Integer> stateDescriptor =
new ValueStateDescriptor<>("queryable-state", Integer.class);
DataStream<Tuple2<String, Integer>> customQueryableStream =
keyedStream.asQueryableState("custom-query", stateDescriptor);// Reduction function
interface ReduceFunction<T> extends Function {
T reduce(T value1, T value2) throws Exception;
}
// Aggregation function
interface AggregateFunction<IN, ACC, OUT> extends Function {
ACC createAccumulator();
ACC add(IN value, ACC accumulator);
OUT getResult(ACC accumulator);
ACC merge(ACC a, ACC b);
}
// Keyed process function
abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
// Context interfaces
abstract class Context {
abstract Long timestamp();
abstract TimerService timerService();
abstract <X> void output(OutputTag<X> outputTag, X value);
abstract K getCurrentKey();
}
abstract class OnTimerContext extends Context {
abstract TimeDomain timeDomain();
}
}// Value state - single value per key
interface ValueState<T> extends State {
T value() throws Exception;
void update(T value) throws Exception;
}
// List state - list of values per key
interface ListState<T> extends MergingState<T, Iterable<T>> {
void add(T value) throws Exception;
void addAll(List<T> values) throws Exception;
Iterable<T> get() throws Exception;
void update(List<T> values) throws Exception;
}
// Map state - map of values per key
interface MapState<UK, UV> extends State {
UV get(UK key) throws Exception;
void put(UK key, UV value) throws Exception;
void putAll(Map<UK, UV> map) throws Exception;
void remove(UK key) throws Exception;
boolean contains(UK key) throws Exception;
Iterable<Map.Entry<UK, UV>> entries() throws Exception;
Iterable<UK> keys() throws Exception;
Iterable<UV> values() throws Exception;
}
// Reducing state - single value with reduce function
interface ReducingState<T> extends MergingState<T, T> {
T get() throws Exception;
void add(T value) throws Exception;
}
// Aggregating state - single value with aggregate function
interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {
OUT get() throws Exception;
void add(IN value) throws Exception;
}// State descriptors for creating state
class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
public ValueStateDescriptor(String name, Class<T> typeClass);
public ValueStateDescriptor(String name, TypeInformation<T> typeInfo);
public ValueStateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue);
}
class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T>> {
public ListStateDescriptor(String name, Class<T> elementTypeClass);
public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo);
}
class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
public MapStateDescriptor(String name, Class<UK> userKeyTypeClass, Class<UV> userValueTypeClass);
public MapStateDescriptor(String name, TypeInformation<UK> userKeyTypeInfo, TypeInformation<UV> userValueTypeInfo);
}
class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> {
public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass);
public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo);
}
class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, OUT> {
public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, Class<ACC> stateType);
public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, TypeInformation<ACC> stateType);
}// Timer service for time-based processing
interface TimerService {
long currentProcessingTime();
long currentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
}
enum TimeDomain {
EVENT_TIME, // Event time timers
PROCESSING_TIME // Processing time timers
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11