Common utilities and interfaces for file sink functionality in Apache Flink stream processing applications
—
File writers handle the actual writing of data to files, supporting both row-wise and bulk writing patterns. They provide abstractions for file recovery, commit operations, and different serialization strategies.
Factory interface for creating different types of file writers.
/**
* Interface for factories that create different InProgressFileWriter writers
* @param <IN> The type of input elements
* @param <BucketID> The type of bucket identifier
*/
public interface BucketWriter<IN, BucketID> {
/**
* Creates a new InProgressFileWriter
* @param bucketID the id of the bucket this writer is writing to
* @param path the path this writer will write to
* @param creationTime the creation time of the file
* @return the new InProgressFileWriter
* @throws IOException if creating a writer fails
*/
InProgressFileWriter<IN, BucketID> openNewInProgressFile(
BucketID bucketID, Path path, long creationTime) throws IOException;
/**
* Creates a new CompactingFileWriter of the requesting type
* @param type the type of writer (RECORD_WISE or OUTPUT_STREAM)
* @param bucketID the id of the bucket this writer is writing to
* @param path the path this writer will write to
* @param creationTime the creation time of the file
* @return the new CompactingFileWriter
* @throws IOException if creating a writer fails
* @throws UnsupportedOperationException if the type is not supported
*/
default CompactingFileWriter openNewCompactingFile(
CompactingFileWriter.Type type,
BucketID bucketID,
Path path,
long creationTime) throws IOException;
/**
* Resumes an InProgressFileWriter from a recoverable state
* @param bucketID the id of the bucket this writer is writing to
* @param inProgressFileSnapshot the state of the part file
* @param creationTime the creation time of the file
* @return the resumed InProgressFileWriter
* @throws IOException if resuming a writer fails
*/
InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
BucketID bucketID,
InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
long creationTime) throws IOException;
/**
* @return the property of the BucketWriter
*/
WriterProperties getProperties();
/**
* Recovers a pending file for finalizing and committing
* @param pendingFileRecoverable The handle with the recovery information
* @return A pending file
* @throws IOException if recovering a pending file fails
*/
PendingFile recoverPendingFile(
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException;
/**
* Cleans up resources for a recoverable state
* @param inProgressFileRecoverable the recoverable state to clean up
* @return true if the resources were successfully freed, false otherwise
* @throws IOException if an I/O error occurs
*/
boolean cleanupInProgressFileRecoverable(
InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
}Represents a file that is ready to be committed.
/**
* Represents a file that can not write any data to but can be committed
*/
public interface PendingFile {
/**
* Commits the pending file, making it visible
* The file will contain the exact data as when the pending file was created
* @throws IOException if committing fails
*/
void commit() throws IOException;
/**
* Commits the pending file, making it visible
* This method tolerates situations where the file was already committed
* Important for idempotent commit retries after recovery
* @throws IOException if committing fails
*/
void commitAfterRecovery() throws IOException;
}Interface for writing elements to part files with recovery support.
/**
* The Bucket uses the InProgressFileWriter to write elements to a part file
* @param <IN> The type of input elements
* @param <BucketID> The type of bucket identifier
*/
public interface InProgressFileWriter<IN, BucketID>
extends PartFileInfo<BucketID>, RecordWiseCompactingFileWriter<IN> {
/**
* Write an element to the part file
* @param element the element to be written
* @param currentTime the writing time
* @throws IOException if writing the element fails
*/
void write(IN element, long currentTime) throws IOException;
/**
* @return The state of the current part file for recovery
* @throws IOException if persisting the part file fails
*/
InProgressFileRecoverable persist() throws IOException;
/**
* @return The state of the pending part file for commit
* @throws IOException if an I/O error occurs
*/
PendingFileRecoverable closeForCommit() throws IOException;
/** Dispose the part file and clean up resources */
void dispose();
/**
* Default write method using current system time
* @param element the element to write
*/
default void write(IN element) throws IOException;
}Support for fault-tolerant recovery of in-progress files.
/**
* Handle that can be used to recover in-progress files
*/
public interface InProgressFileRecoverable extends PendingFileRecoverable {}
/**
* Handle that can be used to recover pending files
*/
public interface PendingFileRecoverable {
/**
* @return The target path of the pending file, null if unavailable
*/
Path getPath();
/**
* @return The size of the pending file, -1 if unavailable
*/
long getSize();
}Base interface for compacting file writers.
/**
* File sink compactors use CompactingFileWriter to write a compacting file
* Classes should implement RecordWiseCompactingFileWriter or OutputStreamBasedCompactingFileWriter
*/
public interface CompactingFileWriter {
/**
* Closes the writer and gets the PendingFileRecoverable of the written compacting file
* @return The state of the pending part file for commit
* @throws IOException if an I/O error occurs
*/
PendingFileRecoverable closeForCommit() throws IOException;
/** Enum defining the types of CompactingFileWriter */
enum Type {
RECORD_WISE,
OUTPUT_STREAM
}
}Interface for record-wise compacting file writers.
/**
* Compactors use RecordWiseCompactingFileWriter to write elements to a compacting file
* @param <IN> The type of input elements
*/
public interface RecordWiseCompactingFileWriter<IN> extends CompactingFileWriter {
/**
* Write an element to the compacting file
* @param element the element to be written
* @throws IOException if writing the element fails
*/
void write(IN element) throws IOException;
}Interface for output stream based compacting file writers.
/**
* Compactors use OutputStreamBasedCompactingFileWriter to directly write
* a compacting file as an OutputStream
*/
public interface OutputStreamBasedCompactingFileWriter extends CompactingFileWriter {
/**
* Gets the output stream underlying the writer
* The close method of the returned stream should never be called
* @return The output stream to write the compacting file
* @throws IOException if acquiring the stream fails
*/
OutputStream asOutputStream() throws IOException;
}Factory for creating row-wise part writers using encoders.
/**
* Factory that creates RowWisePartWriter instances
* @param <IN> The type of input elements
* @param <BucketID> The type of bucket identifier
*/
public class RowWiseBucketWriter<IN, BucketID>
extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
/**
* Creates a RowWiseBucketWriter
* @param recoverableWriter the recoverable writer for the file system
* @param encoder the encoder for serializing elements
*/
public RowWiseBucketWriter(RecoverableWriter recoverableWriter, Encoder<IN> encoder);
@Override
public InProgressFileWriter<IN, BucketID> resumeFrom(
BucketID bucketId,
RecoverableFsDataOutputStream stream,
Path path,
RecoverableWriter.ResumeRecoverable resumable,
long creationTime);
@Override
public InProgressFileWriter<IN, BucketID> openNew(
BucketID bucketId,
RecoverableFsDataOutputStream stream,
Path path,
long creationTime);
}Factory for creating bulk part writers using bulk writers.
/**
* Factory that creates BulkPartWriter instances
* @param <IN> The type of input elements
* @param <BucketID> The type of bucket identifier
*/
public class BulkBucketWriter<IN, BucketID>
extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
/**
* Creates a BulkBucketWriter
* @param recoverableWriter the recoverable writer for the file system
* @param writerFactory the factory for creating bulk writers
*/
public BulkBucketWriter(RecoverableWriter recoverableWriter, BulkWriter.Factory<IN> writerFactory) throws IOException;
@Override
public InProgressFileWriter<IN, BucketID> resumeFrom(
BucketID bucketId,
RecoverableFsDataOutputStream stream,
Path path,
RecoverableWriter.ResumeRecoverable resumable,
long creationTime) throws IOException;
@Override
public InProgressFileWriter<IN, BucketID> openNew(
BucketID bucketId,
RecoverableFsDataOutputStream stream,
Path path,
long creationTime) throws IOException;
}import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.RecoverableWriter;
// Create encoder for string data
Encoder<String> encoder = new SimpleStringEncoder<>("UTF-8");
// Create row-wise bucket writer
BucketWriter<String, String> bucketWriter =
new RowWiseBucketWriter<>(recoverableWriter, encoder);
// Open new file writer
InProgressFileWriter<String, String> writer = bucketWriter.openNewInProgressFile(
"bucket-1",
new Path("/output/bucket-1/part-0"),
System.currentTimeMillis()
);
// Write elements
writer.write("Hello", System.currentTimeMillis());
writer.write("World", System.currentTimeMillis());
// Close for commit
PendingFileRecoverable pendingFile = writer.closeForCommit();import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
import org.apache.flink.api.common.serialization.BulkWriter;
// Create bulk writer factory (example for Parquet)
BulkWriter.Factory<MyRecord> bulkWriterFactory = // ... implementation specific
// Create bulk bucket writer
BucketWriter<MyRecord, String> bucketWriter =
new BulkBucketWriter<>(recoverableWriter, bulkWriterFactory);
// Usage similar to row-wise, but optimized for bulk operations
InProgressFileWriter<MyRecord, String> writer = bucketWriter.openNewInProgressFile(
"bucket-1",
new Path("/output/bucket-1/part-0"),
System.currentTimeMillis()
);IOException during write operationsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-file-sink-common