Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing
—
Utility functions for handling Hadoop configuration, command-line argument parsing, and integration between Hadoop and Flink execution environments. Provides helper methods for common configuration tasks and parameter management.
Utility for parsing Hadoop-style command-line arguments using GenericOptionsParser.
/**
* Returns ParameterTool for the arguments parsed by GenericOptionsParser
* @param args Input array arguments that should be parsable by GenericOptionsParser
* @return A ParameterTool containing the parsed parameters
* @throws IOException If arguments cannot be parsed by GenericOptionsParser
* @see GenericOptionsParser
*/
public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;Usage Example:
import org.apache.flink.hadoopcompatibility.HadoopUtils;
import org.apache.flink.util.ParameterTool;
public class HadoopCompatibleFlinkJob {
public static void main(String[] args) throws Exception {
// Parse Hadoop-style command line arguments
// Supports arguments like: -D property=value -files file1,file2 -archives archive1
ParameterTool parameters = HadoopUtils.paramsFromGenericOptionsParser(args);
// Access parsed parameters
String inputPath = parameters.get("input.path", "hdfs://default/input");
String outputPath = parameters.get("output.path", "hdfs://default/output");
int parallelism = parameters.getInt("parallelism", 1);
// Use parameters in Flink job configuration
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
// Create configuration from parameters
Configuration config = Configuration.fromMap(parameters.toMap());
System.out.println("Input path: " + inputPath);
System.out.println("Output path: " + outputPath);
System.out.println("Parallelism: " + parallelism);
}
}Working with Hadoop configuration objects and integrating them with Flink.
JobConf Integration:
import org.apache.hadoop.mapred.JobConf;
import org.apache.flink.util.ParameterTool;
// Create JobConf from parsed parameters
public static JobConf createJobConf(ParameterTool parameters) {
JobConf jobConf = new JobConf();
// Set common Hadoop properties from parameters
if (parameters.has("mapred.job.name")) {
jobConf.setJobName(parameters.get("mapred.job.name"));
}
if (parameters.has("mapred.reduce.tasks")) {
jobConf.setNumReduceTasks(parameters.getInt("mapred.reduce.tasks"));
}
// Copy all parameters as properties
for (String key : parameters.getProperties().stringPropertyNames()) {
jobConf.set(key, parameters.get(key));
}
return jobConf;
}
// Usage example
String[] args = {"-D", "mapred.job.name=MyFlinkJob", "-D", "input.path=/data/input"};
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
JobConf jobConf = createJobConf(params);Job (mapreduce API) Integration:
import org.apache.hadoop.mapreduce.Job;
import org.apache.flink.util.ParameterTool;
// Create Job from parsed parameters
public static Job createJob(ParameterTool parameters) throws IOException {
Job job = Job.getInstance();
// Set common mapreduce properties from parameters
if (parameters.has("mapreduce.job.name")) {
job.setJobName(parameters.get("mapreduce.job.name"));
}
if (parameters.has("mapreduce.job.reduces")) {
job.setNumReduceTasks(parameters.getInt("mapreduce.job.reduces"));
}
// Copy all parameters as configuration properties
for (String key : parameters.getProperties().stringPropertyNames()) {
job.getConfiguration().set(key, parameters.get(key));
}
return job;
}
// Usage example
String[] args = {"-D", "mapreduce.job.name=MyFlinkJob", "-D", "output.path=/data/output"};
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
Job job = createJob(params);Combining command-line arguments with environment variables.
public static ParameterTool createConfigurationFromMultipleSources(String[] args) throws IOException {
// Parse command-line arguments
ParameterTool cmdLineParams = HadoopUtils.paramsFromGenericOptionsParser(args);
// Merge with environment variables
ParameterTool envParams = ParameterTool.fromSystemProperties();
// Merge with properties file if specified
ParameterTool allParams = cmdLineParams;
if (cmdLineParams.has("config.file")) {
ParameterTool fileParams = ParameterTool.fromPropertiesFile(cmdLineParams.get("config.file"));
allParams = fileParams.mergeWith(cmdLineParams); // Command line overrides file
}
// Environment variables have lowest priority
allParams = allParams.mergeWith(envParams);
return allParams;
}
// Usage
String[] args = {"-D", "parallelism=4", "--config.file", "app.properties"};
ParameterTool config = createConfigurationFromMultipleSources(args);Validating and processing configuration parameters.
public static class ConfigurationValidator {
public static void validateRequiredParameters(ParameterTool params, String... requiredKeys)
throws IllegalArgumentException {
for (String key : requiredKeys) {
if (!params.has(key)) {
throw new IllegalArgumentException("Required parameter missing: " + key);
}
}
}
public static void validatePaths(ParameterTool params) throws IOException {
if (params.has("input.path")) {
String inputPath = params.get("input.path");
// Validate input path exists (for HDFS paths, would need Hadoop FileSystem)
if (!inputPath.startsWith("hdfs://") && !inputPath.startsWith("file://")) {
throw new IllegalArgumentException("Invalid input path format: " + inputPath);
}
}
if (params.has("output.path")) {
String outputPath = params.get("output.path");
// Validate output path format
if (!outputPath.startsWith("hdfs://") && !outputPath.startsWith("file://")) {
throw new IllegalArgumentException("Invalid output path format: " + outputPath);
}
}
}
public static int getValidatedParallelism(ParameterTool params, int defaultValue, int maxValue) {
int parallelism = params.getInt("parallelism", defaultValue);
if (parallelism < 1 || parallelism > maxValue) {
throw new IllegalArgumentException(
"Parallelism must be between 1 and " + maxValue + ", got: " + parallelism);
}
return parallelism;
}
}
// Usage example
public static void main(String[] args) throws Exception {
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
// Validate configuration
ConfigurationValidator.validateRequiredParameters(params, "input.path", "output.path");
ConfigurationValidator.validatePaths(params);
int parallelism = ConfigurationValidator.getValidatedParallelism(params, 1, 100);
// Use validated configuration
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
}Integrating with Hadoop resource management and file systems.
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
public static class ResourceManager {
public static Configuration createHadoopConfiguration(ParameterTool params) {
Configuration conf = new Configuration();
// Set HDFS configuration from parameters
if (params.has("fs.defaultFS")) {
conf.set("fs.defaultFS", params.get("fs.defaultFS"));
}
if (params.has("hadoop.conf.dir")) {
// Add Hadoop configuration directory to classpath
String confDir = params.get("hadoop.conf.dir");
conf.addResource(new Path(confDir + "/core-site.xml"));
conf.addResource(new Path(confDir + "/hdfs-site.xml"));
conf.addResource(new Path(confDir + "/mapred-site.xml"));
}
// Copy all hadoop.* properties
for (String key : params.getProperties().stringPropertyNames()) {
if (key.startsWith("hadoop.") || key.startsWith("fs.") || key.startsWith("dfs.")) {
conf.set(key, params.get(key));
}
}
return conf;
}
public static boolean validateInputPath(String inputPath, Configuration hadoopConf) throws IOException {
FileSystem fs = FileSystem.get(hadoopConf);
Path path = new Path(inputPath);
return fs.exists(path);
}
public static void ensureOutputPath(String outputPath, Configuration hadoopConf) throws IOException {
FileSystem fs = FileSystem.get(hadoopConf);
Path path = new Path(outputPath);
if (fs.exists(path)) {
fs.delete(path, true); // Delete existing output directory
}
// Create parent directories if they don't exist
Path parent = path.getParent();
if (parent != null && !fs.exists(parent)) {
fs.mkdirs(parent);
}
}
}Example of a complete Flink job using Hadoop utilities for configuration.
public class CompleteHadoopFlinkJob {
public static void main(String[] args) throws Exception {
// Parse Hadoop-style arguments
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
// Validate required parameters
if (!params.has("input") || !params.has("output")) {
System.err.println("Usage: program -D input=<path> -D output=<path> [options]");
System.exit(1);
}
// Set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
// Configure parallelism
if (params.has("parallelism")) {
env.setParallelism(params.getInt("parallelism"));
}
// Create Hadoop configuration
JobConf jobConf = new JobConf();
jobConf.set("mapred.input.dir", params.get("input"));
jobConf.set("mapred.output.dir", params.get("output"));
// Create input format
HadoopInputFormat<LongWritable, Text> inputFormat =
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
params.get("input"),
jobConf
);
// Process data
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(inputFormat);
DataSet<Tuple2<Text, IntWritable>> result = input
.flatMap(new WordCountMapper())
.groupBy(0)
.sum(1);
// Create output format
JobConf outputJobConf = new JobConf();
outputJobConf.set("mapred.output.dir", params.get("output"));
outputJobConf.setOutputFormat(TextOutputFormat.class);
HadoopOutputFormat<Text, IntWritable> outputFormat =
new HadoopOutputFormat<>(
new TextOutputFormat<Text, IntWritable>(),
outputJobConf
);
// Write results
result.output(outputFormat);
// Execute job
env.execute("Hadoop Compatible Flink Job");
}
}Utility methods for common parameter processing tasks.
public static class ParameterUtils {
public static String[] getInputPaths(ParameterTool params) {
String input = params.get("input", "");
return input.split(",");
}
public static String getOutputPath(ParameterTool params) {
return params.getRequired("output");
}
public static Map<String, String> getHadoopProperties(ParameterTool params) {
Map<String, String> hadoopProps = new HashMap<>();
for (String key : params.getProperties().stringPropertyNames()) {
if (key.startsWith("hadoop.") || key.startsWith("mapred.") ||
key.startsWith("mapreduce.") || key.startsWith("fs.") || key.startsWith("dfs.")) {
hadoopProps.put(key, params.get(key));
}
}
return hadoopProps;
}
public static JobConf createJobConfFromParams(ParameterTool params) {
JobConf jobConf = new JobConf();
getHadoopProperties(params).forEach(jobConf::set);
return jobConf;
}
public static Job createJobFromParams(ParameterTool params) throws IOException {
Job job = Job.getInstance();
getHadoopProperties(params).forEach((key, value) ->
job.getConfiguration().set(key, value));
return job;
}
}Command-line parameters parsed by GenericOptionsParser can be easily converted to ParameterTool and then propagated to both Flink and Hadoop configurations.
Multiple configuration sources (command-line, files, environment variables) can be layered with appropriate precedence rules.
Configuration validation should be performed early in the application lifecycle with clear error messages for missing or invalid parameters.
Integration with Hadoop FileSystem and resource management APIs enables proper handling of distributed file operations and cluster resources.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility