CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-file-sink-common

Common utilities and interfaces for file sink functionality in Apache Flink stream processing applications

Pending
Overview
Eval results
Files

rolling-policies.mddocs/

Rolling Policies

Rolling policies determine when to close the current part file and start a new one in file sink operations. The package provides flexible policies based on file size, time intervals, processing events, and checkpoints.

Capabilities

RollingPolicy Interface

Core interface for implementing file rolling logic.

/**
 * Policy for determining when a Bucket in the Filesystem Sink rolls its currently open part file
 * @param <IN> The type of input elements
 * @param <BucketID> The type of bucket identifier
 */
public interface RollingPolicy<IN, BucketID> extends Serializable {
    /**
     * Determines if the in-progress part file should roll on every checkpoint
     * @param partFileState the state of the currently open part file
     * @return true if the part file should roll, false otherwise
     */
    boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;
    
    /**
     * Determines if the in-progress part file should roll based on its current state
     * @param element the element being processed
     * @param partFileState the state of the currently open part file
     * @return true if the part file should roll, false otherwise
     */
    boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
    
    /**
     * Determines if the in-progress part file should roll based on a time condition
     * @param partFileState the state of the currently open part file
     * @param currentTime the current processing time
     * @return true if the part file should roll, false otherwise
     */
    boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;
}

PartFileInfo Interface

Provides information about the current part file for rolling policy decisions.

/**
 * Interface exposing information about the current (open) part file
 * Used by RollingPolicy to determine if it should roll the part file
 */
public interface PartFileInfo<BucketID> {
    /**
     * @return The bucket identifier of the current buffer
     */
    BucketID getBucketId();
    
    /**
     * @return The creation time (in ms) of the currently open part file
     */
    long getCreationTime();
    
    /**
     * @return The size of the currently open part file
     */
    long getSize() throws IOException;
    
    /**
     * @return The last time (in ms) the currently open part file was written to
     */
    long getLastUpdateTime();
}

DefaultRollingPolicy

Comprehensive rolling policy implementation with configurable size, time, and inactivity thresholds.

/**
 * Default implementation of RollingPolicy
 * Rolls a part file if:
 * 1. there is no open part file
 * 2. current file has reached maximum size (default 128MB)
 * 3. current file is older than rollover interval (default 60 sec)
 * 4. current file has not been written to for more than inactivity time (default 60 sec)
 */
public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
    /** Default inactivity interval: 60 seconds */
    private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
    
    /** Default rollover interval: 60 seconds */
    private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
    
    /** Default maximum part size: 128MB */
    private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
    
    @Override
    public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;
    
    @Override
    public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
    
    @Override
    public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);
    
    /**
     * Returns the maximum part file size before rolling
     * @return Max size in bytes
     */
    public long getMaxPartSize();
    
    /**
     * Returns the maximum time duration a part file can stay open before rolling
     * @return Time duration in milliseconds
     */
    public long getRolloverInterval();
    
    /**
     * Returns time duration of allowed inactivity after which a part file will roll
     * @return Time duration in milliseconds
     */
    public long getInactivityInterval();
    
    /**
     * Creates a new PolicyBuilder for configuring DefaultRollingPolicy
     */
    public static DefaultRollingPolicy.PolicyBuilder builder();
}

DefaultRollingPolicy.PolicyBuilder

Builder for configuring DefaultRollingPolicy instances.

/**
 * Builder class for configuring DefaultRollingPolicy
 */
public static final class PolicyBuilder {
    /**
     * Sets the part size above which a part file will have to roll
     * @param size the allowed part size
     */
    public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(MemorySize size);
    
    /**
     * Sets the interval of allowed inactivity after which a part file will roll
     * @param interval the allowed inactivity interval
     */
    public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(Duration interval);
    
    /**
     * Sets the max time a part file can stay open before having to roll
     * @param interval the desired rollover interval
     */
    public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(Duration interval);
    
    /** Creates the actual policy */
    public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build();
}

Usage Examples:

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.configuration.MemorySize;
import java.time.Duration;

// Default policy (128MB, 60s rollover, 60s inactivity)
RollingPolicy<String, String> defaultPolicy = DefaultRollingPolicy.<String, String>builder().build();

// Custom size and timing
RollingPolicy<String, String> customPolicy = DefaultRollingPolicy.<String, String>builder()
    .withMaxPartSize(MemorySize.ofMebiBytes(256))
    .withRolloverInterval(Duration.ofMinutes(15))
    .withInactivityInterval(Duration.ofMinutes(5))
    .build();

// Large files with longer intervals
RollingPolicy<String, String> largeBatchPolicy = DefaultRollingPolicy.<String, String>builder()
    .withMaxPartSize(MemorySize.ofGibiBytes(1))
    .withRolloverInterval(Duration.ofHours(1))
    .withInactivityInterval(Duration.ofMinutes(30))
    .build();

CheckpointRollingPolicy

Abstract base class for policies that roll on every checkpoint.

/**
 * Abstract RollingPolicy which rolls on every checkpoint
 */
public abstract class CheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
    /** Always returns true - rolls on every checkpoint */
    public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState);
    
    /** Subclasses define event-based rolling behavior */
    public abstract boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
    
    /** Subclasses define time-based rolling behavior */
    public abstract boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;
}

OnCheckpointRollingPolicy

Simple policy that rolls files only on checkpoints.

/**
 * RollingPolicy which rolls ONLY on every checkpoint
 * Does not roll based on events or processing time
 */
public final class OnCheckpointRollingPolicy<IN, BucketID> extends CheckpointRollingPolicy<IN, BucketID> {
    @Override
    public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element);
    
    @Override
    public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);
    
    /** Creates an instance of OnCheckpointRollingPolicy */
    public static <IN, BucketID> OnCheckpointRollingPolicy<IN, BucketID> build();
}

Usage Example:

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;

// Roll only on checkpoints - useful for exactly-once guarantees
RollingPolicy<MyEvent, String> checkpointOnly = OnCheckpointRollingPolicy.<MyEvent, String>build();

Custom Rolling Policies

You can implement custom rolling logic by implementing the RollingPolicy interface:

import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;

public class RecordCountRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
    private final long maxRecords;
    private long recordCount = 0;
    
    public RecordCountRollingPolicy(long maxRecords) {
        this.maxRecords = maxRecords;
    }
    
    @Override
    public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
        return recordCount >= maxRecords;
    }
    
    @Override
    public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) {
        recordCount++;
        return recordCount >= maxRecords;
    }
    
    @Override
    public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) {
        return false; // Don't roll based on time
    }
}

Error Handling

  • Rolling policy methods should handle IOException from PartFileInfo.getSize()
  • Failing rolling policies will cause job failures
  • Rolling decisions are made frequently - avoid expensive operations
  • Time-based rolling frequency is controlled by bucket check interval settings

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-file-sink-common

docs

bucket-assignment.md

configuration.md

file-writers.md

index.md

rolling-policies.md

tile.json