CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration Utilities

The Configuration Utilities capability provides helper functions for Hadoop configuration management, command-line argument parsing, and seamless integration with existing Hadoop tooling and workflows.

Overview

The configuration utilities enable smooth integration between Flink applications and existing Hadoop infrastructure by providing tools to parse Hadoop-style command-line arguments and manage Hadoop configuration objects within Flink programs.

HadoopUtils Class

Primary utility class for Hadoop configuration management.

@Public
public class HadoopUtils {
    
    /**
     * Returns ParameterTool for arguments parsed by GenericOptionsParser.
     * 
     * @param args Input array arguments parsable by GenericOptionsParser
     * @return A ParameterTool containing the parsed parameters
     * @throws IOException If arguments cannot be parsed by GenericOptionsParser
     */
    public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
}

Usage Examples

Basic Command-Line Argument Parsing

import org.apache.flink.hadoopcompatibility.HadoopUtils;
import org.apache.flink.api.java.utils.ParameterTool;

public static void main(String[] args) throws Exception {
    // Parse Hadoop-style command line arguments
    // Supports arguments like: -D key=value, -conf config.xml, -fs hdfs://namenode:port
    ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
    
    // Access parsed parameters
    String inputPath = params.get("input", "hdfs://localhost:9000/input");
    String outputPath = params.get("output", "hdfs://localhost:9000/output");
    int parallelism = params.getInt("parallelism", 1);
    
    // Configure Flink environment
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(parallelism);
    
    // Use parameters in your Flink job
    System.out.println("Input path: " + inputPath);
    System.out.println("Output path: " + outputPath);
}

Advanced Parameter Handling

import org.apache.hadoop.util.GenericOptionsParser;

public class HadoopFlinkJob {
    public static void main(String[] args) throws Exception {
        try {
            // Parse Hadoop command-line arguments
            ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
            
            // Check for required parameters
            if (!params.has("input")) {
                System.err.println("Missing required parameter: input");
                printUsage();
                System.exit(1);
            }
            
            // Extract configuration parameters
            String inputPath = params.getRequired("input");
            String outputPath = params.get("output", inputPath + "_output");
            boolean enableCompression = params.getBoolean("compress", false);
            String compressionCodec = params.get("codec", "gzip");
            int bufferSize = params.getInt("buffer.size", 65536);
            
            // Log configuration
            System.out.println("Job Configuration:");
            System.out.println("  Input Path: " + inputPath);
            System.out.println("  Output Path: " + outputPath);
            System.out.println("  Compression: " + enableCompression);
            if (enableCompression) {
                System.out.println("  Codec: " + compressionCodec);
            }
            System.out.println("  Buffer Size: " + bufferSize);
            
            // Run Flink job with parsed parameters
            runFlinkJob(params);
            
        } catch (IOException e) {
            System.err.println("Failed to parse command line arguments: " + e.getMessage());
            printUsage();
            System.exit(1);
        }
    }
    
    private static void printUsage() {
        System.out.println("Usage:");
        System.out.println("  HadoopFlinkJob -D input=<path> [-D output=<path>] [options]");
        System.out.println();
        System.out.println("Required parameters:");
        System.out.println("  -D input=<path>       Input data path");
        System.out.println();
        System.out.println("Optional parameters:");
        System.out.println("  -D output=<path>      Output data path (default: input_output)");
        System.out.println("  -D compress=<bool>    Enable compression (default: false)");
        System.out.println("  -D codec=<string>     Compression codec (default: gzip)");
        System.out.println("  -D buffer.size=<int>  Buffer size in bytes (default: 65536)");
        System.out.println("  -D parallelism=<int>  Job parallelism (default: 1)");
    }
    
    private static void runFlinkJob(ParameterTool params) throws Exception {
        // Implementation of the actual Flink job
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // Make parameters available to all operators
        env.getConfig().setGlobalJobParameters(params);
        
        // Configure based on parameters
        if (params.has("parallelism")) {
            env.setParallelism(params.getInt("parallelism"));
        }
        
        // Your Flink job logic here
        // ...
        
        env.execute("Hadoop-Flink Integration Job");
    }
}

Integration with Hadoop Configuration

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;

public class ConfigurationIntegration {
    public static void main(String[] args) throws Exception {
        // Parse Hadoop arguments
        ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
        
        // Create Hadoop configuration from parsed parameters
        JobConf jobConf = createJobConf(params);
        
        // Use configuration with Hadoop InputFormats
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
            HadoopInputs.readHadoopFile(
                new TextInputFormat(),
                LongWritable.class,
                Text.class,
                params.getRequired("input"),
                jobConf
            )
        );
        
        // Process data and write output
        // ...
    }
    
    private static JobConf createJobConf(ParameterTool params) {
        JobConf conf = new JobConf();
        
        // Set HDFS configuration
        if (params.has("fs.defaultFS")) {
            conf.set("fs.defaultFS", params.get("fs.defaultFS"));
        }
        
        // Set input/output formats
        if (params.has("mapreduce.inputformat.class")) {
            conf.set("mapreduce.inputformat.class", params.get("mapreduce.inputformat.class"));
        }
        
        // Set compression settings
        if (params.getBoolean("mapred.output.compress", false)) {
            conf.setBoolean("mapred.output.compress", true);
            conf.set("mapred.output.compression.codec", 
                     params.get("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"));
        }
        
        // Set custom properties
        for (String key : params.getProperties().stringPropertyNames()) {
            if (key.startsWith("hadoop.")) {
                String hadoopKey = key.substring("hadoop.".length());
                conf.set(hadoopKey, params.get(key));
            }
        }
        
        return conf;
    }
}

Command-Line Examples

The utility supports standard Hadoop command-line arguments:

# Basic usage with input/output paths
java -cp flink-job.jar HadoopFlinkJob \
  -D input=hdfs://namenode:9000/data/input \
  -D output=hdfs://namenode:9000/data/output

# With compression settings
java -cp flink-job.jar HadoopFlinkJob \
  -D input=hdfs://namenode:9000/data/input \
  -D output=hdfs://namenode:9000/data/output \
  -D mapred.output.compress=true \
  -D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec

# With custom Hadoop configuration
java -cp flink-job.jar HadoopFlinkJob \
  -conf /path/to/hadoop/conf/core-site.xml \
  -conf /path/to/hadoop/conf/hdfs-site.xml \
  -D input=hdfs://namenode:9000/data/input \
  -D output=hdfs://namenode:9000/data/output \
  -D parallelism=8

# With file system specification
java -cp flink-job.jar HadoopFlinkJob \
  -fs hdfs://namenode:9000 \
  -D input=/data/input \
  -D output=/data/output

# With custom properties
java -cp flink-job.jar HadoopFlinkJob \
  -D input=hdfs://namenode:9000/data/input \
  -D hadoop.io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec \
  -D hadoop.mapreduce.job.queuename=production \
  -D custom.batch.size=1000

Scala Integration

import org.apache.flink.hadoopcompatibility.HadoopUtils
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._

object ScalaHadoopJob {
  def main(args: Array[String]): Unit = {
    try {
      // Parse Hadoop-style arguments
      val params = HadoopUtils.paramsFromGenericOptionsParser(args)
      
      // Extract parameters with default values
      val inputPath = params.getRequired("input")
      val outputPath = params.get("output", s"${inputPath}_output")
      val parallelism = params.getInt("parallelism", 4)
      
      // Configure Flink environment
      val env = ExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(parallelism)
      env.getConfig.setGlobalJobParameters(params)
      
      // Log configuration
      println(s"Input: $inputPath")
      println(s"Output: $outputPath")
      println(s"Parallelism: $parallelism")
      
      // Run job with configuration
      runJob(env, params)
      
    } catch {
      case e: IOException =>
        Console.err.println(s"Failed to parse arguments: ${e.getMessage}")
        printUsage()
        sys.exit(1)
    }
  }
  
  def runJob(env: ExecutionEnvironment, params: ParameterTool): Unit = {
    // Job implementation using parsed parameters
    val inputPath = params.getRequired("input")
    val outputPath = params.get("output")
    
    // Create input with configuration
    val input = env.createInput(
      HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        classOf[LongWritable],
        classOf[Text],
        inputPath
      )
    )
    
    // Process and output
    val result = input
      .map(_._2.toString)
      .filter(_.nonEmpty)
      .map((_, 1))
      .groupBy(0)
      .sum(1)
    
    // Write output if specified
    if (params.has("output")) {
      // Configure and write output
      result.writeAsText(outputPath)
    } else {
      result.print()
    }
    
    env.execute("Scala Hadoop Integration Job")
  }
  
  def printUsage(): Unit = {
    println("Usage: ScalaHadoopJob -D input=<path> [options]")
    println("Options:")
    println("  -D input=<path>       Input data path (required)")
    println("  -D output=<path>      Output data path (optional)")
    println("  -D parallelism=<int>  Job parallelism (default: 4)")
  }
}

Configuration File Integration

import org.apache.hadoop.conf.Configuration;
import java.io.File;

public class ConfigFileIntegration {
    public static void main(String[] args) throws Exception {
        // Parse command line arguments
        ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
        
        // Load Hadoop configuration files if specified
        Configuration hadoopConf = loadHadoopConfiguration(params);
        
        // Convert to JobConf for use with Flink-Hadoop integration
        JobConf jobConf = new JobConf(hadoopConf);
        
        // Override with command-line parameters
        applyParameterOverrides(jobConf, params);
        
        // Use in Flink job
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
            HadoopInputs.createHadoopInput(
                new TextInputFormat(),
                LongWritable.class,
                Text.class,
                jobConf
            )
        );
        
        // Process data...
    }
    
    private static Configuration loadHadoopConfiguration(ParameterTool params) {
        Configuration conf = new Configuration();
        
        // Load core Hadoop configuration files
        String hadoopConfDir = params.get("hadoop.conf.dir", 
                                         System.getenv("HADOOP_CONF_DIR"));
        
        if (hadoopConfDir != null) {
            File confDir = new File(hadoopConfDir);
            if (confDir.exists() && confDir.isDirectory()) {
                // Load standard Hadoop configuration files
                loadConfigFile(conf, new File(confDir, "core-site.xml"));
                loadConfigFile(conf, new File(confDir, "hdfs-site.xml"));
                loadConfigFile(conf, new File(confDir, "mapred-site.xml"));
                loadConfigFile(conf, new File(confDir, "yarn-site.xml"));
            }
        }
        
        // Load additional config files specified via command line
        String[] configFiles = params.get("conf", "").split(",");
        for (String configFile : configFiles) {
            if (!configFile.trim().isEmpty()) {
                loadConfigFile(conf, new File(configFile.trim()));
            }
        }
        
        return conf;
    }
    
    private static void loadConfigFile(Configuration conf, File configFile) {
        if (configFile.exists() && configFile.isFile()) {
            try {
                conf.addResource(configFile.toURI().toURL());
                System.out.println("Loaded configuration from: " + configFile.getAbsolutePath());
            } catch (Exception e) {
                System.err.println("Failed to load config file " + configFile + ": " + e.getMessage());
            }
        }
    }
    
    private static void applyParameterOverrides(JobConf jobConf, ParameterTool params) {
        // Apply command-line parameter overrides
        for (String key : params.getProperties().stringPropertyNames()) {
            String value = params.get(key);
            
            // Skip Flink-specific parameters
            if (!key.startsWith("flink.")) {
                jobConf.set(key, value);
                System.out.println("Override: " + key + " = " + value);
            }
        }
    }
}

Error Handling

public class RobustConfigurationHandling {
    public static ParameterTool parseArgumentsSafely(String[] args) {
        try {
            return HadoopUtils.paramsFromGenericOptionsParser(args);
        } catch (IOException e) {
            System.err.println("Error parsing command line arguments: " + e.getMessage());
            System.err.println("This may be due to:");
            System.err.println("  - Invalid argument format");
            System.err.println("  - Missing configuration files");
            System.err.println("  - Insufficient permissions");
            
            // Provide fallback parameters
            return ParameterTool.fromArgs(args);
        } catch (IllegalArgumentException e) {
            System.err.println("Invalid arguments provided: " + e.getMessage());
            printUsage();
            throw e;
        }
    }
    
    private static void printUsage() {
        System.out.println("Supported argument formats:");
        System.out.println("  -D key=value              Set configuration property");
        System.out.println("  -conf <file>              Load configuration from file");
        System.out.println("  -fs <filesystem>          Set default filesystem");
        System.out.println("  -jt <jobtracker>          Set job tracker address");
        System.out.println("  -files <files>            Specify files to be copied");
        System.out.println("  -archives <archives>      Specify archives to be copied");
    }
}

Best Practices

Parameter Validation

public static void validateParameters(ParameterTool params) throws IllegalArgumentException {
    // Check required parameters
    String[] requiredParams = {"input"};
    for (String param : requiredParams) {
        if (!params.has(param)) {
            throw new IllegalArgumentException("Missing required parameter: " + param);
        }
    }
    
    // Validate parameter values
    if (params.has("parallelism")) {
        int parallelism = params.getInt("parallelism");
        if (parallelism <= 0) {
            throw new IllegalArgumentException("Parallelism must be positive, got: " + parallelism);
        }
    }
    
    // Validate paths
    String inputPath = params.get("input");
    if (!inputPath.startsWith("hdfs://") && !inputPath.startsWith("file://") && !inputPath.startsWith("/")) {
        throw new IllegalArgumentException("Invalid input path format: " + inputPath);
    }
}

Configuration Documentation

public static void printConfiguration(ParameterTool params) {
    System.out.println("=== Job Configuration ===");
    
    // Print all parameters sorted by key
    params.getProperties().stringPropertyNames().stream()
        .sorted()
        .forEach(key -> {
            String value = params.get(key);
            // Mask sensitive values
            if (key.toLowerCase().contains("password") || key.toLowerCase().contains("secret")) {
                value = "***";
            }
            System.out.println(String.format("  %-30s: %s", key, value));
        });
    
    System.out.println("========================");
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

docs

configuration.md

index.md

input-formats.md

mapreduce-functions.md

output-formats.md

scala-api.md

type-system.md

tile.json