Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities
—
Next-generation DataStream API with improved type safety, better performance, and enhanced functionality. This experimental API provides a streamlined programming model for stream processing applications with cleaner abstractions and more intuitive operation semantics.
Main entry point for creating DataStream programs with the new v2 API.
/**
* Main entry point for DataStream programs using the new v2 API
*/
interface ExecutionEnvironment {
/**
* Create data stream from source
* @param source Data source
* @param <T> Element type
* @return DataStream with elements from source
*/
<T> DataStream<T> fromSource(Source<T> source);
/**
* Create data stream from collection
* @param collection Collection of elements
* @param <T> Element type
* @return DataStream with elements from collection
*/
<T> DataStream<T> fromCollection(Collection<T> collection);
/**
* Create data stream from elements
* @param elements Varargs elements
* @param <T> Element type
* @return DataStream with provided elements
*/
<T> DataStream<T> fromElements(T... elements);
/**
* Execute the streaming program
* @return Job execution result
*/
CompletableFuture<JobExecutionResult> execute();
/**
* Execute the streaming program with job name
* @param jobName Name of the job
* @return Job execution result
*/
CompletableFuture<JobExecutionResult> execute(String jobName);
}Core stream abstractions in the v2 API providing different partitioning and processing semantics.
/**
* Fundamental data stream interface
* @param <T> Element type
*/
interface DataStream<T> {
/**
* Apply process function to transform elements
* @param processFunction Process function
* @param <OUT> Output type
* @return Transformed stream
*/
<OUT> DataStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);
/**
* Partition stream by key
* @param keySelector Key extraction function
* @param <K> Key type
* @return Keyed partitioned stream
*/
<K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);
/**
* Connect with another stream for dual-input processing
* @param other Other stream
* @param <T2> Other stream element type
* @return Connected streams
*/
<T2> TwoInputConnectedStreams<T, T2> connectWith(DataStream<T2> other);
/**
* Broadcast stream to all parallel instances
* @return Broadcast stream
*/
BroadcastStream<T> broadcast();
/**
* Send all elements to single parallel instance
* @return Global stream
*/
GlobalStream<T> global();
/**
* Add sink to consume stream elements
* @param sink Data sink
* @return Sink transformation
*/
DataStreamSink<T> sinkTo(Sink<T> sink);
}
/**
* Keyed and partitioned stream for stateful operations
* @param <K> Key type
* @param <T> Element type
*/
interface KeyedPartitionStream<K, T> {
/**
* Apply keyed process function
* @param processFunction Keyed process function
* @param <OUT> Output type
* @return Transformed stream
*/
<OUT> DataStream<OUT> process(KeyedProcessFunction<K, T, OUT> processFunction);
/**
* Reduce elements by key using reduce function
* @param reduceFunction Reduce function
* @return Stream with reduced elements
*/
DataStream<T> reduce(ReduceFunction<T> reduceFunction);
/**
* Aggregate elements by key using aggregate function
* @param aggregateFunction Aggregate function
* @param <ACC> Accumulator type
* @param <OUT> Output type
* @return Stream with aggregated elements
*/
<ACC, OUT> DataStream<OUT> aggregate(AggregateFunction<T, ACC, OUT> aggregateFunction);
}
/**
* Non-keyed partitioned stream
* @param <T> Element type
*/
interface NonKeyedPartitionStream<T> {
/**
* Apply process function to stream
* @param processFunction Process function
* @param <OUT> Output type
* @return Transformed stream
*/
<OUT> DataStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);
}
/**
* Broadcast stream that sends elements to all parallel instances
* @param <T> Element type
*/
interface BroadcastStream<T> {
/**
* Connect with keyed stream for broadcast processing
* @param keyedStream Keyed stream
* @param <K> Key type
* @param <KS> Keyed stream element type
* @return Broadcast connected stream
*/
<K, KS> BroadcastConnectedStream<KS, T> connectWith(KeyedPartitionStream<K, KS> keyedStream);
/**
* Connect with non-keyed stream for broadcast processing
* @param stream Non-keyed stream
* @param <S> Stream element type
* @return Broadcast connected stream
*/
<S> BroadcastConnectedStream<S, T> connectWith(NonKeyedPartitionStream<S> stream);
}
/**
* Global stream processed by single parallel instance
* @param <T> Element type
*/
interface GlobalStream<T> {
/**
* Apply process function to global stream
* @param processFunction Process function
* @param <OUT> Output type
* @return Transformed stream
*/
<OUT> DataStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);
}Process function interfaces for the v2 API providing flexible stream processing capabilities.
/**
* Base interface for process functions
*/
interface ProcessFunction {}
/**
* Single input stream processing function
* @param <IN> Input type
* @param <OUT> Output type
*/
interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
/**
* Process single element
* @param element Input element
* @param output Output collector
* @param ctx Runtime context
* @throws Exception
*/
void processElement(IN element, Collector<OUT> output, RuntimeContext ctx) throws Exception;
/**
* Process timer event
* @param timestamp Timer timestamp
* @param output Output collector
* @param ctx Runtime context
* @throws Exception
*/
default void onTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) throws Exception {}
}
/**
* Key-aware process function for keyed streams
* @param <K> Key type
* @param <IN> Input type
* @param <OUT> Output type
*/
interface KeyedProcessFunction<K, IN, OUT> extends ProcessFunction {
/**
* Process element with key context
* @param element Input element
* @param output Output collector
* @param ctx Partitioned context with key access
* @throws Exception
*/
void processElement(IN element, Collector<OUT> output, PartitionedContext<K> ctx) throws Exception;
/**
* Process timer with key context
* @param timestamp Timer timestamp
* @param output Output collector
* @param ctx Partitioned context with key access
* @throws Exception
*/
default void onTimer(long timestamp, Collector<OUT> output, PartitionedContext<K> ctx) throws Exception {}
}
/**
* Dual input stream processing function (non-broadcast)
* @param <IN1> First input type
* @param <IN2> Second input type
* @param <OUT> Output type
*/
interface TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
/**
* Process element from first input
* @param element Element from first input
* @param output Output collector
* @param ctx Runtime context
* @throws Exception
*/
void processElement1(IN1 element, Collector<OUT> output, RuntimeContext ctx) throws Exception;
/**
* Process element from second input
* @param element Element from second input
* @param output Output collector
* @param ctx Runtime context
* @throws Exception
*/
void processElement2(IN2 element, Collector<OUT> output, RuntimeContext ctx) throws Exception;
}
/**
* Multi-output stream processing function
* @param <IN> Input type
* @param <OUT1> First output type
* @param <OUT2> Second output type
*/
interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
/**
* Process element producing multiple outputs
* @param element Input element
* @param output1 First output collector
* @param output2 Second output collector
* @param ctx Runtime context
* @throws Exception
*/
void processElement(IN element, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) throws Exception;
}Context interfaces providing access to runtime information and services in the v2 API.
/**
* Runtime context for accessing runtime information
*/
interface RuntimeContext {
/**
* Get task name
* @return Task name
*/
String getTaskName();
/**
* Get parallelism of current operator
* @return Parallelism
*/
int getParallelism();
/**
* Get index of current parallel subtask
* @return Subtask index
*/
int getIndexOfThisSubtask();
/**
* Get processing time manager
* @return Processing time manager
*/
ProcessingTimeManager getProcessingTimeManager();
/**
* Get state manager
* @return State manager
*/
StateManager getStateManager();
}
/**
* Context for partitioned (keyed) processing
* @param <K> Key type
*/
interface PartitionedContext<K> extends RuntimeContext {
/**
* Get current key
* @return Current processing key
*/
K getCurrentKey();
/**
* Get keyed state manager
* @return Keyed state manager
*/
KeyedStateManager getKeyedStateManager();
}
/**
* Interface for state management
*/
interface StateManager {
/**
* Get value state
* @param descriptor State descriptor
* @param <T> Value type
* @return Value state
*/
<T> ValueState<T> getState(ValueStateDescriptor<T> descriptor);
/**
* Get list state
* @param descriptor State descriptor
* @param <T> Element type
* @return List state
*/
<T> ListState<T> getListState(ListStateDescriptor<T> descriptor);
/**
* Get map state
* @param descriptor State descriptor
* @param <UK> User key type
* @param <UV> User value type
* @return Map state
*/
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> descriptor);
}
/**
* Interface for keyed state management
*/
interface KeyedStateManager extends StateManager {
/**
* Get reducing state
* @param descriptor State descriptor
* @param <T> Element type
* @return Reducing state
*/
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> descriptor);
/**
* Get aggregating state
* @param descriptor State descriptor
* @param <IN> Input type
* @param <ACC> Accumulator type
* @param <OUT> Output type
* @return Aggregating state
*/
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> descriptor);
}
/**
* Interface for processing time management
*/
interface ProcessingTimeManager {
/**
* Get current processing time
* @return Current processing time timestamp
*/
long getCurrentProcessingTime();
/**
* Register processing time timer
* @param timestamp Timer timestamp
*/
void registerTimer(long timestamp);
/**
* Delete processing time timer
* @param timestamp Timer timestamp
*/
void deleteTimer(long timestamp);
}Windowing support for the v2 API through extensions.
/**
* Base window strategy interface
* @param <W> Window type
*/
interface WindowStrategy<W> {
/**
* Assign windows to element
* @param element Element to assign windows to
* @param timestamp Element timestamp
* @return Collection of windows
*/
Collection<W> assignWindows(Object element, long timestamp);
/**
* Get window serializer
* @return Window serializer
*/
TypeSerializer<W> getWindowSerializer();
}
/**
* Tumbling time window strategy
*/
class TumblingTimeWindowStrategy implements WindowStrategy<TimeWindow> {
/**
* Create tumbling time windows with specified size
* @param size Window size
* @return Tumbling window strategy
*/
public static TumblingTimeWindowStrategy of(Duration size);
/**
* Create tumbling time windows with size and offset
* @param size Window size
* @param offset Window offset
* @return Tumbling window strategy
*/
public static TumblingTimeWindowStrategy of(Duration size, Duration offset);
}
/**
* Sliding time window strategy
*/
class SlidingTimeWindowStrategy implements WindowStrategy<TimeWindow> {
/**
* Create sliding time windows with size and slide
* @param size Window size
* @param slide Slide interval
* @return Sliding window strategy
*/
public static SlidingTimeWindowStrategy of(Duration size, Duration slide);
/**
* Create sliding time windows with size, slide, and offset
* @param size Window size
* @param slide Slide interval
* @param offset Window offset
* @return Sliding window strategy
*/
public static SlidingTimeWindowStrategy of(Duration size, Duration slide, Duration offset);
}
/**
* Session window strategy
*/
class SessionWindowStrategy implements WindowStrategy<TimeWindow> {
/**
* Create session windows with inactivity gap
* @param gap Inactivity gap
* @return Session window strategy
*/
public static SessionWindowStrategy withGap(Duration gap);
}
/**
* Time window implementation
*/
class TimeWindow {
/**
* Get window start time
* @return Start timestamp
*/
public long getStart();
/**
* Get window end time
* @return End timestamp
*/
public long getEnd();
/**
* Get window maximum timestamp
* @return Maximum timestamp
*/
public long maxTimestamp();
/**
* Check if window contains timestamp
* @param timestamp Timestamp to check
* @return true if timestamp is in window
*/
public boolean contains(long timestamp);
}Event time processing support through extensions.
/**
* Event time extension for DataStream API v2
*/
class EventTimeExtension {
/**
* Enable event time processing for stream
* @param stream Input stream
* @param <T> Element type
* @return Stream with event time support
*/
public static <T> DataStream<T> withEventTime(DataStream<T> stream);
/**
* Assign timestamps and watermarks
* @param stream Input stream
* @param timestampAssigner Timestamp assigner
* @param <T> Element type
* @return Stream with timestamps and watermarks
*/
public static <T> DataStream<T> assignTimestampsAndWatermarks(
DataStream<T> stream,
TimestampAssigner<T> timestampAssigner
);
}
/**
* Interface for event time management
*/
interface EventTimeManager {
/**
* Get current event time
* @return Current event time
*/
long getCurrentEventTime();
/**
* Register event time timer
* @param timestamp Timer timestamp
*/
void registerTimer(long timestamp);
/**
* Delete event time timer
* @param timestamp Timer timestamp
*/
void deleteTimer(long timestamp);
}
/**
* Interface for assigning timestamps to elements
* @param <T> Element type
*/
interface TimestampAssigner<T> {
/**
* Extract timestamp from element
* @param element Element
* @param recordTimestamp Record timestamp
* @return Element timestamp
*/
long extractTimestamp(T element, long recordTimestamp);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-parent