Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs
—
The Configuration Utilities capability provides helper functions for Hadoop configuration management, command-line argument parsing, and seamless integration with existing Hadoop tooling and workflows.
The configuration utilities enable smooth integration between Flink applications and existing Hadoop infrastructure by providing tools to parse Hadoop-style command-line arguments and manage Hadoop configuration objects within Flink programs.
Primary utility class for Hadoop configuration management.
@Public
public class HadoopUtils {
/**
* Returns ParameterTool for arguments parsed by GenericOptionsParser.
*
* @param args Input array arguments parsable by GenericOptionsParser
* @return A ParameterTool containing the parsed parameters
* @throws IOException If arguments cannot be parsed by GenericOptionsParser
*/
public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
}import org.apache.flink.hadoopcompatibility.HadoopUtils;
import org.apache.flink.api.java.utils.ParameterTool;
public static void main(String[] args) throws Exception {
// Parse Hadoop-style command line arguments
// Supports arguments like: -D key=value, -conf config.xml, -fs hdfs://namenode:port
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
// Access parsed parameters
String inputPath = params.get("input", "hdfs://localhost:9000/input");
String outputPath = params.get("output", "hdfs://localhost:9000/output");
int parallelism = params.getInt("parallelism", 1);
// Configure Flink environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
// Use parameters in your Flink job
System.out.println("Input path: " + inputPath);
System.out.println("Output path: " + outputPath);
}import org.apache.hadoop.util.GenericOptionsParser;
public class HadoopFlinkJob {
public static void main(String[] args) throws Exception {
try {
// Parse Hadoop command-line arguments
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
// Check for required parameters
if (!params.has("input")) {
System.err.println("Missing required parameter: input");
printUsage();
System.exit(1);
}
// Extract configuration parameters
String inputPath = params.getRequired("input");
String outputPath = params.get("output", inputPath + "_output");
boolean enableCompression = params.getBoolean("compress", false);
String compressionCodec = params.get("codec", "gzip");
int bufferSize = params.getInt("buffer.size", 65536);
// Log configuration
System.out.println("Job Configuration:");
System.out.println(" Input Path: " + inputPath);
System.out.println(" Output Path: " + outputPath);
System.out.println(" Compression: " + enableCompression);
if (enableCompression) {
System.out.println(" Codec: " + compressionCodec);
}
System.out.println(" Buffer Size: " + bufferSize);
// Run Flink job with parsed parameters
runFlinkJob(params);
} catch (IOException e) {
System.err.println("Failed to parse command line arguments: " + e.getMessage());
printUsage();
System.exit(1);
}
}
private static void printUsage() {
System.out.println("Usage:");
System.out.println(" HadoopFlinkJob -D input=<path> [-D output=<path>] [options]");
System.out.println();
System.out.println("Required parameters:");
System.out.println(" -D input=<path> Input data path");
System.out.println();
System.out.println("Optional parameters:");
System.out.println(" -D output=<path> Output data path (default: input_output)");
System.out.println(" -D compress=<bool> Enable compression (default: false)");
System.out.println(" -D codec=<string> Compression codec (default: gzip)");
System.out.println(" -D buffer.size=<int> Buffer size in bytes (default: 65536)");
System.out.println(" -D parallelism=<int> Job parallelism (default: 1)");
}
private static void runFlinkJob(ParameterTool params) throws Exception {
// Implementation of the actual Flink job
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Make parameters available to all operators
env.getConfig().setGlobalJobParameters(params);
// Configure based on parameters
if (params.has("parallelism")) {
env.setParallelism(params.getInt("parallelism"));
}
// Your Flink job logic here
// ...
env.execute("Hadoop-Flink Integration Job");
}
}import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
public class ConfigurationIntegration {
public static void main(String[] args) throws Exception {
// Parse Hadoop arguments
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
// Create Hadoop configuration from parsed parameters
JobConf jobConf = createJobConf(params);
// Use configuration with Hadoop InputFormats
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
params.getRequired("input"),
jobConf
)
);
// Process data and write output
// ...
}
private static JobConf createJobConf(ParameterTool params) {
JobConf conf = new JobConf();
// Set HDFS configuration
if (params.has("fs.defaultFS")) {
conf.set("fs.defaultFS", params.get("fs.defaultFS"));
}
// Set input/output formats
if (params.has("mapreduce.inputformat.class")) {
conf.set("mapreduce.inputformat.class", params.get("mapreduce.inputformat.class"));
}
// Set compression settings
if (params.getBoolean("mapred.output.compress", false)) {
conf.setBoolean("mapred.output.compress", true);
conf.set("mapred.output.compression.codec",
params.get("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"));
}
// Set custom properties
for (String key : params.getProperties().stringPropertyNames()) {
if (key.startsWith("hadoop.")) {
String hadoopKey = key.substring("hadoop.".length());
conf.set(hadoopKey, params.get(key));
}
}
return conf;
}
}The utility supports standard Hadoop command-line arguments:
# Basic usage with input/output paths
java -cp flink-job.jar HadoopFlinkJob \
-D input=hdfs://namenode:9000/data/input \
-D output=hdfs://namenode:9000/data/output
# With compression settings
java -cp flink-job.jar HadoopFlinkJob \
-D input=hdfs://namenode:9000/data/input \
-D output=hdfs://namenode:9000/data/output \
-D mapred.output.compress=true \
-D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec
# With custom Hadoop configuration
java -cp flink-job.jar HadoopFlinkJob \
-conf /path/to/hadoop/conf/core-site.xml \
-conf /path/to/hadoop/conf/hdfs-site.xml \
-D input=hdfs://namenode:9000/data/input \
-D output=hdfs://namenode:9000/data/output \
-D parallelism=8
# With file system specification
java -cp flink-job.jar HadoopFlinkJob \
-fs hdfs://namenode:9000 \
-D input=/data/input \
-D output=/data/output
# With custom properties
java -cp flink-job.jar HadoopFlinkJob \
-D input=hdfs://namenode:9000/data/input \
-D hadoop.io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec \
-D hadoop.mapreduce.job.queuename=production \
-D custom.batch.size=1000import org.apache.flink.hadoopcompatibility.HadoopUtils
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
object ScalaHadoopJob {
def main(args: Array[String]): Unit = {
try {
// Parse Hadoop-style arguments
val params = HadoopUtils.paramsFromGenericOptionsParser(args)
// Extract parameters with default values
val inputPath = params.getRequired("input")
val outputPath = params.get("output", s"${inputPath}_output")
val parallelism = params.getInt("parallelism", 4)
// Configure Flink environment
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)
env.getConfig.setGlobalJobParameters(params)
// Log configuration
println(s"Input: $inputPath")
println(s"Output: $outputPath")
println(s"Parallelism: $parallelism")
// Run job with configuration
runJob(env, params)
} catch {
case e: IOException =>
Console.err.println(s"Failed to parse arguments: ${e.getMessage}")
printUsage()
sys.exit(1)
}
}
def runJob(env: ExecutionEnvironment, params: ParameterTool): Unit = {
// Job implementation using parsed parameters
val inputPath = params.getRequired("input")
val outputPath = params.get("output")
// Create input with configuration
val input = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
inputPath
)
)
// Process and output
val result = input
.map(_._2.toString)
.filter(_.nonEmpty)
.map((_, 1))
.groupBy(0)
.sum(1)
// Write output if specified
if (params.has("output")) {
// Configure and write output
result.writeAsText(outputPath)
} else {
result.print()
}
env.execute("Scala Hadoop Integration Job")
}
def printUsage(): Unit = {
println("Usage: ScalaHadoopJob -D input=<path> [options]")
println("Options:")
println(" -D input=<path> Input data path (required)")
println(" -D output=<path> Output data path (optional)")
println(" -D parallelism=<int> Job parallelism (default: 4)")
}
}import org.apache.hadoop.conf.Configuration;
import java.io.File;
public class ConfigFileIntegration {
public static void main(String[] args) throws Exception {
// Parse command line arguments
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
// Load Hadoop configuration files if specified
Configuration hadoopConf = loadHadoopConfiguration(params);
// Convert to JobConf for use with Flink-Hadoop integration
JobConf jobConf = new JobConf(hadoopConf);
// Override with command-line parameters
applyParameterOverrides(jobConf, params);
// Use in Flink job
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
HadoopInputs.createHadoopInput(
new TextInputFormat(),
LongWritable.class,
Text.class,
jobConf
)
);
// Process data...
}
private static Configuration loadHadoopConfiguration(ParameterTool params) {
Configuration conf = new Configuration();
// Load core Hadoop configuration files
String hadoopConfDir = params.get("hadoop.conf.dir",
System.getenv("HADOOP_CONF_DIR"));
if (hadoopConfDir != null) {
File confDir = new File(hadoopConfDir);
if (confDir.exists() && confDir.isDirectory()) {
// Load standard Hadoop configuration files
loadConfigFile(conf, new File(confDir, "core-site.xml"));
loadConfigFile(conf, new File(confDir, "hdfs-site.xml"));
loadConfigFile(conf, new File(confDir, "mapred-site.xml"));
loadConfigFile(conf, new File(confDir, "yarn-site.xml"));
}
}
// Load additional config files specified via command line
String[] configFiles = params.get("conf", "").split(",");
for (String configFile : configFiles) {
if (!configFile.trim().isEmpty()) {
loadConfigFile(conf, new File(configFile.trim()));
}
}
return conf;
}
private static void loadConfigFile(Configuration conf, File configFile) {
if (configFile.exists() && configFile.isFile()) {
try {
conf.addResource(configFile.toURI().toURL());
System.out.println("Loaded configuration from: " + configFile.getAbsolutePath());
} catch (Exception e) {
System.err.println("Failed to load config file " + configFile + ": " + e.getMessage());
}
}
}
private static void applyParameterOverrides(JobConf jobConf, ParameterTool params) {
// Apply command-line parameter overrides
for (String key : params.getProperties().stringPropertyNames()) {
String value = params.get(key);
// Skip Flink-specific parameters
if (!key.startsWith("flink.")) {
jobConf.set(key, value);
System.out.println("Override: " + key + " = " + value);
}
}
}
}public class RobustConfigurationHandling {
public static ParameterTool parseArgumentsSafely(String[] args) {
try {
return HadoopUtils.paramsFromGenericOptionsParser(args);
} catch (IOException e) {
System.err.println("Error parsing command line arguments: " + e.getMessage());
System.err.println("This may be due to:");
System.err.println(" - Invalid argument format");
System.err.println(" - Missing configuration files");
System.err.println(" - Insufficient permissions");
// Provide fallback parameters
return ParameterTool.fromArgs(args);
} catch (IllegalArgumentException e) {
System.err.println("Invalid arguments provided: " + e.getMessage());
printUsage();
throw e;
}
}
private static void printUsage() {
System.out.println("Supported argument formats:");
System.out.println(" -D key=value Set configuration property");
System.out.println(" -conf <file> Load configuration from file");
System.out.println(" -fs <filesystem> Set default filesystem");
System.out.println(" -jt <jobtracker> Set job tracker address");
System.out.println(" -files <files> Specify files to be copied");
System.out.println(" -archives <archives> Specify archives to be copied");
}
}public static void validateParameters(ParameterTool params) throws IllegalArgumentException {
// Check required parameters
String[] requiredParams = {"input"};
for (String param : requiredParams) {
if (!params.has(param)) {
throw new IllegalArgumentException("Missing required parameter: " + param);
}
}
// Validate parameter values
if (params.has("parallelism")) {
int parallelism = params.getInt("parallelism");
if (parallelism <= 0) {
throw new IllegalArgumentException("Parallelism must be positive, got: " + parallelism);
}
}
// Validate paths
String inputPath = params.get("input");
if (!inputPath.startsWith("hdfs://") && !inputPath.startsWith("file://") && !inputPath.startsWith("/")) {
throw new IllegalArgumentException("Invalid input path format: " + inputPath);
}
}public static void printConfiguration(ParameterTool params) {
System.out.println("=== Job Configuration ===");
// Print all parameters sorted by key
params.getProperties().stringPropertyNames().stream()
.sorted()
.forEach(key -> {
String value = params.get(key);
// Mask sensitive values
if (key.toLowerCase().contains("password") || key.toLowerCase().contains("secret")) {
value = "***";
}
System.out.println(String.format(" %-30s: %s", key, value));
});
System.out.println("========================");
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11