CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-core

Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications

Pending
Overview
Eval results
Files

event-time-watermarks.mddocs/

Event Time and Watermarks

Apache Flink provides sophisticated support for event-time processing, enabling applications to handle out-of-order events and late arrivals through watermarks. This capability is essential for accurate time-based computations in streaming applications.

Event Time Concepts

Timestamp Assignment

Assign timestamps to events for event-time processing.

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;

// Simple timestamp assigner
public class EventTimestampAssigner implements SerializableTimestampAssigner<Event> {
    @Override
    public long extractTimestamp(Event element, long recordTimestamp) {
        // Extract timestamp from the event
        return element.getTimestamp();
    }
}

// Custom timestamp extraction logic
public class CustomTimestampAssigner implements SerializableTimestampAssigner<LogEntry> {
    @Override
    public long extractTimestamp(LogEntry element, long recordTimestamp) {
        // Parse timestamp from log format
        return parseTimestamp(element.getTimestampString());
    }
    
    private long parseTimestamp(String timestampStr) {
        // Custom timestamp parsing logic
        return Instant.parse(timestampStr).toEpochMilli();
    }
}

// Using ingestion time
public class IngestionTimeAssigner implements SerializableTimestampAssigner<Event> {
    @Override
    public long extractTimestamp(Event element, long recordTimestamp) {
        // Use processing time as event time
        return System.currentTimeMillis();
    }
}

Watermark Strategies

Define how watermarks are generated for handling late events.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import java.time.Duration;

// Monotonic timestamps (events arrive in order)
WatermarkStrategy<Event> ascendingStrategy = 
    WatermarkStrategy.<Event>forMonotonousTimestamps()
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Bounded out-of-orderness
WatermarkStrategy<Event> boundedOutOfOrderStrategy = 
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Custom watermark strategy with idleness detection
WatermarkStrategy<Event> customStrategy = 
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
        .withIdleness(Duration.ofMinutes(1)); // Mark source idle after 1 minute

// Generator-based strategy
WatermarkStrategy<Event> generatorStrategy = 
    WatermarkStrategy.<Event>forGenerator(context -> new CustomWatermarkGenerator())
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

Watermark Generators

Built-in Generators

import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks;

// Ascending timestamps generator
public class AscendingWatermarkExample {
    public static WatermarkStrategy<Event> createStrategy() {
        return WatermarkStrategy.<Event>forGenerator(context -> 
            new AscendingTimestampsWatermarks<>())
            .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
    }
}

// Bounded out-of-orderness generator
public class BoundedOutOfOrderExample {
    public static WatermarkStrategy<Event> createStrategy() {
        Duration maxOutOfOrderness = Duration.ofSeconds(5);
        return WatermarkStrategy.<Event>forGenerator(context -> 
            new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness))
            .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
    }
}

Custom Watermark Generators

import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.Watermark;

// Periodic watermark generator
public class PeriodicWatermarkGenerator implements WatermarkGenerator<Event> {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness = 5000; // 5 seconds
    
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        // Update max timestamp seen so far
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Emit watermark periodically (every few hundred milliseconds)
        if (maxTimestamp != Long.MIN_VALUE) {
            output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
        }
    }
}

// Punctuated watermark generator
public class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {
    
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        // Emit watermark on special events
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(eventTimestamp));
        }
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // No periodic watermarks needed
    }
}

// Adaptive watermark generator
public class AdaptiveWatermarkGenerator implements WatermarkGenerator<SensorReading> {
    private long maxTimestamp = Long.MIN_VALUE;
    private long baseDelayMs = 1000; // 1 second base delay
    private long adaptiveDelayMs = 1000;
    
    @Override
    public void onEvent(SensorReading reading, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
        
        // Adapt delay based on out-of-orderness observed
        long currentTime = System.currentTimeMillis();
        long eventDelay = currentTime - eventTimestamp;
        
        if (eventDelay > adaptiveDelayMs) {
            // Increase delay if we see more out-of-order events
            adaptiveDelayMs = Math.min(eventDelay * 2, 30000); // Max 30 seconds
        } else {
            // Gradually decrease delay when events are more in order
            adaptiveDelayMs = Math.max(baseDelayMs, adaptiveDelayMs * 0.95);
        }
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        if (maxTimestamp != Long.MIN_VALUE) {
            output.emitWatermark(new Watermark(maxTimestamp - adaptiveDelayMs));
        }
    }
}

Generator with Idleness Detection

import org.apache.flink.api.common.eventtime.IdlenessTimer;

// Wrapper generator with idleness detection
public class IdleAwareWatermarkGenerator implements WatermarkGenerator<Event> {
    private final WatermarkGenerator<Event> delegate;
    private final IdlenessTimer idlenessTimer;
    
    public IdleAwareWatermarkGenerator(WatermarkGenerator<Event> delegate, 
                                      Duration idleTimeout) {
        this.delegate = delegate;
        this.idlenessTimer = new IdlenessTimer(idleTimeout);
    }
    
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        // Mark as active and delegate
        idlenessTimer.activity();
        delegate.onEvent(event, eventTimestamp, output);
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Check for idleness and emit watermarks
        if (idlenessTimer.checkIfIdle(output)) {
            // Source is idle, watermark advancement is paused
            return;
        }
        
        delegate.onPeriodicEmit(output);
    }
}

Advanced Watermark Patterns

Multi-Stream Watermarks

Handle watermarks from multiple input streams.

import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

public class MultiStreamWatermarkFunction extends CoProcessFunction<Event1, Event2, CombinedEvent> {
    
    @Override
    public void processElement1(Event1 event1, Context ctx, Collector<CombinedEvent> out) 
            throws Exception {
        
        // Current watermark for this stream
        long watermark1 = ctx.timerService().currentWatermark();
        
        // Process event considering watermark
        if (event1.getTimestamp() <= watermark1) {
            // Event is within watermark bounds
            out.collect(new CombinedEvent(event1, null));
        }
    }
    
    @Override
    public void processElement2(Event2 event2, Context ctx, Collector<CombinedEvent> out) 
            throws Exception {
        
        long watermark2 = ctx.timerService().currentWatermark();
        
        if (event2.getTimestamp() <= watermark2) {
            out.collect(new CombinedEvent(null, event2));
        }
    }
}

Custom Watermark Alignment

import org.apache.flink.streaming.api.watermark.Watermark;

public class WatermarkAlignmentFunction extends ProcessFunction<Event, Event> {
    private long lastWatermark = Long.MIN_VALUE;
    private final long alignmentThreshold = 1000; // 1 second
    
    @Override
    public void processElement(Event event, Context ctx, Collector<Event> out) 
            throws Exception {
        
        long currentWatermark = ctx.timerService().currentWatermark();
        
        // Align processing based on watermark progression
        if (currentWatermark - lastWatermark >= alignmentThreshold) {
            // Watermark has advanced significantly
            performPeriodicCleanup();
            lastWatermark = currentWatermark;
        }
        
        out.collect(event);
    }
    
    private void performPeriodicCleanup() {
        // Cleanup old state, flush buffers, etc.
    }
}

Working with Late Events

Allowed Lateness

Configure how to handle late events in windowed operations.

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;

public class LateEventHandling {
    
    // Output tag for late events
    private static final OutputTag<Event> LATE_EVENTS_TAG = 
        new OutputTag<Event>("late-events") {};
    
    public static void handleLateEvents(DataStream<Event> input) {
        SingleOutputStreamOperator<WindowedResult> mainOutput = input
            .assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
            )
            .keyBy(Event::getKey)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .allowedLateness(Time.minutes(1)) // Allow 1 minute lateness
            .sideOutputLateData(LATE_EVENTS_TAG) // Collect late events
            .process(new WindowProcessFunction<Event, WindowedResult, String, TimeWindow>() {
                @Override
                public void process(String key, Context context, 
                                  Iterable<Event> elements, 
                                  Collector<WindowedResult> out) throws Exception {
                    
                    // Process events in window
                    long count = StreamSupport.stream(elements.spliterator(), false).count();
                    out.collect(new WindowedResult(key, count, context.window()));
                }
            });
        
        // Handle late events separately
        DataStream<Event> lateEvents = mainOutput.getSideOutput(LATE_EVENTS_TAG);
        lateEvents.process(new ProcessFunction<Event, Void>() {
            @Override
            public void processElement(Event event, Context ctx, Collector<Void> out) 
                    throws Exception {
                // Log, store, or reprocess late events
                System.out.println("Late event: " + event + 
                    " arrived " + (ctx.timestamp() - event.getTimestamp()) + "ms late");
            }
        });
    }
}

Late Event Reprocessing

public class LateEventReprocessor extends ProcessFunction<Event, ProcessedEvent> {
    private final long maxLateness = 60000; // 1 minute
    
    @Override
    public void processElement(Event event, Context ctx, Collector<ProcessedEvent> out) 
            throws Exception {
        
        long currentWatermark = ctx.timerService().currentWatermark();
        long eventTime = event.getTimestamp();
        
        if (eventTime <= currentWatermark) {
            // Event is late
            long lateness = currentWatermark - eventTime;
            
            if (lateness <= maxLateness) {
                // Acceptable lateness - reprocess
                out.collect(new ProcessedEvent(event, true, lateness));
            } else {
                // Too late - handle specially
                handleTooLateEvent(event, lateness, ctx);
            }
        } else {
            // Event is on time
            out.collect(new ProcessedEvent(event, false, 0));
        }
    }
    
    private void handleTooLateEvent(Event event, long lateness, Context ctx) {
        // Store in external system, alert, or discard
        System.out.println("Event too late by " + lateness + "ms: " + event);
    }
}

Watermark Monitoring and Debugging

Watermark Metrics

public class WatermarkMetricsGenerator implements WatermarkGenerator<Event> {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness = 5000;
    
    // Metrics
    private Counter watermarkEmissions;
    private Gauge<Long> currentWatermark;
    private Histogram eventLateness;
    
    public void setMetrics(MetricGroup metricGroup) {
        this.watermarkEmissions = metricGroup.counter("watermark-emissions");
        this.currentWatermark = metricGroup.gauge("current-watermark", () -> 
            maxTimestamp - maxOutOfOrderness);
        this.eventLateness = metricGroup.histogram("event-lateness");
    }
    
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
        
        // Track lateness metrics
        long currentTime = System.currentTimeMillis();
        long lateness = currentTime - eventTimestamp;
        eventLateness.update(lateness);
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        if (maxTimestamp != Long.MIN_VALUE) {
            watermarkEmissions.inc();
            output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
        }
    }
}

Watermark Debugging

public class WatermarkDebugger implements WatermarkGenerator<Event> {
    private final WatermarkGenerator<Event> delegate;
    private final String sourceName;
    
    public WatermarkDebugger(WatermarkGenerator<Event> delegate, String sourceName) {
        this.delegate = delegate;
        this.sourceName = sourceName;
    }
    
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        System.out.println(String.format(
            "[%s] Event received - timestamp: %d, extracted: %d, delay: %dms",
            sourceName, event.getTimestamp(), eventTimestamp, 
            System.currentTimeMillis() - eventTimestamp
        ));
        
        delegate.onEvent(event, eventTimestamp, new WatermarkOutputWrapper(output));
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        delegate.onPeriodicEmit(new WatermarkOutputWrapper(output));
    }
    
    private class WatermarkOutputWrapper implements WatermarkOutput {
        private final WatermarkOutput delegate;
        
        public WatermarkOutputWrapper(WatermarkOutput delegate) {
            this.delegate = delegate;
        }
        
        @Override
        public void emitWatermark(Watermark watermark) {
            System.out.println(String.format(
                "[%s] Watermark emitted: %d (%s)",
                sourceName, watermark.getTimestamp(), 
                new Date(watermark.getTimestamp())
            ));
            delegate.emitWatermark(watermark);
        }
        
        @Override
        public void markIdle() {
            System.out.println(String.format("[%s] Source marked as idle", sourceName));
            delegate.markIdle();
        }
        
        @Override
        public void markActive() {
            System.out.println(String.format("[%s] Source marked as active", sourceName));
            delegate.markActive();
        }
    }
}

Timestamp and Watermark Utilities

Time Extraction Utilities

public class TimeExtractionUtils {
    
    // Extract from JSON timestamp field
    public static SerializableTimestampAssigner<JsonNode> jsonTimestampExtractor(String timestampField) {
        return (element, recordTimestamp) -> {
            JsonNode timestampNode = element.get(timestampField);
            return timestampNode != null ? timestampNode.asLong() : recordTimestamp;
        };
    }
    
    // Extract from formatted string
    public static SerializableTimestampAssigner<String> formatTimestampExtractor(String pattern) {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
        return (element, recordTimestamp) -> {
            try {
                LocalDateTime dateTime = LocalDateTime.parse(element, formatter);
                return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
            } catch (Exception e) {
                return recordTimestamp; // Fallback to record timestamp
            }
        };
    }
    
    // Extract from CSV with timestamp column
    public static SerializableTimestampAssigner<String> csvTimestampExtractor(int timestampColumn, 
                                                                             String delimiter) {
        return (element, recordTimestamp) -> {
            String[] fields = element.split(delimiter);
            if (fields.length > timestampColumn) {
                try {
                    return Long.parseLong(fields[timestampColumn]);
                } catch (NumberFormatException e) {
                    return recordTimestamp;
                }
            }
            return recordTimestamp;
        };
    }
}

Watermark Strategy Builders

public class WatermarkStrategyBuilder {
    
    public static <T> WatermarkStrategy<T> createRobustStrategy(
            SerializableTimestampAssigner<T> timestampAssigner,
            Duration maxOutOfOrderness,
            Duration idlenessTimeout) {
        
        return WatermarkStrategy.<T>forBoundedOutOfOrderness(maxOutOfOrderness)
            .withTimestampAssigner(timestampAssigner)
            .withIdleness(idlenessTimeout);
    }
    
    public static <T> WatermarkStrategy<T> createAdaptiveStrategy(
            SerializableTimestampAssigner<T> timestampAssigner) {
        
        return WatermarkStrategy.<T>forGenerator(context -> 
            new AdaptiveWatermarkGenerator<>())
            .withTimestampAssigner(timestampAssigner);
    }
    
    public static <T> WatermarkStrategy<T> createDebugStrategy(
            SerializableTimestampAssigner<T> timestampAssigner,
            Duration maxOutOfOrderness,
            String sourceName) {
        
        return WatermarkStrategy.<T>forGenerator(context -> 
            new WatermarkDebugger<>(
                new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness),
                sourceName
            ))
            .withTimestampAssigner(timestampAssigner);
    }
}

Best Practices

Choosing Watermark Strategies

public class WatermarkBestPractices {
    
    // For mostly ordered streams with occasional late events
    public static <T> WatermarkStrategy<T> forMostlyOrdered(
            SerializableTimestampAssigner<T> timestampAssigner) {
        
        return WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner(timestampAssigner)
            .withIdleness(Duration.ofMinutes(1));
    }
    
    // For highly out-of-order streams
    public static <T> WatermarkStrategy<T> forHighlyDisordered(
            SerializableTimestampAssigner<T> timestampAssigner) {
        
        return WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofMinutes(5))
            .withTimestampAssigner(timestampAssigner)
            .withIdleness(Duration.ofMinutes(2));
    }
    
    // For streams with unpredictable patterns
    public static <T> WatermarkStrategy<T> forUnpredictableStreams(
            SerializableTimestampAssigner<T> timestampAssigner) {
        
        return WatermarkStrategy.<T>forGenerator(context -> 
            new AdaptiveWatermarkGenerator<>())
            .withTimestampAssigner(timestampAssigner)
            .withIdleness(Duration.ofMinutes(5));
    }
}

// Performance considerations
public class PerformanceOptimizedWatermarkGenerator implements WatermarkGenerator<Event> {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness;
    private long lastWatermarkTime = 0;
    private final long watermarkInterval = 1000; // Emit at most once per second
    
    public PerformanceOptimizedWatermarkGenerator(Duration maxOutOfOrderness) {
        this.maxOutOfOrderness = maxOutOfOrderness.toMillis();
    }
    
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        // Only update if timestamp is actually newer
        if (eventTimestamp > maxTimestamp) {
            maxTimestamp = eventTimestamp;
            
            // Emit watermark immediately for significant advances
            long currentTime = System.currentTimeMillis();
            if (currentTime - lastWatermarkTime > watermarkInterval) {
                output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
                lastWatermarkTime = currentTime;
            }
        }
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // Periodic emission as backup
        if (maxTimestamp != Long.MIN_VALUE) {
            output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
            lastWatermarkTime = System.currentTimeMillis();
        }
    }
}

Apache Flink's event-time processing and watermark system provides robust support for handling temporal aspects of streaming data. By understanding these concepts and applying appropriate strategies, you can build applications that accurately process time-based computations even with out-of-order and late-arriving events.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-core

docs

configuration.md

connectors.md

event-time-watermarks.md

execution-jobs.md

functions-and-operators.md

index.md

state-management.md

type-system-serialization.md

utilities.md

tile.json