Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.
—
File sinks provide unified writing capabilities with exactly-once semantics, bucketing, rolling policies, and optional file compaction for improved performance.
Main entry point for creating file sinks that write to distributed file systems with exactly-once guarantees.
/**
* A unified sink that emits its input elements to FileSystem files within buckets. This
* sink achieves exactly-once semantics for both BATCH and STREAMING.
*
* When creating the sink a basePath must be specified. The base directory contains one
* directory for every bucket. The bucket directories themselves contain several part files, with at
* least one for each parallel subtask of the sink which is writing data to that bucket. These part
* files contain the actual output data.
*
* The sink uses a BucketAssigner to determine in which bucket directory each element
* should be written to inside the base directory. The BucketAssigner can, for example, roll
* on every checkpoint or use time or a property of the element to determine the bucket directory.
* The default BucketAssigner is a DateTimeBucketAssigner which will create one new
* bucket every hour.
*/
@Experimental
public class FileSink<IN>
implements Sink<IN>,
SupportsWriterState<IN, FileWriterBucketState>,
SupportsCommitter<FileSinkCommittable>,
SupportsWriterState.WithCompatibleState,
SupportsPreCommitTopology<FileSinkCommittable, FileSinkCommittable>,
SupportsConcurrentExecutionAttempts {
/**
* Creates a FileSink for row-wise writing using encoders.
* The created sink will write each record in a separate line separated by line delimiters.
*/
public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
final Path basePath, final Encoder<IN> encoder);
/**
* Creates a FileSink for bulk writing using BulkWriter factories.
* This is suitable for formats such as Parquet or ORC.
*/
public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory);
}Builder for configuring row-format file sinks with encoders.
/**
* Builder for row-format file sinks
*/
public static class DefaultRowFormatBuilder<IN> {
/**
* Sets custom bucketing strategy for organizing output files
* @param bucketAssigner Strategy for assigning records to buckets
* @return Builder instance for chaining
*/
public DefaultRowFormatBuilder<IN> withBucketAssigner(
BucketAssigner<IN, String> bucketAssigner);
/**
* Sets rolling policy for when to create new files
* @param rollingPolicy Policy controlling file rolling behavior
* @return Builder instance for chaining
*/
public DefaultRowFormatBuilder<IN> withRollingPolicy(
RollingPolicy<IN, String> rollingPolicy);
/**
* Sets output file configuration for naming and format
* @param outputFileConfig Configuration for output file properties
* @return Builder instance for chaining
*/
public DefaultRowFormatBuilder<IN> withOutputFileConfig(
OutputFileConfig outputFileConfig);
/**
* Enables file compaction to merge small files
* @param compactStrategy Strategy for triggering compaction
* @param compactor Implementation for compacting files
* @return Builder instance for chaining
*/
public DefaultRowFormatBuilder<IN> enableCompact(
FileCompactStrategy compactStrategy, FileCompactor compactor);
/**
* Builds the final FileSink instance
* @return Configured FileSink
*/
public FileSink<IN> build();
}Builder for configuring bulk-format file sinks.
/**
* Builder for bulk-format file sinks
*/
public static class DefaultBulkFormatBuilder<IN> {
/**
* Sets custom bucketing strategy for organizing output files
* @param bucketAssigner Strategy for assigning records to buckets
* @return Builder instance for chaining
*/
public DefaultBulkFormatBuilder<IN> withBucketAssigner(
BucketAssigner<IN, String> bucketAssigner);
/**
* Sets rolling policy for when to create new files
* @param rollingPolicy Policy controlling file rolling behavior
* @return Builder instance for chaining
*/
public DefaultBulkFormatBuilder<IN> withRollingPolicy(
RollingPolicy<IN, String> rollingPolicy);
/**
* Sets output file configuration for naming and format
* @param outputFileConfig Configuration for output file properties
* @return Builder instance for chaining
*/
public DefaultBulkFormatBuilder<IN> withOutputFileConfig(
OutputFileConfig outputFileConfig);
/**
* Enables file compaction to merge small files
* @param compactStrategy Strategy for triggering compaction
* @param compactor Implementation for compacting files
* @return Builder instance for chaining
*/
public DefaultBulkFormatBuilder<IN> enableCompact(
FileCompactStrategy compactStrategy, FileCompactor compactor);
/**
* Disables writing to local file system for HDFS compatibility
* @return Builder instance for chaining
*/
public DefaultBulkFormatBuilder<IN> disableLocalWriting();
/**
* Builds the final FileSink instance
* @return Configured FileSink
*/
public FileSink<IN> build();
}Usage Examples:
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.MemorySize;
import java.time.Duration;
// Basic file sink for text output
FileSink<String> basicSink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
.build();
// Sink with rolling policy and bucketing
FileSink<String> advancedSink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(MemorySize.ofMebiBytes(128))
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.build())
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
.build();
// Sink with file compaction
FileSink<String> compactingSink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
.enableCompact(
FileCompactStrategy.builder()
.setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())
.enableCompactionOnCheckpoint(3)
.build(),
new ConcatFileCompactor())
.build();
// Use with DataStream API
stream.sinkTo(advancedSink);Internal components for managing file writing operations.
/**
* Writer that manages file buckets and handles the writing process
*/
public class FileWriter<IN> implements SinkWriter<IN> {
// Internal implementation - not directly used by applications
}
/**
* Factory for creating FileWriterBucket instances
*/
public interface FileWriterBucketFactory<IN> {
FileWriterBucket<IN> getWriterBucket(String bucketId) throws IOException;
}
/**
* Default implementation of FileWriterBucketFactory
*/
public class DefaultFileWriterBucketFactory<IN> implements FileWriterBucketFactory<IN> {
public DefaultFileWriterBucketFactory(
Path basePath,
Encoder<IN> encoder,
RollingPolicy<IN, String> rollingPolicy,
OutputFileConfig outputFileConfig);
}Types representing committable file operations for exactly-once semantics.
/**
* Represents a committable file operation
*/
public class FileSinkCommittable {
public FileSinkCommittable(String bucketId, Path path, long creationTime);
public String getBucketId();
public Path getPath();
public long getCreationTime();
}
/**
* Serializer for FileSinkCommittable instances
*/
public class FileSinkCommittableSerializer implements SimpleVersionedSerializer<FileSinkCommittable> {
public int getVersion();
public byte[] serialize(FileSinkCommittable committable) throws IOException;
public FileSinkCommittable deserialize(int version, byte[] serialized) throws IOException;
}Advanced Configuration Examples:
// Custom output file configuration
OutputFileConfig fileConfig = OutputFileConfig.builder()
.withPartPrefix("data-")
.withPartSuffix(".txt")
.build();
// Custom bucket assigner for organizing by key
BucketAssigner<MyRecord, String> keyBucketAssigner = new BucketAssigner<MyRecord, String>() {
@Override
public String getBucketId(MyRecord record, Context context) {
return "bucket-" + record.getKey().hashCode() % 10;
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
};
FileSink<MyRecord> customSink = FileSink
.forRowFormat(new Path("/output"), new MyRecordEncoder())
.withBucketAssigner(keyBucketAssigner)
.withOutputFileConfig(fileConfig)
.build();File sinks handle various error conditions during writing:
try {
FileSink<String> sink = FileSink
.forRowFormat(new Path("/invalid/output"), new SimpleStringEncoder<>("UTF-8"))
.build();
stream.sinkTo(sink);
} catch (Exception e) {
// Handle sink configuration or runtime errors
}disableLocalWriting() for HDFS deployments to avoid local filesystem usageInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-files