CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-files

Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.

Pending
Overview
Eval results
Files

file-sinks.mddocs/

File Sinks

File sinks provide unified writing capabilities with exactly-once semantics, bucketing, rolling policies, and optional file compaction for improved performance.

Capabilities

FileSink Class

Main entry point for creating file sinks that write to distributed file systems with exactly-once guarantees.

/**
 * A unified sink that emits its input elements to FileSystem files within buckets. This
 * sink achieves exactly-once semantics for both BATCH and STREAMING.
 * 
 * When creating the sink a basePath must be specified. The base directory contains one
 * directory for every bucket. The bucket directories themselves contain several part files, with at
 * least one for each parallel subtask of the sink which is writing data to that bucket. These part
 * files contain the actual output data.
 * 
 * The sink uses a BucketAssigner to determine in which bucket directory each element
 * should be written to inside the base directory. The BucketAssigner can, for example, roll
 * on every checkpoint or use time or a property of the element to determine the bucket directory.
 * The default BucketAssigner is a DateTimeBucketAssigner which will create one new
 * bucket every hour.
 */
@Experimental
public class FileSink<IN>
        implements Sink<IN>,
                SupportsWriterState<IN, FileWriterBucketState>,
                SupportsCommitter<FileSinkCommittable>,
                SupportsWriterState.WithCompatibleState,
                SupportsPreCommitTopology<FileSinkCommittable, FileSinkCommittable>,
                SupportsConcurrentExecutionAttempts {
    
    /**
     * Creates a FileSink for row-wise writing using encoders.
     * The created sink will write each record in a separate line separated by line delimiters.
     */
    public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
            final Path basePath, final Encoder<IN> encoder);
    
    /**
     * Creates a FileSink for bulk writing using BulkWriter factories.
     * This is suitable for formats such as Parquet or ORC.
     */
    public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
            final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory);
}

FileSink.DefaultRowFormatBuilder

Builder for configuring row-format file sinks with encoders.

/**
 * Builder for row-format file sinks
 */
public static class DefaultRowFormatBuilder<IN> {
    /**
     * Sets custom bucketing strategy for organizing output files
     * @param bucketAssigner Strategy for assigning records to buckets
     * @return Builder instance for chaining
     */
    public DefaultRowFormatBuilder<IN> withBucketAssigner(
        BucketAssigner<IN, String> bucketAssigner);
    
    /**
     * Sets rolling policy for when to create new files
     * @param rollingPolicy Policy controlling file rolling behavior
     * @return Builder instance for chaining
     */
    public DefaultRowFormatBuilder<IN> withRollingPolicy(
        RollingPolicy<IN, String> rollingPolicy);
    
    /**
     * Sets output file configuration for naming and format
     * @param outputFileConfig Configuration for output file properties
     * @return Builder instance for chaining
     */
    public DefaultRowFormatBuilder<IN> withOutputFileConfig(
        OutputFileConfig outputFileConfig);
    
    /**
     * Enables file compaction to merge small files
     * @param compactStrategy Strategy for triggering compaction
     * @param compactor Implementation for compacting files
     * @return Builder instance for chaining
     */
    public DefaultRowFormatBuilder<IN> enableCompact(
        FileCompactStrategy compactStrategy, FileCompactor compactor);
    
    /**
     * Builds the final FileSink instance
     * @return Configured FileSink
     */
    public FileSink<IN> build();
}

FileSink.DefaultBulkFormatBuilder

Builder for configuring bulk-format file sinks.

/**
 * Builder for bulk-format file sinks  
 */
public static class DefaultBulkFormatBuilder<IN> {
    /**
     * Sets custom bucketing strategy for organizing output files
     * @param bucketAssigner Strategy for assigning records to buckets
     * @return Builder instance for chaining
     */
    public DefaultBulkFormatBuilder<IN> withBucketAssigner(
        BucketAssigner<IN, String> bucketAssigner);
    
    /**
     * Sets rolling policy for when to create new files
     * @param rollingPolicy Policy controlling file rolling behavior
     * @return Builder instance for chaining
     */
    public DefaultBulkFormatBuilder<IN> withRollingPolicy(
        RollingPolicy<IN, String> rollingPolicy);
    
    /**
     * Sets output file configuration for naming and format
     * @param outputFileConfig Configuration for output file properties
     * @return Builder instance for chaining
     */
    public DefaultBulkFormatBuilder<IN> withOutputFileConfig(
        OutputFileConfig outputFileConfig);
    
    /**
     * Enables file compaction to merge small files
     * @param compactStrategy Strategy for triggering compaction
     * @param compactor Implementation for compacting files
     * @return Builder instance for chaining
     */
    public DefaultBulkFormatBuilder<IN> enableCompact(
        FileCompactStrategy compactStrategy, FileCompactor compactor);
    
    /**
     * Disables writing to local file system for HDFS compatibility
     * @return Builder instance for chaining
     */
    public DefaultBulkFormatBuilder<IN> disableLocalWriting();
    
    /**
     * Builds the final FileSink instance
     * @return Configured FileSink
     */
    public FileSink<IN> build();
}

Usage Examples:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.MemorySize;
import java.time.Duration;

// Basic file sink for text output
FileSink<String> basicSink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
    .build();

// Sink with rolling policy and bucketing
FileSink<String> advancedSink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withMaxPartSize(MemorySize.ofMebiBytes(128))
        .withRolloverInterval(Duration.ofMinutes(15))
        .withInactivityInterval(Duration.ofMinutes(5))
        .build())
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
    .build();

// Sink with file compaction
FileSink<String> compactingSink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
    .enableCompact(
        FileCompactStrategy.builder()
            .setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())
            .enableCompactionOnCheckpoint(3)
            .build(),
        new ConcatFileCompactor())
    .build();

// Use with DataStream API
stream.sinkTo(advancedSink);

File Writers and Buckets

Internal components for managing file writing operations.

/**
 * Writer that manages file buckets and handles the writing process
 */
public class FileWriter<IN> implements SinkWriter<IN> {
    // Internal implementation - not directly used by applications
}

/**
 * Factory for creating FileWriterBucket instances
 */
public interface FileWriterBucketFactory<IN> {
    FileWriterBucket<IN> getWriterBucket(String bucketId) throws IOException;
}

/**
 * Default implementation of FileWriterBucketFactory
 */
public class DefaultFileWriterBucketFactory<IN> implements FileWriterBucketFactory<IN> {
    public DefaultFileWriterBucketFactory(
        Path basePath,
        Encoder<IN> encoder,
        RollingPolicy<IN, String> rollingPolicy,
        OutputFileConfig outputFileConfig);
}

Committable Types

Types representing committable file operations for exactly-once semantics.

/**
 * Represents a committable file operation
 */
public class FileSinkCommittable {
    public FileSinkCommittable(String bucketId, Path path, long creationTime);
    public String getBucketId();
    public Path getPath();
    public long getCreationTime();
}

/**
 * Serializer for FileSinkCommittable instances
 */
public class FileSinkCommittableSerializer implements SimpleVersionedSerializer<FileSinkCommittable> {
    public int getVersion();
    public byte[] serialize(FileSinkCommittable committable) throws IOException;
    public FileSinkCommittable deserialize(int version, byte[] serialized) throws IOException;
}

Advanced Configuration Examples:

// Custom output file configuration
OutputFileConfig fileConfig = OutputFileConfig.builder()
    .withPartPrefix("data-")
    .withPartSuffix(".txt")
    .build();

// Custom bucket assigner for organizing by key
BucketAssigner<MyRecord, String> keyBucketAssigner = new BucketAssigner<MyRecord, String>() {
    @Override
    public String getBucketId(MyRecord record, Context context) {
        return "bucket-" + record.getKey().hashCode() % 10;
    }
    
    @Override
    public SimpleVersionedSerializer<String> getSerializer() {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
};

FileSink<MyRecord> customSink = FileSink
    .forRowFormat(new Path("/output"), new MyRecordEncoder())
    .withBucketAssigner(keyBucketAssigner)
    .withOutputFileConfig(fileConfig)
    .build();

Error Handling

File sinks handle various error conditions during writing:

  • IOException: File system write errors, disk full conditions
  • IllegalArgumentException: Invalid configuration or paths
  • RuntimeException: Encoding errors, compaction failures
try {
    FileSink<String> sink = FileSink
        .forRowFormat(new Path("/invalid/output"), new SimpleStringEncoder<>("UTF-8"))
        .build();
    stream.sinkTo(sink);
} catch (Exception e) {
    // Handle sink configuration or runtime errors
}

Performance Considerations

  • Use appropriate rolling policies to balance file size and number of files
  • Configure bucketing to distribute load and improve query performance
  • Enable compaction for workloads that produce many small files
  • Consider bulk formats for high-throughput scenarios
  • Monitor checkpoint intervals to balance consistency and performance
  • Use disableLocalWriting() for HDFS deployments to avoid local filesystem usage

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-files

docs

bulk-formats.md

file-compaction.md

file-enumeration.md

file-sinks.md

file-sources.md

index.md

split-assignment.md

stream-formats.md

tile.json