or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

function-wrappers.mdindex.mdinput-output-formats.mdtype-system.mdutility-classes.md
tile.json

utility-classes.mddocs/

Utility Classes

Helper methods and utility functions that simplify common Hadoop integration tasks, including parameter parsing, configuration management, and common integration patterns.

Capabilities

HadoopUtils

Utility class providing helper methods for working with Apache Hadoop libraries, particularly focused on parameter parsing and configuration management.

/**
 * Utility class to work with Apache Hadoop libraries
 */
public class HadoopUtils {
    
    /**
     * Parses command-line arguments using Hadoop's GenericOptionsParser and returns a ParameterTool
     * 
     * This method leverages Hadoop's GenericOptionsParser to parse command-line arguments,
     * but only extracts -D property definitions in the form "-D key=value". Other Hadoop
     * options like -files, -libjars, etc. are processed by GenericOptionsParser but not
     * included in the returned ParameterTool.
     * 
     * @param args Command-line arguments to parse
     * @return ParameterTool containing parsed -D property definitions
     * @throws IOException if argument parsing fails or configuration access fails
     */
    public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
}

Usage Examples:

import org.apache.flink.hadoopcompatibility.HadoopUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.java.ExecutionEnvironment;

public class FlinkHadoopJob {
    public static void main(String[] args) throws Exception {
        // Parse Hadoop-style command line arguments
        ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
        
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        
        // Access Hadoop properties set via -D 
        String hadoopProperty = params.get("mapred.max.split.size", "67108864");
        String separator = params.get("mapred.textoutputformat.separator", "\t");
        
        // Note: params only contains -D properties. Other arguments like --input, --output
        // would need to be parsed separately using standard Java argument parsing
        
        // ... rest of Flink application
    }
}

Command Line Integration

The HadoopUtils.paramsFromGenericOptionsParser method supports all standard Hadoop command-line options:

# Example command line usage
java -cp flink-app.jar FlinkHadoopJob \
  -D mapred.max.split.size=134217728 \
  -D mapred.textoutputformat.separator="|" \
  --input hdfs://data/input \
  --output hdfs://data/output \
  --parallelism 4

# Note: -files and -libjars options are processed by GenericOptionsParser 
# but are not returned in the ParameterTool. They affect the Hadoop runtime environment.

Supported Options:

  • -D property=value: Set Hadoop configuration properties (extracted into ParameterTool)

Note: While GenericOptionsParser processes other Hadoop options like -files, -libjars, and -archives, only -D property definitions are extracted and returned in the ParameterTool. Other options are handled by Hadoop's infrastructure but not accessible through this method's return value.

Flink-specific Parameters:

Standard Flink parameters are also supported and can be mixed with Hadoop options:

// Access Hadoop -D properties (only -D options are extracted)
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

// Hadoop configuration properties (set via -D)
String splitSize = params.get("mapred.max.split.size");
String separator = params.get("mapred.textoutputformat.separator");

// Note: Non-D parameters like --input, --output are NOT included in the returned ParameterTool
// You would need to parse those separately or use a different approach

Configuration Bridge

The utility creates a seamless bridge between Hadoop and Flink configuration systems:

import org.apache.hadoop.conf.Configuration;
import org.apache.flink.api.java.utils.ParameterTool;

// Parse arguments with Hadoop GenericOptionsParser
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

// Extract Hadoop configuration for use with InputFormats/OutputFormats
Configuration hadoopConf = new Configuration();
for (String key : params.toMap().keySet()) {
    if (key.startsWith("mapred.") || key.startsWith("fs.") || key.startsWith("dfs.")) {
        hadoopConf.set(key, params.get(key));
    }
}

// Use with Hadoop InputFormats
JobConf jobConf = new JobConf(hadoopConf);
HadoopInputFormat<LongWritable, Text> inputFormat = new HadoopInputFormat<>(
    new TextInputFormat(), LongWritable.class, Text.class, jobConf
);

Error Handling

HadoopUtils provides comprehensive error handling for common integration scenarios:

Configuration Errors:

try {
    ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
} catch (IOException e) {
    // Handle configuration parsing errors
    System.err.println("Failed to parse Hadoop configuration: " + e.getMessage());
    // Common causes:
    // - Invalid -D property syntax
    // - Missing required configuration files
    // - File system access issues for -files/-libjars
}

Parameter Validation:

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

// Validate required parameters
try {
    String input = params.getRequired("input");
    String output = params.getRequired("output");
} catch (RuntimeException e) {
    System.err.println("Missing required parameter: " + e.getMessage());
    printUsage();
    System.exit(1);
}

// Validate parameter formats
try {
    int parallelism = params.getInt("parallelism");
    if (parallelism <= 0) {
        throw new IllegalArgumentException("Parallelism must be positive");
    }
} catch (NumberFormatException e) {
    System.err.println("Invalid parallelism value: " + params.get("parallelism"));
    System.exit(1);
}

Best Practices

Application Structure

Recommended pattern for Flink applications that need Hadoop integration:

public class FlinkHadoopApplication {
    public static void main(String[] args) throws Exception {
        // 1. Parse arguments using HadoopUtils
        ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
        
        // 2. Set up Flink environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        
        // 3. Create Hadoop configuration from parsed parameters
        Configuration hadoopConf = createHadoopConfiguration(params);
        
        // 4. Build and execute Flink pipeline
        buildPipeline(env, params, hadoopConf).execute("Flink Hadoop Job");
    }
    
    private static Configuration createHadoopConfiguration(ParameterTool params) {
        Configuration conf = new Configuration();
        // Extract Hadoop-specific properties
        params.toMap().entrySet().stream()
            .filter(entry -> isHadoopProperty(entry.getKey()))
            .forEach(entry -> conf.set(entry.getKey(), entry.getValue()));
        return conf;
    }
    
    private static boolean isHadoopProperty(String key) {
        return key.startsWith("fs.") || key.startsWith("dfs.") || 
               key.startsWith("mapred.") || key.startsWith("yarn.");
    }
}

Parameter Naming Conventions

Follow consistent naming patterns for command-line parameters:

// Recommended parameter names
String inputPath = params.getRequired("input");        // or --input-path
String outputPath = params.getRequired("output");      // or --output-path
int parallelism = params.getInt("parallelism", 1);     // or --parallelism
boolean verbose = params.getBoolean("verbose", false); // or --verbose

// Hadoop configuration via -D flags
// -D fs.defaultFS=hdfs://namenode:9000
// -D mapred.max.split.size=134217728
// -D dfs.blocksize=268435456

Testing Integration

Utilities for testing Hadoop integration:

@Test
public void testParameterParsing() throws IOException {
    String[] args = {
        "-D", "mapred.max.split.size=1048576",
        "-D", "fs.defaultFS=hdfs://localhost:9000",
        "--input", "/test/input",
        "--output", "/test/output"
    };
    
    ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
    
    assertEquals("1048576", params.get("mapred.max.split.size"));
    assertEquals("hdfs://localhost:9000", params.get("fs.defaultFS"));
    assertEquals("/test/input", params.get("input"));
    assertEquals("/test/output", params.get("output"));
}

Performance Considerations

Configuration Overhead

  • Parse once: Call paramsFromGenericOptionsParser once at application startup
  • Cache results: Store ParameterTool and Configuration objects for reuse
  • Avoid repeated parsing: Don't re-parse arguments in loops or transformations

Memory Usage

  • Configuration size: Large Hadoop configurations can impact memory usage
  • Parameter scope: Use global job parameters judiciously to avoid serialization overhead
  • String interning: Hadoop configuration keys are often repeated, benefiting from string interning

Distributed Cache

Proper usage of Hadoop's distributed cache features:

# Add configuration files to all task nodes
java -cp flink-app.jar FlinkHadoopJob \
  -files hdfs://config/core-site.xml,hdfs://config/hdfs-site.xml \
  --input hdfs://data/input \
  --output hdfs://data/output

Access distributed cache files in your Flink application:

// Files specified with -files are available in working directory
Path configFile = Paths.get("core-site.xml");
if (Files.exists(configFile)) {
    // Use the configuration file
    Configuration conf = new Configuration();
    conf.addResource(configFile.toString());
}