Apache Flink streaming core library providing fundamental building blocks for scalable stream data processing.
—
DataStream is the core abstraction in Flink representing a stream of data elements. It provides a rich set of transformation operations to process and manipulate streaming data.
The fundamental stream abstraction providing transformation operations.
public class DataStream<T> {
// Basic transformations
public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<T, R> mapper);
public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<T, R> flatMapper);
public SingleOutputStreamOperator<T, ?> filter(FilterFunction<T> filter);
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer);
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder);
public <R extends Tuple> SingleOutputStreamOperator<R, ?> project(int... fieldIndexes);
// Aggregation operations (available on non-keyed streams for single parallelism)
public SingleOutputStreamOperator<T, ?> sum(int positionToSum);
public SingleOutputStreamOperator<T, ?> sum(String field);
public SingleOutputStreamOperator<T, ?> min(int positionToMin);
public SingleOutputStreamOperator<T, ?> min(String field);
public SingleOutputStreamOperator<T, ?> max(int positionToMax);
public SingleOutputStreamOperator<T, ?> max(String field);
public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy);
public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy);
public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first);
public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first);
public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy);
public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy);
public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first);
public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first);
public SingleOutputStreamOperator<Long, ?> count();
// Keyed operations
public GroupedDataStream<T> groupBy(KeySelector<T, ?> key);
public GroupedDataStream<T> groupBy(int... fields);
public GroupedDataStream<T> groupBy(String... fields);
// Stream composition and connectivity
public DataStream<T> union(DataStream<T>... streams);
public <R> ConnectedDataStream<T, R> connect(DataStream<R> dataStream);
public SplitDataStream<T> split(OutputSelector<T> outputSelector);
// Partitioning strategies
public DataStream<T> shuffle();
public DataStream<T> forward();
public DataStream<T> rebalance();
public DataStream<T> global();
public DataStream<T> broadcast();
public DataStream<T> partitionByHash(int... fields);
public DataStream<T> partitionByHash(String... fields);
public DataStream<T> partitionByHash(KeySelector<T, ?> keySelector);
// Temporal operations (cross and join)
public <IN2> StreamCrossOperator<T, IN2> cross(DataStream<IN2> dataStreamToCross);
public <IN2> StreamJoinOperator<T, IN2> join(DataStream<IN2> dataStreamToJoin);
// Iteration support
public IterativeDataStream<T> iterate();
public IterativeDataStream<T> iterate(long maxWaitTimeMillis);
// Windowing operations
public WindowedDataStream<T> window(WindowingHelper policyHelper);
public WindowedDataStream<T> window(TriggerPolicy<T> trigger, EvictionPolicy<T> eviction);
public WindowedDataStream<T> every(WindowingHelper policyHelper);
// Output operations - Console
public DataStreamSink<T> print();
public DataStreamSink<T> printToErr();
// Output operations - File (Text)
public DataStreamSink<T> writeAsText(String path);
public DataStreamSink<T> writeAsText(String path, long millis);
public DataStreamSink<T> writeAsText(String path, WriteMode writeMode);
public DataStreamSink<T> writeAsText(String path, WriteMode writeMode, long millis);
// Output operations - File (CSV)
public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path);
public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, long millis);
public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode);
public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode, long millis);
// Output operations - Network and Generic
public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema);
public DataStreamSink<T> write(OutputFormat<T> format, long millis);
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
// Advanced operations
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator);
// Basic properties and configuration
public Integer getId();
public int getParallelism();
public TypeInformation<T> getType();
public StreamExecutionEnvironment getExecutionEnvironment();
public DataStream<T> copy();
}A DataStream created from a source function.
public class DataStreamSource<T> extends DataStream<T> {
public DataStreamSource<T> setParallelism(int parallelism);
public DataStreamSource<T> name(String name);
}A DataStream that has been partitioned by key for keyed operations.
public class GroupedDataStream<T> {
// Aggregations
public DataStream<T> reduce(ReduceFunction<T> reducer);
public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> folder);
// Field-based aggregations
public DataStream<T> sum(int positionToSum);
public DataStream<T> sum(String field);
public DataStream<T> min(int positionToMin);
public DataStream<T> min(String field);
public DataStream<T> max(int positionToMax);
public DataStream<T> max(String field);
public DataStream<T> minBy(int positionToMinBy);
public DataStream<T> minBy(String field);
public DataStream<T> maxBy(int positionToMaxBy);
public DataStream<T> maxBy(String field);
// Windowing
public WindowedDataStream<T> window(WindowingHelper<T> helper);
public WindowedDataStream<T> every(WindowingHelper<T> helper);
// Configuration
public GroupedDataStream<T> setParallelism(int parallelism);
public GroupedDataStream<T> name(String name);
}Two connected streams that can be processed jointly.
public class ConnectedDataStream<T1, T2> {
// Joint transformations
public <R> DataStream<R> map(CoMapFunction<T1, T2, R> coMapper);
public <R> DataStream<R> flatMap(CoFlatMapFunction<T1, T2, R> coFlatMapper);
public <R> DataStream<R> reduce(CoReduceFunction<T1, T2, R> coReducer);
// Key both streams for keyed operations
public ConnectedDataStream<T1, T2> groupBy(KeySelector<T1, ?> keySelector1, KeySelector<T2, ?> keySelector2);
public ConnectedDataStream<T1, T2> groupBy(int key1, int key2);
public ConnectedDataStream<T1, T2> groupBy(String key1, String key2);
// Configuration
public ConnectedDataStream<T1, T2> setParallelism(int parallelism);
public ConnectedDataStream<T1, T2> name(String name);
}Terminal operation that consumes stream data.
public class DataStreamSink<T> {
// Configuration
public DataStreamSink<T> setParallelism(int parallelism);
public int getParallelism();
public DataStreamSink<T> name(String name);
public DataStreamSink<T> disableChaining();
public DataStreamSink<T> setBufferTimeout(long timeout);
public DataStreamSink<T> slotSharingGroup(String slotSharingGroup);
}DataStream<String> text = env.fromElements("hello world", "how are you");
// Map transformation
DataStream<String> upper = text.map(String::toUpperCase);
// FlatMap transformation
DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String sentence, Collector<String> out) {
for (String word : sentence.split(" ")) {
out.collect(word);
}
}
});
// Filter transformation
DataStream<String> filtered = words.filter(word -> word.length() > 3);DataStream<Tuple2<String, Integer>> counts = words
.map(word -> new Tuple2<>(word, 1))
.groupBy(0) // Group by first field (word)
.sum(1); // Sum second field (count)
// Using KeySelector
DataStream<Tuple2<String, Integer>> counts2 = words
.map(word -> new Tuple2<>(word, 1))
.groupBy(tuple -> tuple.f0) // Group by word
.reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));DataStream<String> stream1 = env.fromElements("a", "b", "c");
DataStream<String> stream2 = env.fromElements("d", "e", "f");
DataStream<String> stream3 = env.fromElements("g", "h", "i");
// Union multiple streams
DataStream<String> unionStream = stream1.union(stream2, stream3);DataStream<String> stream1 = env.fromElements("hello", "world");
DataStream<Integer> stream2 = env.fromElements(1, 2, 3);
ConnectedDataStream<String, Integer> connected = stream1.connect(stream2);
DataStream<String> result = connected.map(new CoMapFunction<String, Integer, String>() {
@Override
public String map1(String value) {
return "String: " + value;
}
@Override
public String map2(Integer value) {
return "Integer: " + value;
}
});DataStream<Tuple2<String, Integer>> data = env.fromElements(
new Tuple2<>("key1", 1), new Tuple2<>("key2", 2));
// Different partitioning strategies
DataStream<Tuple2<String, Integer>> shuffled = data.shuffle();
DataStream<Tuple2<String, Integer>> rebalanced = data.rebalance();
DataStream<Tuple2<String, Integer>> broadcasted = data.broadcast();DataStream<String> processed = words.map(String::toUpperCase);
// Print to standard output
processed.print();
// Write to file
processed.writeAsText("/path/to/output.txt");
// Write as CSV
DataStream<Tuple2<String, Integer>> tuples = processed
.map(word -> new Tuple2<>(word, word.length()));
tuples.writeAsCsv("/path/to/output.csv");
// Custom sink
processed.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
System.out.println("Custom sink: " + value);
}
});DataStream<String> configured = text
.map(String::toUpperCase)
.name("UpperCase Transformation")
.setParallelism(4)
.setBufferTimeout(100)
.disableChaining();// Function interfaces for transformations
public interface MapFunction<T, O> extends Function {
O map(T value) throws Exception;
}
public interface FlatMapFunction<T, O> extends Function {
void flatMap(T value, Collector<O> out) throws Exception;
}
public interface FilterFunction<T> extends Function {
boolean filter(T value) throws Exception;
}
public interface ReduceFunction<T> extends Function {
T reduce(T value1, T value2) throws Exception;
}
public interface FoldFunction<T, O> extends Function {
O fold(O accumulator, T value) throws Exception;
}
public interface KeySelector<IN, KEY> extends Function {
KEY getKey(IN value) throws Exception;
}
// CoFunction interfaces for connected streams
public interface CoMapFunction<IN1, IN2, OUT> extends Function {
OUT map1(IN1 value) throws Exception;
OUT map2(IN2 value) throws Exception;
}
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function {
void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}
public interface CoReduceFunction<IN1, IN2, OUT> extends Function {
OUT reduce1(IN1 value, OUT accumulator) throws Exception;
OUT reduce2(IN2 value, OUT accumulator) throws Exception;
}
// Partitioner interface
public abstract class Partitioner<T> implements Serializable {
public abstract int partition(T key, int numPartitions);
}
// Collector interface
public interface Collector<T> {
void collect(T record);
void close();
}
// Additional types for advanced operations
public class SingleOutputStreamOperator<T, O extends StreamOperator<T>> extends DataStream<T> {
// Stream operator with configuration methods
public SingleOutputStreamOperator<T, O> name(String name);
public SingleOutputStreamOperator<T, O> setParallelism(int parallelism);
public SingleOutputStreamOperator<T, O> setBufferTimeout(long timeoutMillis);
public SingleOutputStreamOperator<T, O> disableChaining();
public SingleOutputStreamOperator<T, O> startNewChain();
public SingleOutputStreamOperator<T, O> slotSharingGroup(String slotSharingGroup);
}
public class SplitDataStream<T> {
// Split stream that can be selected by name
public DataStream<T> select(String... outputNames);
}
public interface OutputSelector<T> extends Serializable {
Iterable<String> select(T value);
}
public class IterativeDataStream<T> extends DataStream<T> {
// Iterative data stream for feedback loops
public DataStream<T> closeWith(DataStream<T> feedbackStream);
}
public class StreamCrossOperator<I1, I2> {
// Cross operation between two streams
public <OUT> DataStream<OUT> with(CrossFunction<I1, I2, OUT> crossFunction);
public StreamCrossOperator<I1, I2> where(KeySelector<I1, ?> keySelector);
public StreamCrossOperator<I1, I2> equalTo(KeySelector<I2, ?> keySelector);
}
public class StreamJoinOperator<I1, I2> {
// Join operation between two streams
public <OUT> DataStream<OUT> with(JoinFunction<I1, I2, OUT> joinFunction);
public StreamJoinOperator<I1, I2> where(KeySelector<I1, ?> keySelector);
public StreamJoinOperator<I1, I2> equalTo(KeySelector<I2, ?> keySelector);
}
// Cross and Join function interfaces
public interface CrossFunction<IN1, IN2, OUT> extends Function {
OUT cross(IN1 first, IN2 second) throws Exception;
}
public interface JoinFunction<IN1, IN2, OUT> extends Function {
OUT join(IN1 first, IN2 second) throws Exception;
}
// Windowing types referenced in API
public abstract class WindowingHelper<T> implements Serializable {
// Base class for windowing helpers
}
public interface TriggerPolicy<T> extends Serializable {
// Trigger policy for windowing
}
public interface EvictionPolicy<T> extends Serializable {
// Eviction policy for windowing
}
// Serialization schema for network output
public interface SerializationSchema<T, S> extends Serializable {
S serialize(T element);
}
// Write mode enum for file outputs
public enum WriteMode {
NO_OVERWRITE,
OVERWRITE
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-core