0
# Configuration Utilities
1
2
The Configuration Utilities capability provides helper functions for Hadoop configuration management, command-line argument parsing, and seamless integration with existing Hadoop tooling and workflows.
3
4
## Overview
5
6
The configuration utilities enable smooth integration between Flink applications and existing Hadoop infrastructure by providing tools to parse Hadoop-style command-line arguments and manage Hadoop configuration objects within Flink programs.
7
8
## HadoopUtils Class
9
10
Primary utility class for Hadoop configuration management.
11
12
```java { .api }
13
@Public
14
public class HadoopUtils {
15
16
/**
17
* Returns ParameterTool for arguments parsed by GenericOptionsParser.
18
*
19
* @param args Input array arguments parsable by GenericOptionsParser
20
* @return A ParameterTool containing the parsed parameters
21
* @throws IOException If arguments cannot be parsed by GenericOptionsParser
22
*/
23
public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
24
}
25
```
26
27
## Usage Examples
28
29
### Basic Command-Line Argument Parsing
30
31
```java
32
import org.apache.flink.hadoopcompatibility.HadoopUtils;
33
import org.apache.flink.api.java.utils.ParameterTool;
34
35
public static void main(String[] args) throws Exception {
36
// Parse Hadoop-style command line arguments
37
// Supports arguments like: -D key=value, -conf config.xml, -fs hdfs://namenode:port
38
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
39
40
// Access parsed parameters
41
String inputPath = params.get("input", "hdfs://localhost:9000/input");
42
String outputPath = params.get("output", "hdfs://localhost:9000/output");
43
int parallelism = params.getInt("parallelism", 1);
44
45
// Configure Flink environment
46
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
47
env.setParallelism(parallelism);
48
49
// Use parameters in your Flink job
50
System.out.println("Input path: " + inputPath);
51
System.out.println("Output path: " + outputPath);
52
}
53
```
54
55
### Advanced Parameter Handling
56
57
```java
58
import org.apache.hadoop.util.GenericOptionsParser;
59
60
public class HadoopFlinkJob {
61
public static void main(String[] args) throws Exception {
62
try {
63
// Parse Hadoop command-line arguments
64
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
65
66
// Check for required parameters
67
if (!params.has("input")) {
68
System.err.println("Missing required parameter: input");
69
printUsage();
70
System.exit(1);
71
}
72
73
// Extract configuration parameters
74
String inputPath = params.getRequired("input");
75
String outputPath = params.get("output", inputPath + "_output");
76
boolean enableCompression = params.getBoolean("compress", false);
77
String compressionCodec = params.get("codec", "gzip");
78
int bufferSize = params.getInt("buffer.size", 65536);
79
80
// Log configuration
81
System.out.println("Job Configuration:");
82
System.out.println(" Input Path: " + inputPath);
83
System.out.println(" Output Path: " + outputPath);
84
System.out.println(" Compression: " + enableCompression);
85
if (enableCompression) {
86
System.out.println(" Codec: " + compressionCodec);
87
}
88
System.out.println(" Buffer Size: " + bufferSize);
89
90
// Run Flink job with parsed parameters
91
runFlinkJob(params);
92
93
} catch (IOException e) {
94
System.err.println("Failed to parse command line arguments: " + e.getMessage());
95
printUsage();
96
System.exit(1);
97
}
98
}
99
100
private static void printUsage() {
101
System.out.println("Usage:");
102
System.out.println(" HadoopFlinkJob -D input=<path> [-D output=<path>] [options]");
103
System.out.println();
104
System.out.println("Required parameters:");
105
System.out.println(" -D input=<path> Input data path");
106
System.out.println();
107
System.out.println("Optional parameters:");
108
System.out.println(" -D output=<path> Output data path (default: input_output)");
109
System.out.println(" -D compress=<bool> Enable compression (default: false)");
110
System.out.println(" -D codec=<string> Compression codec (default: gzip)");
111
System.out.println(" -D buffer.size=<int> Buffer size in bytes (default: 65536)");
112
System.out.println(" -D parallelism=<int> Job parallelism (default: 1)");
113
}
114
115
private static void runFlinkJob(ParameterTool params) throws Exception {
116
// Implementation of the actual Flink job
117
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
118
119
// Make parameters available to all operators
120
env.getConfig().setGlobalJobParameters(params);
121
122
// Configure based on parameters
123
if (params.has("parallelism")) {
124
env.setParallelism(params.getInt("parallelism"));
125
}
126
127
// Your Flink job logic here
128
// ...
129
130
env.execute("Hadoop-Flink Integration Job");
131
}
132
}
133
```
134
135
### Integration with Hadoop Configuration
136
137
```java
138
import org.apache.hadoop.conf.Configuration;
139
import org.apache.hadoop.mapred.JobConf;
140
141
public class ConfigurationIntegration {
142
public static void main(String[] args) throws Exception {
143
// Parse Hadoop arguments
144
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
145
146
// Create Hadoop configuration from parsed parameters
147
JobConf jobConf = createJobConf(params);
148
149
// Use configuration with Hadoop InputFormats
150
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
151
152
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
153
HadoopInputs.readHadoopFile(
154
new TextInputFormat(),
155
LongWritable.class,
156
Text.class,
157
params.getRequired("input"),
158
jobConf
159
)
160
);
161
162
// Process data and write output
163
// ...
164
}
165
166
private static JobConf createJobConf(ParameterTool params) {
167
JobConf conf = new JobConf();
168
169
// Set HDFS configuration
170
if (params.has("fs.defaultFS")) {
171
conf.set("fs.defaultFS", params.get("fs.defaultFS"));
172
}
173
174
// Set input/output formats
175
if (params.has("mapreduce.inputformat.class")) {
176
conf.set("mapreduce.inputformat.class", params.get("mapreduce.inputformat.class"));
177
}
178
179
// Set compression settings
180
if (params.getBoolean("mapred.output.compress", false)) {
181
conf.setBoolean("mapred.output.compress", true);
182
conf.set("mapred.output.compression.codec",
183
params.get("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"));
184
}
185
186
// Set custom properties
187
for (String key : params.getProperties().stringPropertyNames()) {
188
if (key.startsWith("hadoop.")) {
189
String hadoopKey = key.substring("hadoop.".length());
190
conf.set(hadoopKey, params.get(key));
191
}
192
}
193
194
return conf;
195
}
196
}
197
```
198
199
### Command-Line Examples
200
201
The utility supports standard Hadoop command-line arguments:
202
203
```bash
204
# Basic usage with input/output paths
205
java -cp flink-job.jar HadoopFlinkJob \
206
-D input=hdfs://namenode:9000/data/input \
207
-D output=hdfs://namenode:9000/data/output
208
209
# With compression settings
210
java -cp flink-job.jar HadoopFlinkJob \
211
-D input=hdfs://namenode:9000/data/input \
212
-D output=hdfs://namenode:9000/data/output \
213
-D mapred.output.compress=true \
214
-D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec
215
216
# With custom Hadoop configuration
217
java -cp flink-job.jar HadoopFlinkJob \
218
-conf /path/to/hadoop/conf/core-site.xml \
219
-conf /path/to/hadoop/conf/hdfs-site.xml \
220
-D input=hdfs://namenode:9000/data/input \
221
-D output=hdfs://namenode:9000/data/output \
222
-D parallelism=8
223
224
# With file system specification
225
java -cp flink-job.jar HadoopFlinkJob \
226
-fs hdfs://namenode:9000 \
227
-D input=/data/input \
228
-D output=/data/output
229
230
# With custom properties
231
java -cp flink-job.jar HadoopFlinkJob \
232
-D input=hdfs://namenode:9000/data/input \
233
-D hadoop.io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec \
234
-D hadoop.mapreduce.job.queuename=production \
235
-D custom.batch.size=1000
236
```
237
238
### Scala Integration
239
240
```scala
241
import org.apache.flink.hadoopcompatibility.HadoopUtils
242
import org.apache.flink.api.java.utils.ParameterTool
243
import org.apache.flink.api.scala._
244
245
object ScalaHadoopJob {
246
def main(args: Array[String]): Unit = {
247
try {
248
// Parse Hadoop-style arguments
249
val params = HadoopUtils.paramsFromGenericOptionsParser(args)
250
251
// Extract parameters with default values
252
val inputPath = params.getRequired("input")
253
val outputPath = params.get("output", s"${inputPath}_output")
254
val parallelism = params.getInt("parallelism", 4)
255
256
// Configure Flink environment
257
val env = ExecutionEnvironment.getExecutionEnvironment
258
env.setParallelism(parallelism)
259
env.getConfig.setGlobalJobParameters(params)
260
261
// Log configuration
262
println(s"Input: $inputPath")
263
println(s"Output: $outputPath")
264
println(s"Parallelism: $parallelism")
265
266
// Run job with configuration
267
runJob(env, params)
268
269
} catch {
270
case e: IOException =>
271
Console.err.println(s"Failed to parse arguments: ${e.getMessage}")
272
printUsage()
273
sys.exit(1)
274
}
275
}
276
277
def runJob(env: ExecutionEnvironment, params: ParameterTool): Unit = {
278
// Job implementation using parsed parameters
279
val inputPath = params.getRequired("input")
280
val outputPath = params.get("output")
281
282
// Create input with configuration
283
val input = env.createInput(
284
HadoopInputs.readHadoopFile(
285
new TextInputFormat(),
286
classOf[LongWritable],
287
classOf[Text],
288
inputPath
289
)
290
)
291
292
// Process and output
293
val result = input
294
.map(_._2.toString)
295
.filter(_.nonEmpty)
296
.map((_, 1))
297
.groupBy(0)
298
.sum(1)
299
300
// Write output if specified
301
if (params.has("output")) {
302
// Configure and write output
303
result.writeAsText(outputPath)
304
} else {
305
result.print()
306
}
307
308
env.execute("Scala Hadoop Integration Job")
309
}
310
311
def printUsage(): Unit = {
312
println("Usage: ScalaHadoopJob -D input=<path> [options]")
313
println("Options:")
314
println(" -D input=<path> Input data path (required)")
315
println(" -D output=<path> Output data path (optional)")
316
println(" -D parallelism=<int> Job parallelism (default: 4)")
317
}
318
}
319
```
320
321
### Configuration File Integration
322
323
```java
324
import org.apache.hadoop.conf.Configuration;
325
import java.io.File;
326
327
public class ConfigFileIntegration {
328
public static void main(String[] args) throws Exception {
329
// Parse command line arguments
330
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
331
332
// Load Hadoop configuration files if specified
333
Configuration hadoopConf = loadHadoopConfiguration(params);
334
335
// Convert to JobConf for use with Flink-Hadoop integration
336
JobConf jobConf = new JobConf(hadoopConf);
337
338
// Override with command-line parameters
339
applyParameterOverrides(jobConf, params);
340
341
// Use in Flink job
342
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
343
344
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
345
HadoopInputs.createHadoopInput(
346
new TextInputFormat(),
347
LongWritable.class,
348
Text.class,
349
jobConf
350
)
351
);
352
353
// Process data...
354
}
355
356
private static Configuration loadHadoopConfiguration(ParameterTool params) {
357
Configuration conf = new Configuration();
358
359
// Load core Hadoop configuration files
360
String hadoopConfDir = params.get("hadoop.conf.dir",
361
System.getenv("HADOOP_CONF_DIR"));
362
363
if (hadoopConfDir != null) {
364
File confDir = new File(hadoopConfDir);
365
if (confDir.exists() && confDir.isDirectory()) {
366
// Load standard Hadoop configuration files
367
loadConfigFile(conf, new File(confDir, "core-site.xml"));
368
loadConfigFile(conf, new File(confDir, "hdfs-site.xml"));
369
loadConfigFile(conf, new File(confDir, "mapred-site.xml"));
370
loadConfigFile(conf, new File(confDir, "yarn-site.xml"));
371
}
372
}
373
374
// Load additional config files specified via command line
375
String[] configFiles = params.get("conf", "").split(",");
376
for (String configFile : configFiles) {
377
if (!configFile.trim().isEmpty()) {
378
loadConfigFile(conf, new File(configFile.trim()));
379
}
380
}
381
382
return conf;
383
}
384
385
private static void loadConfigFile(Configuration conf, File configFile) {
386
if (configFile.exists() && configFile.isFile()) {
387
try {
388
conf.addResource(configFile.toURI().toURL());
389
System.out.println("Loaded configuration from: " + configFile.getAbsolutePath());
390
} catch (Exception e) {
391
System.err.println("Failed to load config file " + configFile + ": " + e.getMessage());
392
}
393
}
394
}
395
396
private static void applyParameterOverrides(JobConf jobConf, ParameterTool params) {
397
// Apply command-line parameter overrides
398
for (String key : params.getProperties().stringPropertyNames()) {
399
String value = params.get(key);
400
401
// Skip Flink-specific parameters
402
if (!key.startsWith("flink.")) {
403
jobConf.set(key, value);
404
System.out.println("Override: " + key + " = " + value);
405
}
406
}
407
}
408
}
409
```
410
411
## Error Handling
412
413
```java
414
public class RobustConfigurationHandling {
415
public static ParameterTool parseArgumentsSafely(String[] args) {
416
try {
417
return HadoopUtils.paramsFromGenericOptionsParser(args);
418
} catch (IOException e) {
419
System.err.println("Error parsing command line arguments: " + e.getMessage());
420
System.err.println("This may be due to:");
421
System.err.println(" - Invalid argument format");
422
System.err.println(" - Missing configuration files");
423
System.err.println(" - Insufficient permissions");
424
425
// Provide fallback parameters
426
return ParameterTool.fromArgs(args);
427
} catch (IllegalArgumentException e) {
428
System.err.println("Invalid arguments provided: " + e.getMessage());
429
printUsage();
430
throw e;
431
}
432
}
433
434
private static void printUsage() {
435
System.out.println("Supported argument formats:");
436
System.out.println(" -D key=value Set configuration property");
437
System.out.println(" -conf <file> Load configuration from file");
438
System.out.println(" -fs <filesystem> Set default filesystem");
439
System.out.println(" -jt <jobtracker> Set job tracker address");
440
System.out.println(" -files <files> Specify files to be copied");
441
System.out.println(" -archives <archives> Specify archives to be copied");
442
}
443
}
444
```
445
446
## Best Practices
447
448
### Parameter Validation
449
450
```java
451
public static void validateParameters(ParameterTool params) throws IllegalArgumentException {
452
// Check required parameters
453
String[] requiredParams = {"input"};
454
for (String param : requiredParams) {
455
if (!params.has(param)) {
456
throw new IllegalArgumentException("Missing required parameter: " + param);
457
}
458
}
459
460
// Validate parameter values
461
if (params.has("parallelism")) {
462
int parallelism = params.getInt("parallelism");
463
if (parallelism <= 0) {
464
throw new IllegalArgumentException("Parallelism must be positive, got: " + parallelism);
465
}
466
}
467
468
// Validate paths
469
String inputPath = params.get("input");
470
if (!inputPath.startsWith("hdfs://") && !inputPath.startsWith("file://") && !inputPath.startsWith("/")) {
471
throw new IllegalArgumentException("Invalid input path format: " + inputPath);
472
}
473
}
474
```
475
476
### Configuration Documentation
477
478
```java
479
public static void printConfiguration(ParameterTool params) {
480
System.out.println("=== Job Configuration ===");
481
482
// Print all parameters sorted by key
483
params.getProperties().stringPropertyNames().stream()
484
.sorted()
485
.forEach(key -> {
486
String value = params.get(key);
487
// Mask sensitive values
488
if (key.toLowerCase().contains("password") || key.toLowerCase().contains("secret")) {
489
value = "***";
490
}
491
System.out.println(String.format(" %-30s: %s", key, value));
492
});
493
494
System.out.println("========================");
495
}
496
```