Common utilities and interfaces for file sink functionality in Apache Flink stream processing applications
—
Rolling policies determine when to close the current part file and start a new one in file sink operations. The package provides flexible policies based on file size, time intervals, processing events, and checkpoints.
Core interface for implementing file rolling logic.
/**
* Policy for determining when a Bucket in the Filesystem Sink rolls its currently open part file
* @param <IN> The type of input elements
* @param <BucketID> The type of bucket identifier
*/
public interface RollingPolicy<IN, BucketID> extends Serializable {
/**
* Determines if the in-progress part file should roll on every checkpoint
* @param partFileState the state of the currently open part file
* @return true if the part file should roll, false otherwise
*/
boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;
/**
* Determines if the in-progress part file should roll based on its current state
* @param element the element being processed
* @param partFileState the state of the currently open part file
* @return true if the part file should roll, false otherwise
*/
boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
/**
* Determines if the in-progress part file should roll based on a time condition
* @param partFileState the state of the currently open part file
* @param currentTime the current processing time
* @return true if the part file should roll, false otherwise
*/
boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;
}Provides information about the current part file for rolling policy decisions.
/**
* Interface exposing information about the current (open) part file
* Used by RollingPolicy to determine if it should roll the part file
*/
public interface PartFileInfo<BucketID> {
/**
* @return The bucket identifier of the current buffer
*/
BucketID getBucketId();
/**
* @return The creation time (in ms) of the currently open part file
*/
long getCreationTime();
/**
* @return The size of the currently open part file
*/
long getSize() throws IOException;
/**
* @return The last time (in ms) the currently open part file was written to
*/
long getLastUpdateTime();
}Comprehensive rolling policy implementation with configurable size, time, and inactivity thresholds.
/**
* Default implementation of RollingPolicy
* Rolls a part file if:
* 1. there is no open part file
* 2. current file has reached maximum size (default 128MB)
* 3. current file is older than rollover interval (default 60 sec)
* 4. current file has not been written to for more than inactivity time (default 60 sec)
*/
public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
/** Default inactivity interval: 60 seconds */
private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
/** Default rollover interval: 60 seconds */
private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
/** Default maximum part size: 128MB */
private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;
@Override
public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);
/**
* Returns the maximum part file size before rolling
* @return Max size in bytes
*/
public long getMaxPartSize();
/**
* Returns the maximum time duration a part file can stay open before rolling
* @return Time duration in milliseconds
*/
public long getRolloverInterval();
/**
* Returns time duration of allowed inactivity after which a part file will roll
* @return Time duration in milliseconds
*/
public long getInactivityInterval();
/**
* Creates a new PolicyBuilder for configuring DefaultRollingPolicy
*/
public static DefaultRollingPolicy.PolicyBuilder builder();
}Builder for configuring DefaultRollingPolicy instances.
/**
* Builder class for configuring DefaultRollingPolicy
*/
public static final class PolicyBuilder {
/**
* Sets the part size above which a part file will have to roll
* @param size the allowed part size
*/
public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(MemorySize size);
/**
* Sets the interval of allowed inactivity after which a part file will roll
* @param interval the allowed inactivity interval
*/
public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(Duration interval);
/**
* Sets the max time a part file can stay open before having to roll
* @param interval the desired rollover interval
*/
public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(Duration interval);
/** Creates the actual policy */
public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build();
}Usage Examples:
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.configuration.MemorySize;
import java.time.Duration;
// Default policy (128MB, 60s rollover, 60s inactivity)
RollingPolicy<String, String> defaultPolicy = DefaultRollingPolicy.<String, String>builder().build();
// Custom size and timing
RollingPolicy<String, String> customPolicy = DefaultRollingPolicy.<String, String>builder()
.withMaxPartSize(MemorySize.ofMebiBytes(256))
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.build();
// Large files with longer intervals
RollingPolicy<String, String> largeBatchPolicy = DefaultRollingPolicy.<String, String>builder()
.withMaxPartSize(MemorySize.ofGibiBytes(1))
.withRolloverInterval(Duration.ofHours(1))
.withInactivityInterval(Duration.ofMinutes(30))
.build();Abstract base class for policies that roll on every checkpoint.
/**
* Abstract RollingPolicy which rolls on every checkpoint
*/
public abstract class CheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
/** Always returns true - rolls on every checkpoint */
public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState);
/** Subclasses define event-based rolling behavior */
public abstract boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
/** Subclasses define time-based rolling behavior */
public abstract boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;
}Simple policy that rolls files only on checkpoints.
/**
* RollingPolicy which rolls ONLY on every checkpoint
* Does not roll based on events or processing time
*/
public final class OnCheckpointRollingPolicy<IN, BucketID> extends CheckpointRollingPolicy<IN, BucketID> {
@Override
public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element);
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime);
/** Creates an instance of OnCheckpointRollingPolicy */
public static <IN, BucketID> OnCheckpointRollingPolicy<IN, BucketID> build();
}Usage Example:
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
// Roll only on checkpoints - useful for exactly-once guarantees
RollingPolicy<MyEvent, String> checkpointOnly = OnCheckpointRollingPolicy.<MyEvent, String>build();You can implement custom rolling logic by implementing the RollingPolicy interface:
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
public class RecordCountRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
private final long maxRecords;
private long recordCount = 0;
public RecordCountRollingPolicy(long maxRecords) {
this.maxRecords = maxRecords;
}
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
return recordCount >= maxRecords;
}
@Override
public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) {
recordCount++;
return recordCount >= maxRecords;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) {
return false; // Don't roll based on time
}
}IOException from PartFileInfo.getSize()Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-file-sink-common