Hadoop FileSystem integration for Apache Flink enabling seamless access to HDFS and other Hadoop-compatible file systems
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hadoop-fs@2.1.0Apache Flink Hadoop FileSystem (flink-hadoop-fs) provides seamless integration between Apache Flink's file system abstraction and Hadoop's file system implementations. It enables Flink applications to access HDFS, S3, Azure Blob Storage, Google Cloud Storage, and other Hadoop-compatible file systems with full support for fault-tolerant streaming, exactly-once processing guarantees, and high-performance I/O operations.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-fs</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.runtime.fs.hdfs.HadoopFsFactory;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;import org.apache.flink.runtime.fs.hdfs.HadoopFsFactory;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.Configuration;
import java.net.URI;
// Create and configure factory
HadoopFsFactory factory = new HadoopFsFactory();
Configuration config = new Configuration();
factory.configure(config);
// Create file system for HDFS
URI hdfsUri = URI.create("hdfs://namenode:9000/");
FileSystem fs = factory.create(hdfsUri);
// Basic file operations
Path filePath = new Path("hdfs://namenode:9000/data/input.txt");
boolean exists = fs.exists(filePath);
FileStatus status = fs.getFileStatus(filePath);
// Read from file
FSDataInputStream inputStream = fs.open(filePath);
// ... read data
inputStream.close();
// Write to file
FSDataOutputStream outputStream = fs.create(new Path("hdfs://namenode:9000/data/output.txt"));
outputStream.writeUTF("Hello, Hadoop!");
outputStream.close();Apache Flink Hadoop FileSystem is built around several key components:
HadoopFsFactory creates appropriate file system instances for different schemes (HDFS, S3, etc.)HadoopFileSystem wraps Hadoop's FileSystem implementations with Flink's interfaceFactory for creating Hadoop-based file systems that automatically detects and instantiates the appropriate implementation based on URI schemes.
public class HadoopFsFactory implements FileSystemFactory {
public String getScheme();
public void configure(Configuration config);
public FileSystem create(URI fsUri) throws IOException;
}Comprehensive file system operations including reading, writing, directory management, and metadata access with support for all Hadoop-compatible file systems.
public class HadoopFileSystem extends FileSystem {
public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem);
public FileStatus getFileStatus(Path f) throws IOException;
public HadoopDataInputStream open(Path f) throws IOException;
public HadoopDataOutputStream create(Path f, WriteMode overwrite) throws IOException;
public boolean delete(Path f, boolean recursive) throws IOException;
public FileStatus[] listStatus(Path f) throws IOException;
public RecoverableWriter createRecoverableWriter() throws IOException;
}Optimized input and output streams with ByteBuffer support, connection limiting, and advanced positioning capabilities for efficient data processing.
public class HadoopDataInputStream extends FSDataInputStream implements ByteBufferReadable {
public void seek(long seekPos) throws IOException;
public int read(ByteBuffer byteBuffer) throws IOException;
public int read(long position, ByteBuffer byteBuffer) throws IOException;
}
public class HadoopDataOutputStream extends FSDataOutputStream {
public long getPos() throws IOException;
public void sync() throws IOException;
}Recoverable writers that provide exactly-once processing guarantees through persistent state management and checkpoint/recovery mechanisms.
public class HadoopRecoverableWriter implements RecoverableWriter {
public RecoverableFsDataOutputStream open(Path filePath) throws IOException;
public RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException;
public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException;
}
public class HadoopFsRecoverable implements CommitRecoverable, ResumeRecoverable {
public Path targetFile();
public Path tempFile();
public long offset();
}Utility functions for configuration management, security handling, and version compatibility checks when working with Hadoop ecosystems.
public class HadoopUtils {
public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration);
public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi);
public static boolean areKerberosCredentialsValid(UserGroupInformation ugi, boolean useTicketCache);
}This package supports all Hadoop-compatible file systems through automatic scheme detection:
The package throws standard Java IOExceptions for file system operations, with specific exceptions for:
UnsupportedFileSystemSchemeException: When the URI scheme is not supported by HadoopUnknownHostException: When the file system authority cannot be resolvedIOException: General I/O errors during file operationsFlinkRuntimeException: Configuration and version compatibility issuesAll exceptions include detailed error messages to aid in troubleshooting configuration and connectivity issues.
// Core file system interface
public abstract class FileSystem {
public enum WriteMode { NO_OVERWRITE, OVERWRITE }
}
// File metadata
public interface FileStatus {
long getLen();
long getBlockSize();
long getAccessTime();
long getModificationTime();
short getReplication();
Path getPath();
boolean isDir();
}
// Block location information
public interface BlockLocation extends Comparable<BlockLocation> {
String[] getHosts() throws IOException;
long getLength();
long getOffset();
}
// Path representation
public class Path {
public Path(String path);
public Path(URI uri);
public URI toUri();
}