CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-compatibility

Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing

Pending
Overview
Eval results
Files

utilities.mddocs/

Configuration Utilities

Utility functions for handling Hadoop configuration, command-line argument parsing, and integration between Hadoop and Flink execution environments. Provides helper methods for common configuration tasks and parameter management.

Capabilities

Command-Line Parameter Parsing

Utility for parsing Hadoop-style command-line arguments using GenericOptionsParser.

/**
 * Returns ParameterTool for the arguments parsed by GenericOptionsParser
 * @param args Input array arguments that should be parsable by GenericOptionsParser
 * @return A ParameterTool containing the parsed parameters
 * @throws IOException If arguments cannot be parsed by GenericOptionsParser
 * @see GenericOptionsParser
 */
public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;

Usage Example:

import org.apache.flink.hadoopcompatibility.HadoopUtils;
import org.apache.flink.util.ParameterTool;

public class HadoopCompatibleFlinkJob {
    public static void main(String[] args) throws Exception {
        // Parse Hadoop-style command line arguments
        // Supports arguments like: -D property=value -files file1,file2 -archives archive1
        ParameterTool parameters = HadoopUtils.paramsFromGenericOptionsParser(args);
        
        // Access parsed parameters
        String inputPath = parameters.get("input.path", "hdfs://default/input");
        String outputPath = parameters.get("output.path", "hdfs://default/output");
        int parallelism = parameters.getInt("parallelism", 1);
        
        // Use parameters in Flink job configuration
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        
        // Create configuration from parameters
        Configuration config = Configuration.fromMap(parameters.toMap());
        
        System.out.println("Input path: " + inputPath);
        System.out.println("Output path: " + outputPath);
        System.out.println("Parallelism: " + parallelism);
    }
}

Hadoop Configuration Integration

Working with Hadoop configuration objects and integrating them with Flink.

JobConf Integration:

import org.apache.hadoop.mapred.JobConf;
import org.apache.flink.util.ParameterTool;

// Create JobConf from parsed parameters
public static JobConf createJobConf(ParameterTool parameters) {
    JobConf jobConf = new JobConf();
    
    // Set common Hadoop properties from parameters
    if (parameters.has("mapred.job.name")) {
        jobConf.setJobName(parameters.get("mapred.job.name"));
    }
    
    if (parameters.has("mapred.reduce.tasks")) {
        jobConf.setNumReduceTasks(parameters.getInt("mapred.reduce.tasks"));
    }
    
    // Copy all parameters as properties
    for (String key : parameters.getProperties().stringPropertyNames()) {
        jobConf.set(key, parameters.get(key));
    }
    
    return jobConf;
}

// Usage example
String[] args = {"-D", "mapred.job.name=MyFlinkJob", "-D", "input.path=/data/input"};
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
JobConf jobConf = createJobConf(params);

Job (mapreduce API) Integration:

import org.apache.hadoop.mapreduce.Job;
import org.apache.flink.util.ParameterTool;

// Create Job from parsed parameters
public static Job createJob(ParameterTool parameters) throws IOException {
    Job job = Job.getInstance();
    
    // Set common mapreduce properties from parameters
    if (parameters.has("mapreduce.job.name")) {
        job.setJobName(parameters.get("mapreduce.job.name"));
    }
    
    if (parameters.has("mapreduce.job.reduces")) {
        job.setNumReduceTasks(parameters.getInt("mapreduce.job.reduces"));
    }
    
    // Copy all parameters as configuration properties
    for (String key : parameters.getProperties().stringPropertyNames()) {
        job.getConfiguration().set(key, parameters.get(key));
    }
    
    return job;
}

// Usage example
String[] args = {"-D", "mapreduce.job.name=MyFlinkJob", "-D", "output.path=/data/output"};
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
Job job = createJob(params);

Advanced Configuration Patterns

Environment Variable Integration

Combining command-line arguments with environment variables.

public static ParameterTool createConfigurationFromMultipleSources(String[] args) throws IOException {
    // Parse command-line arguments
    ParameterTool cmdLineParams = HadoopUtils.paramsFromGenericOptionsParser(args);
    
    // Merge with environment variables
    ParameterTool envParams = ParameterTool.fromSystemProperties();
    
    // Merge with properties file if specified
    ParameterTool allParams = cmdLineParams;
    if (cmdLineParams.has("config.file")) {
        ParameterTool fileParams = ParameterTool.fromPropertiesFile(cmdLineParams.get("config.file"));
        allParams = fileParams.mergeWith(cmdLineParams); // Command line overrides file
    }
    
    // Environment variables have lowest priority
    allParams = allParams.mergeWith(envParams);
    
    return allParams;
}

// Usage
String[] args = {"-D", "parallelism=4", "--config.file", "app.properties"};
ParameterTool config = createConfigurationFromMultipleSources(args);

Configuration Validation

Validating and processing configuration parameters.

public static class ConfigurationValidator {
    
    public static void validateRequiredParameters(ParameterTool params, String... requiredKeys) 
            throws IllegalArgumentException {
        for (String key : requiredKeys) {
            if (!params.has(key)) {
                throw new IllegalArgumentException("Required parameter missing: " + key);
            }
        }
    }
    
    public static void validatePaths(ParameterTool params) throws IOException {
        if (params.has("input.path")) {
            String inputPath = params.get("input.path");
            // Validate input path exists (for HDFS paths, would need Hadoop FileSystem)
            if (!inputPath.startsWith("hdfs://") && !inputPath.startsWith("file://")) {
                throw new IllegalArgumentException("Invalid input path format: " + inputPath);
            }
        }
        
        if (params.has("output.path")) {
            String outputPath = params.get("output.path");
            // Validate output path format
            if (!outputPath.startsWith("hdfs://") && !outputPath.startsWith("file://")) {
                throw new IllegalArgumentException("Invalid output path format: " + outputPath);
            }
        }
    }
    
    public static int getValidatedParallelism(ParameterTool params, int defaultValue, int maxValue) {
        int parallelism = params.getInt("parallelism", defaultValue);
        if (parallelism < 1 || parallelism > maxValue) {
            throw new IllegalArgumentException(
                "Parallelism must be between 1 and " + maxValue + ", got: " + parallelism);
        }
        return parallelism;
    }
}

// Usage example
public static void main(String[] args) throws Exception {
    ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
    
    // Validate configuration
    ConfigurationValidator.validateRequiredParameters(params, "input.path", "output.path");
    ConfigurationValidator.validatePaths(params);
    int parallelism = ConfigurationValidator.getValidatedParallelism(params, 1, 100);
    
    // Use validated configuration
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(parallelism);
}

Resource Management Integration

Integrating with Hadoop resource management and file systems.

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;

public static class ResourceManager {
    
    public static Configuration createHadoopConfiguration(ParameterTool params) {
        Configuration conf = new Configuration();
        
        // Set HDFS configuration from parameters
        if (params.has("fs.defaultFS")) {
            conf.set("fs.defaultFS", params.get("fs.defaultFS"));
        }
        
        if (params.has("hadoop.conf.dir")) {
            // Add Hadoop configuration directory to classpath
            String confDir = params.get("hadoop.conf.dir");
            conf.addResource(new Path(confDir + "/core-site.xml"));
            conf.addResource(new Path(confDir + "/hdfs-site.xml"));
            conf.addResource(new Path(confDir + "/mapred-site.xml"));
        }
        
        // Copy all hadoop.* properties
        for (String key : params.getProperties().stringPropertyNames()) {
            if (key.startsWith("hadoop.") || key.startsWith("fs.") || key.startsWith("dfs.")) {
                conf.set(key, params.get(key));
            }
        }
        
        return conf;
    }
    
    public static boolean validateInputPath(String inputPath, Configuration hadoopConf) throws IOException {
        FileSystem fs = FileSystem.get(hadoopConf);
        Path path = new Path(inputPath);
        return fs.exists(path);
    }
    
    public static void ensureOutputPath(String outputPath, Configuration hadoopConf) throws IOException {
        FileSystem fs = FileSystem.get(hadoopConf);
        Path path = new Path(outputPath);
        
        if (fs.exists(path)) {
            fs.delete(path, true); // Delete existing output directory
        }
        
        // Create parent directories if they don't exist
        Path parent = path.getParent();
        if (parent != null && !fs.exists(parent)) {
            fs.mkdirs(parent);
        }
    }
}

Common Usage Patterns

Complete Job Configuration

Example of a complete Flink job using Hadoop utilities for configuration.

public class CompleteHadoopFlinkJob {
    
    public static void main(String[] args) throws Exception {
        // Parse Hadoop-style arguments
        ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
        
        // Validate required parameters
        if (!params.has("input") || !params.has("output")) {
            System.err.println("Usage: program -D input=<path> -D output=<path> [options]");
            System.exit(1);
        }
        
        // Set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        
        // Configure parallelism
        if (params.has("parallelism")) {
            env.setParallelism(params.getInt("parallelism"));
        }
        
        // Create Hadoop configuration
        JobConf jobConf = new JobConf();
        jobConf.set("mapred.input.dir", params.get("input"));
        jobConf.set("mapred.output.dir", params.get("output"));
        
        // Create input format
        HadoopInputFormat<LongWritable, Text> inputFormat = 
            HadoopInputs.readHadoopFile(
                new TextInputFormat(),
                LongWritable.class,
                Text.class,
                params.get("input"),
                jobConf
            );
        
        // Process data
        DataSet<Tuple2<LongWritable, Text>> input = env.createInput(inputFormat);
        DataSet<Tuple2<Text, IntWritable>> result = input
            .flatMap(new WordCountMapper())
            .groupBy(0)
            .sum(1);
        
        // Create output format
        JobConf outputJobConf = new JobConf();
        outputJobConf.set("mapred.output.dir", params.get("output"));
        outputJobConf.setOutputFormat(TextOutputFormat.class);
        
        HadoopOutputFormat<Text, IntWritable> outputFormat = 
            new HadoopOutputFormat<>(
                new TextOutputFormat<Text, IntWritable>(),
                outputJobConf
            );
        
        // Write results
        result.output(outputFormat);
        
        // Execute job
        env.execute("Hadoop Compatible Flink Job");
    }
}

Parameter Processing Utilities

Utility methods for common parameter processing tasks.

public static class ParameterUtils {
    
    public static String[] getInputPaths(ParameterTool params) {
        String input = params.get("input", "");
        return input.split(",");
    }
    
    public static String getOutputPath(ParameterTool params) {
        return params.getRequired("output");
    }
    
    public static Map<String, String> getHadoopProperties(ParameterTool params) {
        Map<String, String> hadoopProps = new HashMap<>();
        for (String key : params.getProperties().stringPropertyNames()) {
            if (key.startsWith("hadoop.") || key.startsWith("mapred.") || 
                key.startsWith("mapreduce.") || key.startsWith("fs.") || key.startsWith("dfs.")) {
                hadoopProps.put(key, params.get(key));
            }
        }
        return hadoopProps;
    }
    
    public static JobConf createJobConfFromParams(ParameterTool params) {
        JobConf jobConf = new JobConf();
        getHadoopProperties(params).forEach(jobConf::set);
        return jobConf;
    }
    
    public static Job createJobFromParams(ParameterTool params) throws IOException {
        Job job = Job.getInstance();
        getHadoopProperties(params).forEach((key, value) -> 
            job.getConfiguration().set(key, value));
        return job;
    }
}

Key Design Patterns

Parameter Inheritance

Command-line parameters parsed by GenericOptionsParser can be easily converted to ParameterTool and then propagated to both Flink and Hadoop configurations.

Configuration Layering

Multiple configuration sources (command-line, files, environment variables) can be layered with appropriate precedence rules.

Validation and Error Handling

Configuration validation should be performed early in the application lifecycle with clear error messages for missing or invalid parameters.

Resource Management

Integration with Hadoop FileSystem and resource management APIs enables proper handling of distributed file operations and cluster resources.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility

docs

index.md

input-formats.md

mapreduce-functions.md

output-formats.md

type-system.md

utilities.md

tile.json