CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-fs

Hadoop FileSystem integration for Apache Flink enabling seamless access to HDFS and other Hadoop-compatible file systems

Pending
Overview
Eval results
Files

recoverable-writers.mddocs/

Fault-Tolerant Writing

The Hadoop FileSystem package provides recoverable writers that enable exactly-once processing guarantees through persistent state management and checkpoint/recovery mechanisms. These writers are essential for fault-tolerant streaming applications that require durability and consistency guarantees.

Capabilities

HadoopRecoverableWriter

Main recoverable writer implementation that provides fault-tolerant writing capabilities for Hadoop file systems.

/**
 * An implementation of the RecoverableWriter for Hadoop's file system abstraction.
 * Supports fault-tolerant writing with exactly-once processing guarantees.
 */
@Internal
public class HadoopRecoverableWriter implements RecoverableWriter {
    /**
     * Creates a recoverable writer using the specified Hadoop FileSystem.
     * @param fs Hadoop file system to write to
     * @throws IOException if writer creation fails due to unsupported file system
     */
    public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) throws IOException;
    
    /**
     * Creates a recoverable writer with no-local-write option.
     * @param fs Hadoop file system to write to
     * @param noLocalWrite if true, disables local write optimizations
     * @throws IOException if writer creation fails
     */
    public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs, boolean noLocalWrite) throws IOException;
}

Opening and Creating Streams

Methods for creating new recoverable output streams.

/**
 * Opens a new recoverable output stream for the given file path.
 * @param filePath target file path for writing
 * @return RecoverableFsDataOutputStream for writing data
 * @throws IOException if stream creation fails
 */
public RecoverableFsDataOutputStream open(Path filePath) throws IOException;

/**
 * Indicates whether this writer supports resuming from a recoverable state.
 * @return true (Hadoop recoverable writer supports resume)
 */
public boolean supportsResume();

Usage Examples:

import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;

// Create recoverable writer from Hadoop FileSystem
org.apache.hadoop.fs.FileSystem hadoopFs = // ... obtain Hadoop FS
HadoopRecoverableWriter writer = new HadoopRecoverableWriter(hadoopFs);

// Open recoverable stream for writing
Path outputPath = new Path("hdfs://namenode:9000/output/part-1.txt");
RecoverableFsDataOutputStream stream = writer.open(outputPath);

// Write data
stream.write("First batch of data\n".getBytes());
stream.write("Second batch of data\n".getBytes());

// Persist current state for recovery
HadoopFsRecoverable recoverable = (HadoopFsRecoverable) stream.persist();

// Continue writing
stream.write("Third batch of data\n".getBytes());

// Close and commit
stream.closeForCommit().commit();

Recovery Operations

Methods for recovering from persistent state after failures.

/**
 * Recovers a stream from a resumable recoverable state.
 * @param recoverable the resumable state to recover from
 * @return RecoverableFsDataOutputStream positioned at the recovered state
 * @throws IOException if recovery fails
 */
public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;

/**
 * Recovers a committer from a commit recoverable state.
 * @param recoverable the commit state to recover from
 * @return Committer that can complete the commit operation
 * @throws IOException if recovery fails
 */
public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;

/**
 * Indicates whether this writer requires cleanup of recoverable state.
 * @return false (Hadoop writer doesn't require cleanup)
 */
public boolean requiresCleanupOfRecoverableState();

/**
 * Cleans up recoverable state (no-op for Hadoop writer).
 * @param resumable the resumable state to clean up
 * @return false (no cleanup performed)
 * @throws IOException if cleanup fails
 */
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException;

Usage Examples:

// Recover from a previous session
HadoopFsRecoverable savedState = // ... load from checkpoint
RecoverableFsDataOutputStream recoveredStream = writer.recover(savedState);

// Continue writing from where we left off
recoveredStream.write("Continuing after recovery\n".getBytes());

// Or recover for commit only
Committer committer = writer.recoverForCommit(commitRecoverable);
committer.commit(); // Complete the commit operation

Serialization Support

Methods for serializing and deserializing recoverable state.

/**
 * Gets the serializer for commit recoverable state.
 * @return SimpleVersionedSerializer for CommitRecoverable objects
 */
public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();

/**
 * Gets the serializer for resume recoverable state.
 * @return SimpleVersionedSerializer for ResumeRecoverable objects
 */
public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();

HadoopFsRecoverable

State object that contains all information needed to recover or commit a write operation.

/**
 * Implementation of resume and commit descriptor objects for Hadoop's file system abstraction.
 * Contains the state needed to recover a write operation.
 */
@Internal
public class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {
    /**
     * Creates a recoverable state descriptor.
     * @param targetFile final target file path
     * @param tempFile temporary file being written to
     * @param offset current write position
     */
    public HadoopFsRecoverable(Path targetFile, Path tempFile, long offset);
    
    /**
     * Gets the target file path.
     * @return target file path where data will be committed
     */
    public Path targetFile();
    
    /**
     * Gets the temporary file path.
     * @return temporary file path where data is being written
     */
    public Path tempFile();
    
    /**
     * Gets the current write offset.
     * @return byte offset in the file
     */
    public long offset();
    
    /**
     * String representation of the recoverable state.
     * @return string describing the recoverable state
     */
    public String toString();
}

Usage Examples:

// Access recoverable state information
HadoopFsRecoverable recoverable = (HadoopFsRecoverable) stream.persist();

System.out.println("Target file: " + recoverable.targetFile());
System.out.println("Temp file: " + recoverable.tempFile());
System.out.println("Current offset: " + recoverable.offset());

// Serialize for checkpointing
SimpleVersionedSerializer<ResumeRecoverable> serializer = writer.getResumeRecoverableSerializer();
byte[] serializedState = serializer.serialize(recoverable);

// Later: deserialize and recover
HadoopFsRecoverable restored = (HadoopFsRecoverable) serializer.deserialize(
    serializer.getVersion(), serializedState);
RecoverableFsDataOutputStream recoveredStream = writer.recover(restored);

Recoverable Output Stream Operations

The recoverable output stream provides standard writing operations plus persistence capabilities.

/**
 * Base class for HDFS and ABFS recoverable streams.
 */
@Internal
public abstract class BaseHadoopFsRecoverableFsDataOutputStream 
    extends CommitterFromPersistRecoverableFsDataOutputStream<HadoopFsRecoverable> {
    
    /**
     * Gets the current position in the stream.
     * @return current byte position
     * @throws IOException if operation fails
     */
    public long getPos() throws IOException;
    
    /**
     * Writes a single byte.
     * @param b byte to write
     * @throws IOException if write fails
     */
    public void write(int b) throws IOException;
    
    /**
     * Writes data from byte array.
     * @param b byte array containing data
     * @param off starting offset in array
     * @param len number of bytes to write
     * @throws IOException if write fails
     */
    public void write(byte[] b, int off, int len) throws IOException;
    
    /**
     * Flushes buffered data.
     * @throws IOException if flush fails
     */
    public void flush() throws IOException;
    
    /**
     * Synchronizes data to storage.
     * @throws IOException if sync fails
     */
    public void sync() throws IOException;
    
    /**
     * Persists the current state for recovery.
     * @return HadoopFsRecoverable representing current state
     * @throws IOException if persist operation fails
     */
    public HadoopFsRecoverable persist() throws IOException;
    
    /**
     * Closes the stream and returns a committer.
     * @return Committer that can complete the write operation
     * @throws IOException if close fails
     */
    public Committer closeForCommit() throws IOException;
    
    /**
     * Closes the stream without committing.
     * @throws IOException if close fails
     */
    public void close() throws IOException;
}

Complete Fault-Tolerant Writing Example

import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.io.SimpleVersionedSerializer;

public class FaultTolerantWriter {
    private HadoopRecoverableWriter writer;
    private SimpleVersionedSerializer<ResumeRecoverable> serializer;
    
    public void initializeWriter(org.apache.hadoop.fs.FileSystem hadoopFs) throws IOException {
        writer = new HadoopRecoverableWriter(hadoopFs);
        serializer = writer.getResumeRecoverableSerializer();
    }
    
    public byte[] writeWithCheckpoint(Path outputPath, List<String> data) throws IOException {
        RecoverableFsDataOutputStream stream = writer.open(outputPath);
        HadoopFsRecoverable checkpoint = null;
        
        try {
            // Write data in batches with periodic checkpoints
            for (int i = 0; i < data.size(); i++) {
                stream.write(data.get(i).getBytes());
                stream.write("\n".getBytes());
                
                // Checkpoint every 100 records
                if (i % 100 == 0) {
                    checkpoint = (HadoopFsRecoverable) stream.persist();
                    System.out.println("Checkpoint at record " + i + 
                                     ", offset: " + checkpoint.offset());
                }
            }
            
            // Commit the file
            stream.closeForCommit().commit();
            
            return checkpoint != null ? serializer.serialize(checkpoint) : null;
            
        } catch (IOException e) {
            // On failure, return checkpoint data for recovery
            if (checkpoint != null) {
                return serializer.serialize(checkpoint);
            }
            throw e;
        }
    }
    
    public void recoverAndContinue(byte[] checkpointData, List<String> remainingData) throws IOException {
        // Deserialize checkpoint
        HadoopFsRecoverable recoverable = (HadoopFsRecoverable) serializer.deserialize(
            serializer.getVersion(), checkpointData);
        
        // Recover stream
        RecoverableFsDataOutputStream stream = writer.recover(recoverable);
        
        System.out.println("Recovered at offset: " + recoverable.offset());
        System.out.println("Temp file: " + recoverable.tempFile());
        
        // Continue writing
        for (String line : remainingData) {
            stream.write(line.getBytes());
            stream.write("\n".getBytes());
        }
        
        // Commit
        stream.closeForCommit().commit();
        
        System.out.println("Recovery completed, data committed to: " + recoverable.targetFile());
    }
}

HadoopRecoverableSerializer

Serializer for persisting and restoring recoverable state.

/**
 * Simple serializer for HadoopFsRecoverable objects.
 */
@Internal
public class HadoopRecoverableSerializer implements SimpleVersionedSerializer<HadoopFsRecoverable> {
    /**
     * Singleton instance of the serializer.
     */
    public static final HadoopRecoverableSerializer INSTANCE;
    
    /**
     * Gets the version of the serialization format.
     * @return version number (1)
     */
    public int getVersion();
    
    /**
     * Serializes a HadoopFsRecoverable object.
     * @param obj the recoverable object to serialize
     * @return byte array containing serialized data
     * @throws IOException if serialization fails
     */
    public byte[] serialize(HadoopFsRecoverable obj) throws IOException;
    
    /**
     * Deserializes a HadoopFsRecoverable object.
     * @param version version of the serialized data
     * @param serialized byte array containing serialized data
     * @return deserialized HadoopFsRecoverable object
     * @throws IOException if deserialization fails
     */
    public HadoopFsRecoverable deserialize(int version, byte[] serialized) throws IOException;
}

Supported File Systems

Recoverable writers support a subset of Hadoop file systems that provide the necessary features for fault-tolerance:

  • HDFS: Full support with atomic rename and append operations
  • S3: Limited support depending on S3 implementation (S3A with certain configurations)
  • Azure: Support for Azure Data Lake Storage (ABFS)
  • Local FS: Full support for testing and development

Error Handling

try {
    RecoverableFsDataOutputStream stream = writer.open(outputPath);
    // ... write operations
} catch (UnsupportedOperationException e) {
    System.err.println("File system doesn't support recoverable writes: " + e.getMessage());
} catch (IOException e) {
    System.err.println("I/O error during recoverable write: " + e.getMessage());
}

Types

// Core recoverable writer interfaces
public interface RecoverableWriter {
    RecoverableFsDataOutputStream open(Path filePath) throws IOException;
    RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;
    Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;
    boolean supportsResume();
}

// Recoverable state interfaces
public interface ResumeRecoverable extends Serializable {}
public interface CommitRecoverable extends Serializable {}

// Committer interface
public interface Committer {
    void commit() throws IOException;
}

// Serialization interface
public interface SimpleVersionedSerializer<T> {
    int getVersion();
    byte[] serialize(T obj) throws IOException;
    T deserialize(int version, byte[] serialized) throws IOException;
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-fs

docs

filesystem-factory.md

filesystem-operations.md

hadoop-utilities.md

index.md

io-streams.md

recoverable-writers.md

tile.json