Hadoop FileSystem integration for Apache Flink enabling seamless access to HDFS and other Hadoop-compatible file systems
—
The Hadoop FileSystem package provides recoverable writers that enable exactly-once processing guarantees through persistent state management and checkpoint/recovery mechanisms. These writers are essential for fault-tolerant streaming applications that require durability and consistency guarantees.
Main recoverable writer implementation that provides fault-tolerant writing capabilities for Hadoop file systems.
/**
* An implementation of the RecoverableWriter for Hadoop's file system abstraction.
* Supports fault-tolerant writing with exactly-once processing guarantees.
*/
@Internal
public class HadoopRecoverableWriter implements RecoverableWriter {
/**
* Creates a recoverable writer using the specified Hadoop FileSystem.
* @param fs Hadoop file system to write to
* @throws IOException if writer creation fails due to unsupported file system
*/
public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) throws IOException;
/**
* Creates a recoverable writer with no-local-write option.
* @param fs Hadoop file system to write to
* @param noLocalWrite if true, disables local write optimizations
* @throws IOException if writer creation fails
*/
public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs, boolean noLocalWrite) throws IOException;
}Methods for creating new recoverable output streams.
/**
* Opens a new recoverable output stream for the given file path.
* @param filePath target file path for writing
* @return RecoverableFsDataOutputStream for writing data
* @throws IOException if stream creation fails
*/
public RecoverableFsDataOutputStream open(Path filePath) throws IOException;
/**
* Indicates whether this writer supports resuming from a recoverable state.
* @return true (Hadoop recoverable writer supports resume)
*/
public boolean supportsResume();Usage Examples:
import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
// Create recoverable writer from Hadoop FileSystem
org.apache.hadoop.fs.FileSystem hadoopFs = // ... obtain Hadoop FS
HadoopRecoverableWriter writer = new HadoopRecoverableWriter(hadoopFs);
// Open recoverable stream for writing
Path outputPath = new Path("hdfs://namenode:9000/output/part-1.txt");
RecoverableFsDataOutputStream stream = writer.open(outputPath);
// Write data
stream.write("First batch of data\n".getBytes());
stream.write("Second batch of data\n".getBytes());
// Persist current state for recovery
HadoopFsRecoverable recoverable = (HadoopFsRecoverable) stream.persist();
// Continue writing
stream.write("Third batch of data\n".getBytes());
// Close and commit
stream.closeForCommit().commit();Methods for recovering from persistent state after failures.
/**
* Recovers a stream from a resumable recoverable state.
* @param recoverable the resumable state to recover from
* @return RecoverableFsDataOutputStream positioned at the recovered state
* @throws IOException if recovery fails
*/
public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;
/**
* Recovers a committer from a commit recoverable state.
* @param recoverable the commit state to recover from
* @return Committer that can complete the commit operation
* @throws IOException if recovery fails
*/
public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;
/**
* Indicates whether this writer requires cleanup of recoverable state.
* @return false (Hadoop writer doesn't require cleanup)
*/
public boolean requiresCleanupOfRecoverableState();
/**
* Cleans up recoverable state (no-op for Hadoop writer).
* @param resumable the resumable state to clean up
* @return false (no cleanup performed)
* @throws IOException if cleanup fails
*/
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException;Usage Examples:
// Recover from a previous session
HadoopFsRecoverable savedState = // ... load from checkpoint
RecoverableFsDataOutputStream recoveredStream = writer.recover(savedState);
// Continue writing from where we left off
recoveredStream.write("Continuing after recovery\n".getBytes());
// Or recover for commit only
Committer committer = writer.recoverForCommit(commitRecoverable);
committer.commit(); // Complete the commit operationMethods for serializing and deserializing recoverable state.
/**
* Gets the serializer for commit recoverable state.
* @return SimpleVersionedSerializer for CommitRecoverable objects
*/
public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer();
/**
* Gets the serializer for resume recoverable state.
* @return SimpleVersionedSerializer for ResumeRecoverable objects
*/
public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer();State object that contains all information needed to recover or commit a write operation.
/**
* Implementation of resume and commit descriptor objects for Hadoop's file system abstraction.
* Contains the state needed to recover a write operation.
*/
@Internal
public class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {
/**
* Creates a recoverable state descriptor.
* @param targetFile final target file path
* @param tempFile temporary file being written to
* @param offset current write position
*/
public HadoopFsRecoverable(Path targetFile, Path tempFile, long offset);
/**
* Gets the target file path.
* @return target file path where data will be committed
*/
public Path targetFile();
/**
* Gets the temporary file path.
* @return temporary file path where data is being written
*/
public Path tempFile();
/**
* Gets the current write offset.
* @return byte offset in the file
*/
public long offset();
/**
* String representation of the recoverable state.
* @return string describing the recoverable state
*/
public String toString();
}Usage Examples:
// Access recoverable state information
HadoopFsRecoverable recoverable = (HadoopFsRecoverable) stream.persist();
System.out.println("Target file: " + recoverable.targetFile());
System.out.println("Temp file: " + recoverable.tempFile());
System.out.println("Current offset: " + recoverable.offset());
// Serialize for checkpointing
SimpleVersionedSerializer<ResumeRecoverable> serializer = writer.getResumeRecoverableSerializer();
byte[] serializedState = serializer.serialize(recoverable);
// Later: deserialize and recover
HadoopFsRecoverable restored = (HadoopFsRecoverable) serializer.deserialize(
serializer.getVersion(), serializedState);
RecoverableFsDataOutputStream recoveredStream = writer.recover(restored);The recoverable output stream provides standard writing operations plus persistence capabilities.
/**
* Base class for HDFS and ABFS recoverable streams.
*/
@Internal
public abstract class BaseHadoopFsRecoverableFsDataOutputStream
extends CommitterFromPersistRecoverableFsDataOutputStream<HadoopFsRecoverable> {
/**
* Gets the current position in the stream.
* @return current byte position
* @throws IOException if operation fails
*/
public long getPos() throws IOException;
/**
* Writes a single byte.
* @param b byte to write
* @throws IOException if write fails
*/
public void write(int b) throws IOException;
/**
* Writes data from byte array.
* @param b byte array containing data
* @param off starting offset in array
* @param len number of bytes to write
* @throws IOException if write fails
*/
public void write(byte[] b, int off, int len) throws IOException;
/**
* Flushes buffered data.
* @throws IOException if flush fails
*/
public void flush() throws IOException;
/**
* Synchronizes data to storage.
* @throws IOException if sync fails
*/
public void sync() throws IOException;
/**
* Persists the current state for recovery.
* @return HadoopFsRecoverable representing current state
* @throws IOException if persist operation fails
*/
public HadoopFsRecoverable persist() throws IOException;
/**
* Closes the stream and returns a committer.
* @return Committer that can complete the write operation
* @throws IOException if close fails
*/
public Committer closeForCommit() throws IOException;
/**
* Closes the stream without committing.
* @throws IOException if close fails
*/
public void close() throws IOException;
}import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.io.SimpleVersionedSerializer;
public class FaultTolerantWriter {
private HadoopRecoverableWriter writer;
private SimpleVersionedSerializer<ResumeRecoverable> serializer;
public void initializeWriter(org.apache.hadoop.fs.FileSystem hadoopFs) throws IOException {
writer = new HadoopRecoverableWriter(hadoopFs);
serializer = writer.getResumeRecoverableSerializer();
}
public byte[] writeWithCheckpoint(Path outputPath, List<String> data) throws IOException {
RecoverableFsDataOutputStream stream = writer.open(outputPath);
HadoopFsRecoverable checkpoint = null;
try {
// Write data in batches with periodic checkpoints
for (int i = 0; i < data.size(); i++) {
stream.write(data.get(i).getBytes());
stream.write("\n".getBytes());
// Checkpoint every 100 records
if (i % 100 == 0) {
checkpoint = (HadoopFsRecoverable) stream.persist();
System.out.println("Checkpoint at record " + i +
", offset: " + checkpoint.offset());
}
}
// Commit the file
stream.closeForCommit().commit();
return checkpoint != null ? serializer.serialize(checkpoint) : null;
} catch (IOException e) {
// On failure, return checkpoint data for recovery
if (checkpoint != null) {
return serializer.serialize(checkpoint);
}
throw e;
}
}
public void recoverAndContinue(byte[] checkpointData, List<String> remainingData) throws IOException {
// Deserialize checkpoint
HadoopFsRecoverable recoverable = (HadoopFsRecoverable) serializer.deserialize(
serializer.getVersion(), checkpointData);
// Recover stream
RecoverableFsDataOutputStream stream = writer.recover(recoverable);
System.out.println("Recovered at offset: " + recoverable.offset());
System.out.println("Temp file: " + recoverable.tempFile());
// Continue writing
for (String line : remainingData) {
stream.write(line.getBytes());
stream.write("\n".getBytes());
}
// Commit
stream.closeForCommit().commit();
System.out.println("Recovery completed, data committed to: " + recoverable.targetFile());
}
}Serializer for persisting and restoring recoverable state.
/**
* Simple serializer for HadoopFsRecoverable objects.
*/
@Internal
public class HadoopRecoverableSerializer implements SimpleVersionedSerializer<HadoopFsRecoverable> {
/**
* Singleton instance of the serializer.
*/
public static final HadoopRecoverableSerializer INSTANCE;
/**
* Gets the version of the serialization format.
* @return version number (1)
*/
public int getVersion();
/**
* Serializes a HadoopFsRecoverable object.
* @param obj the recoverable object to serialize
* @return byte array containing serialized data
* @throws IOException if serialization fails
*/
public byte[] serialize(HadoopFsRecoverable obj) throws IOException;
/**
* Deserializes a HadoopFsRecoverable object.
* @param version version of the serialized data
* @param serialized byte array containing serialized data
* @return deserialized HadoopFsRecoverable object
* @throws IOException if deserialization fails
*/
public HadoopFsRecoverable deserialize(int version, byte[] serialized) throws IOException;
}Recoverable writers support a subset of Hadoop file systems that provide the necessary features for fault-tolerance:
try {
RecoverableFsDataOutputStream stream = writer.open(outputPath);
// ... write operations
} catch (UnsupportedOperationException e) {
System.err.println("File system doesn't support recoverable writes: " + e.getMessage());
} catch (IOException e) {
System.err.println("I/O error during recoverable write: " + e.getMessage());
}// Core recoverable writer interfaces
public interface RecoverableWriter {
RecoverableFsDataOutputStream open(Path filePath) throws IOException;
RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;
Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;
boolean supportsResume();
}
// Recoverable state interfaces
public interface ResumeRecoverable extends Serializable {}
public interface CommitRecoverable extends Serializable {}
// Committer interface
public interface Committer {
void commit() throws IOException;
}
// Serialization interface
public interface SimpleVersionedSerializer<T> {
int getVersion();
byte[] serialize(T obj) throws IOException;
T deserialize(int version, byte[] serialized) throws IOException;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-fs