CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-fs

Hadoop FileSystem integration for Apache Flink enabling seamless access to HDFS and other Hadoop-compatible file systems

Pending
Overview
Eval results
Files

filesystem-factory.mddocs/

FileSystem Factory

The HadoopFsFactory provides a factory pattern implementation for creating Hadoop-based file systems in Flink applications. It automatically detects the appropriate Hadoop FileSystem implementation based on URI schemes and handles the complex initialization process.

Capabilities

HadoopFsFactory Class

Factory class that implements Flink's FileSystemFactory interface to create Hadoop-compatible file systems.

/**
 * A file system factory for Hadoop-based file systems.
 * Calls Hadoop's mechanism to find a file system implementation for a given file
 * system scheme and wraps it as a Flink file system.
 */
public class HadoopFsFactory implements FileSystemFactory {
    public HadoopFsFactory();
    
    /**
     * Returns the scheme handled by this factory.
     * @return "*" indicating it handles various schemes
     */
    public String getScheme();
    
    /**
     * Configures the factory with Flink configuration.
     * @param config Flink's configuration object
     */
    public void configure(Configuration config);
    
    /**
     * Creates a file system instance for the given URI.
     * @param fsUri URI of the file system to create
     * @return FileSystem instance wrapped as Flink FileSystem
     * @throws IOException if file system creation fails
     * @throws UnsupportedFileSystemSchemeException if scheme is not supported
     */
    public FileSystem create(URI fsUri) throws IOException;
}

Usage Examples:

import org.apache.flink.runtime.fs.hdfs.HadoopFsFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import java.net.URI;

// Create and configure factory
HadoopFsFactory factory = new HadoopFsFactory();
Configuration config = new Configuration();
// Add Hadoop configuration properties to Flink config if needed
config.setString("fs.defaultFS", "hdfs://namenode:9000");
factory.configure(config);

// Create HDFS file system
URI hdfsUri = URI.create("hdfs://namenode:9000/");
FileSystem hdfsFs = factory.create(hdfsUri);

// Create S3 file system
URI s3Uri = URI.create("s3a://my-bucket/");
FileSystem s3Fs = factory.create(s3Uri);

// Create local file system
URI localUri = URI.create("file:///tmp/");
FileSystem localFs = factory.create(localUri);

Factory Configuration

The factory integrates Flink and Hadoop configurations, allowing you to set Hadoop properties through Flink's configuration system.

import org.apache.flink.configuration.Configuration;

Configuration config = new Configuration();

// Set Hadoop configuration through Flink config
config.setString("fs.defaultFS", "hdfs://namenode:9000");
config.setString("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
config.setString("fs.s3a.access.key", "your-access-key");
config.setString("fs.s3a.secret.key", "your-secret-key");

// Configure connection limits
config.setInteger("fs.hdfs.limit.total", 100);
config.setInteger("fs.hdfs.limit.input", 50);
config.setInteger("fs.hdfs.limit.output", 50);
config.setLong("fs.hdfs.limit.timeout", 30000L);

factory.configure(config);

Scheme Detection and Support

The factory automatically detects and supports various file system schemes:

// HDFS schemes
FileSystem hdfs1 = factory.create(URI.create("hdfs://namenode:9000/"));
FileSystem hdfs2 = factory.create(URI.create("hdfs://ha-cluster/"));

// S3 schemes
FileSystem s3a = factory.create(URI.create("s3a://bucket/"));
FileSystem s3n = factory.create(URI.create("s3n://bucket/"));
FileSystem s3 = factory.create(URI.create("s3://bucket/"));

// Azure schemes
FileSystem wasb = factory.create(URI.create("wasb://container@account.blob.core.windows.net/"));
FileSystem abfs = factory.create(URI.create("abfs://container@account.dfs.core.windows.net/"));

// Google Cloud Storage
FileSystem gcs = factory.create(URI.create("gs://bucket/"));

// Local file system
FileSystem local = factory.create(URI.create("file:///path/"));

Connection Limiting

The factory supports connection limiting to prevent resource exhaustion when accessing remote file systems:

Configuration config = new Configuration();

// Set total connection limit for HDFS
config.setInteger("fs.hdfs.limit.total", 100);

// Set input stream connection limit
config.setInteger("fs.hdfs.limit.input", 50);

// Set output stream connection limit  
config.setInteger("fs.hdfs.limit.output", 30);

// Set connection timeout (milliseconds)
config.setLong("fs.hdfs.limit.timeout", 30000L);

// Set stream inactivity timeout (milliseconds)
config.setLong("fs.hdfs.limit.stream-timeout", 300000L);

factory.configure(config);

// Created file system will automatically use connection limiting
FileSystem limitedFs = factory.create(URI.create("hdfs://namenode:9000/"));

Error Handling

The factory provides detailed error handling for common configuration and connectivity issues:

try {
    FileSystem fs = factory.create(URI.create("hdfs://invalid-host:9000/"));
} catch (UnsupportedFileSystemSchemeException e) {
    // Scheme not supported or Hadoop classes missing
    System.err.println("Unsupported scheme: " + e.getMessage());
} catch (UnknownHostException e) {
    // Authority cannot be resolved
    System.err.println("Cannot resolve host: " + e.getMessage());
} catch (IOException e) {
    // General I/O error during file system creation
    System.err.println("File system creation failed: " + e.getMessage());
}

Authority Resolution

When a URI doesn't specify an authority, the factory attempts to resolve it from Hadoop configuration:

Configuration config = new Configuration();
config.setString("fs.defaultFS", "hdfs://namenode:9000");
factory.configure(config);

// This URI has no authority
URI uriNoAuthority = URI.create("hdfs:///path/to/file");

// Factory will use fs.defaultFS to resolve authority
FileSystem fs = factory.create(uriNoAuthority);
// Results in: hdfs://namenode:9000/path/to/file

Types

// Flink's FileSystemFactory interface
public interface FileSystemFactory {
    String getScheme();
    void configure(Configuration config);
    FileSystem create(URI fsUri) throws IOException;
}

// Flink's Configuration class
public class Configuration {
    public void setString(String key, String value);
    public void setInteger(String key, int value);
    public void setLong(String key, long value);
    public String getString(String key, String defaultValue);
}

// Exceptions thrown by factory
public class UnsupportedFileSystemSchemeException extends IOException {
    public UnsupportedFileSystemSchemeException(String message, Throwable cause);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-fs

docs

filesystem-factory.md

filesystem-operations.md

hadoop-utilities.md

index.md

io-streams.md

recoverable-writers.md

tile.json