Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers
—
The Flink GS FileSystem plugin provides a comprehensive fault-tolerant streaming write system that enables exactly-once guarantees for Flink streaming applications. The recoverable writer system handles interrupted writes, supports resumption from failure points, and ensures data consistency through a multi-phase commit protocol.
Main recoverable writer implementation providing fault-tolerant streaming writes with exactly-once semantics.
/**
* The recoverable writer implementation for Google storage
* Provides fault-tolerant streaming writes with exactly-once guarantees
*/
public class GSRecoverableWriter implements RecoverableWriter {
/**
* Construct a GS recoverable writer
* @param storage The underlying blob storage instance
* @param options The GS file system options
*/
public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions options);
/**
* Whether this writer requires cleanup of recoverable state before commit
* @return false - no cleanup required before commit for safety
*/
public boolean requiresCleanupOfRecoverableState();
/**
* Whether this writer supports resuming interrupted writes
* @return true - supports resuming from ResumeRecoverable state
*/
public boolean supportsResume();
/**
* Open a new recoverable output stream
* @param path The target path for the final file
* @return GSRecoverableFsDataOutputStream for writing data
* @throws IOException If stream creation fails
*/
public RecoverableFsDataOutputStream open(Path path) throws IOException;
/**
* Recover an existing stream from resumable state
* @param resumable The resumable state from previous stream
* @return GSRecoverableFsDataOutputStream for continuing writes
*/
public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable);
/**
* Clean up recoverable state (no-op for safety)
* @param resumable The resumable state to clean up
* @return true - always succeeds (no actual cleanup performed)
*/
public boolean cleanupRecoverableState(ResumeRecoverable resumable);
/**
* Recover a committer for completing writes
* @param resumable The commit recoverable state
* @return GSRecoverableWriterCommitter for completing the write
*/
public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable);
/**
* Get serializer for commit recoverable state
* @return GSCommitRecoverableSerializer instance
*/
public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
/**
* Get serializer for resume recoverable state
* @return GSResumeRecoverableSerializer instance
*/
public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
}Usage Example:
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.Path;
// Get recoverable writer from filesystem
FileSystem fs = new Path("gs://bucket/").getFileSystem();
RecoverableWriter writer = fs.createRecoverableWriter();
// Open stream for writing
Path outputPath = new Path("gs://my-bucket/output/part-1");
RecoverableFsDataOutputStream stream = writer.open(outputPath);
// Write data
stream.write("Hello World".getBytes());
stream.flush();
// Create checkpoint - get resumable state
RecoverableWriter.ResumeRecoverable resumable = stream.persist();
// Later: recover and continue writing
RecoverableFsDataOutputStream recoveredStream = writer.recover(resumable);
recoveredStream.write(" More data".getBytes());
// Close and commit
RecoverableFsDataOutputStream.Committer committer = recoveredStream.closeForCommit();
committer.commit();Data output stream implementation for recoverable writes providing buffering and state management.
/**
* Main data output stream implementation for the GS recoverable writer
* Package-private - accessed through RecoverableWriter interface
*/
class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
/**
* Write data to the stream
* @param b byte array containing data
* @param off starting offset in the array
* @param len number of bytes to write
* @throws IOException if write operation fails
*/
public void write(byte[] b, int off, int len) throws IOException;
/**
* Flush buffered data to storage
* @throws IOException if flush operation fails
*/
public void flush() throws IOException;
/**
* Force data persistence and sync to storage
* @throws IOException if sync operation fails
*/
public void sync() throws IOException;
/**
* Create resumable state for recovery
* @return GSResumeRecoverable containing current stream state
* @throws IOException if state creation fails
*/
public ResumeRecoverable persist() throws IOException;
/**
* Close stream and return committer for final commit
* @return GSRecoverableWriterCommitter for completing the write
* @throws IOException if close operation fails
*/
public Committer closeForCommit() throws IOException;
/**
* Close the stream without committing
* @throws IOException if close operation fails
*/
public void close() throws IOException;
}Handles the commit phase of recoverable writer operations with atomic completion.
/**
* Handles the commit phase of recoverable writer operations
* Package-private - obtained through closeForCommit()
*/
class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer {
/**
* Commit the write operation atomically
* @throws IOException if commit fails
*/
public void commit() throws IOException;
/**
* Commit after recovery from failure
* @throws IOException if commit fails
*/
public void commitAfterRecovery() throws IOException;
/**
* Get the recoverable state for this committer
* @return GSCommitRecoverable containing commit information
*/
public CommitRecoverable getRecoverable();
}Usage Example:
// After writing data to recoverable stream
RecoverableFsDataOutputStream.Committer committer = stream.closeForCommit();
// Store commit recoverable for recovery scenarios
CommitRecoverable commitState = committer.getRecoverable();
// Commit the write
try {
committer.commit();
} catch (IOException e) {
// Recovery scenario: create new committer and retry
RecoverableWriter writer = fs.createRecoverableWriter();
RecoverableFsDataOutputStream.Committer recoveredCommitter =
writer.recoverForCommit(commitState);
recoveredCommitter.commitAfterRecovery();
}Represents the state needed to commit a recoverable write operation.
/**
* Represents committable state for a recoverable output stream
* Package-private - managed internally by the writer system
*/
class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable {
/** The target blob identifier for the final committed file */
public final GSBlobIdentifier finalBlobIdentifier;
/** List of temporary object UUIDs that need to be composed into final blob */
public final List<UUID> componentObjectIds;
/**
* Package-private constructor
* @param finalBlobIdentifier The final blob identifier
* @param componentObjectIds List of component object UUIDs
*/
GSCommitRecoverable(GSBlobIdentifier finalBlobIdentifier, List<UUID> componentObjectIds);
/**
* Get component blob identifiers for composition
* @param options File system options for temporary bucket resolution
* @return List of GSBlobIdentifier for temporary objects
*/
List<GSBlobIdentifier> getComponentBlobIds(GSFileSystemOptions options);
}Represents the state needed to resume an interrupted write operation.
/**
* Represents resumable state for a recoverable output stream
* Extends GSCommitRecoverable with additional resume information
* Package-private - managed internally by the writer system
*/
class GSResumeRecoverable extends GSCommitRecoverable
implements RecoverableWriter.ResumeRecoverable {
/** Current write position in bytes */
public final long position;
/** Whether the stream is closed for writing */
public final boolean closed;
/**
* Package-private constructor
* @param finalBlobIdentifier The final blob identifier
* @param componentObjectIds List of component object UUIDs
* @param position The current write position
* @param closed Whether the stream is closed
*/
GSResumeRecoverable(GSBlobIdentifier finalBlobIdentifier, List<UUID> componentObjectIds, long position, boolean closed);
}Serializer for GSCommitRecoverable objects enabling state persistence across failures.
/**
* Serializer for GSCommitRecoverable objects
* Package-private - used internally for state persistence
*/
class GSCommitRecoverableSerializer implements SimpleVersionedSerializer<GSCommitRecoverable> {
/** Singleton instance */
public static final GSCommitRecoverableSerializer INSTANCE;
/**
* Get serializer version
* @return Current serializer version
*/
public int getVersion();
/**
* Serialize commit recoverable to byte array
* @param obj The GSCommitRecoverable to serialize
* @return Serialized byte array
* @throws IOException if serialization fails
*/
public byte[] serialize(GSCommitRecoverable obj) throws IOException;
/**
* Deserialize commit recoverable from byte array
* @param version Serializer version used for serialization
* @param serialized The serialized byte array
* @return Deserialized GSCommitRecoverable
* @throws IOException if deserialization fails
*/
public GSCommitRecoverable deserialize(int version, byte[] serialized) throws IOException;
}Serializer for GSResumeRecoverable objects enabling resume state persistence.
/**
* Serializer for GSResumeRecoverable objects
* Package-private - used internally for state persistence
*/
class GSResumeRecoverableSerializer implements SimpleVersionedSerializer<GSResumeRecoverable> {
/** Singleton instance */
public static final GSResumeRecoverableSerializer INSTANCE;
/**
* Get serializer version
* @return Current serializer version
*/
public int getVersion();
/**
* Serialize resume recoverable to byte array
* @param obj The GSResumeRecoverable to serialize
* @return Serialized byte array
* @throws IOException if serialization fails
*/
public byte[] serialize(GSResumeRecoverable obj) throws IOException;
/**
* Deserialize resume recoverable from byte array
* @param version Serializer version used for serialization
* @param serialized The serialized byte array
* @return Deserialized GSResumeRecoverable
* @throws IOException if deserialization fails
*/
public GSResumeRecoverable deserialize(int version, byte[] serialized) throws IOException;
}writer.open(path) creates GSRecoverableFsDataOutputStreamwrite(bytes) buffer data in temporary objectspersist() creates GSResumeRecoverable state for checkpointingcloseForCommit() returns GSRecoverableWriterCommittercommit() composes temporary objects into final blob atomicallywriter.recover(resumeRecoverable) recreates streamwrite() calls append to existing temporary objectswriter.recoverForCommit(commitRecoverable) handles commit failures.inprogress/<bucket>/<object>/<uuid> patternGSResumeRecoverable stateGSCommitRecoverable stategs.writer.chunk.size to optimize upload performancegs.filesink.entropy.enabled for high-throughput scenariosInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gs-fs-hadoop