Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.
—
Watermark strategies in the Flink Table API Java Bridge enable event-time processing by defining how to handle out-of-order events and when to trigger time-based operations. These strategies are essential for windowed operations and temporal joins in streaming applications.
Watermarks are timestamps that flow as part of the data stream and indicate the progress of event time. They help Flink determine when all events for a particular time window have arrived, enabling the system to produce complete and correct results for time-based operations.
The bridge provides several watermark assignment strategies that can be used with table sources.
Abstract base class for watermark strategies that generate watermarks periodically:
@PublicEvolving
public abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
public abstract void nextTimestamp(long timestamp);
public abstract Watermark getWatermark();
// Inherited from WatermarkStrategy
public abstract Map<String, String> toProperties();
public abstract boolean equals(Object obj);
public abstract int hashCode();
}Usage Pattern:
public class MyPeriodicWatermarkAssigner extends PeriodicWatermarkAssigner {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness;
public MyPeriodicWatermarkAssigner(long maxOutOfOrderness) {
this.maxOutOfOrderness = maxOutOfOrderness;
}
@Override
public void nextTimestamp(long timestamp) {
maxTimestamp = Math.max(maxTimestamp, timestamp);
}
@Override
public Watermark getWatermark() {
return new Watermark(maxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(Row element, long recordTimestamp) {
// Extract timestamp from the row
return (Long) element.getField(2); // Assuming timestamp is at index 2
}
}Abstract base class for watermark strategies that generate watermarks based on specific events:
@PublicEvolving
public abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
public abstract Watermark getWatermark(Row row, long timestamp);
// Inherited from WatermarkStrategy
public abstract Map<String, String> toProperties();
public abstract boolean equals(Object obj);
public abstract int hashCode();
}Usage Pattern:
public class MyPunctuatedWatermarkAssigner extends PunctuatedWatermarkAssigner {
@Override
public Watermark getWatermark(Row row, long extractedTimestamp) {
// Generate watermark based on special marker events
String eventType = (String) row.getField(1);
if ("WATERMARK_EVENT".equals(eventType)) {
return new Watermark(extractedTimestamp);
}
return null; // No watermark for regular events
}
@Override
public long extractTimestamp(Row element, long recordTimestamp) {
return (Long) element.getField(0); // Extract timestamp from row
}
}Watermark strategy for streams with strictly ascending timestamps:
@PublicEvolving
public class AscendingTimestamps extends PeriodicWatermarkAssigner {
public void nextTimestamp(long timestamp);
public Watermark getWatermark();
}Usage Example:
// For streams where timestamps are guaranteed to be ascending
AscendingTimestamps watermarkStrategy = new AscendingTimestamps() {
@Override
public long extractTimestamp(Row element, long recordTimestamp) {
return (Long) element.getField(3); // timestamp field index
}
};
// Use with legacy table source
public class MyTableSource implements StreamTableSource<Row> {
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.addSource(new MySourceFunction())
.assignTimestampsAndWatermarks(watermarkStrategy);
}
}Watermark strategy for streams with bounded out-of-order events:
@PublicEvolving
public class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
public BoundedOutOfOrderTimestamps(long maxOutOfOrderness);
public void nextTimestamp(long timestamp);
public Watermark getWatermark();
}Usage Example:
// For streams where events can arrive up to 5 seconds out of order
BoundedOutOfOrderTimestamps watermarkStrategy =
new BoundedOutOfOrderTimestamps(5000L) { // 5 seconds max out-of-order
@Override
public long extractTimestamp(Row element, long recordTimestamp) {
return (Long) element.getField(2); // event timestamp field
}
};
// Integration with table source
public class EventTableSource implements StreamTableSource<Row> {
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.addSource(new EventSourceFunction())
.assignTimestampsAndWatermarks(watermarkStrategy);
}
}With modern table sources, watermarks are typically defined in the table schema:
// Define watermark in table schema
Schema schema = Schema.newBuilder()
.column("user_id", DataTypes.STRING())
.column("event_type", DataTypes.STRING())
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
.column("processing_time", DataTypes.TIMESTAMP_LTZ(3))
.watermark("event_time", "event_time - INTERVAL '5' SECONDS")
.build();
// Create table with watermark
tableEnv.createTable("events_with_watermark",
TableDescriptor.forConnector("my-connector")
.schema(schema)
.build());public class WatermarkedStreamTableSource implements StreamTableSource<Row> {
private final long maxOutOfOrderness;
public WatermarkedStreamTableSource(long maxOutOfOrderness) {
this.maxOutOfOrderness = maxOutOfOrderness;
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
BoundedOutOfOrderTimestamps watermarkStrategy =
new BoundedOutOfOrderTimestamps(maxOutOfOrderness) {
@Override
public long extractTimestamp(Row element, long recordTimestamp) {
// Extract event time from row
Timestamp eventTime = (Timestamp) element.getField(2);
return eventTime.getTime();
}
};
return execEnv
.addSource(new MySourceFunction())
.assignTimestampsAndWatermarks(watermarkStrategy);
}
@Override
public TableSchema getTableSchema() {
return TableSchema.builder()
.field("id", DataTypes.BIGINT())
.field("data", DataTypes.STRING())
.field("event_time", DataTypes.TIMESTAMP(3))
.build();
}
}// Convert DataStream with watermarks to Table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create DataStream with watermarks
DataStream<Event> eventStream = env
.addSource(new EventSource())
.assignTimestampsAndWatermarks(
new BoundedOutOfOrderTimestamps(Duration.ofSeconds(5)) {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getEventTime();
}
}
);
// Convert to Table while preserving watermarks
Schema schema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("data", DataTypes.STRING())
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
.watermark("event_time", "SOURCE_WATERMARK()") // Preserve existing watermarks
.build();
Table eventTable = tableEnv.fromDataStream(eventStream, schema);// Use watermarks with tumbling windows
Table windowedResult = tableEnv.sqlQuery("""
SELECT
user_id,
COUNT(*) as event_count,
SUM(amount) as total_amount,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
FROM events_with_watermark
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '1' MINUTE)
""");// Sliding windows with watermarks
Table slidingResult = tableEnv.sqlQuery("""
SELECT
user_id,
AVG(value) as avg_value,
HOP_START(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES) as window_start,
HOP_END(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES) as window_end
FROM events_with_watermark
GROUP BY
user_id,
HOP(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES)
""");// Session windows with watermarks
Table sessionResult = tableEnv.sqlQuery("""
SELECT
user_id,
COUNT(*) as session_events,
MIN(event_time) as session_start,
MAX(event_time) as session_end,
SESSION_START(event_time, INTERVAL '10' MINUTES) as session_window_start,
SESSION_END(event_time, INTERVAL '10' MINUTES) as session_window_end
FROM user_events
GROUP BY
user_id,
SESSION(event_time, INTERVAL '10' MINUTES)
""");public class BusinessLogicWatermarkAssigner extends PeriodicWatermarkAssigner {
private long maxTimestamp = Long.MIN_VALUE;
private final long gracePeriod;
public BusinessLogicWatermarkAssigner(long gracePeriodMs) {
this.gracePeriod = gracePeriodMs;
}
@Override
public void nextTimestamp(long timestamp) {
maxTimestamp = Math.max(maxTimestamp, timestamp);
}
@Override
public Watermark getWatermark() {
// Business rule: allow 30% of grace period for late events on weekends
long currentTime = System.currentTimeMillis();
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(currentTime);
long adjustedGracePeriod = gracePeriod;
if (cal.get(Calendar.DAY_OF_WEEK) == Calendar.SATURDAY ||
cal.get(Calendar.DAY_OF_WEEK) == Calendar.SUNDAY) {
adjustedGracePeriod = (long) (gracePeriod * 1.3);
}
return new Watermark(maxTimestamp - adjustedGracePeriod);
}
@Override
public long extractTimestamp(Row element, long recordTimestamp) {
return (Long) element.getField(1);
}
}public class CoordinatedWatermarkAssigner extends PeriodicWatermarkAssigner {
private final Map<String, Long> sourceWatermarksNeed = new HashMap<>();
private final long defaultLag;
public CoordinatedWatermarkAssigner(long defaultLagMs) {
this.defaultLag = defaultLagMs;
}
@Override
public void nextTimestamp(long timestamp) {
// Update watermark per source
// Implementation would track per-source timestamps
}
@Override
public Watermark getWatermark() {
// Return minimum watermark across all sources
long minWatermark = sourceWatermarksNeed.values().stream()
.mapToLong(Long::longValue)
.min()
.orElse(Long.MIN_VALUE);
return minWatermark == Long.MIN_VALUE ?
null : new Watermark(minWatermark - defaultLag);
}
@Override
public long extractTimestamp(Row element, long recordTimestamp) {
String sourceId = (String) element.getField(0);
long timestamp = (Long) element.getField(2);
// Update per-source watermark tracking
sourceWatermarksNeed.put(sourceId, Math.max(
sourceWatermarksNeed.getOrDefault(sourceId, Long.MIN_VALUE),
timestamp
));
return timestamp;
}
}// Monitor late events
DataStream<Row> eventStream = tableEnv.toDataStream(eventsTable);
OutputTag<Row> lateEventsTag = new OutputTag<Row>("late-events"){};
SingleOutputStreamOperator<Row> processedStream = eventStream
.keyBy(row -> row.getField(0))
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1)) // Allow 1 minute lateness
.sideOutputLateData(lateEventsTag)
.apply(new MyWindowFunction());
// Handle late events separately
DataStream<Row> lateEvents = processedStream.getSideOutput(lateEventsTag);
lateEvents.addSink(new LateEventsSink());// Configure watermark interval
env.getConfig().setAutoWatermarkInterval(1000L); // 1 second
// Efficient timestamp extraction
public class OptimizedWatermarkAssigner extends BoundedOutOfOrderTimestamps {
public OptimizedWatermarkAssigner(long maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(Row element, long recordTimestamp) {
// Use recordTimestamp when available to avoid field access
if (recordTimestamp != Long.MIN_VALUE) {
return recordTimestamp;
}
return (Long) element.getField(2);
}
}public class RobustWatermarkAssigner extends PeriodicWatermarkAssigner {
@Override
public long extractTimestamp(Row element, long recordTimestamp) {
try {
Object timestampField = element.getField(2);
if (timestampField instanceof Long) {
return (Long) timestampField;
} else if (timestampField instanceof Timestamp) {
return ((Timestamp) timestampField).getTime();
} else {
// Log warning and use processing time
LOG.warn("Invalid timestamp field type: {}", timestampField.getClass());
return System.currentTimeMillis();
}
} catch (Exception e) {
LOG.error("Error extracting timestamp", e);
return System.currentTimeMillis();
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11