CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-fs

Hadoop FileSystem integration for Apache Flink enabling seamless access to HDFS and other Hadoop-compatible file systems

Pending
Overview
Eval results
Files

filesystem-operations.mddocs/

Core FileSystem Operations

The HadoopFileSystem class provides comprehensive file system operations by wrapping Hadoop's FileSystem implementations with Flink's FileSystem interface. It supports all standard file operations including reading, writing, directory management, and metadata access across all Hadoop-compatible file systems.

Capabilities

HadoopFileSystem Class

Main file system implementation that wraps Hadoop FileSystem with Flink's interface.

/**
 * A FileSystem that wraps a Hadoop File System.
 * Provides Flink's file system interface over Hadoop's file system abstraction.
 */
public class HadoopFileSystem extends FileSystem {
    /**
     * Wraps the given Hadoop File System object as a Flink File System.
     * The Hadoop file system object is expected to be initialized already.
     * @param hadoopFileSystem The Hadoop FileSystem to wrap
     */
    public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem);
    
    /**
     * Gets the underlying Hadoop FileSystem.
     * @return The underlying Hadoop FileSystem
     */
    public org.apache.hadoop.fs.FileSystem getHadoopFileSystem();
    
    /**
     * Gets the working directory.
     * @return Path to the working directory
     */
    public Path getWorkingDirectory();
    
    /**
     * Gets the home directory.
     * @return Path to the home directory
     */
    public Path getHomeDirectory();
    
    /**
     * Gets the URI of this file system.
     * @return URI of the file system
     */
    public URI getUri();
    
    /**
     * Returns true since Hadoop file systems are distributed.
     * @return always true
     */
    public boolean isDistributedFS();
    
    /**
     * Gets the default block size for this file system.
     * @return default block size in bytes
     */
    public long getDefaultBlockSize();
}

File Status Operations

Methods for retrieving file and directory metadata information.

/**
 * Gets the file status for the specified path.
 * @param f path to get status for
 * @return FileStatus containing metadata
 * @throws IOException if the path doesn't exist or operation fails
 */
public FileStatus getFileStatus(Path f) throws IOException;

/**
 * Checks if a path exists.
 * @param f path to check
 * @return true if path exists, false otherwise
 * @throws IOException if operation fails
 */
public boolean exists(Path f) throws IOException;

/**
 * Lists the status of files/directories in a directory.
 * @param f directory path to list
 * @return array of FileStatus objects for directory contents
 * @throws IOException if path is not a directory or operation fails
 */
public FileStatus[] listStatus(Path f) throws IOException;

/**
 * Gets file block locations for the specified file and range.
 * @param file file status object
 * @param start starting byte position
 * @param len number of bytes
 * @return array of BlockLocation objects
 * @throws IOException if operation fails
 */
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException;

Usage Examples:

import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.BlockLocation;

// Get file status
Path filePath = new Path("hdfs://namenode:9000/data/input.txt");
FileStatus status = fs.getFileStatus(filePath);

System.out.println("File length: " + status.getLen());
System.out.println("Block size: " + status.getBlockSize());
System.out.println("Modification time: " + status.getModificationTime());
System.out.println("Is directory: " + status.isDir());

// Check if file exists
boolean exists = fs.exists(filePath);
if (exists) {
    System.out.println("File exists");
}

// List directory contents
Path dirPath = new Path("hdfs://namenode:9000/data/");
FileStatus[] files = fs.listStatus(dirPath);
for (FileStatus file : files) {
    System.out.println(file.getPath() + " - " + file.getLen() + " bytes");
}

// Get block locations for data locality
BlockLocation[] blocks = fs.getFileBlockLocations(status, 0, status.getLen());
for (BlockLocation block : blocks) {
    System.out.println("Block at offset " + block.getOffset() + 
                      " on hosts: " + String.join(",", block.getHosts()));
}

File Reading Operations

Methods for opening and reading files with various options.

/**
 * Opens a file for reading.
 * @param f path to the file
 * @return HadoopDataInputStream for reading
 * @throws IOException if file cannot be opened
 */
public HadoopDataInputStream open(Path f) throws IOException;

/**
 * Opens a file for reading with specified buffer size.
 * @param f path to the file
 * @param bufferSize buffer size for reading
 * @return HadoopDataInputStream for reading
 * @throws IOException if file cannot be opened
 */
public HadoopDataInputStream open(Path f, int bufferSize) throws IOException;

Usage Examples:

import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;

// Open file for reading
Path inputPath = new Path("hdfs://namenode:9000/data/input.txt");
HadoopDataInputStream inputStream = fs.open(inputPath);

// Read data
byte[] buffer = new byte[1024];
int bytesRead = inputStream.read(buffer);
while (bytesRead != -1) {
    // Process buffer
    System.out.write(buffer, 0, bytesRead);
    bytesRead = inputStream.read(buffer);
}
inputStream.close();

// Open with custom buffer size
HadoopDataInputStream bufferedStream = fs.open(inputPath, 64 * 1024); // 64KB buffer
// ... read operations
bufferedStream.close();

// Random access reading
inputStream = fs.open(inputPath);
inputStream.seek(1000); // Seek to position 1000
int byteAtPosition = inputStream.read();
inputStream.close();

File Writing Operations

Methods for creating and writing files with various configuration options.

/**
 * Creates a file with write mode specification.
 * @param f path to create
 * @param overwrite write mode (OVERWRITE or NO_OVERWRITE)
 * @return HadoopDataOutputStream for writing
 * @throws IOException if file cannot be created
 */
public HadoopDataOutputStream create(Path f, WriteMode overwrite) throws IOException;

/**
 * Creates a file with detailed HDFS parameters.
 * @param f path to create
 * @param overwrite whether to overwrite existing file
 * @param bufferSize buffer size for writing
 * @param replication replication factor
 * @param blockSize block size in bytes
 * @return HadoopDataOutputStream for writing
 * @throws IOException if file cannot be created
 */
public HadoopDataOutputStream create(Path f, boolean overwrite, int bufferSize, 
                                   short replication, long blockSize) throws IOException;

Usage Examples:

import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
import org.apache.flink.core.fs.FileSystem.WriteMode;

// Create file with simple write mode
Path outputPath = new Path("hdfs://namenode:9000/data/output.txt");
HadoopDataOutputStream outputStream = fs.create(outputPath, WriteMode.OVERWRITE);

// Write data
String data = "Hello, Hadoop FileSystem!";
outputStream.write(data.getBytes());
outputStream.flush();
outputStream.close();

// Create file with detailed HDFS parameters
HadoopDataOutputStream hdfsStream = fs.create(
    outputPath,
    true,        // overwrite
    32 * 1024,   // 32KB buffer
    (short) 3,   // replication factor
    128 * 1024 * 1024  // 128MB block size
);

// Write with positioning
hdfsStream.write("First part".getBytes());
long position = hdfsStream.getPos();
hdfsStream.write("Second part".getBytes());
hdfsStream.sync(); // Force sync to storage
hdfsStream.close();

Directory Operations

Methods for directory management and manipulation.

/**
 * Creates directories for the specified path.
 * @param f path to create directories for
 * @return true if directories were created or already exist
 * @throws IOException if operation fails
 */
public boolean mkdirs(Path f) throws IOException;

/**
 * Deletes a file or directory.
 * @param f path to delete
 * @param recursive if true, delete directory recursively
 * @return true if deletion was successful
 * @throws IOException if operation fails
 */
public boolean delete(Path f, boolean recursive) throws IOException;

/**
 * Renames a file or directory.
 * @param src source path
 * @param dst destination path
 * @return true if rename was successful
 * @throws IOException if operation fails
 */
public boolean rename(Path src, Path dst) throws IOException;

Usage Examples:

// Create directories
Path dirPath = new Path("hdfs://namenode:9000/data/processed/");
boolean created = fs.mkdirs(dirPath);
if (created) {
    System.out.println("Directories created successfully");
}

// Delete file
Path fileToDelete = new Path("hdfs://namenode:9000/data/temp.txt");
boolean deleted = fs.delete(fileToDelete, false);

// Delete directory recursively
Path dirToDelete = new Path("hdfs://namenode:9000/data/temp/");
boolean deletedRecursive = fs.delete(dirToDelete, true);

// Rename file
Path oldPath = new Path("hdfs://namenode:9000/data/old_name.txt");
Path newPath = new Path("hdfs://namenode:9000/data/new_name.txt");
boolean renamed = fs.rename(oldPath, newPath);

Recoverable Writer Creation

Methods for creating fault-tolerant writers that support exactly-once processing guarantees.

/**
 * Creates a recoverable writer for fault-tolerant writing.
 * @return RecoverableWriter instance
 * @throws IOException if writer creation fails due to unsupported file system
 */
public RecoverableWriter createRecoverableWriter() throws IOException;

/**
 * Creates a recoverable writer with configuration options.
 * @param conf configuration map with writer options
 * @return RecoverableWriter instance
 * @throws IOException if writer creation fails
 */
public RecoverableWriter createRecoverableWriter(Map<String, String> conf) throws IOException;

Usage Examples:

import org.apache.flink.core.fs.RecoverableWriter;
import java.util.HashMap;
import java.util.Map;

// Create recoverable writer with default settings
RecoverableWriter writer = fs.createRecoverableWriter();

// Create recoverable writer with configuration
Map<String, String> config = new HashMap<>();
config.put("fs.hdfs.no-local-write", "true");
RecoverableWriter configuredWriter = fs.createRecoverableWriter(config);

// Use the writer for fault-tolerant streaming
Path outputPath = new Path("hdfs://namenode:9000/output/data.txt");
RecoverableFsDataOutputStream stream = writer.open(outputPath);
// ... write data with recovery capabilities

Utility Methods

Additional utility methods for path conversion and file system identification.

/**
 * Converts Flink Path to Hadoop Path.
 * @param path Flink Path object
 * @return Hadoop Path object
 */
public static org.apache.hadoop.fs.Path toHadoopPath(Path path);

/**
 * Gets the file system kind based on scheme.
 * @param scheme URI scheme
 * @return FileSystemKind indicating the type of file system
 */
static FileSystemKind getKindForScheme(String scheme);

File Status Types

/**
 * FileStatus implementation for Hadoop file systems.
 */
public class HadoopFileStatus implements FileStatus {
    public HadoopFileStatus(org.apache.hadoop.fs.FileStatus fileStatus);
    
    public long getLen();
    public long getBlockSize();
    public long getAccessTime();
    public long getModificationTime();
    public short getReplication();
    public Path getPath();
    public boolean isDir();
    public org.apache.hadoop.fs.FileStatus getInternalFileStatus();
    
    public static HadoopFileStatus fromHadoopStatus(org.apache.hadoop.fs.FileStatus fileStatus);
}

/**
 * FileStatus with block location information.
 */
public class LocatedHadoopFileStatus extends HadoopFileStatus implements LocatedFileStatus {
    public LocatedHadoopFileStatus(org.apache.hadoop.fs.LocatedFileStatus fileStatus);
    public BlockLocation[] getBlockLocations();
}

/**
 * Block location implementation for Hadoop file systems.
 */
public class HadoopBlockLocation implements BlockLocation {
    public HadoopBlockLocation(org.apache.hadoop.fs.BlockLocation blockLocation);
    
    public String[] getHosts() throws IOException;
    public long getLength();
    public long getOffset();
    public int compareTo(BlockLocation o);
}

Error Handling

Common exceptions and error scenarios:

try {
    FileStatus status = fs.getFileStatus(nonExistentPath);
} catch (FileNotFoundException e) {
    System.err.println("File not found: " + e.getMessage());
} catch (IOException e) {
    System.err.println("I/O error: " + e.getMessage());
}

try {
    fs.delete(readOnlyFile, false);
} catch (AccessControlException e) {
    System.err.println("Permission denied: " + e.getMessage());
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-fs

docs

filesystem-factory.md

filesystem-operations.md

hadoop-utilities.md

index.md

io-streams.md

recoverable-writers.md

tile.json