Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities
—
Apache Flink provides sophisticated time semantics for handling event time, processing time, and ingestion time in streaming applications. Watermarks enable handling of out-of-order events and late data in event-time processing.
Configure the time semantics for your streaming application.
/**
* Set the time characteristic for the streaming application
* @param characteristic - time characteristic to use
*/
StreamExecutionEnvironment setStreamTimeCharacteristic(TimeCharacteristic characteristic);
/**
* Get the current time characteristic
*/
TimeCharacteristic getStreamTimeCharacteristic();
/**
* Time characteristic enumeration
*/
enum TimeCharacteristic {
ProcessingTime, // Time when elements are processed by operators
IngestionTime, // Time when elements enter Flink
EventTime // Time embedded in the elements themselves
}Usage Examples:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set event time processing (most common for accurate results)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Set processing time (lowest latency, no late data handling)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// Set ingestion time (compromise between accuracy and performance)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);Define watermark generation strategies for handling event time and out-of-order events.
/**
* Watermark strategy interface
*/
interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
/**
* Create strategy for monotonous timestamps (no out-of-order events)
*/
static <T> WatermarkStrategy<T> forMonotonousTimestamps();
/**
* Create strategy for bounded out-of-orderness
* @param maxOutOfOrderness - maximum time by which events can be late
*/
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness);
/**
* Create strategy with no watermarks
*/
static <T> WatermarkStrategy<T> noWatermarks();
/**
* Add timestamp assigner to strategy
* @param timestampAssigner - timestamp assigner supplier
*/
WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner);
/**
* Add idleness detection
* @param idleTimeout - timeout after which source is considered idle
*/
WatermarkStrategy<T> withIdleness(Duration idleTimeout);
}
/**
* Assign watermarks and timestamps to stream
* @param watermarkStrategy - watermark strategy to use
*/
SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy);Usage Examples:
DataStream<Event> events = env.addSource(new EventSource());
// Monotonous timestamps - events arrive in timestamp order
DataStream<Event> monotonousStream = events.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
// Bounded out-of-orderness - events can be up to 5 seconds late
DataStream<Event> boundedStream = events.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
// With idleness detection - handle slow sources
DataStream<Event> idleStream = events.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(1))
);
// Custom watermark generator
DataStream<Event> customStream = events.assignTimestampsAndWatermarks(
new WatermarkStrategy<Event>() {
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, timestamp) -> event.getTimestamp();
}
}
);Extract timestamps from elements for event time processing.
/**
* Interface for assigning timestamps to elements
*/
interface TimestampAssigner<T> {
/**
* Extract timestamp from element
* @param element - input element
* @param recordTimestamp - previously assigned timestamp (or Long.MIN_VALUE if none)
* @return timestamp for the element
*/
long extractTimestamp(T element, long recordTimestamp);
}
/**
* Supplier for timestamp assigners
*/
interface TimestampAssignerSupplier<T> {
TimestampAssigner<T> createTimestampAssigner(Context context);
interface Context {
MetricGroup getMetricGroup();
}
}Usage Examples:
// Lambda timestamp assigner
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
// Custom timestamp assigner
TimestampAssigner<LogEntry> logTimestampAssigner = new TimestampAssigner<LogEntry>() {
@Override
public long extractTimestamp(LogEntry log, long recordTimestamp) {
return log.getTimestamp();
}
};
WatermarkStrategy<LogEntry> logStrategy = WatermarkStrategy
.<LogEntry>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(context -> logTimestampAssigner);Generate watermarks to signal event time progress.
/**
* Interface for generating watermarks
*/
interface WatermarkGenerator<T> {
/**
* Called for every event
* @param event - input event
* @param eventTimestamp - timestamp of the event
* @param output - output for emitting watermarks
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically (based on auto watermark interval)
* @param output - output for emitting watermarks
*/
void onPeriodicEmit(WatermarkOutput output);
}
/**
* Watermark output interface
*/
interface WatermarkOutput {
void emitWatermark(Watermark watermark);
void markIdle();
void markActive();
}
/**
* Supplier for watermark generators
*/
interface WatermarkGeneratorSupplier<T> {
WatermarkGenerator<T> createWatermarkGenerator(Context context);
interface Context {
MetricGroup getMetricGroup();
}
}Usage Examples:
// Custom watermark generator
class CustomWatermarkGenerator implements WatermarkGenerator<Event> {
private long maxTimestamp = Long.MIN_VALUE;
private final long outOfOrdernessMillis = 5000;
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
if (maxTimestamp != Long.MIN_VALUE) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis));
}
}
}
// Punctuated watermark generator
class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
if (event.isWatermarkEvent()) {
output.emitWatermark(new Watermark(eventTimestamp));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// No periodic watermarks needed
}
}Utility classes for working with time in Flink applications.
/**
* Time utility class for creating time values
*/
class Time {
static Time of(long size, TimeUnit unit);
static Time milliseconds(long milliseconds);
static Time seconds(long seconds);
static Time minutes(long minutes);
static Time hours(long hours);
static Time days(long days);
long toMilliseconds();
TimeUnit getUnit();
long getSize();
}
/**
* Watermark class
*/
class Watermark {
public Watermark(long timestamp);
long getTimestamp();
// Special watermarks
static final Watermark MAX_WATERMARK;
}Usage Examples:
// Creating time values
Time fiveMinutes = Time.minutes(5);
Time thirtySeconds = Time.seconds(30);
Time oneHour = Time.of(1, TimeUnit.HOURS);
// Convert to milliseconds
long millis = fiveMinutes.toMilliseconds(); // 300000
// Creating watermarks
Watermark watermark = new Watermark(System.currentTimeMillis());
long timestamp = watermark.getTimestamp();Note: These are deprecated in favor of WatermarkStrategy but may still be found in older code.
// Deprecated timestamp extractors
@Deprecated
abstract class TimestampExtractor<T> implements TimestampAssigner<T> {
abstract long extractTimestamp(T element);
}
@Deprecated
abstract class AscendingTimestampExtractor<T> extends TimestampExtractor<T> {
// For monotonously increasing timestamps
}
@Deprecated
abstract class BoundedOutOfOrdernessTimestampExtractor<T> extends TimestampExtractor<T> {
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness);
}
@Deprecated
abstract class IngestionTimeExtractor<T> extends TimestampExtractor<T> {
// Uses ingestion time as timestamp
}// Watermark strategy
interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
static <T> WatermarkStrategy<T> forMonotonousTimestamps();
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness);
static <T> WatermarkStrategy<T> noWatermarks();
WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner);
WatermarkStrategy<T> withIdleness(Duration idleTimeout);
}
// Timestamp assigner
interface TimestampAssigner<T> {
long extractTimestamp(T element, long recordTimestamp);
}
// Watermark generator
interface WatermarkGenerator<T> {
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
void onPeriodicEmit(WatermarkOutput output);
}
// Watermark output
interface WatermarkOutput {
void emitWatermark(Watermark watermark);
void markIdle();
void markActive();
}// Time characteristic enum
enum TimeCharacteristic {
ProcessingTime, // Processing time semantics
IngestionTime, // Ingestion time semantics
EventTime // Event time semantics
}
// Time utility class
class Time implements Comparable<Time>, Serializable {
static Time of(long size, TimeUnit unit);
static Time milliseconds(long milliseconds);
static Time seconds(long seconds);
static Time minutes(long minutes);
static Time hours(long hours);
static Time days(long days);
long toMilliseconds();
TimeUnit getUnit();
long getSize();
}
// Watermark class
class Watermark implements Serializable {
public static final Watermark MAX_WATERMARK;
public Watermark(long timestamp);
long getTimestamp();
boolean equals(Object obj);
int hashCode();
}// Built-in watermark generators
class BoundedOutOfOrdernessWatermarksGenerator<T> implements WatermarkGenerator<T> {
// Handles bounded out-of-orderness
}
class MonotonousWatermarksGenerator<T> implements WatermarkGenerator<T> {
// For monotonously increasing timestamps
}
class NoWatermarksGenerator<T> implements WatermarkGenerator<T> {
// No watermark generation
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11