CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-file-sink-common

Common utilities and interfaces for file sink functionality in Apache Flink stream processing applications

Pending
Overview
Eval results
Files

file-writers.mddocs/

File Writers

File writers handle the actual writing of data to files, supporting both row-wise and bulk writing patterns. They provide abstractions for file recovery, commit operations, and different serialization strategies.

Capabilities

BucketWriter Interface

Factory interface for creating different types of file writers.

/**
 * Interface for factories that create different InProgressFileWriter writers
 * @param <IN> The type of input elements
 * @param <BucketID> The type of bucket identifier
 */
public interface BucketWriter<IN, BucketID> {
    /**
     * Creates a new InProgressFileWriter
     * @param bucketID the id of the bucket this writer is writing to
     * @param path the path this writer will write to
     * @param creationTime the creation time of the file
     * @return the new InProgressFileWriter
     * @throws IOException if creating a writer fails
     */
    InProgressFileWriter<IN, BucketID> openNewInProgressFile(
        BucketID bucketID, Path path, long creationTime) throws IOException;
    
    /**
     * Creates a new CompactingFileWriter of the requesting type
     * @param type the type of writer (RECORD_WISE or OUTPUT_STREAM)
     * @param bucketID the id of the bucket this writer is writing to
     * @param path the path this writer will write to
     * @param creationTime the creation time of the file
     * @return the new CompactingFileWriter
     * @throws IOException if creating a writer fails
     * @throws UnsupportedOperationException if the type is not supported
     */
    default CompactingFileWriter openNewCompactingFile(
        CompactingFileWriter.Type type,
        BucketID bucketID,
        Path path,
        long creationTime) throws IOException;
    
    /**
     * Resumes an InProgressFileWriter from a recoverable state
     * @param bucketID the id of the bucket this writer is writing to
     * @param inProgressFileSnapshot the state of the part file
     * @param creationTime the creation time of the file
     * @return the resumed InProgressFileWriter
     * @throws IOException if resuming a writer fails
     */
    InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
        BucketID bucketID,
        InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
        long creationTime) throws IOException;
    
    /**
     * @return the property of the BucketWriter
     */
    WriterProperties getProperties();
    
    /**
     * Recovers a pending file for finalizing and committing
     * @param pendingFileRecoverable The handle with the recovery information
     * @return A pending file
     * @throws IOException if recovering a pending file fails
     */
    PendingFile recoverPendingFile(
        InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException;
    
    /**
     * Cleans up resources for a recoverable state
     * @param inProgressFileRecoverable the recoverable state to clean up
     * @return true if the resources were successfully freed, false otherwise
     * @throws IOException if an I/O error occurs
     */
    boolean cleanupInProgressFileRecoverable(
        InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
}

BucketWriter.PendingFile Interface

Represents a file that is ready to be committed.

/**
 * Represents a file that can not write any data to but can be committed
 */
public interface PendingFile {
    /**
     * Commits the pending file, making it visible
     * The file will contain the exact data as when the pending file was created
     * @throws IOException if committing fails
     */
    void commit() throws IOException;
    
    /**
     * Commits the pending file, making it visible
     * This method tolerates situations where the file was already committed
     * Important for idempotent commit retries after recovery
     * @throws IOException if committing fails
     */
    void commitAfterRecovery() throws IOException;
}

InProgressFileWriter Interface

Interface for writing elements to part files with recovery support.

/**
 * The Bucket uses the InProgressFileWriter to write elements to a part file
 * @param <IN> The type of input elements
 * @param <BucketID> The type of bucket identifier
 */
public interface InProgressFileWriter<IN, BucketID> 
        extends PartFileInfo<BucketID>, RecordWiseCompactingFileWriter<IN> {
    
    /**
     * Write an element to the part file
     * @param element the element to be written
     * @param currentTime the writing time
     * @throws IOException if writing the element fails
     */
    void write(IN element, long currentTime) throws IOException;
    
    /**
     * @return The state of the current part file for recovery
     * @throws IOException if persisting the part file fails
     */
    InProgressFileRecoverable persist() throws IOException;
    
    /**
     * @return The state of the pending part file for commit
     * @throws IOException if an I/O error occurs
     */
    PendingFileRecoverable closeForCommit() throws IOException;
    
    /** Dispose the part file and clean up resources */
    void dispose();
    
    /** 
     * Default write method using current system time
     * @param element the element to write
     */
    default void write(IN element) throws IOException;
}

InProgressFileWriter Recovery Interfaces

Support for fault-tolerant recovery of in-progress files.

/**
 * Handle that can be used to recover in-progress files
 */
public interface InProgressFileRecoverable extends PendingFileRecoverable {}

/**
 * Handle that can be used to recover pending files
 */
public interface PendingFileRecoverable {
    /**
     * @return The target path of the pending file, null if unavailable
     */
    Path getPath();
    
    /**
     * @return The size of the pending file, -1 if unavailable
     */
    long getSize();
}

CompactingFileWriter Interface

Base interface for compacting file writers.

/**
 * File sink compactors use CompactingFileWriter to write a compacting file
 * Classes should implement RecordWiseCompactingFileWriter or OutputStreamBasedCompactingFileWriter
 */
public interface CompactingFileWriter {
    /**
     * Closes the writer and gets the PendingFileRecoverable of the written compacting file
     * @return The state of the pending part file for commit
     * @throws IOException if an I/O error occurs
     */
    PendingFileRecoverable closeForCommit() throws IOException;
    
    /** Enum defining the types of CompactingFileWriter */
    enum Type {
        RECORD_WISE,
        OUTPUT_STREAM
    }
}

RecordWiseCompactingFileWriter Interface

Interface for record-wise compacting file writers.

/**
 * Compactors use RecordWiseCompactingFileWriter to write elements to a compacting file
 * @param <IN> The type of input elements
 */
public interface RecordWiseCompactingFileWriter<IN> extends CompactingFileWriter {
    /**
     * Write an element to the compacting file
     * @param element the element to be written
     * @throws IOException if writing the element fails
     */
    void write(IN element) throws IOException;
}

OutputStreamBasedCompactingFileWriter Interface

Interface for output stream based compacting file writers.

/**
 * Compactors use OutputStreamBasedCompactingFileWriter to directly write 
 * a compacting file as an OutputStream
 */
public interface OutputStreamBasedCompactingFileWriter extends CompactingFileWriter {
    /**
     * Gets the output stream underlying the writer
     * The close method of the returned stream should never be called
     * @return The output stream to write the compacting file
     * @throws IOException if acquiring the stream fails
     */
    OutputStream asOutputStream() throws IOException;
}

Concrete Implementations

RowWiseBucketWriter

Factory for creating row-wise part writers using encoders.

/**
 * Factory that creates RowWisePartWriter instances
 * @param <IN> The type of input elements
 * @param <BucketID> The type of bucket identifier
 */
public class RowWiseBucketWriter<IN, BucketID> 
        extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
    
    /**
     * Creates a RowWiseBucketWriter
     * @param recoverableWriter the recoverable writer for the file system
     * @param encoder the encoder for serializing elements
     */
    public RowWiseBucketWriter(RecoverableWriter recoverableWriter, Encoder<IN> encoder);
    
    @Override
    public InProgressFileWriter<IN, BucketID> resumeFrom(
        BucketID bucketId,
        RecoverableFsDataOutputStream stream,
        Path path,
        RecoverableWriter.ResumeRecoverable resumable,
        long creationTime);
    
    @Override
    public InProgressFileWriter<IN, BucketID> openNew(
        BucketID bucketId,
        RecoverableFsDataOutputStream stream,
        Path path,
        long creationTime);
}

BulkBucketWriter

Factory for creating bulk part writers using bulk writers.

/**
 * Factory that creates BulkPartWriter instances
 * @param <IN> The type of input elements
 * @param <BucketID> The type of bucket identifier
 */
public class BulkBucketWriter<IN, BucketID> 
        extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
    
    /**
     * Creates a BulkBucketWriter
     * @param recoverableWriter the recoverable writer for the file system
     * @param writerFactory the factory for creating bulk writers
     */
    public BulkBucketWriter(RecoverableWriter recoverableWriter, BulkWriter.Factory<IN> writerFactory) throws IOException;
    
    @Override
    public InProgressFileWriter<IN, BucketID> resumeFrom(
        BucketID bucketId,
        RecoverableFsDataOutputStream stream,
        Path path,
        RecoverableWriter.ResumeRecoverable resumable,
        long creationTime) throws IOException;
    
    @Override
    public InProgressFileWriter<IN, BucketID> openNew(
        BucketID bucketId,
        RecoverableFsDataOutputStream stream,
        Path path,
        long creationTime) throws IOException;
}

Usage Examples

Row-wise Writing with Encoder

import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.RecoverableWriter;

// Create encoder for string data
Encoder<String> encoder = new SimpleStringEncoder<>("UTF-8");

// Create row-wise bucket writer
BucketWriter<String, String> bucketWriter = 
    new RowWiseBucketWriter<>(recoverableWriter, encoder);

// Open new file writer
InProgressFileWriter<String, String> writer = bucketWriter.openNewInProgressFile(
    "bucket-1", 
    new Path("/output/bucket-1/part-0"), 
    System.currentTimeMillis()
);

// Write elements
writer.write("Hello", System.currentTimeMillis());
writer.write("World", System.currentTimeMillis());

// Close for commit
PendingFileRecoverable pendingFile = writer.closeForCommit();

Bulk Writing

import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
import org.apache.flink.api.common.serialization.BulkWriter;

// Create bulk writer factory (example for Parquet)
BulkWriter.Factory<MyRecord> bulkWriterFactory = // ... implementation specific

// Create bulk bucket writer  
BucketWriter<MyRecord, String> bucketWriter = 
    new BulkBucketWriter<>(recoverableWriter, bulkWriterFactory);

// Usage similar to row-wise, but optimized for bulk operations
InProgressFileWriter<MyRecord, String> writer = bucketWriter.openNewInProgressFile(
    "bucket-1",
    new Path("/output/bucket-1/part-0"),
    System.currentTimeMillis()
);

Error Handling

  • Writers should handle IOException during write operations
  • Failed writes will typically cause job failures and require restart
  • Dispose methods should not throw exceptions but clean up resources
  • Recovery operations may fail if the underlying file system state is corrupted
  • Commit operations should be idempotent for exactly-once processing guarantees

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-file-sink-common

docs

bucket-assignment.md

configuration.md

file-writers.md

index.md

rolling-policies.md

tile.json