Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications
—
Apache Flink provides sophisticated support for event-time processing, enabling applications to handle out-of-order events and late arrivals through watermarks. This capability is essential for accurate time-based computations in streaming applications.
Assign timestamps to events for event-time processing.
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
// Simple timestamp assigner
public class EventTimestampAssigner implements SerializableTimestampAssigner<Event> {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
// Extract timestamp from the event
return element.getTimestamp();
}
}
// Custom timestamp extraction logic
public class CustomTimestampAssigner implements SerializableTimestampAssigner<LogEntry> {
@Override
public long extractTimestamp(LogEntry element, long recordTimestamp) {
// Parse timestamp from log format
return parseTimestamp(element.getTimestampString());
}
private long parseTimestamp(String timestampStr) {
// Custom timestamp parsing logic
return Instant.parse(timestampStr).toEpochMilli();
}
}
// Using ingestion time
public class IngestionTimeAssigner implements SerializableTimestampAssigner<Event> {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
// Use processing time as event time
return System.currentTimeMillis();
}
}Define how watermarks are generated for handling late events.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import java.time.Duration;
// Monotonic timestamps (events arrive in order)
WatermarkStrategy<Event> ascendingStrategy =
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// Bounded out-of-orderness
WatermarkStrategy<Event> boundedOutOfOrderStrategy =
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// Custom watermark strategy with idleness detection
WatermarkStrategy<Event> customStrategy =
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(1)); // Mark source idle after 1 minute
// Generator-based strategy
WatermarkStrategy<Event> generatorStrategy =
WatermarkStrategy.<Event>forGenerator(context -> new CustomWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks;
// Ascending timestamps generator
public class AscendingWatermarkExample {
public static WatermarkStrategy<Event> createStrategy() {
return WatermarkStrategy.<Event>forGenerator(context ->
new AscendingTimestampsWatermarks<>())
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
}
}
// Bounded out-of-orderness generator
public class BoundedOutOfOrderExample {
public static WatermarkStrategy<Event> createStrategy() {
Duration maxOutOfOrderness = Duration.ofSeconds(5);
return WatermarkStrategy.<Event>forGenerator(context ->
new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
}
}import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.Watermark;
// Periodic watermark generator
public class PeriodicWatermarkGenerator implements WatermarkGenerator<Event> {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness = 5000; // 5 seconds
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// Update max timestamp seen so far
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Emit watermark periodically (every few hundred milliseconds)
if (maxTimestamp != Long.MIN_VALUE) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
}
}
}
// Punctuated watermark generator
public class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// Emit watermark on special events
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(eventTimestamp));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// No periodic watermarks needed
}
}
// Adaptive watermark generator
public class AdaptiveWatermarkGenerator implements WatermarkGenerator<SensorReading> {
private long maxTimestamp = Long.MIN_VALUE;
private long baseDelayMs = 1000; // 1 second base delay
private long adaptiveDelayMs = 1000;
@Override
public void onEvent(SensorReading reading, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
// Adapt delay based on out-of-orderness observed
long currentTime = System.currentTimeMillis();
long eventDelay = currentTime - eventTimestamp;
if (eventDelay > adaptiveDelayMs) {
// Increase delay if we see more out-of-order events
adaptiveDelayMs = Math.min(eventDelay * 2, 30000); // Max 30 seconds
} else {
// Gradually decrease delay when events are more in order
adaptiveDelayMs = Math.max(baseDelayMs, adaptiveDelayMs * 0.95);
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
if (maxTimestamp != Long.MIN_VALUE) {
output.emitWatermark(new Watermark(maxTimestamp - adaptiveDelayMs));
}
}
}import org.apache.flink.api.common.eventtime.IdlenessTimer;
// Wrapper generator with idleness detection
public class IdleAwareWatermarkGenerator implements WatermarkGenerator<Event> {
private final WatermarkGenerator<Event> delegate;
private final IdlenessTimer idlenessTimer;
public IdleAwareWatermarkGenerator(WatermarkGenerator<Event> delegate,
Duration idleTimeout) {
this.delegate = delegate;
this.idlenessTimer = new IdlenessTimer(idleTimeout);
}
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// Mark as active and delegate
idlenessTimer.activity();
delegate.onEvent(event, eventTimestamp, output);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Check for idleness and emit watermarks
if (idlenessTimer.checkIfIdle(output)) {
// Source is idle, watermark advancement is paused
return;
}
delegate.onPeriodicEmit(output);
}
}Handle watermarks from multiple input streams.
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
public class MultiStreamWatermarkFunction extends CoProcessFunction<Event1, Event2, CombinedEvent> {
@Override
public void processElement1(Event1 event1, Context ctx, Collector<CombinedEvent> out)
throws Exception {
// Current watermark for this stream
long watermark1 = ctx.timerService().currentWatermark();
// Process event considering watermark
if (event1.getTimestamp() <= watermark1) {
// Event is within watermark bounds
out.collect(new CombinedEvent(event1, null));
}
}
@Override
public void processElement2(Event2 event2, Context ctx, Collector<CombinedEvent> out)
throws Exception {
long watermark2 = ctx.timerService().currentWatermark();
if (event2.getTimestamp() <= watermark2) {
out.collect(new CombinedEvent(null, event2));
}
}
}import org.apache.flink.streaming.api.watermark.Watermark;
public class WatermarkAlignmentFunction extends ProcessFunction<Event, Event> {
private long lastWatermark = Long.MIN_VALUE;
private final long alignmentThreshold = 1000; // 1 second
@Override
public void processElement(Event event, Context ctx, Collector<Event> out)
throws Exception {
long currentWatermark = ctx.timerService().currentWatermark();
// Align processing based on watermark progression
if (currentWatermark - lastWatermark >= alignmentThreshold) {
// Watermark has advanced significantly
performPeriodicCleanup();
lastWatermark = currentWatermark;
}
out.collect(event);
}
private void performPeriodicCleanup() {
// Cleanup old state, flush buffers, etc.
}
}Configure how to handle late events in windowed operations.
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
public class LateEventHandling {
// Output tag for late events
private static final OutputTag<Event> LATE_EVENTS_TAG =
new OutputTag<Event>("late-events") {};
public static void handleLateEvents(DataStream<Event> input) {
SingleOutputStreamOperator<WindowedResult> mainOutput = input
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
)
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1)) // Allow 1 minute lateness
.sideOutputLateData(LATE_EVENTS_TAG) // Collect late events
.process(new WindowProcessFunction<Event, WindowedResult, String, TimeWindow>() {
@Override
public void process(String key, Context context,
Iterable<Event> elements,
Collector<WindowedResult> out) throws Exception {
// Process events in window
long count = StreamSupport.stream(elements.spliterator(), false).count();
out.collect(new WindowedResult(key, count, context.window()));
}
});
// Handle late events separately
DataStream<Event> lateEvents = mainOutput.getSideOutput(LATE_EVENTS_TAG);
lateEvents.process(new ProcessFunction<Event, Void>() {
@Override
public void processElement(Event event, Context ctx, Collector<Void> out)
throws Exception {
// Log, store, or reprocess late events
System.out.println("Late event: " + event +
" arrived " + (ctx.timestamp() - event.getTimestamp()) + "ms late");
}
});
}
}public class LateEventReprocessor extends ProcessFunction<Event, ProcessedEvent> {
private final long maxLateness = 60000; // 1 minute
@Override
public void processElement(Event event, Context ctx, Collector<ProcessedEvent> out)
throws Exception {
long currentWatermark = ctx.timerService().currentWatermark();
long eventTime = event.getTimestamp();
if (eventTime <= currentWatermark) {
// Event is late
long lateness = currentWatermark - eventTime;
if (lateness <= maxLateness) {
// Acceptable lateness - reprocess
out.collect(new ProcessedEvent(event, true, lateness));
} else {
// Too late - handle specially
handleTooLateEvent(event, lateness, ctx);
}
} else {
// Event is on time
out.collect(new ProcessedEvent(event, false, 0));
}
}
private void handleTooLateEvent(Event event, long lateness, Context ctx) {
// Store in external system, alert, or discard
System.out.println("Event too late by " + lateness + "ms: " + event);
}
}public class WatermarkMetricsGenerator implements WatermarkGenerator<Event> {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness = 5000;
// Metrics
private Counter watermarkEmissions;
private Gauge<Long> currentWatermark;
private Histogram eventLateness;
public void setMetrics(MetricGroup metricGroup) {
this.watermarkEmissions = metricGroup.counter("watermark-emissions");
this.currentWatermark = metricGroup.gauge("current-watermark", () ->
maxTimestamp - maxOutOfOrderness);
this.eventLateness = metricGroup.histogram("event-lateness");
}
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
// Track lateness metrics
long currentTime = System.currentTimeMillis();
long lateness = currentTime - eventTimestamp;
eventLateness.update(lateness);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
if (maxTimestamp != Long.MIN_VALUE) {
watermarkEmissions.inc();
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
}
}
}public class WatermarkDebugger implements WatermarkGenerator<Event> {
private final WatermarkGenerator<Event> delegate;
private final String sourceName;
public WatermarkDebugger(WatermarkGenerator<Event> delegate, String sourceName) {
this.delegate = delegate;
this.sourceName = sourceName;
}
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
System.out.println(String.format(
"[%s] Event received - timestamp: %d, extracted: %d, delay: %dms",
sourceName, event.getTimestamp(), eventTimestamp,
System.currentTimeMillis() - eventTimestamp
));
delegate.onEvent(event, eventTimestamp, new WatermarkOutputWrapper(output));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
delegate.onPeriodicEmit(new WatermarkOutputWrapper(output));
}
private class WatermarkOutputWrapper implements WatermarkOutput {
private final WatermarkOutput delegate;
public WatermarkOutputWrapper(WatermarkOutput delegate) {
this.delegate = delegate;
}
@Override
public void emitWatermark(Watermark watermark) {
System.out.println(String.format(
"[%s] Watermark emitted: %d (%s)",
sourceName, watermark.getTimestamp(),
new Date(watermark.getTimestamp())
));
delegate.emitWatermark(watermark);
}
@Override
public void markIdle() {
System.out.println(String.format("[%s] Source marked as idle", sourceName));
delegate.markIdle();
}
@Override
public void markActive() {
System.out.println(String.format("[%s] Source marked as active", sourceName));
delegate.markActive();
}
}
}public class TimeExtractionUtils {
// Extract from JSON timestamp field
public static SerializableTimestampAssigner<JsonNode> jsonTimestampExtractor(String timestampField) {
return (element, recordTimestamp) -> {
JsonNode timestampNode = element.get(timestampField);
return timestampNode != null ? timestampNode.asLong() : recordTimestamp;
};
}
// Extract from formatted string
public static SerializableTimestampAssigner<String> formatTimestampExtractor(String pattern) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
return (element, recordTimestamp) -> {
try {
LocalDateTime dateTime = LocalDateTime.parse(element, formatter);
return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
} catch (Exception e) {
return recordTimestamp; // Fallback to record timestamp
}
};
}
// Extract from CSV with timestamp column
public static SerializableTimestampAssigner<String> csvTimestampExtractor(int timestampColumn,
String delimiter) {
return (element, recordTimestamp) -> {
String[] fields = element.split(delimiter);
if (fields.length > timestampColumn) {
try {
return Long.parseLong(fields[timestampColumn]);
} catch (NumberFormatException e) {
return recordTimestamp;
}
}
return recordTimestamp;
};
}
}public class WatermarkStrategyBuilder {
public static <T> WatermarkStrategy<T> createRobustStrategy(
SerializableTimestampAssigner<T> timestampAssigner,
Duration maxOutOfOrderness,
Duration idlenessTimeout) {
return WatermarkStrategy.<T>forBoundedOutOfOrderness(maxOutOfOrderness)
.withTimestampAssigner(timestampAssigner)
.withIdleness(idlenessTimeout);
}
public static <T> WatermarkStrategy<T> createAdaptiveStrategy(
SerializableTimestampAssigner<T> timestampAssigner) {
return WatermarkStrategy.<T>forGenerator(context ->
new AdaptiveWatermarkGenerator<>())
.withTimestampAssigner(timestampAssigner);
}
public static <T> WatermarkStrategy<T> createDebugStrategy(
SerializableTimestampAssigner<T> timestampAssigner,
Duration maxOutOfOrderness,
String sourceName) {
return WatermarkStrategy.<T>forGenerator(context ->
new WatermarkDebugger<>(
new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness),
sourceName
))
.withTimestampAssigner(timestampAssigner);
}
}public class WatermarkBestPractices {
// For mostly ordered streams with occasional late events
public static <T> WatermarkStrategy<T> forMostlyOrdered(
SerializableTimestampAssigner<T> timestampAssigner) {
return WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(timestampAssigner)
.withIdleness(Duration.ofMinutes(1));
}
// For highly out-of-order streams
public static <T> WatermarkStrategy<T> forHighlyDisordered(
SerializableTimestampAssigner<T> timestampAssigner) {
return WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner(timestampAssigner)
.withIdleness(Duration.ofMinutes(2));
}
// For streams with unpredictable patterns
public static <T> WatermarkStrategy<T> forUnpredictableStreams(
SerializableTimestampAssigner<T> timestampAssigner) {
return WatermarkStrategy.<T>forGenerator(context ->
new AdaptiveWatermarkGenerator<>())
.withTimestampAssigner(timestampAssigner)
.withIdleness(Duration.ofMinutes(5));
}
}
// Performance considerations
public class PerformanceOptimizedWatermarkGenerator implements WatermarkGenerator<Event> {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness;
private long lastWatermarkTime = 0;
private final long watermarkInterval = 1000; // Emit at most once per second
public PerformanceOptimizedWatermarkGenerator(Duration maxOutOfOrderness) {
this.maxOutOfOrderness = maxOutOfOrderness.toMillis();
}
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// Only update if timestamp is actually newer
if (eventTimestamp > maxTimestamp) {
maxTimestamp = eventTimestamp;
// Emit watermark immediately for significant advances
long currentTime = System.currentTimeMillis();
if (currentTime - lastWatermarkTime > watermarkInterval) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
lastWatermarkTime = currentTime;
}
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// Periodic emission as backup
if (maxTimestamp != Long.MIN_VALUE) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
lastWatermarkTime = System.currentTimeMillis();
}
}
}Apache Flink's event-time processing and watermark system provides robust support for handling temporal aspects of streaming data. By understanding these concepts and applying appropriate strategies, you can build applications that accurately process time-based computations even with out-of-order and late-arriving events.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-core