CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-examples-streaming-2-10

Apache Flink streaming examples demonstrating various stream processing patterns and use cases

Pending
Overview
Eval results
Files

joins.mddocs/

Stream Joins

Time-based stream joins with coordinated watermarks and window-based join operations. Demonstrates joining two data streams within time windows using event-time processing.

Capabilities

WindowJoin (Java)

Window-based join of two data streams with coordinated watermarks and time-based join conditions.

/**
 * Example of windowed stream joins
 * Joins two streams within time windows using event-time processing
 * @param args Command line arguments (--input path, --output path)
 */
public class WindowJoin {
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Run with default sample data
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.join.WindowJoin

# Run with file input
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.join.WindowJoin \
  --input /path/to/input.txt --output /path/to/results.txt

WindowJoin (Scala)

Scala implementation of windowed stream joins using functional API and case classes.

/**
 * Scala version of windowed stream joins
 * @param args Command line arguments
 */
object WindowJoin {
    def main(args: Array[String]): Unit;
}

Join Patterns

Time Window Join Setup

// Enable event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Create two data streams with timestamps
DataStream<Tuple3<String, String, Long>> orangeStream = env
    .addSource(new ThrottledIterator<>(OrangeSourceData.ORANGE_DATA.iterator(), elementsPerSecond))
    .assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

DataStream<Tuple3<String, String, Long>> greenStream = env
    .addSource(new ThrottledIterator<>(GreenSourceData.GREEN_DATA.iterator(), elementsPerSecond))
    .assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

// Perform windowed join
DataStream<Tuple6<String, String, String, String, Long, Long>> joinedStream = orangeStream
    .join(greenStream)
    .where(new KeySelector<Tuple3<String, String, Long>, String>() {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0; // Join on first field
        }
    })
    .equalTo(new KeySelector<Tuple3<String, String, Long>, String>() {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0; // Join on first field
        }
    })
    .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
    .apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, 
                           Tuple6<String, String, String, String, Long, Long>>() {
        @Override
        public Tuple6<String, String, String, String, Long, Long> join(
            Tuple3<String, String, Long> orange, 
            Tuple3<String, String, Long> green) throws Exception {
            return new Tuple6<>(orange.f0, orange.f1, green.f1, 
                               "JOINED", orange.f2, green.f2);
        }
    });

Timestamp and Watermark Assignment

private static class Tuple3TimestampExtractor 
    extends AscendingTimestampExtractor<Tuple3<String, String, Long>> {
    
    @Override
    public long extractAscendingTimestamp(Tuple3<String, String, Long> element) {
        return element.f2; // Use third field as timestamp
    }
}

Key-Based Join Logic

// Define join keys
KeySelector<Tuple3<String, String, Long>, String> keySelector = 
    new KeySelector<Tuple3<String, String, Long>, String>() {
        @Override
        public String getKey(Tuple3<String, String, Long> value) throws Exception {
            return value.f0; // Join on first field (ID)
        }
    };

// Apply join with key selectors
orangeStream.join(greenStream)
    .where(keySelector)
    .equalTo(keySelector)
    .window(TumblingEventTimeWindows.of(Time.seconds(windowSizeSeconds)))
    .apply(joinFunction);

Data Structures

Input Data Format

// Input tuples: (id, value, timestamp)
Tuple3<String, String, Long> inputElement;
// f0: Join key (ID)
// f1: Data value
// f2: Event timestamp (milliseconds)

Join Result Format

// Join result: (id, leftValue, rightValue, joinType, leftTimestamp, rightTimestamp)
Tuple6<String, String, String, String, Long, Long> joinResult;
// f0: Join key
// f1: Left stream value
// f2: Right stream value  
// f3: Join type indicator ("JOINED")
// f4: Left stream timestamp
// f5: Right stream timestamp

Sample Data Sources

Orange Stream Data

public class WindowJoinSampleData {
    public static final String[] ORANGE_DATA = {
        "orange-1,orange-data-1,1000",
        "orange-2,orange-data-2,2000", 
        "orange-3,orange-data-3,3000",
        // More sample data...
    };
}

Throttled Data Generation

// Use ThrottledIterator for controlled data emission
DataStream<Tuple3<String, String, Long>> orangeStream = env
    .addSource(new ThrottledIterator<>(
        OrangeSourceData.ORANGE_DATA.iterator(), 
        elementsPerSecond // Control emission rate
    ))
    .assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

Window Configuration

Tumbling Event Time Windows

// Fixed-size non-overlapping windows based on event time
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))

Sliding Event Time Windows

// Overlapping windows with slide interval
.window(SlidingEventTimeWindows.of(
    Time.milliseconds(windowSize),    // Window size
    Time.milliseconds(slideInterval)  // Slide interval
))

Session Windows for Joins

// Variable-size windows based on activity gaps
.window(EventTimeSessionWindows.withGap(Time.minutes(sessionGapMinutes)))

Join Function Implementations

Basic Join Function

private static class BasicJoinFunction 
    implements JoinFunction<Tuple3<String, String, Long>, 
                           Tuple3<String, String, Long>, 
                           Tuple6<String, String, String, String, Long, Long>> {
    
    @Override
    public Tuple6<String, String, String, String, Long, Long> join(
        Tuple3<String, String, Long> left, 
        Tuple3<String, String, Long> right) throws Exception {
        
        return new Tuple6<>(
            left.f0,           // Join key
            left.f1,           // Left value
            right.f1,          // Right value
            "INNER_JOIN",      // Join type
            left.f2,           // Left timestamp
            right.f2           // Right timestamp
        );
    }
}

Rich Join Function with State

private static class StatefulJoinFunction 
    extends RichJoinFunction<Tuple3<String, String, Long>, 
                            Tuple3<String, String, Long>, 
                            Tuple6<String, String, String, String, Long, Long>> {
    
    private ValueState<Long> joinCountState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
            "joinCount", Long.class, 0L);
        joinCountState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public Tuple6<String, String, String, String, Long, Long> join(
        Tuple3<String, String, Long> left, 
        Tuple3<String, String, Long> right) throws Exception {
        
        Long currentCount = joinCountState.value();
        joinCountState.update(currentCount + 1);
        
        return new Tuple6<>(left.f0, left.f1, right.f1, 
                           "JOIN_" + currentCount, left.f2, right.f2);
    }
}

Advanced Join Patterns

CoGroup for Custom Join Logic

// CoGroup allows custom join logic including outer joins
orangeStream.coGroup(greenStream)
    .where(keySelector)
    .equalTo(keySelector)
    .window(TumblingEventTimeWindows.of(Time.seconds(windowSizeSeconds)))
    .apply(new CoGroupFunction<Tuple3<String, String, Long>, 
                              Tuple3<String, String, Long>, 
                              String>() {
        @Override
        public void coGroup(
            Iterable<Tuple3<String, String, Long>> left,
            Iterable<Tuple3<String, String, Long>> right,
            Collector<String> out) throws Exception {
            
            // Custom join logic - can handle outer joins
            for (Tuple3<String, String, Long> leftElement : left) {
                boolean hasMatch = false;
                for (Tuple3<String, String, Long> rightElement : right) {
                    out.collect("INNER: " + leftElement + " + " + rightElement);
                    hasMatch = true;
                }
                if (!hasMatch) {
                    out.collect("LEFT_OUTER: " + leftElement + " + null");
                }
            }
        }
    });

Interval Join

// Join elements within time intervals
orangeStream.keyBy(keySelector)
    .intervalJoin(greenStream.keyBy(keySelector))
    .between(Time.milliseconds(-100), Time.milliseconds(100)) // ±100ms window
    .process(new ProcessJoinFunction<Tuple3<String, String, Long>, 
                                    Tuple3<String, String, Long>, 
                                    String>() {
        @Override
        public void processElement(
            Tuple3<String, String, Long> left,
            Tuple3<String, String, Long> right,
            Context ctx,
            Collector<String> out) throws Exception {
            out.collect("INTERVAL_JOIN: " + left + " + " + right);
        }
    });

Event Time and Watermarks

Watermark Strategy

// Assign watermarks with bounded out-of-orderness
orangeStream.assignTimestampsAndWatermarks(
    WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(
        Duration.ofMillis(100)) // 100ms max out-of-order
    .withTimestampAssigner((element, timestamp) -> element.f2)
);

Late Data Handling

// Configure late data handling
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(2)) // Allow 2 seconds late data
.sideOutputLateData(lateDataTag)  // Collect late data in side output

Dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Required Imports

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10

docs

async.md

external-systems.md

index.md

iteration.md

joins.md

machine-learning.md

side-output.md

socket.md

utilities.md

windowing.md

wordcount.md

tile.json