CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

Bridge component for Apache Flink's Table/SQL API that enables Java developers to write table programs that seamlessly interact with Flink's streaming and batch processing APIs.

Pending
Overview
Eval results
Files

watermark-strategies.mddocs/

Watermark Strategies

Watermark strategies in the Flink Table API Java Bridge enable event-time processing by defining how to handle out-of-order events and when to trigger time-based operations. These strategies are essential for windowed operations and temporal joins in streaming applications.

Overview

Watermarks are timestamps that flow as part of the data stream and indicate the progress of event time. They help Flink determine when all events for a particular time window have arrived, enabling the system to produce complete and correct results for time-based operations.

The bridge provides several watermark assignment strategies that can be used with table sources.

Base Classes

PeriodicWatermarkAssigner

Abstract base class for watermark strategies that generate watermarks periodically:

@PublicEvolving
public abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
    public abstract void nextTimestamp(long timestamp);
    public abstract Watermark getWatermark();
    
    // Inherited from WatermarkStrategy
    public abstract Map<String, String> toProperties();
    public abstract boolean equals(Object obj);
    public abstract int hashCode();
}

Usage Pattern:

public class MyPeriodicWatermarkAssigner extends PeriodicWatermarkAssigner {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness;
    
    public MyPeriodicWatermarkAssigner(long maxOutOfOrderness) {
        this.maxOutOfOrderness = maxOutOfOrderness;
    }
    
    @Override
    public void nextTimestamp(long timestamp) {
        maxTimestamp = Math.max(maxTimestamp, timestamp);
    }
    
    @Override
    public Watermark getWatermark() {
        return new Watermark(maxTimestamp - maxOutOfOrderness);
    }
    
    @Override
    public long extractTimestamp(Row element, long recordTimestamp) {
        // Extract timestamp from the row
        return (Long) element.getField(2); // Assuming timestamp is at index 2
    }
}

PunctuatedWatermarkAssigner

Abstract base class for watermark strategies that generate watermarks based on specific events:

@PublicEvolving
public abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
    public abstract Watermark getWatermark(Row row, long timestamp);
    
    // Inherited from WatermarkStrategy
    public abstract Map<String, String> toProperties();
    public abstract boolean equals(Object obj);  
    public abstract int hashCode();
}

Usage Pattern:

public class MyPunctuatedWatermarkAssigner extends PunctuatedWatermarkAssigner {
    
    @Override
    public Watermark getWatermark(Row row, long extractedTimestamp) {
        // Generate watermark based on special marker events
        String eventType = (String) row.getField(1);
        if ("WATERMARK_EVENT".equals(eventType)) {
            return new Watermark(extractedTimestamp);
        }
        return null; // No watermark for regular events
    }
    
    @Override
    public long extractTimestamp(Row element, long recordTimestamp) {
        return (Long) element.getField(0); // Extract timestamp from row
    }
}

Built-in Strategies

AscendingTimestamps

Watermark strategy for streams with strictly ascending timestamps:

@PublicEvolving
public class AscendingTimestamps extends PeriodicWatermarkAssigner {
    public void nextTimestamp(long timestamp);
    public Watermark getWatermark();
}

Usage Example:

// For streams where timestamps are guaranteed to be ascending
AscendingTimestamps watermarkStrategy = new AscendingTimestamps() {
    @Override
    public long extractTimestamp(Row element, long recordTimestamp) {
        return (Long) element.getField(3); // timestamp field index
    }
};

// Use with legacy table source
public class MyTableSource implements StreamTableSource<Row> {
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        return execEnv
            .addSource(new MySourceFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy);
    }
}

BoundedOutOfOrderTimestamps

Watermark strategy for streams with bounded out-of-order events:

@PublicEvolving
public class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
    public BoundedOutOfOrderTimestamps(long maxOutOfOrderness);
    public void nextTimestamp(long timestamp);
    public Watermark getWatermark();
}

Usage Example:

// For streams where events can arrive up to 5 seconds out of order
BoundedOutOfOrderTimestamps watermarkStrategy = 
    new BoundedOutOfOrderTimestamps(5000L) { // 5 seconds max out-of-order
        @Override
        public long extractTimestamp(Row element, long recordTimestamp) {
            return (Long) element.getField(2); // event timestamp field
        }
    };

// Integration with table source
public class EventTableSource implements StreamTableSource<Row> {
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        return execEnv
            .addSource(new EventSourceFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy);
    }
}

Modern Watermark Integration

With modern table sources, watermarks are typically defined in the table schema:

// Define watermark in table schema
Schema schema = Schema.newBuilder()
    .column("user_id", DataTypes.STRING())
    .column("event_type", DataTypes.STRING())
    .column("event_time", DataTypes.TIMESTAMP_LTZ(3))
    .column("processing_time", DataTypes.TIMESTAMP_LTZ(3))
    .watermark("event_time", "event_time - INTERVAL '5' SECONDS")
    .build();

// Create table with watermark
tableEnv.createTable("events_with_watermark", 
    TableDescriptor.forConnector("my-connector")
        .schema(schema)
        .build());

Legacy Integration Patterns

With StreamTableSource

public class WatermarkedStreamTableSource implements StreamTableSource<Row> {
    private final long maxOutOfOrderness;
    
    public WatermarkedStreamTableSource(long maxOutOfOrderness) {
        this.maxOutOfOrderness = maxOutOfOrderness;
    }
    
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        BoundedOutOfOrderTimestamps watermarkStrategy = 
            new BoundedOutOfOrderTimestamps(maxOutOfOrderness) {
                @Override
                public long extractTimestamp(Row element, long recordTimestamp) {
                    // Extract event time from row
                    Timestamp eventTime = (Timestamp) element.getField(2);
                    return eventTime.getTime();
                }
            };
        
        return execEnv
            .addSource(new MySourceFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy);
    }
    
    @Override
    public TableSchema getTableSchema() {
        return TableSchema.builder()
            .field("id", DataTypes.BIGINT())
            .field("data", DataTypes.STRING())
            .field("event_time", DataTypes.TIMESTAMP(3))
            .build();
    }
}

With DataStream Conversion

// Convert DataStream with watermarks to Table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create DataStream with watermarks
DataStream<Event> eventStream = env
    .addSource(new EventSource())
    .assignTimestampsAndWatermarks(
        new BoundedOutOfOrderTimestamps(Duration.ofSeconds(5)) {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.getEventTime();
            }
        }
    );

// Convert to Table while preserving watermarks
Schema schema = Schema.newBuilder()
    .column("id", DataTypes.BIGINT())
    .column("data", DataTypes.STRING())
    .column("event_time", DataTypes.TIMESTAMP_LTZ(3))
    .watermark("event_time", "SOURCE_WATERMARK()") // Preserve existing watermarks
    .build();

Table eventTable = tableEnv.fromDataStream(eventStream, schema);

Windowed Operations with Watermarks

Tumbling Windows

// Use watermarks with tumbling windows
Table windowedResult = tableEnv.sqlQuery("""
    SELECT 
        user_id,
        COUNT(*) as event_count,
        SUM(amount) as total_amount,
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
        TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
    FROM events_with_watermark
    GROUP BY 
        user_id,
        TUMBLE(event_time, INTERVAL '1' MINUTE)
""");

Sliding Windows

// Sliding windows with watermarks
Table slidingResult = tableEnv.sqlQuery("""
    SELECT 
        user_id,
        AVG(value) as avg_value,
        HOP_START(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES) as window_start,
        HOP_END(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES) as window_end
    FROM events_with_watermark
    GROUP BY 
        user_id,
        HOP(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES)
""");

Session Windows

// Session windows with watermarks
Table sessionResult = tableEnv.sqlQuery("""
    SELECT 
        user_id,
        COUNT(*) as session_events,
        MIN(event_time) as session_start,
        MAX(event_time) as session_end,
        SESSION_START(event_time, INTERVAL '10' MINUTES) as session_window_start,
        SESSION_END(event_time, INTERVAL '10' MINUTES) as session_window_end
    FROM user_events
    GROUP BY 
        user_id,
        SESSION(event_time, INTERVAL '10' MINUTES)
""");

Custom Watermark Strategies

Business Logic Based Watermarks

public class BusinessLogicWatermarkAssigner extends PeriodicWatermarkAssigner {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long gracePeriod;
    
    public BusinessLogicWatermarkAssigner(long gracePeriodMs) {
        this.gracePeriod = gracePeriodMs;
    }
    
    @Override
    public void nextTimestamp(long timestamp) {
        maxTimestamp = Math.max(maxTimestamp, timestamp);
    }
    
    @Override
    public Watermark getWatermark() {
        // Business rule: allow 30% of grace period for late events on weekends
        long currentTime = System.currentTimeMillis();
        Calendar cal = Calendar.getInstance();
        cal.setTimeInMillis(currentTime);
        
        long adjustedGracePeriod = gracePeriod;
        if (cal.get(Calendar.DAY_OF_WEEK) == Calendar.SATURDAY || 
            cal.get(Calendar.DAY_OF_WEEK) == Calendar.SUNDAY) {
            adjustedGracePeriod = (long) (gracePeriod * 1.3);
        }
        
        return new Watermark(maxTimestamp - adjustedGracePeriod);
    }
    
    @Override
    public long extractTimestamp(Row element, long recordTimestamp) {
        return (Long) element.getField(1);
    }
}

Multi-Source Watermark Coordination

public class CoordinatedWatermarkAssigner extends PeriodicWatermarkAssigner {
    private final Map<String, Long> sourceWatermarksNeed = new HashMap<>();
    private final long defaultLag;
    
    public CoordinatedWatermarkAssigner(long defaultLagMs) {
        this.defaultLag = defaultLagMs;
    }
    
    @Override
    public void nextTimestamp(long timestamp) {
        // Update watermark per source
        // Implementation would track per-source timestamps
    }
    
    @Override
    public Watermark getWatermark() {
        // Return minimum watermark across all sources
        long minWatermark = sourceWatermarksNeed.values().stream()
            .mapToLong(Long::longValue)
            .min()
            .orElse(Long.MIN_VALUE);
            
        return minWatermark == Long.MIN_VALUE ? 
            null : new Watermark(minWatermark - defaultLag);
    }
    
    @Override
    public long extractTimestamp(Row element, long recordTimestamp) {
        String sourceId = (String) element.getField(0);
        long timestamp = (Long) element.getField(2);
        
        // Update per-source watermark tracking
        sourceWatermarksNeed.put(sourceId, Math.max(
            sourceWatermarksNeed.getOrDefault(sourceId, Long.MIN_VALUE), 
            timestamp
        ));
        
        return timestamp;
    }
}

Best Practices

Watermark Configuration

  1. Choose appropriate lag: Balance between lateness tolerance and result timeliness
  2. Monitor late events: Track events arriving after watermarks
  3. Consider business requirements: Different domains may need different lateness handling
// Monitor late events
DataStream<Row> eventStream = tableEnv.toDataStream(eventsTable);
OutputTag<Row> lateEventsTag = new OutputTag<Row>("late-events"){};

SingleOutputStreamOperator<Row> processedStream = eventStream
    .keyBy(row -> row.getField(0))
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))  // Allow 1 minute lateness
    .sideOutputLateData(lateEventsTag)
    .apply(new MyWindowFunction());

// Handle late events separately
DataStream<Row> lateEvents = processedStream.getSideOutput(lateEventsTag);
lateEvents.addSink(new LateEventsSink());

Performance Optimization

  1. Periodic interval: Configure watermark generation interval appropriately
  2. Timestamp extraction: Optimize timestamp extraction for performance
  3. Memory usage: Be mindful of state size in custom watermark assigners
// Configure watermark interval
env.getConfig().setAutoWatermarkInterval(1000L); // 1 second

// Efficient timestamp extraction
public class OptimizedWatermarkAssigner extends BoundedOutOfOrderTimestamps {
    public OptimizedWatermarkAssigner(long maxOutOfOrderness) {
        super(maxOutOfOrderness);
    }
    
    @Override
    public long extractTimestamp(Row element, long recordTimestamp) {
        // Use recordTimestamp when available to avoid field access
        if (recordTimestamp != Long.MIN_VALUE) {
            return recordTimestamp;
        }
        return (Long) element.getField(2);
    }
}

Error Handling

public class RobustWatermarkAssigner extends PeriodicWatermarkAssigner {
    @Override
    public long extractTimestamp(Row element, long recordTimestamp) {
        try {
            Object timestampField = element.getField(2);
            if (timestampField instanceof Long) {
                return (Long) timestampField;
            } else if (timestampField instanceof Timestamp) {
                return ((Timestamp) timestampField).getTime();
            } else {
                // Log warning and use processing time
                LOG.warn("Invalid timestamp field type: {}", timestampField.getClass());
                return System.currentTimeMillis();
            }
        } catch (Exception e) {
            LOG.error("Error extracting timestamp", e);
            return System.currentTimeMillis();
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge-2-11

docs

built-in-connectors.md

datastream-conversions.md

index.md

legacy-connector-support.md

modern-connector-framework.md

stream-table-environment.md

watermark-strategies.md

tile.json