CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-java-2-11

Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities

Pending
Overview
Eval results
Files

time-watermarks.mddocs/

Time and Watermarks

Apache Flink provides sophisticated time semantics for handling event time, processing time, and ingestion time in streaming applications. Watermarks enable handling of out-of-order events and late data in event-time processing.

Capabilities

Time Characteristics

Configure the time semantics for your streaming application.

/**
 * Set the time characteristic for the streaming application
 * @param characteristic - time characteristic to use
 */
StreamExecutionEnvironment setStreamTimeCharacteristic(TimeCharacteristic characteristic);

/**
 * Get the current time characteristic
 */
TimeCharacteristic getStreamTimeCharacteristic();

/**
 * Time characteristic enumeration
 */
enum TimeCharacteristic {
    ProcessingTime,  // Time when elements are processed by operators
    IngestionTime,   // Time when elements enter Flink
    EventTime        // Time embedded in the elements themselves
}

Usage Examples:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Set event time processing (most common for accurate results)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Set processing time (lowest latency, no late data handling)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// Set ingestion time (compromise between accuracy and performance)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

Watermark Strategies

Define watermark generation strategies for handling event time and out-of-order events.

/**
 * Watermark strategy interface
 */
interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
    /**
     * Create strategy for monotonous timestamps (no out-of-order events)
     */
    static <T> WatermarkStrategy<T> forMonotonousTimestamps();
    
    /**
     * Create strategy for bounded out-of-orderness
     * @param maxOutOfOrderness - maximum time by which events can be late
     */
    static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness);
    
    /**
     * Create strategy with no watermarks
     */
    static <T> WatermarkStrategy<T> noWatermarks();
    
    /**
     * Add timestamp assigner to strategy
     * @param timestampAssigner - timestamp assigner supplier
     */
    WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner);
    
    /**
     * Add idleness detection
     * @param idleTimeout - timeout after which source is considered idle
     */
    WatermarkStrategy<T> withIdleness(Duration idleTimeout);
}

/**
 * Assign watermarks and timestamps to stream
 * @param watermarkStrategy - watermark strategy to use
 */
SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy);

Usage Examples:

DataStream<Event> events = env.addSource(new EventSource());

// Monotonous timestamps - events arrive in timestamp order
DataStream<Event> monotonousStream = events.assignTimestampsAndWatermarks(
    WatermarkStrategy.<Event>forMonotonousTimestamps()
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);

// Bounded out-of-orderness - events can be up to 5 seconds late
DataStream<Event> boundedStream = events.assignTimestampsAndWatermarks(
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);

// With idleness detection - handle slow sources
DataStream<Event> idleStream = events.assignTimestampsAndWatermarks(
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
        .withIdleness(Duration.ofMinutes(1))
);

// Custom watermark generator
DataStream<Event> customStream = events.assignTimestampsAndWatermarks(
    new WatermarkStrategy<Event>() {
        @Override
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomWatermarkGenerator();
        }

        @Override
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (event, timestamp) -> event.getTimestamp();
        }
    }
);

Timestamp Assigners

Extract timestamps from elements for event time processing.

/**
 * Interface for assigning timestamps to elements
 */
interface TimestampAssigner<T> {
    /**
     * Extract timestamp from element
     * @param element - input element
     * @param recordTimestamp - previously assigned timestamp (or Long.MIN_VALUE if none)
     * @return timestamp for the element
     */
    long extractTimestamp(T element, long recordTimestamp);
}

/**
 * Supplier for timestamp assigners
 */
interface TimestampAssignerSupplier<T> {
    TimestampAssigner<T> createTimestampAssigner(Context context);
    
    interface Context {
        MetricGroup getMetricGroup();
    }
}

Usage Examples:

// Lambda timestamp assigner
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());

// Custom timestamp assigner
TimestampAssigner<LogEntry> logTimestampAssigner = new TimestampAssigner<LogEntry>() {
    @Override
    public long extractTimestamp(LogEntry log, long recordTimestamp) {
        return log.getTimestamp();
    }
};

WatermarkStrategy<LogEntry> logStrategy = WatermarkStrategy
    .<LogEntry>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner(context -> logTimestampAssigner);

Watermark Generators

Generate watermarks to signal event time progress.

/**
 * Interface for generating watermarks
 */
interface WatermarkGenerator<T> {
    /**
     * Called for every event
     * @param event - input event
     * @param eventTimestamp - timestamp of the event
     * @param output - output for emitting watermarks
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    
    /**
     * Called periodically (based on auto watermark interval)
     * @param output - output for emitting watermarks
     */
    void onPeriodicEmit(WatermarkOutput output);
}

/**
 * Watermark output interface
 */
interface WatermarkOutput {
    void emitWatermark(Watermark watermark);
    void markIdle();
    void markActive();
}

/**
 * Supplier for watermark generators
 */
interface WatermarkGeneratorSupplier<T> {
    WatermarkGenerator<T> createWatermarkGenerator(Context context);
    
    interface Context {
        MetricGroup getMetricGroup();
    }
}

Usage Examples:

// Custom watermark generator
class CustomWatermarkGenerator implements WatermarkGenerator<Event> {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long outOfOrdernessMillis = 5000;

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        if (maxTimestamp != Long.MIN_VALUE) {
            output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis));
        }
    }
}

// Punctuated watermark generator
class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {
    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        if (event.isWatermarkEvent()) {
            output.emitWatermark(new Watermark(eventTimestamp));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // No periodic watermarks needed
    }
}

Time Utility Classes

Utility classes for working with time in Flink applications.

/**
 * Time utility class for creating time values
 */
class Time {
    static Time of(long size, TimeUnit unit);
    static Time milliseconds(long milliseconds);
    static Time seconds(long seconds);
    static Time minutes(long minutes);
    static Time hours(long hours);
    static Time days(long days);
    
    long toMilliseconds();
    TimeUnit getUnit();
    long getSize();
}

/**
 * Watermark class
 */
class Watermark {
    public Watermark(long timestamp);
    long getTimestamp();
    
    // Special watermarks
    static final Watermark MAX_WATERMARK;
}

Usage Examples:

// Creating time values
Time fiveMinutes = Time.minutes(5);
Time thirtySeconds = Time.seconds(30);
Time oneHour = Time.of(1, TimeUnit.HOURS);

// Convert to milliseconds
long millis = fiveMinutes.toMilliseconds(); // 300000

// Creating watermarks
Watermark watermark = new Watermark(System.currentTimeMillis());
long timestamp = watermark.getTimestamp();

Legacy Timestamp Extractors (Deprecated)

Note: These are deprecated in favor of WatermarkStrategy but may still be found in older code.

// Deprecated timestamp extractors
@Deprecated
abstract class TimestampExtractor<T> implements TimestampAssigner<T> {
    abstract long extractTimestamp(T element);
}

@Deprecated
abstract class AscendingTimestampExtractor<T> extends TimestampExtractor<T> {
    // For monotonously increasing timestamps
}

@Deprecated
abstract class BoundedOutOfOrdernessTimestampExtractor<T> extends TimestampExtractor<T> {
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness);
}

@Deprecated
abstract class IngestionTimeExtractor<T> extends TimestampExtractor<T> {
    // Uses ingestion time as timestamp
}

Types

Time-related Interfaces

// Watermark strategy
interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
    static <T> WatermarkStrategy<T> forMonotonousTimestamps();
    static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness);
    static <T> WatermarkStrategy<T> noWatermarks();
    WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner);
    WatermarkStrategy<T> withIdleness(Duration idleTimeout);
}

// Timestamp assigner
interface TimestampAssigner<T> {
    long extractTimestamp(T element, long recordTimestamp);
}

// Watermark generator
interface WatermarkGenerator<T> {
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    void onPeriodicEmit(WatermarkOutput output);
}

// Watermark output
interface WatermarkOutput {
    void emitWatermark(Watermark watermark);
    void markIdle();
    void markActive();
}

Time Classes

// Time characteristic enum
enum TimeCharacteristic {
    ProcessingTime,  // Processing time semantics
    IngestionTime,   // Ingestion time semantics
    EventTime        // Event time semantics
}

// Time utility class
class Time implements Comparable<Time>, Serializable {
    static Time of(long size, TimeUnit unit);
    static Time milliseconds(long milliseconds);
    static Time seconds(long seconds);
    static Time minutes(long minutes);
    static Time hours(long hours);
    static Time days(long days);
    
    long toMilliseconds();
    TimeUnit getUnit();
    long getSize();
}

// Watermark class
class Watermark implements Serializable {
    public static final Watermark MAX_WATERMARK;
    
    public Watermark(long timestamp);
    long getTimestamp();
    boolean equals(Object obj);
    int hashCode();
}

Built-in Watermark Strategies

// Built-in watermark generators
class BoundedOutOfOrdernessWatermarksGenerator<T> implements WatermarkGenerator<T> {
    // Handles bounded out-of-orderness
}

class MonotonousWatermarksGenerator<T> implements WatermarkGenerator<T> {
    // For monotonously increasing timestamps
}

class NoWatermarksGenerator<T> implements WatermarkGenerator<T> {
    // No watermark generation
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-java-2-11

docs

async-io.md

checkpointing.md

datastream-transformations.md

execution-environment.md

index.md

keyed-streams-state.md

process-functions.md

sources-sinks.md

time-watermarks.md

windowing.md

tile.json