CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-core

Apache Flink streaming core library providing fundamental building blocks for scalable stream data processing.

Pending
Overview
Eval results
Files

datastream-operations.mddocs/

DataStream Operations

DataStream is the core abstraction in Flink representing a stream of data elements. It provides a rich set of transformation operations to process and manipulate streaming data.

DataStream<T>

The fundamental stream abstraction providing transformation operations.

public class DataStream<T> {
    // Basic transformations
    public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<T, R> mapper);
    public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<T, R> flatMapper);
    public SingleOutputStreamOperator<T, ?> filter(FilterFunction<T> filter);
    public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer);
    public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder);
    public <R extends Tuple> SingleOutputStreamOperator<R, ?> project(int... fieldIndexes);
    
    // Aggregation operations (available on non-keyed streams for single parallelism)
    public SingleOutputStreamOperator<T, ?> sum(int positionToSum);
    public SingleOutputStreamOperator<T, ?> sum(String field);
    public SingleOutputStreamOperator<T, ?> min(int positionToMin);
    public SingleOutputStreamOperator<T, ?> min(String field);
    public SingleOutputStreamOperator<T, ?> max(int positionToMax);
    public SingleOutputStreamOperator<T, ?> max(String field);
    public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy);
    public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy);
    public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first);
    public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first);
    public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy);
    public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy);
    public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first);
    public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first);
    public SingleOutputStreamOperator<Long, ?> count();
    
    // Keyed operations
    public GroupedDataStream<T> groupBy(KeySelector<T, ?> key);
    public GroupedDataStream<T> groupBy(int... fields);
    public GroupedDataStream<T> groupBy(String... fields);
    
    // Stream composition and connectivity
    public DataStream<T> union(DataStream<T>... streams);
    public <R> ConnectedDataStream<T, R> connect(DataStream<R> dataStream);
    public SplitDataStream<T> split(OutputSelector<T> outputSelector);
    
    // Partitioning strategies
    public DataStream<T> shuffle();
    public DataStream<T> forward();
    public DataStream<T> rebalance();
    public DataStream<T> global();
    public DataStream<T> broadcast();
    public DataStream<T> partitionByHash(int... fields);
    public DataStream<T> partitionByHash(String... fields);
    public DataStream<T> partitionByHash(KeySelector<T, ?> keySelector);
    
    // Temporal operations (cross and join)
    public <IN2> StreamCrossOperator<T, IN2> cross(DataStream<IN2> dataStreamToCross);
    public <IN2> StreamJoinOperator<T, IN2> join(DataStream<IN2> dataStreamToJoin);
    
    // Iteration support
    public IterativeDataStream<T> iterate();
    public IterativeDataStream<T> iterate(long maxWaitTimeMillis);
    
    // Windowing operations
    public WindowedDataStream<T> window(WindowingHelper policyHelper);
    public WindowedDataStream<T> window(TriggerPolicy<T> trigger, EvictionPolicy<T> eviction);
    public WindowedDataStream<T> every(WindowingHelper policyHelper);
    
    // Output operations - Console
    public DataStreamSink<T> print();
    public DataStreamSink<T> printToErr();
    
    // Output operations - File (Text)
    public DataStreamSink<T> writeAsText(String path);
    public DataStreamSink<T> writeAsText(String path, long millis);
    public DataStreamSink<T> writeAsText(String path, WriteMode writeMode);
    public DataStreamSink<T> writeAsText(String path, WriteMode writeMode, long millis);
    
    // Output operations - File (CSV)
    public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path);
    public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, long millis);
    public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode);
    public <X extends Tuple> DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode, long millis);
    
    // Output operations - Network and Generic
    public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema);
    public DataStreamSink<T> write(OutputFormat<T> format, long millis);
    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
    
    // Advanced operations
    public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator);
    
    // Basic properties and configuration
    public Integer getId();
    public int getParallelism();
    public TypeInformation<T> getType();
    public StreamExecutionEnvironment getExecutionEnvironment();
    public DataStream<T> copy();
}

DataStreamSource<T>

A DataStream created from a source function.

public class DataStreamSource<T> extends DataStream<T> {
    public DataStreamSource<T> setParallelism(int parallelism);
    public DataStreamSource<T> name(String name);
}

GroupedDataStream<T>

A DataStream that has been partitioned by key for keyed operations.

public class GroupedDataStream<T> {
    // Aggregations
    public DataStream<T> reduce(ReduceFunction<T> reducer);
    public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> folder);
    
    // Field-based aggregations  
    public DataStream<T> sum(int positionToSum);
    public DataStream<T> sum(String field);
    public DataStream<T> min(int positionToMin);
    public DataStream<T> min(String field);
    public DataStream<T> max(int positionToMax);
    public DataStream<T> max(String field);
    public DataStream<T> minBy(int positionToMinBy);
    public DataStream<T> minBy(String field);
    public DataStream<T> maxBy(int positionToMaxBy);
    public DataStream<T> maxBy(String field);
    
    // Windowing
    public WindowedDataStream<T> window(WindowingHelper<T> helper);
    public WindowedDataStream<T> every(WindowingHelper<T> helper);
    
    // Configuration
    public GroupedDataStream<T> setParallelism(int parallelism);
    public GroupedDataStream<T> name(String name);
}

ConnectedDataStream<T1, T2>

Two connected streams that can be processed jointly.

public class ConnectedDataStream<T1, T2> {
    // Joint transformations
    public <R> DataStream<R> map(CoMapFunction<T1, T2, R> coMapper);
    public <R> DataStream<R> flatMap(CoFlatMapFunction<T1, T2, R> coFlatMapper);
    public <R> DataStream<R> reduce(CoReduceFunction<T1, T2, R> coReducer);
    
    // Key both streams for keyed operations
    public ConnectedDataStream<T1, T2> groupBy(KeySelector<T1, ?> keySelector1, KeySelector<T2, ?> keySelector2);
    public ConnectedDataStream<T1, T2> groupBy(int key1, int key2);
    public ConnectedDataStream<T1, T2> groupBy(String key1, String key2);
    
    // Configuration
    public ConnectedDataStream<T1, T2> setParallelism(int parallelism);
    public ConnectedDataStream<T1, T2> name(String name);
}

DataStreamSink<T>

Terminal operation that consumes stream data.

public class DataStreamSink<T> {
    // Configuration
    public DataStreamSink<T> setParallelism(int parallelism);
    public int getParallelism();
    public DataStreamSink<T> name(String name);
    public DataStreamSink<T> disableChaining();
    public DataStreamSink<T> setBufferTimeout(long timeout);
    public DataStreamSink<T> slotSharingGroup(String slotSharingGroup);
}

Usage Examples

Basic Transformations

DataStream<String> text = env.fromElements("hello world", "how are you");

// Map transformation
DataStream<String> upper = text.map(String::toUpperCase);

// FlatMap transformation
DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String sentence, Collector<String> out) {
        for (String word : sentence.split(" ")) {
            out.collect(word);
        }
    }
});

// Filter transformation
DataStream<String> filtered = words.filter(word -> word.length() > 3);

Keyed Operations

DataStream<Tuple2<String, Integer>> counts = words
    .map(word -> new Tuple2<>(word, 1))
    .groupBy(0)  // Group by first field (word)
    .sum(1);   // Sum second field (count)

// Using KeySelector
DataStream<Tuple2<String, Integer>> counts2 = words
    .map(word -> new Tuple2<>(word, 1))
    .groupBy(tuple -> tuple.f0)  // Group by word
    .reduce((a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));

Stream Union

DataStream<String> stream1 = env.fromElements("a", "b", "c");
DataStream<String> stream2 = env.fromElements("d", "e", "f");
DataStream<String> stream3 = env.fromElements("g", "h", "i");

// Union multiple streams
DataStream<String> unionStream = stream1.union(stream2, stream3);

Connected Streams

DataStream<String> stream1 = env.fromElements("hello", "world");
DataStream<Integer> stream2 = env.fromElements(1, 2, 3);

ConnectedDataStream<String, Integer> connected = stream1.connect(stream2);

DataStream<String> result = connected.map(new CoMapFunction<String, Integer, String>() {
    @Override
    public String map1(String value) {
        return "String: " + value;
    }

    @Override
    public String map2(Integer value) {
        return "Integer: " + value;
    }
});

Partitioning

DataStream<Tuple2<String, Integer>> data = env.fromElements(
    new Tuple2<>("key1", 1), new Tuple2<>("key2", 2));

// Different partitioning strategies
DataStream<Tuple2<String, Integer>> shuffled = data.shuffle();
DataStream<Tuple2<String, Integer>> rebalanced = data.rebalance();
DataStream<Tuple2<String, Integer>> broadcasted = data.broadcast();

Output Operations

DataStream<String> processed = words.map(String::toUpperCase);

// Print to standard output
processed.print();

// Write to file
processed.writeAsText("/path/to/output.txt");

// Write as CSV
DataStream<Tuple2<String, Integer>> tuples = processed
    .map(word -> new Tuple2<>(word, word.length()));
tuples.writeAsCsv("/path/to/output.csv");

// Custom sink
processed.addSink(new SinkFunction<String>() {
    @Override
    public void invoke(String value) {
        System.out.println("Custom sink: " + value);
    }
});

Configuration

DataStream<String> configured = text
    .map(String::toUpperCase)
    .name("UpperCase Transformation")
    .setParallelism(4)
    .setBufferTimeout(100)
    .disableChaining();

Types

// Function interfaces for transformations
public interface MapFunction<T, O> extends Function {
    O map(T value) throws Exception;
}

public interface FlatMapFunction<T, O> extends Function {
    void flatMap(T value, Collector<O> out) throws Exception;
}

public interface FilterFunction<T> extends Function {
    boolean filter(T value) throws Exception;
}

public interface ReduceFunction<T> extends Function {
    T reduce(T value1, T value2) throws Exception;
}

public interface FoldFunction<T, O> extends Function {
    O fold(O accumulator, T value) throws Exception;
}

public interface KeySelector<IN, KEY> extends Function {
    KEY getKey(IN value) throws Exception;
}

// CoFunction interfaces for connected streams
public interface CoMapFunction<IN1, IN2, OUT> extends Function {
    OUT map1(IN1 value) throws Exception;
    OUT map2(IN2 value) throws Exception;
}

public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function {
    void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
    void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}

public interface CoReduceFunction<IN1, IN2, OUT> extends Function {
    OUT reduce1(IN1 value, OUT accumulator) throws Exception;
    OUT reduce2(IN2 value, OUT accumulator) throws Exception;
}

// Partitioner interface
public abstract class Partitioner<T> implements Serializable {
    public abstract int partition(T key, int numPartitions);
}

// Collector interface
public interface Collector<T> {
    void collect(T record);
    void close();
}

// Additional types for advanced operations
public class SingleOutputStreamOperator<T, O extends StreamOperator<T>> extends DataStream<T> {
    // Stream operator with configuration methods
    public SingleOutputStreamOperator<T, O> name(String name);
    public SingleOutputStreamOperator<T, O> setParallelism(int parallelism);
    public SingleOutputStreamOperator<T, O> setBufferTimeout(long timeoutMillis);
    public SingleOutputStreamOperator<T, O> disableChaining();
    public SingleOutputStreamOperator<T, O> startNewChain();
    public SingleOutputStreamOperator<T, O> slotSharingGroup(String slotSharingGroup);
}

public class SplitDataStream<T> {
    // Split stream that can be selected by name
    public DataStream<T> select(String... outputNames);
}

public interface OutputSelector<T> extends Serializable {
    Iterable<String> select(T value);
}

public class IterativeDataStream<T> extends DataStream<T> {
    // Iterative data stream for feedback loops
    public DataStream<T> closeWith(DataStream<T> feedbackStream);
}

public class StreamCrossOperator<I1, I2> {
    // Cross operation between two streams
    public <OUT> DataStream<OUT> with(CrossFunction<I1, I2, OUT> crossFunction);
    public StreamCrossOperator<I1, I2> where(KeySelector<I1, ?> keySelector);
    public StreamCrossOperator<I1, I2> equalTo(KeySelector<I2, ?> keySelector);
}

public class StreamJoinOperator<I1, I2> {
    // Join operation between two streams
    public <OUT> DataStream<OUT> with(JoinFunction<I1, I2, OUT> joinFunction);
    public StreamJoinOperator<I1, I2> where(KeySelector<I1, ?> keySelector);
    public StreamJoinOperator<I1, I2> equalTo(KeySelector<I2, ?> keySelector);
}

// Cross and Join function interfaces
public interface CrossFunction<IN1, IN2, OUT> extends Function {
    OUT cross(IN1 first, IN2 second) throws Exception;
}

public interface JoinFunction<IN1, IN2, OUT> extends Function {
    OUT join(IN1 first, IN2 second) throws Exception;
}

// Windowing types referenced in API
public abstract class WindowingHelper<T> implements Serializable {
    // Base class for windowing helpers
}

public interface TriggerPolicy<T> extends Serializable {
    // Trigger policy for windowing
}

public interface EvictionPolicy<T> extends Serializable {
    // Eviction policy for windowing
}

// Serialization schema for network output
public interface SerializationSchema<T, S> extends Serializable {
    S serialize(T element);
}

// Write mode enum for file outputs
public enum WriteMode {
    NO_OVERWRITE,
    OVERWRITE
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-core

docs

checkpointing-state.md

datastream-operations.md

execution-environment.md

index.md

sources-and-sinks.md

stream-operators.md

windowing.md

tile.json