Common utilities and interfaces for file sink functionality in Apache Flink stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-file-sink-common@2.1.0Apache Flink File Sink Common provides foundational utilities and interfaces for implementing file sink functionality in Apache Flink stream processing applications. It contains core abstractions for bucket writers, part file writers, rolling policies, and bucket assigners that enable efficient and reliable writing of streaming data to file systems.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-file-sink-common</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.configuration.MemorySize;
import java.time.Duration;
// Create a date-time based bucket assigner
BucketAssigner<String, String> bucketAssigner = new DateTimeBucketAssigner<>("yyyy-MM-dd/HH");
// Create a rolling policy with custom settings
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.<String, String>builder()
.withMaxPartSize(MemorySize.ofMebiBytes(256))
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.build();
// Configure output file naming
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
.withPartPrefix("data")
.withPartSuffix(".txt")
.build();Apache Flink File Sink Common is built around several key architectural components:
Organizes streaming data into logical buckets using pluggable assignment strategies. Supports time-based bucketing, custom bucketing logic, and serializable bucket identifiers.
public interface BucketAssigner<IN, BucketID> extends Serializable {
BucketID getBucketId(IN element, BucketAssigner.Context context);
SimpleVersionedSerializer<BucketID> getSerializer();
}
public interface Context {
long currentProcessingTime();
long currentWatermark();
Long timestamp();
}Controls when to close current part files and start new ones based on configurable criteria including file size, time intervals, and processing events.
public interface RollingPolicy<IN, BucketID> extends Serializable {
boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException;
boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException;
boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException;
}
public interface PartFileInfo<BucketID> {
BucketID getBucketId();
long getCreationTime();
long getSize() throws IOException;
long getLastUpdateTime();
}Provides abstractions for writing data to files with support for both row-wise encoding and bulk writing patterns. Handles file recovery and commit operations.
public interface BucketWriter<IN, BucketID> {
InProgressFileWriter<IN, BucketID> openNewInProgressFile(
BucketID bucketID, Path path, long creationTime) throws IOException;
InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
BucketID bucketID,
InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
long creationTime) throws IOException;
WriterProperties getProperties();
}
public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
void write(IN element, long currentTime) throws IOException;
InProgressFileRecoverable persist() throws IOException;
PendingFileRecoverable closeForCommit() throws IOException;
void dispose();
}Configurable options for file naming patterns, writer properties, and behavior customization.
public class OutputFileConfig implements Serializable {
public OutputFileConfig(String partPrefix, String partSuffix);
public String getPartPrefix();
public String getPartSuffix();
public static OutputFileConfigBuilder builder();
}
public class WriterProperties {
public WriterProperties(
SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
boolean supportsResume);
public boolean supportsResume();
}public interface InProgressFileWriter {
interface InProgressFileRecoverable extends PendingFileRecoverable {}
interface PendingFileRecoverable {
Path getPath();
long getSize();
}
}
public interface CompactingFileWriter {
PendingFileRecoverable closeForCommit() throws IOException;
enum Type {
RECORD_WISE,
OUTPUT_STREAM
}
}
public interface BucketWriter {
interface PendingFile {
void commit() throws IOException;
void commitAfterRecovery() throws IOException;
}
}