CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-gs-fs-hadoop

Google Cloud Storage FileSystem plugin for Apache Flink providing gs:// URI support with recoverable writers

Pending
Overview
Eval results
Files

recoverable-writer.mddocs/

Recoverable Writer System

The Flink GS FileSystem plugin provides a comprehensive fault-tolerant streaming write system that enables exactly-once guarantees for Flink streaming applications. The recoverable writer system handles interrupted writes, supports resumption from failure points, and ensures data consistency through a multi-phase commit protocol.

Capabilities

GSRecoverableWriter

Main recoverable writer implementation providing fault-tolerant streaming writes with exactly-once semantics.

/**
 * The recoverable writer implementation for Google storage
 * Provides fault-tolerant streaming writes with exactly-once guarantees
 */
public class GSRecoverableWriter implements RecoverableWriter {
    
    /**
     * Construct a GS recoverable writer
     * @param storage The underlying blob storage instance
     * @param options The GS file system options
     */
    public GSRecoverableWriter(GSBlobStorage storage, GSFileSystemOptions options);
    
    /**
     * Whether this writer requires cleanup of recoverable state before commit
     * @return false - no cleanup required before commit for safety
     */
    public boolean requiresCleanupOfRecoverableState();
    
    /**
     * Whether this writer supports resuming interrupted writes
     * @return true - supports resuming from ResumeRecoverable state
     */
    public boolean supportsResume();
    
    /**
     * Open a new recoverable output stream
     * @param path The target path for the final file
     * @return GSRecoverableFsDataOutputStream for writing data
     * @throws IOException If stream creation fails
     */
    public RecoverableFsDataOutputStream open(Path path) throws IOException;
    
    /**
     * Recover an existing stream from resumable state
     * @param resumable The resumable state from previous stream
     * @return GSRecoverableFsDataOutputStream for continuing writes
     */
    public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable);
    
    /**
     * Clean up recoverable state (no-op for safety)
     * @param resumable The resumable state to clean up
     * @return true - always succeeds (no actual cleanup performed)
     */
    public boolean cleanupRecoverableState(ResumeRecoverable resumable);
    
    /**
     * Recover a committer for completing writes
     * @param resumable The commit recoverable state
     * @return GSRecoverableWriterCommitter for completing the write
     */
    public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable);
    
    /**
     * Get serializer for commit recoverable state
     * @return GSCommitRecoverableSerializer instance
     */
    public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
    
    /**
     * Get serializer for resume recoverable state
     * @return GSResumeRecoverableSerializer instance
     */
    public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();
}

Usage Example:

import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.Path;

// Get recoverable writer from filesystem
FileSystem fs = new Path("gs://bucket/").getFileSystem();
RecoverableWriter writer = fs.createRecoverableWriter();

// Open stream for writing
Path outputPath = new Path("gs://my-bucket/output/part-1");
RecoverableFsDataOutputStream stream = writer.open(outputPath);

// Write data
stream.write("Hello World".getBytes());
stream.flush();

// Create checkpoint - get resumable state
RecoverableWriter.ResumeRecoverable resumable = stream.persist();

// Later: recover and continue writing
RecoverableFsDataOutputStream recoveredStream = writer.recover(resumable);
recoveredStream.write(" More data".getBytes());

// Close and commit
RecoverableFsDataOutputStream.Committer committer = recoveredStream.closeForCommit();
committer.commit();

GSRecoverableFsDataOutputStream

Data output stream implementation for recoverable writes providing buffering and state management.

/**
 * Main data output stream implementation for the GS recoverable writer
 * Package-private - accessed through RecoverableWriter interface
 */
class GSRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
    
    /**
     * Write data to the stream
     * @param b byte array containing data
     * @param off starting offset in the array
     * @param len number of bytes to write
     * @throws IOException if write operation fails
     */
    public void write(byte[] b, int off, int len) throws IOException;
    
    /**
     * Flush buffered data to storage
     * @throws IOException if flush operation fails
     */
    public void flush() throws IOException;
    
    /**
     * Force data persistence and sync to storage
     * @throws IOException if sync operation fails
     */
    public void sync() throws IOException;
    
    /**
     * Create resumable state for recovery
     * @return GSResumeRecoverable containing current stream state
     * @throws IOException if state creation fails
     */
    public ResumeRecoverable persist() throws IOException;
    
    /**
     * Close stream and return committer for final commit
     * @return GSRecoverableWriterCommitter for completing the write
     * @throws IOException if close operation fails
     */
    public Committer closeForCommit() throws IOException;
    
    /**
     * Close the stream without committing
     * @throws IOException if close operation fails
     */
    public void close() throws IOException;
}

GSRecoverableWriterCommitter

Handles the commit phase of recoverable writer operations with atomic completion.

/**
 * Handles the commit phase of recoverable writer operations
 * Package-private - obtained through closeForCommit()
 */
class GSRecoverableWriterCommitter implements RecoverableFsDataOutputStream.Committer {
    
    /**
     * Commit the write operation atomically
     * @throws IOException if commit fails
     */
    public void commit() throws IOException;
    
    /**
     * Commit after recovery from failure
     * @throws IOException if commit fails
     */
    public void commitAfterRecovery() throws IOException;
    
    /**
     * Get the recoverable state for this committer
     * @return GSCommitRecoverable containing commit information
     */
    public CommitRecoverable getRecoverable();
}

Usage Example:

// After writing data to recoverable stream
RecoverableFsDataOutputStream.Committer committer = stream.closeForCommit();

// Store commit recoverable for recovery scenarios
CommitRecoverable commitState = committer.getRecoverable();

// Commit the write
try {
    committer.commit();
} catch (IOException e) {
    // Recovery scenario: create new committer and retry
    RecoverableWriter writer = fs.createRecoverableWriter();
    RecoverableFsDataOutputStream.Committer recoveredCommitter = 
        writer.recoverForCommit(commitState);
    recoveredCommitter.commitAfterRecovery();
}

State Management

GSCommitRecoverable

Represents the state needed to commit a recoverable write operation.

/**
 * Represents committable state for a recoverable output stream
 * Package-private - managed internally by the writer system
 */
class GSCommitRecoverable implements RecoverableWriter.CommitRecoverable {
    /** The target blob identifier for the final committed file */
    public final GSBlobIdentifier finalBlobIdentifier;
    
    /** List of temporary object UUIDs that need to be composed into final blob */
    public final List<UUID> componentObjectIds;
    
    /**
     * Package-private constructor
     * @param finalBlobIdentifier The final blob identifier
     * @param componentObjectIds List of component object UUIDs
     */
    GSCommitRecoverable(GSBlobIdentifier finalBlobIdentifier, List<UUID> componentObjectIds);
    
    /**
     * Get component blob identifiers for composition
     * @param options File system options for temporary bucket resolution
     * @return List of GSBlobIdentifier for temporary objects
     */
    List<GSBlobIdentifier> getComponentBlobIds(GSFileSystemOptions options);
}

GSResumeRecoverable

Represents the state needed to resume an interrupted write operation.

/**
 * Represents resumable state for a recoverable output stream
 * Extends GSCommitRecoverable with additional resume information
 * Package-private - managed internally by the writer system
 */
class GSResumeRecoverable extends GSCommitRecoverable 
    implements RecoverableWriter.ResumeRecoverable {
    
    /** Current write position in bytes */
    public final long position;
    
    /** Whether the stream is closed for writing */
    public final boolean closed;
    
    /**
     * Package-private constructor
     * @param finalBlobIdentifier The final blob identifier
     * @param componentObjectIds List of component object UUIDs
     * @param position The current write position
     * @param closed Whether the stream is closed
     */
    GSResumeRecoverable(GSBlobIdentifier finalBlobIdentifier, List<UUID> componentObjectIds, long position, boolean closed);
}

Serializers

GSCommitRecoverableSerializer

Serializer for GSCommitRecoverable objects enabling state persistence across failures.

/**
 * Serializer for GSCommitRecoverable objects
 * Package-private - used internally for state persistence
 */
class GSCommitRecoverableSerializer implements SimpleVersionedSerializer<GSCommitRecoverable> {
    /** Singleton instance */
    public static final GSCommitRecoverableSerializer INSTANCE;
    
    /**
     * Get serializer version
     * @return Current serializer version
     */
    public int getVersion();
    
    /**
     * Serialize commit recoverable to byte array
     * @param obj The GSCommitRecoverable to serialize
     * @return Serialized byte array
     * @throws IOException if serialization fails
     */
    public byte[] serialize(GSCommitRecoverable obj) throws IOException;
    
    /**
     * Deserialize commit recoverable from byte array
     * @param version Serializer version used for serialization
     * @param serialized The serialized byte array
     * @return Deserialized GSCommitRecoverable
     * @throws IOException if deserialization fails
     */
    public GSCommitRecoverable deserialize(int version, byte[] serialized) throws IOException;
}

GSResumeRecoverableSerializer

Serializer for GSResumeRecoverable objects enabling resume state persistence.

/**
 * Serializer for GSResumeRecoverable objects
 * Package-private - used internally for state persistence
 */
class GSResumeRecoverableSerializer implements SimpleVersionedSerializer<GSResumeRecoverable> {
    /** Singleton instance */
    public static final GSResumeRecoverableSerializer INSTANCE;
    
    /**
     * Get serializer version
     * @return Current serializer version
     */
    public int getVersion();
    
    /**
     * Serialize resume recoverable to byte array
     * @param obj The GSResumeRecoverable to serialize
     * @return Serialized byte array
     * @throws IOException if serialization fails
     */
    public byte[] serialize(GSResumeRecoverable obj) throws IOException;
    
    /**
     * Deserialize resume recoverable from byte array
     * @param version Serializer version used for serialization
     * @param serialized The serialized byte array
     * @return Deserialized GSResumeRecoverable
     * @throws IOException if deserialization fails
     */
    public GSResumeRecoverable deserialize(int version, byte[] serialized) throws IOException;
}

Write Process Flow

Normal Write Flow

  1. Open Stream: writer.open(path) creates GSRecoverableFsDataOutputStream
  2. Write Data: Multiple calls to write(bytes) buffer data in temporary objects
  3. Periodic Persistence: persist() creates GSResumeRecoverable state for checkpointing
  4. Close for Commit: closeForCommit() returns GSRecoverableWriterCommitter
  5. Commit: commit() composes temporary objects into final blob atomically

Recovery Flow

  1. Resume from Checkpoint: writer.recover(resumeRecoverable) recreates stream
  2. Continue Writing: Additional write() calls append to existing temporary objects
  3. Close and Commit: Same as normal flow
  4. Commit Recovery: writer.recoverForCommit(commitRecoverable) handles commit failures

Temporary Object Management

  • Naming: Temporary objects use .inprogress/<bucket>/<object>/<uuid> pattern
  • Composition: Final commit uses GCS compose operation to merge up to 32 objects
  • Cleanup: Temporary objects are cleaned up after successful commit
  • Entropy Injection: Optional entropy prefix reduces hotspotting in high-throughput scenarios

Error Handling and Recovery

Failure Scenarios

  • Writer Failure: Resume from GSResumeRecoverable state
  • Commit Failure: Retry commit using GSCommitRecoverable state
  • Network Issues: Configurable retry policies handle transient failures
  • Storage Errors: Proper exception propagation with context information

Safety Guarantees

  • Exactly-Once: Each successful commit produces exactly one final file
  • No Data Loss: All written data is recoverable until commit completion
  • Atomic Commits: Final file appears atomically or not at all
  • Idempotent Recovery: Multiple recovery attempts produce same result

Performance Considerations

  • Chunk Size: Configure gs.writer.chunk.size to optimize upload performance
  • Temporary Bucket: Use separate bucket for temporary objects to avoid hotspots
  • Entropy Injection: Enable gs.filesink.entropy.enabled for high-throughput scenarios
  • Composition Limits: Automatic handling of GCS 32-object composition limit

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-gs-fs-hadoop

docs

filesystem-configuration.md

index.md

recoverable-writer.md

storage-operations.md

tile.json