CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge

Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.

Pending
Overview
Eval results
Files

watermark-strategies.mddocs/

Watermark Strategies

Time-based event processing with configurable watermark assignment strategies for handling out-of-order events in streaming applications. These strategies ensure proper event-time semantics and enable accurate windowing operations.

Capabilities

Periodic Watermark Assigner

Base class for watermark strategies that emit watermarks at regular intervals.

/**
 * Base class for periodic watermark assignment strategies
 * Watermarks are emitted periodically based on processed timestamps
 */
public abstract class PeriodicWatermarkAssigner {
    
    /**
     * Process the next timestamp from incoming events
     * @param timestamp Event timestamp to process
     */
    public abstract void nextTimestamp(long timestamp);
    
    /**
     * Get the current watermark based on processed timestamps
     * @return Current watermark
     */
    public abstract Watermark getWatermark();
    
    /**
     * Convert watermark strategy to properties for descriptor usage
     * @return Properties map for table descriptor configuration
     */
    public abstract Map<String, String> toProperties();
}

Bounded Out-of-Order Timestamps

Watermark strategy for handling events that arrive out-of-order within a bounded time interval.

/**
 * Watermark strategy for rowtime attributes which are out-of-order by a bounded time interval
 * Emits watermarks which are the maximum observed timestamp minus the specified delay
 */
public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
    
    /**
     * Create bounded out-of-order watermark strategy
     * @param delay The delay by which watermarks are behind the maximum observed timestamp
     */
    public BoundedOutOfOrderTimestamps(long delay);
    
    @Override
    public void nextTimestamp(long timestamp);
    
    @Override
    public Watermark getWatermark();
    
    @Override
    public Map<String, String> toProperties();
}

Usage Examples:

import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
import org.apache.flink.streaming.api.watermark.Watermark;

// Create watermark strategy with 5-second delay for out-of-order events
BoundedOutOfOrderTimestamps watermarkStrategy = new BoundedOutOfOrderTimestamps(5000L);

// Process timestamps (simulating event processing)
watermarkStrategy.nextTimestamp(1000L);
watermarkStrategy.nextTimestamp(2000L);
watermarkStrategy.nextTimestamp(1500L); // Out-of-order event

// Get current watermark (max timestamp - delay = 2000 - 5000 = -3000, but clamped)
Watermark currentWatermark = watermarkStrategy.getWatermark();
System.out.println("Current watermark: " + currentWatermark.getTimestamp());

// Use in table descriptor
Map<String, String> properties = watermarkStrategy.toProperties();
// Properties will contain watermark type and delay configuration

Ascending Timestamps

Watermark strategy for strictly ascending timestamps where events arrive in order.

/**
 * Watermark strategy for strictly ascending timestamps
 * Suitable when events are guaranteed to arrive in timestamp order
 */
public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
    
    /**
     * Create ascending timestamp watermark strategy
     */
    public AscendingTimestamps();
    
    @Override
    public void nextTimestamp(long timestamp);
    
    @Override
    public Watermark getWatermark();
    
    @Override
    public Map<String, String> toProperties();
}

Usage Examples:

import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;

// Create watermark strategy for ascending timestamps
AscendingTimestamps watermarkStrategy = new AscendingTimestamps();

// Process strictly ascending timestamps
watermarkStrategy.nextTimestamp(1000L);
watermarkStrategy.nextTimestamp(2000L);
watermarkStrategy.nextTimestamp(3000L);

// Watermark will be the maximum seen timestamp (3000L)
Watermark watermark = watermarkStrategy.getWatermark();
System.out.println("Watermark: " + watermark.getTimestamp()); // 3000

Punctuated Watermark Assigner

Base class for watermark strategies that emit watermarks based on special marker events.

/**
 * Base class for punctuated watermark assignment strategies
 * Watermarks are emitted when special marker events are encountered
 */
public abstract class PunctuatedWatermarkAssigner {
    
    /**
     * Extract watermark from the current event if it contains watermark information
     * @param timestamp Current event timestamp
     * @return Watermark if event triggers watermark emission, null otherwise
     */
    public abstract Watermark getWatermark(long timestamp);
    
    /**
     * Convert watermark strategy to properties for descriptor usage
     * @return Properties map for table descriptor configuration
     */
    public abstract Map<String, String> toProperties();
}

Integration with Table API

Schema-based Watermark Configuration

Configure watermarks using the modern Schema API.

import org.apache.flink.table.api.Schema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;

// Schema with watermark strategy
Schema schemaWithWatermarksSchema = Schema.newBuilder()
    .column("user_id", "STRING")
    .column("event_data", "STRING")
    .column("event_time", "TIMESTAMP(3)")
    .watermark("event_time", "event_time - INTERVAL '5' SECOND") // 5-second delay
    .build();

// Apply to DataStream conversion
DataStream<Row> eventStream = env.fromElements(/* data */);
Table table = tableEnv.fromDataStream(eventStream, schemaWithWatermarksSchema);

// Watermark propagation from DataStream
Schema sourceWatermarkSchema = Schema.newBuilder()
    .column("user_id", "STRING")
    .column("event_data", "STRING")
    .columnByMetadata("event_time", "TIMESTAMP_LTZ(3)")
    .watermark("event_time", "SOURCE_WATERMARK()") // Propagate from DataStream
    .build();

SQL DDL Watermark Configuration

Configure watermarks in SQL table definitions.

-- Table with computed watermark
CREATE TABLE events (
    user_id STRING,
    event_data STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'datagen',
    'fields.event_time.kind' = 'random',
    'fields.event_time.max-past' = '1h'
);

-- Table with source watermark propagation
CREATE TABLE kafka_events (
    user_id STRING,
    event_data STRING,
    event_time TIMESTAMP_LTZ(3) METADATA,
    WATERMARK FOR event_time AS SOURCE_WATERMARK()
) WITH (
    'connector' = 'kafka',
    'topic' = 'events'
);

Advanced Watermark Patterns

Custom Watermark Strategy Implementation

Implement custom watermark strategies for specific business requirements.

public class BusinessHoursWatermarkStrategy extends PeriodicWatermarkAssigner {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long businessHourDelay = 30000L; // 30 seconds during business hours
    private final long offHourDelay = 300000L;     // 5 minutes during off hours
    
    @Override
    public void nextTimestamp(long timestamp) {
        if (timestamp > maxTimestamp) {
            maxTimestamp = timestamp;
        }
    }
    
    @Override
    public Watermark getWatermark() {
        if (maxTimestamp == Long.MIN_VALUE) {
            return new Watermark(Long.MIN_VALUE);
        }
        
        // Determine if current time is during business hours (simplified)
        long currentHour = (System.currentTimeMillis() / (1000 * 60 * 60)) % 24;
        boolean isBusinessHours = currentHour >= 9 && currentHour <= 17;
        
        long delay = isBusinessHours ? businessHourDelay : offHourDelay;
        return new Watermark(maxTimestamp - delay);
    }
    
    @Override
    public Map<String, String> toProperties() {
        Map<String, String> properties = new HashMap<>();
        properties.put("watermark.strategy", "business-hours");
        return properties;
    }
}

Windowing with Watermarks

Use watermark strategies with windowing operations.

// Create table with watermark strategy
tableEnv.executeSql(
    "CREATE TABLE sensor_readings (" +
    "  sensor_id STRING," +
    "  temperature DOUBLE," +
    "  reading_time TIMESTAMP(3)," +
    "  WATERMARK FOR reading_time AS reading_time - INTERVAL '30' SECOND" +
    ") WITH (" +
    "  'connector' = 'datagen'," +
    "  'fields.temperature.min' = '15.0'," +
    "  'fields.temperature.max' = '35.0'" +
    ")"
);

// Windowed aggregation with watermarks
Table windowedAggregates = tableEnv.sqlQuery(
    "SELECT " +
    "  sensor_id, " +
    "  window_start, " +
    "  window_end, " +
    "  AVG(temperature) as avg_temp, " +
    "  MAX(temperature) as max_temp " +
    "FROM TABLE(" +
    "  TUMBLE(TABLE sensor_readings, DESCRIPTOR(reading_time), INTERVAL '1' MINUTE)" +
    ") " +
    "GROUP BY sensor_id, window_start, window_end"
);

Late Data Handling

Configure late data handling with allowed lateness.

-- Table with allowed lateness configuration
CREATE TABLE late_events (
    event_id STRING,
    event_time TIMESTAMP(3),
    event_data STRING,
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'events',
    -- Connector-specific late data handling
    'scan.watermark.allowed-lateness' = '1min'
);

Watermark Monitoring

Watermark Debugging

Monitor watermark progress in streaming applications.

// Enable watermark debugging
Configuration config = new Configuration();
config.setString("metrics.reporters", "jmx");
config.setBoolean("metrics.latency.tracking", true);

StreamExecutionEnvironment env = StreamExecutionEnvironment
    .getExecutionEnvironment(config);

// Monitor watermarks in processing
DataStream<Row> monitoredStream = tableEnv.toChangelogStream(table)
    .map(new RichMapFunction<Row, Row>() {
        private transient MetricGroup metricGroup;
        private transient Gauge<Long> watermarkGauge;
        
        @Override
        public void open(Configuration parameters) {
            metricGroup = getRuntimeContext().getMetricGroup();
            watermarkGauge = metricGroup.gauge("currentWatermark", 
                () -> getCurrentWatermark());
        }
        
        @Override
        public Row map(Row row) {
            // Log watermark progress periodically
            if (System.currentTimeMillis() % 10000 == 0) {
                System.out.println("Current watermark: " + getCurrentWatermark());
            }
            return row;
        }
        
        private long getCurrentWatermark() {
            // Get current watermark from context
            return getRuntimeContext().getCurrentWatermark();
        }
    });

Types

Core Watermark Types

import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;

Legacy Descriptor Types

import org.apache.flink.table.legacy.descriptors.Rowtime;
import java.util.Map;
import java.util.HashMap;

Schema Integration Types

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.expressions.Expression;

Migration from Legacy APIs

Descriptor to Schema Migration

Migrate from legacy descriptor-based watermarks to modern Schema API.

// Legacy approach (deprecated)
Rowtime rowtimeDescriptor = new Rowtime()
    .timestampsFromField("event_time")
    .watermarksPeriodicBounded(5000L);

// Modern approach
Schema modernSchema = Schema.newBuilder()
    .column("event_id", "STRING")
    .column("event_time", "TIMESTAMP(3)")
    .column("event_data", "STRING")
    .watermark("event_time", "event_time - INTERVAL '5' SECOND")
    .build();

// Apply to DataStream
Table modernTable = tableEnv.fromDataStream(dataStream, modernSchema);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge

docs

builtin-connectors.md

changelog-processing.md

datastream-connectors.md

index.md

procedures.md

statement-sets.md

stream-table-environment.md

watermark-strategies.md

tile.json