CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-java-2-11

Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities

Pending
Overview
Eval results
Files

datastream-transformations.mddocs/

DataStream Transformations

DataStream transformations are the core operations for processing unbounded streams of data in Apache Flink. These operations transform one or more DataStreams into new DataStreams, enabling complex data processing pipelines.

Capabilities

Basic Transformations

Transform individual elements in the stream using map, filter, and flatMap operations.

/**
 * Apply a MapFunction to transform each element
 * @param mapper - the map function to apply
 * @return transformed DataStream
 */
<R> DataStream<R> map(MapFunction<T, R> mapper);

/**
 * Filter elements based on a predicate
 * @param filter - the filter function
 * @return filtered DataStream
 */
DataStream<T> filter(FilterFunction<T> filter);

/**
 * Apply a FlatMapFunction that can produce zero, one, or more elements for each input
 * @param flatMapper - the flatmap function
 * @return transformed DataStream
 */
<R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper);

Usage Examples:

DataStream<String> text = env.fromElements("hello world", "flink streaming");

// Map transformation - convert to uppercase
DataStream<String> upperCase = text.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) {
        return value.toUpperCase();
    }
});

// Using lambda expressions
DataStream<String> upperCase = text.map(String::toUpperCase);

// Filter transformation - keep only strings with more than 5 characters
DataStream<String> filtered = text.filter(s -> s.length() > 5);

// FlatMap transformation - split sentences into words
DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String sentence, Collector<String> out) {
        for (String word : sentence.split(" ")) {
            out.collect(word);
        }
    }
});

// Using lambda expressions
DataStream<String> words = text.flatMap(
    (sentence, out) -> Arrays.stream(sentence.split(" ")).forEach(out::collect)
);

Stream Partitioning

Control how data is distributed across parallel instances of operators.

/**
 * Partition the stream by key
 * @param key - the key selector function
 * @return KeyedStream partitioned by the key
 */
<K> KeyedStream<T, K> keyBy(KeySelector<T, K> key);

/**
 * Partition by field positions (for Tuple types)
 * @param fields - field positions to partition by
 * @return KeyedStream partitioned by the fields
 */
KeyedStream<T, Tuple> keyBy(int... fields);

/**
 * Partition by field names (for POJO types)
 * @param fields - field names to partition by
 * @return KeyedStream partitioned by the fields
 */
KeyedStream<T, Tuple> keyBy(String... fields);

/**
 * Random partitioning - elements are randomly distributed
 * @return randomly partitioned DataStream
 */
DataStream<T> shuffle();

/**
 * Round-robin partitioning - elements are distributed in round-robin fashion
 * @return rebalanced DataStream
 */
DataStream<T> rebalance();

/**
 * Rescale partitioning - locally rebalance between upstream and downstream operators
 * @return rescaled DataStream
 */
DataStream<T> rescale();

/**
 * Broadcast - send elements to all downstream operators
 * @return broadcasted DataStream
 */
DataStream<T> broadcast();

/**
 * Forward partitioning - send elements to the next operator in the same subtask
 * @return forwarded DataStream
 */
DataStream<T> forward();

/**
 * Global partitioning - send all elements to the first instance of the next operator
 * @return globally partitioned DataStream
 */
DataStream<T> global();

Usage Examples:

DataStream<Tuple2<String, Integer>> tuples = env.fromElements(
    Tuple2.of("a", 1), Tuple2.of("b", 2), Tuple2.of("a", 3)
);

// Key by field position
KeyedStream<Tuple2<String, Integer>, Tuple> keyedByPosition = tuples.keyBy(0);

// Key by lambda function
KeyedStream<Tuple2<String, Integer>, String> keyedByFunction = 
    tuples.keyBy(value -> value.f0);

// For POJO types
DataStream<Person> people = env.fromElements(new Person("John", 25), new Person("Jane", 30));
KeyedStream<Person, String> keyedByName = people.keyBy(person -> person.getName());

// Partitioning strategies
DataStream<String> shuffled = text.shuffle();
DataStream<String> rebalanced = text.rebalance();
DataStream<String> broadcasted = text.broadcast();

Stream Composition

Combine multiple streams into a single stream or create connected streams for joint processing.

/**
 * Union with other DataStreams of the same type
 * @param streams - streams to union with
 * @return unified DataStream
 */
DataStream<T> union(DataStream<T>... streams);

/**
 * Connect with another DataStream for joint processing
 * @param dataStream - stream to connect with
 * @return ConnectedStreams for joint processing
 */
<R> ConnectedStreams<T, R> connect(DataStream<R> dataStream);

Usage Examples:

DataStream<String> stream1 = env.fromElements("a", "b");
DataStream<String> stream2 = env.fromElements("c", "d");
DataStream<String> stream3 = env.fromElements("e", "f");

// Union streams of the same type
DataStream<String> unionedStream = stream1.union(stream2, stream3);

// Connect streams of different types
DataStream<Integer> numbers = env.fromElements(1, 2, 3);
ConnectedStreams<String, Integer> connected = stream1.connect(numbers);

// Process connected streams
DataStream<String> result = connected.map(new CoMapFunction<String, Integer, String>() {
    @Override
    public String map1(String value) {
        return "String: " + value;
    }

    @Override
    public String map2(Integer value) {
        return "Number: " + value;
    }
});

Rich Transformations

Use rich functions that provide access to runtime context and lifecycle methods.

/**
 * Apply a RichMapFunction with access to runtime context
 * @param mapper - the rich map function
 * @return transformed DataStream
 */
<R> DataStream<R> map(RichMapFunction<T, R> mapper);

/**
 * Apply a RichFilterFunction with access to runtime context
 * @param filter - the rich filter function
 * @return filtered DataStream
 */
DataStream<T> filter(RichFilterFunction<T> filter);

/**
 * Apply a RichFlatMapFunction with access to runtime context
 * @param flatMapper - the rich flatmap function
 * @return transformed DataStream
 */
<R> DataStream<R> flatMap(RichFlatMapFunction<T, R> flatMapper);

Usage Examples:

// Rich function with initialization
DataStream<String> enriched = text.map(new RichMapFunction<String, String>() {
    private String prefix;
    
    @Override
    public void open(Configuration parameters) {
        prefix = getRuntimeContext().getExecutionConfig()
            .getGlobalJobParameters().get("prefix", "default");
    }
    
    @Override
    public String map(String value) {
        return prefix + ": " + value;
    }
});

Process Functions

Use process functions for complex processing logic with access to timers and state.

/**
 * Apply a ProcessFunction for complex stream processing
 * @param processFunction - the process function
 * @return processed DataStream
 */
<R> DataStream<R> process(ProcessFunction<T, R> processFunction);

Usage Examples:

DataStream<String> processed = text.process(new ProcessFunction<String, String>() {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        // Custom processing logic
        if (value.length() > 0) {
            out.collect("Processed: " + value);
            
            // Set timer for 60 seconds from now
            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 60000);
        }
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
        out.collect("Timer fired at: " + timestamp);
    }
});

Stream Splitting (Deprecated)

Note: Stream splitting using split() and select() is deprecated in newer versions. Use side outputs instead.

/**
 * Split the stream based on an OutputSelector (DEPRECATED)
 * @param outputSelector - selector to determine output streams
 * @return SplitStream for selecting split streams
 */
@Deprecated
SplitStream<T> split(OutputSelector<T> outputSelector);

Side Outputs

Use side outputs to emit data to multiple output streams from a single operator.

// Side outputs are used within ProcessFunction
public void processElement(T element, Context ctx, Collector<R> out) {
    // Emit to main output
    out.collect(mainResult);
    
    // Emit to side output
    ctx.output(sideOutputTag, sideResult);
}

// Retrieve side output from SingleOutputStreamOperator
DataStream<X> getSideOutput(OutputTag<X> sideOutputTag);

Usage Examples:

// Define side output tag
final OutputTag<String> lateDataTag = new OutputTag<String>("late-data"){};

// Process function with side output
SingleOutputStreamOperator<String> mainStream = input.process(
    new ProcessFunction<String, String>() {
        @Override
        public void processElement(String value, Context ctx, Collector<String> out) {
            if (isLate(value)) {
                // Emit to side output
                ctx.output(lateDataTag, value);
            } else {
                // Emit to main output
                out.collect(value);
            }
        }
    }
);

// Get side output stream
DataStream<String> lateData = mainStream.getSideOutput(lateDataTag);

Iteration

Create iterative streaming programs for machine learning and graph processing.

/**
 * Create an iterative stream
 * @return IterativeStream for iteration processing
 */
IterativeStream<T> iterate();

/**
 * Create an iterative stream with timeout
 * @param maxWaitTimeMillis - maximum wait time for iteration
 * @return IterativeStream for iteration processing
 */
IterativeStream<T> iterate(long maxWaitTimeMillis);

Usage Examples:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
    @Override
    public Long map(Long value) throws Exception {
        return value - 1;
    }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

Types

Transformation Function Interfaces

// Basic transformation functions
interface MapFunction<T, O> extends Function {
    O map(T value) throws Exception;
}

interface FilterFunction<T> extends Function {
    boolean filter(T value) throws Exception;
}

interface FlatMapFunction<T, O> extends Function {
    void flatMap(T value, Collector<O> out) throws Exception;
}

// Rich transformation functions
abstract class RichMapFunction<T, O> extends AbstractRichFunction implements MapFunction<T, O> {
    // Provides access to RuntimeContext and lifecycle methods
}

abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {
    // Provides access to RuntimeContext and lifecycle methods
}

abstract class RichFlatMapFunction<T, O> extends AbstractRichFunction implements FlatMapFunction<T, O> {
    // Provides access to RuntimeContext and lifecycle methods
}

// Key selector for partitioning
interface KeySelector<IN, KEY> extends Function {
    KEY getKey(IN value) throws Exception;
}

// Output selector for splitting (deprecated)
@Deprecated
interface OutputSelector<OUT> extends Function {
    Iterable<String> select(OUT value);
}

Stream Types

// Main stream type
class DataStream<T> {
    // All transformation methods as documented above
}

// Result of transformations
class SingleOutputStreamOperator<T> extends DataStream<T> {
    // Additional operator configuration methods
    SingleOutputStreamOperator<T> name(String name);
    SingleOutputStreamOperator<T> uid(String uid);
    SingleOutputStreamOperator<T> setParallelism(int parallelism);
    <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag);
}

// Keyed stream for stateful operations
class KeyedStream<T, KEY> {
    // Stateful operations (documented in keyed-streams-state.md)
}

// Connected streams for joint processing
class ConnectedStreams<T1, T2> {
    // Joint processing operations (documented in connected-streams.md)
}

// Split stream (deprecated)
@Deprecated
class SplitStream<T> extends DataStream<T> {
    DataStream<T> select(String... outputNames);
}

// Iterative stream
class IterativeStream<T> extends DataStream<T> {
    DataStream<T> closeWith(DataStream<T> feedbackStream);
}

Utility Types

// Collector for emitting results
interface Collector<T> {
    void collect(T record);
    void close();
}

// Output tag for side outputs
class OutputTag<T> {
    public OutputTag(String id) {}
    public OutputTag(String id, TypeInformation<T> typeInfo) {}
}

// Runtime context for rich functions
interface RuntimeContext {
    String getTaskName();
    int getNumberOfParallelSubtasks();
    int getIndexOfThisSubtask();
    ExecutionConfig getExecutionConfig();
    // State access methods
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11

docs

async-io.md

checkpointing.md

datastream-transformations.md

execution-environment.md

index.md

keyed-streams-state.md

process-functions.md

sources-sinks.md

time-watermarks.md

windowing.md

tile.json