CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-fs

Hadoop FileSystem integration for Apache Flink enabling seamless access to HDFS and other Hadoop-compatible file systems

Pending
Overview
Eval results
Files

hadoop-utilities.mddocs/

Hadoop Integration Utilities

The Hadoop integration utilities provide essential functions for configuration management, security handling, and version compatibility when working with Hadoop ecosystems. These utilities bridge Flink and Hadoop configurations and handle authentication and security concerns.

Capabilities

HadoopUtils

Main utility class providing static methods for Hadoop integration tasks.

/**
 * Utility class for working with Hadoop-related classes.
 * Should only be used if Hadoop is on the classpath.
 */
public class HadoopUtils {
    /**
     * HDFS delegation token kind identifier.
     */
    public static final Text HDFS_DELEGATION_TOKEN_KIND;
}

Configuration Management

Methods for converting and managing configurations between Flink and Hadoop.

/**
 * Gets Hadoop configuration from Flink configuration.
 * Converts Flink configuration properties to Hadoop configuration format.
 * @param flinkConfiguration Flink's configuration object
 * @return Hadoop Configuration with converted properties
 */
public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration);

Usage Examples:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.HadoopUtils;

// Create Flink configuration with Hadoop properties
Configuration flinkConfig = new Configuration();
flinkConfig.setString("fs.defaultFS", "hdfs://namenode:9000");
flinkConfig.setString("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
flinkConfig.setString("dfs.replication", "3");
flinkConfig.setString("dfs.blocksize", "134217728"); // 128MB

// Convert to Hadoop configuration
org.apache.hadoop.conf.Configuration hadoopConfig = 
    HadoopUtils.getHadoopConfiguration(flinkConfig);

// Hadoop config now contains the converted properties
System.out.println("HDFS default FS: " + hadoopConfig.get("fs.defaultFS"));
System.out.println("Block size: " + hadoopConfig.get("dfs.blocksize"));

// Use with Hadoop FileSystem
org.apache.hadoop.fs.FileSystem hadoopFs = 
    org.apache.hadoop.fs.FileSystem.get(hadoopConfig);

Kerberos Security Management

Methods for handling Kerberos authentication and security validation.

/**
 * Checks if Kerberos security is enabled for the user.
 * @param ugi UserGroupInformation to check
 * @return true if Kerberos security is enabled
 */
public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi);

/**
 * Validates if Kerberos credentials are valid and not expired.
 * @param ugi UserGroupInformation to validate
 * @param useTicketCache whether to use ticket cache for validation
 * @return true if credentials are valid
 */
public static boolean areKerberosCredentialsValid(UserGroupInformation ugi, boolean useTicketCache);

/**
 * Checks if the user has HDFS delegation tokens.
 * @param ugi UserGroupInformation to check
 * @return true if HDFS delegation tokens are present
 */
public static boolean hasHDFSDelegationToken(UserGroupInformation ugi);

Usage Examples:

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.flink.runtime.util.HadoopUtils;

// Get current user information
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

// Check security configuration
if (HadoopUtils.isKerberosSecurityEnabled(ugi)) {
    System.out.println("Kerberos security is enabled");
    
    // Validate credentials
    boolean validCredentials = HadoopUtils.areKerberosCredentialsValid(ugi, true);
    if (validCredentials) {
        System.out.println("Kerberos credentials are valid");
    } else {
        System.err.println("Kerberos credentials are invalid or expired");
        // Handle credential renewal
    }
    
    // Check for delegation tokens
    if (HadoopUtils.hasHDFSDelegationToken(ugi)) {
        System.out.println("HDFS delegation tokens available");
    } else {
        System.out.println("No HDFS delegation tokens found");
    }
} else {
    System.out.println("Simple authentication (no Kerberos)");
}

// Login with keytab (if needed)
if (!HadoopUtils.areKerberosCredentialsValid(ugi, false)) {
    UserGroupInformation.loginUserFromKeytab("user@REALM", "/path/to/user.keytab");
    ugi = UserGroupInformation.getCurrentUser();
}

Version Compatibility Checks

Methods for checking Hadoop version compatibility.

/**
 * Checks if the current Hadoop version meets the minimum required version.
 * @param major minimum major version required
 * @param minor minimum minor version required
 * @return true if current version meets minimum requirements
 * @throws FlinkRuntimeException if version information cannot be determined
 */
public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException;

/**
 * Checks if the current Hadoop version is at most the specified maximum version.
 * @param major maximum major version allowed
 * @param minor maximum minor version allowed
 * @return true if current version is within maximum limits
 * @throws FlinkRuntimeException if version information cannot be determined
 */
public static boolean isMaxHadoopVersion(int major, int minor) throws FlinkRuntimeException;

Usage Examples:

import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.runtime.util.HadoopUtils;

try {
    // Check minimum version requirements
    if (HadoopUtils.isMinHadoopVersion(2, 6)) {
        System.out.println("Hadoop version is 2.6 or higher");
    } else {
        System.err.println("Hadoop version is below 2.6, some features may not work");
    }
    
    // Check maximum version constraints
    if (HadoopUtils.isMaxHadoopVersion(3, 2)) {
        System.out.println("Hadoop version is 3.2 or lower");
    } else {
        System.out.println("Hadoop version is above 3.2, compatibility not guaranteed");
    }
    
    // Version range check
    if (HadoopUtils.isMinHadoopVersion(2, 7) && HadoopUtils.isMaxHadoopVersion(3, 1)) {
        System.out.println("Hadoop version is in supported range (2.7 - 3.1)");
        // Enable version-specific features
    }
    
} catch (FlinkRuntimeException e) {
    System.err.println("Cannot determine Hadoop version: " + e.getMessage());
}

HadoopConfigLoader

Advanced configuration loader that provides lazy loading and caching of Hadoop configurations.

/**
 * Lazily loads Hadoop configuration from resettable Flink's configuration.
 * Provides efficient configuration management with caching and reset capabilities.
 */
public class HadoopConfigLoader {
    /**
     * Creates a configuration loader with specified parameters.
     * @param flinkConfigPrefixes prefixes for Flink configuration keys
     * @param mirroredConfigKeys configuration keys to mirror between systems
     * @param hadoopConfigPrefix prefix for Hadoop configuration
     * @param packagePrefixesToShade package prefixes to shade
     * @param configKeysToShade configuration keys to shade
     * @param flinkShadingPrefix Flink shading prefix
     */
    public HadoopConfigLoader(String[] flinkConfigPrefixes, 
                             String[][] mirroredConfigKeys,
                             String hadoopConfigPrefix,
                             Set<String> packagePrefixesToShade,
                             Set<String> configKeysToShade,
                             String flinkShadingPrefix);
    
    /**
     * Sets the Flink configuration and triggers reload on next access.
     * @param config new Flink configuration
     */
    public void setFlinkConfig(Configuration config);
    
    /**
     * Gets the current Hadoop configuration, loading it if necessary.
     * @return loaded Hadoop Configuration object
     */
    public org.apache.hadoop.conf.Configuration getOrLoadHadoopConfig();
}

Usage Examples:

import org.apache.flink.runtime.util.HadoopConfigLoader;
import java.util.Set;
import java.util.HashSet;

// Create configuration loader
String[] flinkPrefixes = {"flink.hadoop."};
String[][] mirroredKeys = {{"fs.defaultFS", "fs.default.name"}};
Set<String> shadedPackages = new HashSet<>();
shadedPackages.add("org.apache.hadoop");

HadoopConfigLoader loader = new HadoopConfigLoader(
    flinkPrefixes,
    mirroredKeys,
    "hadoop.",
    shadedPackages,
    new HashSet<>(),
    "org.apache.flink.shaded."
);

// Set Flink configuration
Configuration flinkConfig = new Configuration();
flinkConfig.setString("flink.hadoop.fs.defaultFS", "hdfs://namenode:9000");
loader.setFlinkConfig(flinkConfig);

// Load Hadoop configuration (cached after first load)
org.apache.hadoop.conf.Configuration hadoopConfig = loader.getOrLoadHadoopConfig();

// Configuration is cached until setFlinkConfig is called again
org.apache.hadoop.conf.Configuration sameConfig = loader.getOrLoadHadoopConfig();
// Returns cached instance

Security Token Management

Advanced token management for secure Hadoop clusters.

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;

// Check and manage delegation tokens
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

if (HadoopUtils.hasHDFSDelegationToken(ugi)) {
    // Get all tokens
    Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
    
    for (Token<? extends TokenIdentifier> token : tokens) {
        if (token.getKind().equals(HadoopUtils.HDFS_DELEGATION_TOKEN_KIND)) {
            System.out.println("HDFS Token: " + token.toString());
            System.out.println("Token service: " + token.getService());
            
            // Check token expiration
            long expirationTime = token.decodeIdentifier().getMaxDate();
            if (System.currentTimeMillis() > expirationTime) {
                System.out.println("Token is expired, renewal needed");
            }
        }
    }
}

Configuration Integration Example

Complete example showing integration between Flink and Hadoop configurations:

public class HadoopIntegrationExample {
    
    public void setupHadoopIntegration() {
        // Create Flink configuration with Hadoop properties
        Configuration flinkConfig = new Configuration();
        
        // Basic HDFS configuration
        flinkConfig.setString("fs.defaultFS", "hdfs://ha-namenode:9000");
        flinkConfig.setString("dfs.nameservices", "mycluster");
        flinkConfig.setString("dfs.ha.namenodes.mycluster", "nn1,nn2");
        flinkConfig.setString("dfs.namenode.rpc-address.mycluster.nn1", "namenode1:8020");
        flinkConfig.setString("dfs.namenode.rpc-address.mycluster.nn2", "namenode2:8020");
        
        // Security configuration
        flinkConfig.setString("hadoop.security.authentication", "kerberos");
        flinkConfig.setString("hadoop.security.authorization", "true");
        
        // Performance tuning
        flinkConfig.setString("dfs.client.read.shortcircuit", "true");
        flinkConfig.setString("dfs.client.cache.drop.behind.writes", "true");
        
        // Convert to Hadoop configuration
        org.apache.hadoop.conf.Configuration hadoopConfig = 
            HadoopUtils.getHadoopConfiguration(flinkConfig);
        
        // Validate setup
        try {
            if (HadoopUtils.isMinHadoopVersion(2, 6)) {
                System.out.println("Hadoop version compatible");
                
                // Check security
                UserGroupInformation.setConfiguration(hadoopConfig);
                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
                
                if (HadoopUtils.isKerberosSecurityEnabled(ugi)) {
                    if (HadoopUtils.areKerberosCredentialsValid(ugi, true)) {
                        System.out.println("Security setup valid");
                    } else {
                        System.err.println("Invalid Kerberos credentials");
                    }
                }
                
                // Create file system
                org.apache.hadoop.fs.FileSystem fs = 
                    org.apache.hadoop.fs.FileSystem.get(hadoopConfig);
                System.out.println("Successfully connected to: " + fs.getUri());
                
            } else {
                throw new RuntimeException("Unsupported Hadoop version");
            }
            
        } catch (Exception e) {
            System.err.println("Hadoop integration failed: " + e.getMessage());
        }
    }
}

Types

// Hadoop configuration classes
public class Configuration {
    public String get(String name);
    public void set(String name, String value);
    public boolean getBoolean(String name, boolean defaultValue);
    public int getInt(String name, int defaultValue);
}

// Security classes
public abstract class UserGroupInformation {
    public static UserGroupInformation getCurrentUser() throws IOException;
    public static void loginUserFromKeytab(String user, String path) throws IOException;
    public Collection<Token<? extends TokenIdentifier>> getTokens();
    public boolean hasKerberosCredentials();
}

// Token classes
public class Token<T extends TokenIdentifier> {
    public Text getKind();
    public Text getService();
    public T decodeIdentifier() throws IOException;
}

public class Text {
    public Text(String string);
    public String toString();
    public boolean equals(Object o);
}

// Exception types
public class FlinkRuntimeException extends RuntimeException {
    public FlinkRuntimeException(String message);
    public FlinkRuntimeException(String message, Throwable cause);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-fs

docs

filesystem-factory.md

filesystem-operations.md

hadoop-utilities.md

index.md

io-streams.md

recoverable-writers.md

tile.json