CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-examples-streaming-2-10

Apache Flink streaming examples demonstrating various stream processing patterns and use cases

Pending
Overview
Eval results
Files

windowing.mddocs/

Windowing Examples

Advanced windowing patterns including time windows, session windows, and custom triggers with event-time processing. Demonstrates various window types, eviction policies, and complex triggering mechanisms.

Capabilities

TopSpeedWindowing

Car speed monitoring with custom triggers and evictors, demonstrating global windows with delta triggers.

/**
 * Grouped stream windowing with custom eviction and trigger policies
 * Monitors car speeds and triggers top speed calculation every x meters
 * @param args Command line arguments (--input path, --output path)
 */
public class TopSpeedWindowing {
    public static void main(String[] args) throws Exception;
}

Usage Example:

// Run with sample car data generator
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.windowing.TopSpeedWindowing

// Run with file input
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.windowing.TopSpeedWindowing \
  --input /path/to/car-data.txt --output /path/to/results.txt

SessionWindowing

Session-based windowing for user activity analysis with configurable session gaps.

/**
 * Session windowing example for analyzing user activity sessions
 * Groups events into sessions based on activity gaps
 * @param args Command line arguments (--input path, --output path)
 */
public class SessionWindowing {
    public static void main(String[] args) throws Exception;
}

WindowWordCount

Basic windowed word count with tumbling time windows.

/**
 * Word count with tumbling time windows
 * Demonstrates basic time-based windowing concepts
 * @param args Command line arguments (--input path, --output path)
 */
public class WindowWordCount {
    public static void main(String[] args) throws Exception;
}

GroupedProcessingTimeWindowExample

High-throughput processing time windows with sliding window patterns and parallel data generation.

/**
 * Processing time windows with grouped keys and sliding windows
 * Performance benchmark with 20M elements across 10K keys
 * @param args Command line arguments
 */
public class GroupedProcessingTimeWindowExample {
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Run high-throughput windowing benchmark
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.windowing.GroupedProcessingTimeWindowExample

High-Throughput Data Source

Custom parallel source function for performance testing.

/**
 * High-throughput parallel source generating tuple data
 * Generates 20,000,000 elements across multiple parallel instances
 */
public class RichParallelSourceFunction<Tuple2<Long, Long>> {
    private volatile boolean running = true;
    
    /**
     * Main data generation loop
     * @param ctx Source context for element emission
     */
    public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
    
    /**
     * Cancel source execution
     */
    public void cancel();
}

Key Selector Function

Extracts keys from tuple types for grouping operations.

/**
 * Generic key extractor for tuple types
 * @param <Type> Tuple type extending Tuple
 * @param <Key> Key type for grouping
 */
public static class FirstFieldKeyExtractor<Type extends Tuple, Key> 
    implements KeySelector<Type, Key> {
    
    /**
     * Extract key from tuple first field
     * @param value Input tuple
     * @return Key for grouping (first field of tuple)
     */
    public Key getKey(Type value);
}

Window Functions

SummingReducer

Pre-aggregating reduce function for efficient windowing.

/**
 * Efficient reduce function for summing tuple values
 * Pre-aggregates values within window before output
 */
public static class SummingReducer implements ReduceFunction<Tuple2<Long, Long>> {
    /**
     * Combine two tuples by summing their second field
     * @param value1 First tuple
     * @param value2 Second tuple
     * @return Combined tuple with summed second field
     */
    public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2);
}

SummingWindowFunction

Non-pre-aggregating window function for custom aggregation logic.

/**
 * Window function that processes all elements at window trigger time
 * Demonstrates non-pre-aggregating pattern vs reduce function
 */
public static class SummingWindowFunction 
    implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
    
    /**
     * Process all window elements when window triggers
     * @param key Window key
     * @param window Window metadata
     * @param values All elements in window
     * @param out Output collector
     */
    public void apply(Long key, Window window, 
                     Iterable<Tuple2<Long, Long>> values, 
                     Collector<Tuple2<Long, Long>> out);
}

TopSpeedWindowing (Scala)

Scala implementation of car speed windowing using functional API and case classes.

/**
 * Scala version of car top speed windowing
 * @param args Command line arguments
 */
object TopSpeedWindowing {
    def main(args: Array[String]): Unit;
}

Key Window Patterns

Global Windows with Custom Triggers

// Global windows with time evictor and delta trigger
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
    .assignTimestampsAndWatermarks(new CarTimestamp())
    .keyBy(0)  // Group by car ID
    .window(GlobalWindows.create())
    .evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
    .trigger(DeltaTrigger.of(triggerMeters, 
        new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
            @Override
            public double getDelta(
                Tuple4<Integer, Integer, Double, Long> oldDataPoint,
                Tuple4<Integer, Integer, Double, Long> newDataPoint) {
                return newDataPoint.f2 - oldDataPoint.f2; // Distance delta
            }
        }, carData.getType().createSerializer(env.getConfig())))
    .maxBy(1); // Get max speed

Session Windows

// Session windows with configurable gap
dataStream
    .keyBy(keySelector)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .apply(sessionWindowFunction);

Sliding Time Windows

// Sliding windows with size and slide interval (processing time)
dataStream
    .keyBy(keySelector)
    .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))  // 2.5s window, 0.5s slide
    .reduce(new SummingReducer());

// Alternative with apply function (non-pre-aggregating)
dataStream
    .keyBy(keySelector)  
    .timeWindow(Time.milliseconds(2500), Time.milliseconds(500))
    .apply(new SummingWindowFunction());

Tumbling Time Windows

// Fixed-size tumbling windows
dataStream
    .keyBy(keySelector)
    .timeWindow(Time.seconds(30))
    .reduce(aggregationFunction);

Event Time Processing

// Enable event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Assign timestamps and watermarks
dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<T>() {
    @Override
    public long extractAscendingTimestamp(T element) {
        return element.getTimestamp();
    }
});

Window Configuration Options

Time Characteristics

  • Processing Time: System time when elements are processed
  • Event Time: Time when events actually occurred (embedded in data)
  • Ingestion Time: Time when events enter Flink system

Window Types

  • Tumbling Windows: Fixed-size, non-overlapping windows
  • Sliding Windows: Fixed-size, overlapping windows
  • Session Windows: Variable-size windows based on activity gaps
  • Global Windows: All elements in single window, custom triggers required

Triggers

  • Time Triggers: Fire based on processing/event time
  • Count Triggers: Fire after specific number of elements
  • Delta Triggers: Fire based on value changes (like distance traveled)
  • Custom Triggers: User-defined triggering logic

Evictors

  • Time Evictor: Remove elements older than specified time
  • Count Evictor: Keep only most recent N elements
  • Delta Evictor: Remove elements based on value differences

Car Data Processing Example

Data Format

// Car telemetry tuple: (carId, speed, distance, timestamp)
Tuple4<Integer, Integer, Double, Long> carData;
// f0: Car ID
// f1: Current speed (km/h)  
// f2: Total distance traveled (meters)
// f3: Timestamp (milliseconds)

Sample Data Generation

private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
    @Override
    public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception {
        while (isRunning) {
            Thread.sleep(100); // 100ms intervals
            for (int carId = 0; carId < speeds.length; carId++) {
                // Randomly adjust speed
                if (rand.nextBoolean()) {
                    speeds[carId] = Math.min(100, speeds[carId] + 5);
                } else {
                    speeds[carId] = Math.max(0, speeds[carId] - 5);
                }
                // Update distance traveled
                distances[carId] += speeds[carId] / 3.6d; // Convert km/h to m/s
                
                Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>(
                    carId, speeds[carId], distances[carId], System.currentTimeMillis());
                ctx.collect(record);
            }
        }
    }
}

Session Window Configuration

Session Gap Configuration

// Configure session gap based on user activity
ProcessingTimeSessionWindows.withGap(Time.minutes(30))  // 30-minute inactivity gap
EventTimeSessionWindows.withGap(Time.seconds(60))       // 1-minute gap for event time

Dynamic Session Gaps

// Variable session gaps based on data
EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<T>() {
    @Override
    public long extract(T element) {
        // Return gap in milliseconds based on element properties
        return element.getUserType().equals("PREMIUM") ? 600000L : 300000L;
    }
});

Dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Required Imports

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10

docs

async.md

external-systems.md

index.md

iteration.md

joins.md

machine-learning.md

side-output.md

socket.md

utilities.md

windowing.md

wordcount.md

tile.json