Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities
—
Traditional DataStream API providing comprehensive stream processing capabilities with windowing, state management, and complex event processing features. This is the stable, production-ready API used in most Flink applications.
Main entry point for traditional DataStream programs.
/**
* Main entry point for streaming programs
*/
class StreamExecutionEnvironment {
/**
* Get execution environment with default configuration
* @return Execution environment
*/
public static StreamExecutionEnvironment getExecutionEnvironment();
/**
* Create local execution environment
* @return Local execution environment
*/
public static StreamExecutionEnvironment createLocalEnvironment();
/**
* Add source function to create data stream
* @param function Source function
* @param <T> Element type
* @return DataStream with elements from source
*/
public <T> DataStream<T> addSource(SourceFunction<T> function);
/**
* Create data stream from collection
* @param data Collection of elements
* @param <T> Element type
* @return DataStream with elements
*/
public <T> DataStream<T> fromCollection(Collection<T> data);
/**
* Create data stream from elements
* @param data Varargs elements
* @param <T> Element type
* @return DataStream with elements
*/
public <T> DataStream<T> fromElements(@SuppressWarnings("unchecked") T... data);
/**
* Execute the streaming program
* @return Job execution result
* @throws Exception
*/
public JobExecutionResult execute() throws Exception;
/**
* Execute with job name
* @param jobName Job name
* @return Job execution result
* @throws Exception
*/
public JobExecutionResult execute(String jobName) throws Exception;
/**
* Set parallelism for operations
* @param parallelism Parallelism level
*/
public void setParallelism(int parallelism);
/**
* Enable checkpointing
* @param interval Checkpoint interval in milliseconds
*/
public void enableCheckpointing(long interval);
}Core stream transformation operations.
/**
* Core stream abstraction for traditional API
* @param <T> Element type
*/
class DataStream<T> {
/**
* Apply map transformation
* @param mapper Map function
* @param <R> Result type
* @return Transformed stream
*/
public <R> DataStream<R> map(MapFunction<T, R> mapper);
/**
* Apply flatMap transformation
* @param flatMapper FlatMap function
* @param <R> Result type
* @return Transformed stream
*/
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper);
/**
* Filter elements
* @param filter Filter function
* @return Filtered stream
*/
public DataStream<T> filter(FilterFunction<T> filter);
/**
* Partition stream by key
* @param keySelector Key selector function
* @param <K> Key type
* @return Keyed stream
*/
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> keySelector);
/**
* Connect with another stream
* @param dataStream Other stream
* @param <T2> Other stream type
* @return Connected streams
*/
public <T2> ConnectedStreams<T, T2> connect(DataStream<T2> dataStream);
/**
* Union with other streams
* @param streams Other streams
* @return Union of streams
*/
public DataStream<T> union(DataStream<T>... streams);
/**
* Apply process function
* @param processFunction Process function
* @param <R> Result type
* @return Processed stream
*/
public <R> DataStream<R> process(ProcessFunction<T, R> processFunction);
/**
* Add sink
* @param sinkFunction Sink function
* @return Data stream sink
*/
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
/**
* Print elements to output
* @return Data stream sink
*/
public DataStreamSink<T> print();
}Operations available on keyed streams for stateful processing.
/**
* Keyed stream for stateful operations
* @param <T> Element type
* @param <K> Key type
*/
class KeyedStream<T, K> {
/**
* Reduce elements by key
* @param reducer Reduce function
* @return Stream with reduced elements
*/
public DataStream<T> reduce(ReduceFunction<T> reducer);
/**
* Apply process function with key context
* @param keyedProcessFunction Keyed process function
* @param <R> Result type
* @return Processed stream
*/
public <R> DataStream<R> process(KeyedProcessFunction<K, T, R> keyedProcessFunction);
/**
* Create windowed stream
* @param assigner Window assigner
* @param <W> Window type
* @return Windowed stream
*/
public <W extends Window> WindowedStream<T, K, W> window(WindowAssigner<? super T, W> assigner);
/**
* Create time windowed stream
* @param assigner Time window assigner
* @return Time windowed stream
*/
public WindowedStream<T, K, TimeWindow> timeWindow(Time size);
/**
* Create sliding time windowed stream
* @param size Window size
* @param slide Slide interval
* @return Time windowed stream
*/
public WindowedStream<T, K, TimeWindow> timeWindow(Time size, Time slide);
}Operations on windowed streams.
/**
* Windowed stream operations
* @param <T> Element type
* @param <K> Key type
* @param <W> Window type
*/
class WindowedStream<T, K, W extends Window> {
/**
* Reduce elements within windows
* @param function Reduce function
* @return Stream with window results
*/
public DataStream<T> reduce(ReduceFunction<T> function);
/**
* Aggregate elements within windows
* @param function Aggregate function
* @param <ACC> Accumulator type
* @param <R> Result type
* @return Stream with aggregated results
*/
public <ACC, R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> function);
/**
* Apply window function
* @param function Window function
* @param <R> Result type
* @return Stream with window results
*/
public <R> DataStream<R> apply(WindowFunction<T, R, K, W> function);
/**
* Apply process window function
* @param function Process window function
* @param <R> Result type
* @return Stream with processed results
*/
public <R> DataStream<R> process(ProcessWindowFunction<T, R, K, W> function);
/**
* Set window trigger
* @param trigger Window trigger
* @return Windowed stream with trigger
*/
public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger);
/**
* Set window evictor
* @param evictor Window evictor
* @return Windowed stream with evictor
*/
public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor);
}Process function interfaces for custom processing logic.
/**
* Process function for single input streams
* @param <I> Input type
* @param <O> Output type
*/
abstract class ProcessFunction<I, O> {
/**
* Process element
* @param value Input element
* @param ctx Process context
* @param out Output collector
* @throws Exception
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* Process timer
* @param timestamp Timer timestamp
* @param ctx OnTimer context
* @param out Output collector
* @throws Exception
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* Process context
*/
public abstract class Context {
/**
* Get current element timestamp
* @return Element timestamp
*/
public abstract Long timestamp();
/**
* Register processing time timer
* @param timestamp Timer timestamp
*/
public abstract void timerService().registerProcessingTimeTimer(long timestamp);
/**
* Output to side output
* @param outputTag Output tag
* @param value Value to output
* @param <X> Value type
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
/**
* Keyed process function
* @param <K> Key type
* @param <I> Input type
* @param <O> Output type
*/
abstract class KeyedProcessFunction<K, I, O> {
/**
* Process element with key context
* @param value Input element
* @param ctx Keyed context
* @param out Output collector
* @throws Exception
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* Process timer with key context
* @param timestamp Timer timestamp
* @param ctx OnTimer context
* @param out Output collector
* @throws Exception
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* Keyed process context
*/
public abstract class Context {
/**
* Get current key
* @return Current key
*/
public abstract K getCurrentKey();
/**
* Get timer service
* @return Timer service
*/
public abstract TimerService timerService();
}
}Supporting types and interfaces for the traditional DataStream API.
/**
* Source function interface for creating data streams
* @param <T> Element type
*/
interface SourceFunction<T> {
/**
* Run source function
* @param ctx Source context
* @throws Exception
*/
void run(SourceContext<T> ctx) throws Exception;
/**
* Cancel source function
*/
void cancel();
/**
* Source context for emitting elements
* @param <T> Element type
*/
interface SourceContext<T> {
/**
* Collect element
* @param element Element to collect
*/
void collect(T element);
/**
* Collect element with timestamp
* @param element Element to collect
* @param timestamp Element timestamp
*/
void collectWithTimestamp(T element, long timestamp);
/**
* Emit watermark
* @param mark Watermark
*/
void emitWatermark(Watermark mark);
/**
* Mark source as temporarily idle
*/
void markAsTemporarilyIdle();
/**
* Get checkpoint lock
* @return Checkpoint lock
*/
Object getCheckpointLock();
/**
* Close the source context
*/
void close();
}
}
/**
* Sink function interface for consuming data streams
* @param <IN> Input element type
*/
interface SinkFunction<IN> {
/**
* Invoke sink function with element
* @param value Input element
* @param context Sink context
* @throws Exception
*/
void invoke(IN value, Context context) throws Exception;
/**
* Sink context
*/
interface Context {
/**
* Get current processing time
* @return Current processing time
*/
long currentProcessingTime();
/**
* Get current watermark
* @return Current watermark
*/
long currentWatermark();
/**
* Get element timestamp
* @return Element timestamp
*/
Long timestamp();
}
}
/**
* Data stream sink
* @param <T> Element type
*/
class DataStreamSink<T> {
/**
* Set sink parallelism
* @param parallelism Parallelism level
* @return Data stream sink
*/
public DataStreamSink<T> setParallelism(int parallelism);
/**
* Disable chaining for this sink
* @return Data stream sink
*/
public DataStreamSink<T> disableChaining();
/**
* Set slot sharing group
* @param slotSharingGroup Slot sharing group
* @return Data stream sink
*/
public DataStreamSink<T> slotSharingGroup(String slotSharingGroup);
/**
* Set sink name
* @param name Sink name
* @return Data stream sink
*/
public DataStreamSink<T> name(String name);
}
/**
* Job execution result
*/
class JobExecutionResult {
/**
* Get job execution time
* @return Execution time in milliseconds
*/
public long getNetRuntime();
/**
* Get accumulator result
* @param accumulatorName Accumulator name
* @param <T> Result type
* @return Accumulator result
*/
public <T> T getAccumulatorResult(String accumulatorName);
/**
* Get all accumulator results
* @return Map of accumulator results
*/
public Map<String, Object> getAllAccumulatorResults();
}
/**
* Timer service for managing timers
*/
interface TimerService {
/**
* Get current processing time
* @return Current processing time
*/
long currentProcessingTime();
/**
* Get current watermark
* @return Current watermark
*/
long currentWatermark();
/**
* Register processing time timer
* @param timestamp Timer timestamp
*/
void registerProcessingTimeTimer(long timestamp);
/**
* Register event time timer
* @param timestamp Timer timestamp
*/
void registerEventTimeTimer(long timestamp);
/**
* Delete processing time timer
* @param timestamp Timer timestamp
*/
void deleteProcessingTimeTimer(long timestamp);
/**
* Delete event time timer
* @param timestamp Timer timestamp
*/
void deleteEventTimeTimer(long timestamp);
}
/**
* Connected streams for processing two input streams
* @param <IN1> First input type
* @param <IN2> Second input type
*/
class ConnectedStreams<IN1, IN2> {
/**
* Apply co-map function
* @param coMapper Co-map function
* @param <R> Result type
* @return Data stream
*/
public <R> DataStream<R> map(CoMapFunction<IN1, IN2, R> coMapper);
/**
* Apply co-flat-map function
* @param coFlatMapper Co-flat-map function
* @param <R> Result type
* @return Data stream
*/
public <R> DataStream<R> flatMap(CoFlatMapFunction<IN1, IN2, R> coFlatMapper);
/**
* Apply co-process function
* @param coProcessFunction Co-process function
* @param <R> Result type
* @return Data stream
*/
public <R> DataStream<R> process(CoProcessFunction<IN1, IN2, R> coProcessFunction);
}Additional function interfaces for stream processing.
/**
* Co-map function for connected streams
* @param <IN1> First input type
* @param <IN2> Second input type
* @param <OUT> Output type
*/
interface CoMapFunction<IN1, IN2, OUT> extends Function {
/**
* Map function for first input
* @param value Value from first input
* @return Mapped value
* @throws Exception
*/
OUT map1(IN1 value) throws Exception;
/**
* Map function for second input
* @param value Value from second input
* @return Mapped value
* @throws Exception
*/
OUT map2(IN2 value) throws Exception;
}
/**
* Co-flat-map function for connected streams
* @param <IN1> First input type
* @param <IN2> Second input type
* @param <OUT> Output type
*/
interface CoFlatMapFunction<IN1, IN2, OUT> extends Function {
/**
* Flat-map function for first input
* @param value Value from first input
* @param out Output collector
* @throws Exception
*/
void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
/**
* Flat-map function for second input
* @param value Value from second input
* @param out Output collector
* @throws Exception
*/
void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}
/**
* Co-process function for connected streams
* @param <IN1> First input type
* @param <IN2> Second input type
* @param <OUT> Output type
*/
abstract class CoProcessFunction<IN1, IN2, OUT> {
/**
* Process element from first input
* @param value Element from first input
* @param ctx Process context
* @param out Output collector
* @throws Exception
*/
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
/**
* Process element from second input
* @param value Element from second input
* @param ctx Process context
* @param out Output collector
* @throws Exception
*/
public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
/**
* Process timer
* @param timestamp Timer timestamp
* @param ctx OnTimer context
* @param out Output collector
* @throws Exception
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
/**
* Process context
*/
public abstract class Context {
/**
* Get timer service
* @return Timer service
*/
public abstract TimerService timerService();
/**
* Get current timestamp
* @return Current timestamp
*/
public abstract Long timestamp();
/**
* Output to side output
* @param outputTag Output tag
* @param value Value to output
* @param <X> Value type
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
/**
* OnTimer context
*/
public abstract class OnTimerContext extends Context {
/**
* Get timer timestamp
* @return Timer timestamp
*/
public abstract Long timestamp();
/**
* Get timer domain
* @return Timer domain
*/
public abstract TimeDomain timeDomain();
}
}
/**
* Time domain enumeration
*/
enum TimeDomain {
/** Event time domain */
EVENT_TIME,
/** Processing time domain */
PROCESSING_TIME
}Support for asynchronous I/O operations in stream processing.
/**
* Function interface for asynchronous operations
* @param <IN> Input type
* @param <OUT> Output type
*/
interface AsyncFunction<IN, OUT> extends Function {
/**
* Trigger async operation for input element
* @param input Input element
* @param resultFuture Future to complete with results
* @throws Exception
*/
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
/**
* Handle timeout for async operation (optional)
* @param input Input element that timed out
* @param resultFuture Future to complete with results
* @throws Exception
*/
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
resultFuture.completeExceptionally(
new TimeoutException("Async operation timed out"));
}
}
/**
* Rich async function with runtime context access
* @param <IN> Input type
* @param <OUT> Output type
*/
abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {}
/**
* Utility class for applying async operations to data streams
*/
class AsyncDataStream {
/**
* Apply async function to stream with unordered output
* @param input Input stream
* @param function Async function
* @param timeout Operation timeout
* @param timeUnit Timeout time unit
* @param capacity Maximum number of concurrent async operations
* @param <IN> Input type
* @param <OUT> Output type
* @return Stream with async results
*/
public static <IN, OUT> DataStream<OUT> unorderedWait(
DataStream<IN> input,
AsyncFunction<IN, OUT> function,
long timeout,
TimeUnit timeUnit,
int capacity);
/**
* Apply async function to stream with unordered output (default capacity)
* @param input Input stream
* @param function Async function
* @param timeout Operation timeout
* @param timeUnit Timeout time unit
* @param <IN> Input type
* @param <OUT> Output type
* @return Stream with async results
*/
public static <IN, OUT> DataStream<OUT> unorderedWait(
DataStream<IN> input,
AsyncFunction<IN, OUT> function,
long timeout,
TimeUnit timeUnit);
/**
* Apply async function to stream with ordered output
* @param input Input stream
* @param function Async function
* @param timeout Operation timeout
* @param timeUnit Timeout time unit
* @param capacity Maximum number of concurrent async operations
* @param <IN> Input type
* @param <OUT> Output type
* @return Stream with async results (order preserved)
*/
public static <IN, OUT> DataStream<OUT> orderedWait(
DataStream<IN> input,
AsyncFunction<IN, OUT> function,
long timeout,
TimeUnit timeUnit,
int capacity);
/**
* Apply async function to stream with ordered output (default capacity)
* @param input Input stream
* @param function Async function
* @param timeout Operation timeout
* @param timeUnit Timeout time unit
* @param <IN> Input type
* @param <OUT> Output type
* @return Stream with async results (order preserved)
*/
public static <IN, OUT> DataStream<OUT> orderedWait(
DataStream<IN> input,
AsyncFunction<IN, OUT> function,
long timeout,
TimeUnit timeUnit);
}
/**
* Future for collecting async operation results
* @param <OUT> Result type
*/
interface ResultFuture<OUT> {
/**
* Complete future with single result
* @param result Result value
*/
void complete(Collection<OUT> result);
/**
* Complete future with single result
* @param result Result value
*/
default void complete(OUT result) {
complete(Collections.singletonList(result));
}
/**
* Complete future exceptionally
* @param error Exception
*/
void completeExceptionally(Throwable error);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-parent