Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.
—
Time-based event processing with configurable watermark assignment strategies for handling out-of-order events in streaming applications. These strategies ensure proper event-time semantics and enable accurate windowing operations.
Base class for watermark strategies that emit watermarks at regular intervals.
/**
* Base class for periodic watermark assignment strategies
* Watermarks are emitted periodically based on processed timestamps
*/
public abstract class PeriodicWatermarkAssigner {
/**
* Process the next timestamp from incoming events
* @param timestamp Event timestamp to process
*/
public abstract void nextTimestamp(long timestamp);
/**
* Get the current watermark based on processed timestamps
* @return Current watermark
*/
public abstract Watermark getWatermark();
/**
* Convert watermark strategy to properties for descriptor usage
* @return Properties map for table descriptor configuration
*/
public abstract Map<String, String> toProperties();
}Watermark strategy for handling events that arrive out-of-order within a bounded time interval.
/**
* Watermark strategy for rowtime attributes which are out-of-order by a bounded time interval
* Emits watermarks which are the maximum observed timestamp minus the specified delay
*/
public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
/**
* Create bounded out-of-order watermark strategy
* @param delay The delay by which watermarks are behind the maximum observed timestamp
*/
public BoundedOutOfOrderTimestamps(long delay);
@Override
public void nextTimestamp(long timestamp);
@Override
public Watermark getWatermark();
@Override
public Map<String, String> toProperties();
}Usage Examples:
import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
import org.apache.flink.streaming.api.watermark.Watermark;
// Create watermark strategy with 5-second delay for out-of-order events
BoundedOutOfOrderTimestamps watermarkStrategy = new BoundedOutOfOrderTimestamps(5000L);
// Process timestamps (simulating event processing)
watermarkStrategy.nextTimestamp(1000L);
watermarkStrategy.nextTimestamp(2000L);
watermarkStrategy.nextTimestamp(1500L); // Out-of-order event
// Get current watermark (max timestamp - delay = 2000 - 5000 = -3000, but clamped)
Watermark currentWatermark = watermarkStrategy.getWatermark();
System.out.println("Current watermark: " + currentWatermark.getTimestamp());
// Use in table descriptor
Map<String, String> properties = watermarkStrategy.toProperties();
// Properties will contain watermark type and delay configurationWatermark strategy for strictly ascending timestamps where events arrive in order.
/**
* Watermark strategy for strictly ascending timestamps
* Suitable when events are guaranteed to arrive in timestamp order
*/
public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
/**
* Create ascending timestamp watermark strategy
*/
public AscendingTimestamps();
@Override
public void nextTimestamp(long timestamp);
@Override
public Watermark getWatermark();
@Override
public Map<String, String> toProperties();
}Usage Examples:
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
// Create watermark strategy for ascending timestamps
AscendingTimestamps watermarkStrategy = new AscendingTimestamps();
// Process strictly ascending timestamps
watermarkStrategy.nextTimestamp(1000L);
watermarkStrategy.nextTimestamp(2000L);
watermarkStrategy.nextTimestamp(3000L);
// Watermark will be the maximum seen timestamp (3000L)
Watermark watermark = watermarkStrategy.getWatermark();
System.out.println("Watermark: " + watermark.getTimestamp()); // 3000Base class for watermark strategies that emit watermarks based on special marker events.
/**
* Base class for punctuated watermark assignment strategies
* Watermarks are emitted when special marker events are encountered
*/
public abstract class PunctuatedWatermarkAssigner {
/**
* Extract watermark from the current event if it contains watermark information
* @param timestamp Current event timestamp
* @return Watermark if event triggers watermark emission, null otherwise
*/
public abstract Watermark getWatermark(long timestamp);
/**
* Convert watermark strategy to properties for descriptor usage
* @return Properties map for table descriptor configuration
*/
public abstract Map<String, String> toProperties();
}Configure watermarks using the modern Schema API.
import org.apache.flink.table.api.Schema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
// Schema with watermark strategy
Schema schemaWithWatermarksSchema = Schema.newBuilder()
.column("user_id", "STRING")
.column("event_data", "STRING")
.column("event_time", "TIMESTAMP(3)")
.watermark("event_time", "event_time - INTERVAL '5' SECOND") // 5-second delay
.build();
// Apply to DataStream conversion
DataStream<Row> eventStream = env.fromElements(/* data */);
Table table = tableEnv.fromDataStream(eventStream, schemaWithWatermarksSchema);
// Watermark propagation from DataStream
Schema sourceWatermarkSchema = Schema.newBuilder()
.column("user_id", "STRING")
.column("event_data", "STRING")
.columnByMetadata("event_time", "TIMESTAMP_LTZ(3)")
.watermark("event_time", "SOURCE_WATERMARK()") // Propagate from DataStream
.build();Configure watermarks in SQL table definitions.
-- Table with computed watermark
CREATE TABLE events (
user_id STRING,
event_data STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'datagen',
'fields.event_time.kind' = 'random',
'fields.event_time.max-past' = '1h'
);
-- Table with source watermark propagation
CREATE TABLE kafka_events (
user_id STRING,
event_data STRING,
event_time TIMESTAMP_LTZ(3) METADATA,
WATERMARK FOR event_time AS SOURCE_WATERMARK()
) WITH (
'connector' = 'kafka',
'topic' = 'events'
);Implement custom watermark strategies for specific business requirements.
public class BusinessHoursWatermarkStrategy extends PeriodicWatermarkAssigner {
private long maxTimestamp = Long.MIN_VALUE;
private final long businessHourDelay = 30000L; // 30 seconds during business hours
private final long offHourDelay = 300000L; // 5 minutes during off hours
@Override
public void nextTimestamp(long timestamp) {
if (timestamp > maxTimestamp) {
maxTimestamp = timestamp;
}
}
@Override
public Watermark getWatermark() {
if (maxTimestamp == Long.MIN_VALUE) {
return new Watermark(Long.MIN_VALUE);
}
// Determine if current time is during business hours (simplified)
long currentHour = (System.currentTimeMillis() / (1000 * 60 * 60)) % 24;
boolean isBusinessHours = currentHour >= 9 && currentHour <= 17;
long delay = isBusinessHours ? businessHourDelay : offHourDelay;
return new Watermark(maxTimestamp - delay);
}
@Override
public Map<String, String> toProperties() {
Map<String, String> properties = new HashMap<>();
properties.put("watermark.strategy", "business-hours");
return properties;
}
}Use watermark strategies with windowing operations.
// Create table with watermark strategy
tableEnv.executeSql(
"CREATE TABLE sensor_readings (" +
" sensor_id STRING," +
" temperature DOUBLE," +
" reading_time TIMESTAMP(3)," +
" WATERMARK FOR reading_time AS reading_time - INTERVAL '30' SECOND" +
") WITH (" +
" 'connector' = 'datagen'," +
" 'fields.temperature.min' = '15.0'," +
" 'fields.temperature.max' = '35.0'" +
")"
);
// Windowed aggregation with watermarks
Table windowedAggregates = tableEnv.sqlQuery(
"SELECT " +
" sensor_id, " +
" window_start, " +
" window_end, " +
" AVG(temperature) as avg_temp, " +
" MAX(temperature) as max_temp " +
"FROM TABLE(" +
" TUMBLE(TABLE sensor_readings, DESCRIPTOR(reading_time), INTERVAL '1' MINUTE)" +
") " +
"GROUP BY sensor_id, window_start, window_end"
);Configure late data handling with allowed lateness.
-- Table with allowed lateness configuration
CREATE TABLE late_events (
event_id STRING,
event_time TIMESTAMP(3),
event_data STRING,
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'events',
-- Connector-specific late data handling
'scan.watermark.allowed-lateness' = '1min'
);Monitor watermark progress in streaming applications.
// Enable watermark debugging
Configuration config = new Configuration();
config.setString("metrics.reporters", "jmx");
config.setBoolean("metrics.latency.tracking", true);
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment(config);
// Monitor watermarks in processing
DataStream<Row> monitoredStream = tableEnv.toChangelogStream(table)
.map(new RichMapFunction<Row, Row>() {
private transient MetricGroup metricGroup;
private transient Gauge<Long> watermarkGauge;
@Override
public void open(Configuration parameters) {
metricGroup = getRuntimeContext().getMetricGroup();
watermarkGauge = metricGroup.gauge("currentWatermark",
() -> getCurrentWatermark());
}
@Override
public Row map(Row row) {
// Log watermark progress periodically
if (System.currentTimeMillis() % 10000 == 0) {
System.out.println("Current watermark: " + getCurrentWatermark());
}
return row;
}
private long getCurrentWatermark() {
// Get current watermark from context
return getRuntimeContext().getCurrentWatermark();
}
});import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;import org.apache.flink.table.legacy.descriptors.Rowtime;
import java.util.Map;
import java.util.HashMap;import org.apache.flink.table.api.Schema;
import org.apache.flink.table.expressions.Expression;Migrate from legacy descriptor-based watermarks to modern Schema API.
// Legacy approach (deprecated)
Rowtime rowtimeDescriptor = new Rowtime()
.timestampsFromField("event_time")
.watermarksPeriodicBounded(5000L);
// Modern approach
Schema modernSchema = Schema.newBuilder()
.column("event_id", "STRING")
.column("event_time", "TIMESTAMP(3)")
.column("event_data", "STRING")
.watermark("event_time", "event_time - INTERVAL '5' SECOND")
.build();
// Apply to DataStream
Table modernTable = tableEnv.fromDataStream(dataStream, modernSchema);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge