0
# Configuration Utilities
1
2
Utility functions for handling Hadoop configuration, command-line argument parsing, and integration between Hadoop and Flink execution environments. Provides helper methods for common configuration tasks and parameter management.
3
4
## Capabilities
5
6
### Command-Line Parameter Parsing
7
8
Utility for parsing Hadoop-style command-line arguments using GenericOptionsParser.
9
10
```java { .api }
11
/**
12
* Returns ParameterTool for the arguments parsed by GenericOptionsParser
13
* @param args Input array arguments that should be parsable by GenericOptionsParser
14
* @return A ParameterTool containing the parsed parameters
15
* @throws IOException If arguments cannot be parsed by GenericOptionsParser
16
* @see GenericOptionsParser
17
*/
18
public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
19
```
20
21
**Usage Example:**
22
23
```java
24
import org.apache.flink.hadoopcompatibility.HadoopUtils;
25
import org.apache.flink.util.ParameterTool;
26
27
public class HadoopCompatibleFlinkJob {
28
public static void main(String[] args) throws Exception {
29
// Parse Hadoop-style command line arguments
30
// Supports arguments like: -D property=value -files file1,file2 -archives archive1
31
ParameterTool parameters = HadoopUtils.paramsFromGenericOptionsParser(args);
32
33
// Access parsed parameters
34
String inputPath = parameters.get("input.path", "hdfs://default/input");
35
String outputPath = parameters.get("output.path", "hdfs://default/output");
36
int parallelism = parameters.getInt("parallelism", 1);
37
38
// Use parameters in Flink job configuration
39
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
40
env.setParallelism(parallelism);
41
42
// Create configuration from parameters
43
Configuration config = Configuration.fromMap(parameters.toMap());
44
45
System.out.println("Input path: " + inputPath);
46
System.out.println("Output path: " + outputPath);
47
System.out.println("Parallelism: " + parallelism);
48
}
49
}
50
```
51
52
### Hadoop Configuration Integration
53
54
Working with Hadoop configuration objects and integrating them with Flink.
55
56
**JobConf Integration:**
57
58
```java
59
import org.apache.hadoop.mapred.JobConf;
60
import org.apache.flink.util.ParameterTool;
61
62
// Create JobConf from parsed parameters
63
public static JobConf createJobConf(ParameterTool parameters) {
64
JobConf jobConf = new JobConf();
65
66
// Set common Hadoop properties from parameters
67
if (parameters.has("mapred.job.name")) {
68
jobConf.setJobName(parameters.get("mapred.job.name"));
69
}
70
71
if (parameters.has("mapred.reduce.tasks")) {
72
jobConf.setNumReduceTasks(parameters.getInt("mapred.reduce.tasks"));
73
}
74
75
// Copy all parameters as properties
76
for (String key : parameters.getProperties().stringPropertyNames()) {
77
jobConf.set(key, parameters.get(key));
78
}
79
80
return jobConf;
81
}
82
83
// Usage example
84
String[] args = {"-D", "mapred.job.name=MyFlinkJob", "-D", "input.path=/data/input"};
85
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
86
JobConf jobConf = createJobConf(params);
87
```
88
89
**Job (mapreduce API) Integration:**
90
91
```java
92
import org.apache.hadoop.mapreduce.Job;
93
import org.apache.flink.util.ParameterTool;
94
95
// Create Job from parsed parameters
96
public static Job createJob(ParameterTool parameters) throws IOException {
97
Job job = Job.getInstance();
98
99
// Set common mapreduce properties from parameters
100
if (parameters.has("mapreduce.job.name")) {
101
job.setJobName(parameters.get("mapreduce.job.name"));
102
}
103
104
if (parameters.has("mapreduce.job.reduces")) {
105
job.setNumReduceTasks(parameters.getInt("mapreduce.job.reduces"));
106
}
107
108
// Copy all parameters as configuration properties
109
for (String key : parameters.getProperties().stringPropertyNames()) {
110
job.getConfiguration().set(key, parameters.get(key));
111
}
112
113
return job;
114
}
115
116
// Usage example
117
String[] args = {"-D", "mapreduce.job.name=MyFlinkJob", "-D", "output.path=/data/output"};
118
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
119
Job job = createJob(params);
120
```
121
122
## Advanced Configuration Patterns
123
124
### Environment Variable Integration
125
126
Combining command-line arguments with environment variables.
127
128
```java
129
public static ParameterTool createConfigurationFromMultipleSources(String[] args) throws IOException {
130
// Parse command-line arguments
131
ParameterTool cmdLineParams = HadoopUtils.paramsFromGenericOptionsParser(args);
132
133
// Merge with environment variables
134
ParameterTool envParams = ParameterTool.fromSystemProperties();
135
136
// Merge with properties file if specified
137
ParameterTool allParams = cmdLineParams;
138
if (cmdLineParams.has("config.file")) {
139
ParameterTool fileParams = ParameterTool.fromPropertiesFile(cmdLineParams.get("config.file"));
140
allParams = fileParams.mergeWith(cmdLineParams); // Command line overrides file
141
}
142
143
// Environment variables have lowest priority
144
allParams = allParams.mergeWith(envParams);
145
146
return allParams;
147
}
148
149
// Usage
150
String[] args = {"-D", "parallelism=4", "--config.file", "app.properties"};
151
ParameterTool config = createConfigurationFromMultipleSources(args);
152
```
153
154
### Configuration Validation
155
156
Validating and processing configuration parameters.
157
158
```java
159
public static class ConfigurationValidator {
160
161
public static void validateRequiredParameters(ParameterTool params, String... requiredKeys)
162
throws IllegalArgumentException {
163
for (String key : requiredKeys) {
164
if (!params.has(key)) {
165
throw new IllegalArgumentException("Required parameter missing: " + key);
166
}
167
}
168
}
169
170
public static void validatePaths(ParameterTool params) throws IOException {
171
if (params.has("input.path")) {
172
String inputPath = params.get("input.path");
173
// Validate input path exists (for HDFS paths, would need Hadoop FileSystem)
174
if (!inputPath.startsWith("hdfs://") && !inputPath.startsWith("file://")) {
175
throw new IllegalArgumentException("Invalid input path format: " + inputPath);
176
}
177
}
178
179
if (params.has("output.path")) {
180
String outputPath = params.get("output.path");
181
// Validate output path format
182
if (!outputPath.startsWith("hdfs://") && !outputPath.startsWith("file://")) {
183
throw new IllegalArgumentException("Invalid output path format: " + outputPath);
184
}
185
}
186
}
187
188
public static int getValidatedParallelism(ParameterTool params, int defaultValue, int maxValue) {
189
int parallelism = params.getInt("parallelism", defaultValue);
190
if (parallelism < 1 || parallelism > maxValue) {
191
throw new IllegalArgumentException(
192
"Parallelism must be between 1 and " + maxValue + ", got: " + parallelism);
193
}
194
return parallelism;
195
}
196
}
197
198
// Usage example
199
public static void main(String[] args) throws Exception {
200
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
201
202
// Validate configuration
203
ConfigurationValidator.validateRequiredParameters(params, "input.path", "output.path");
204
ConfigurationValidator.validatePaths(params);
205
int parallelism = ConfigurationValidator.getValidatedParallelism(params, 1, 100);
206
207
// Use validated configuration
208
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
209
env.setParallelism(parallelism);
210
}
211
```
212
213
### Resource Management Integration
214
215
Integrating with Hadoop resource management and file systems.
216
217
```java
218
import org.apache.hadoop.fs.FileSystem;
219
import org.apache.hadoop.fs.Path;
220
import org.apache.hadoop.conf.Configuration;
221
222
public static class ResourceManager {
223
224
public static Configuration createHadoopConfiguration(ParameterTool params) {
225
Configuration conf = new Configuration();
226
227
// Set HDFS configuration from parameters
228
if (params.has("fs.defaultFS")) {
229
conf.set("fs.defaultFS", params.get("fs.defaultFS"));
230
}
231
232
if (params.has("hadoop.conf.dir")) {
233
// Add Hadoop configuration directory to classpath
234
String confDir = params.get("hadoop.conf.dir");
235
conf.addResource(new Path(confDir + "/core-site.xml"));
236
conf.addResource(new Path(confDir + "/hdfs-site.xml"));
237
conf.addResource(new Path(confDir + "/mapred-site.xml"));
238
}
239
240
// Copy all hadoop.* properties
241
for (String key : params.getProperties().stringPropertyNames()) {
242
if (key.startsWith("hadoop.") || key.startsWith("fs.") || key.startsWith("dfs.")) {
243
conf.set(key, params.get(key));
244
}
245
}
246
247
return conf;
248
}
249
250
public static boolean validateInputPath(String inputPath, Configuration hadoopConf) throws IOException {
251
FileSystem fs = FileSystem.get(hadoopConf);
252
Path path = new Path(inputPath);
253
return fs.exists(path);
254
}
255
256
public static void ensureOutputPath(String outputPath, Configuration hadoopConf) throws IOException {
257
FileSystem fs = FileSystem.get(hadoopConf);
258
Path path = new Path(outputPath);
259
260
if (fs.exists(path)) {
261
fs.delete(path, true); // Delete existing output directory
262
}
263
264
// Create parent directories if they don't exist
265
Path parent = path.getParent();
266
if (parent != null && !fs.exists(parent)) {
267
fs.mkdirs(parent);
268
}
269
}
270
}
271
```
272
273
## Common Usage Patterns
274
275
### Complete Job Configuration
276
277
Example of a complete Flink job using Hadoop utilities for configuration.
278
279
```java
280
public class CompleteHadoopFlinkJob {
281
282
public static void main(String[] args) throws Exception {
283
// Parse Hadoop-style arguments
284
ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);
285
286
// Validate required parameters
287
if (!params.has("input") || !params.has("output")) {
288
System.err.println("Usage: program -D input=<path> -D output=<path> [options]");
289
System.exit(1);
290
}
291
292
// Set up execution environment
293
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
294
env.getConfig().setGlobalJobParameters(params);
295
296
// Configure parallelism
297
if (params.has("parallelism")) {
298
env.setParallelism(params.getInt("parallelism"));
299
}
300
301
// Create Hadoop configuration
302
JobConf jobConf = new JobConf();
303
jobConf.set("mapred.input.dir", params.get("input"));
304
jobConf.set("mapred.output.dir", params.get("output"));
305
306
// Create input format
307
HadoopInputFormat<LongWritable, Text> inputFormat =
308
HadoopInputs.readHadoopFile(
309
new TextInputFormat(),
310
LongWritable.class,
311
Text.class,
312
params.get("input"),
313
jobConf
314
);
315
316
// Process data
317
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(inputFormat);
318
DataSet<Tuple2<Text, IntWritable>> result = input
319
.flatMap(new WordCountMapper())
320
.groupBy(0)
321
.sum(1);
322
323
// Create output format
324
JobConf outputJobConf = new JobConf();
325
outputJobConf.set("mapred.output.dir", params.get("output"));
326
outputJobConf.setOutputFormat(TextOutputFormat.class);
327
328
HadoopOutputFormat<Text, IntWritable> outputFormat =
329
new HadoopOutputFormat<>(
330
new TextOutputFormat<Text, IntWritable>(),
331
outputJobConf
332
);
333
334
// Write results
335
result.output(outputFormat);
336
337
// Execute job
338
env.execute("Hadoop Compatible Flink Job");
339
}
340
}
341
```
342
343
### Parameter Processing Utilities
344
345
Utility methods for common parameter processing tasks.
346
347
```java
348
public static class ParameterUtils {
349
350
public static String[] getInputPaths(ParameterTool params) {
351
String input = params.get("input", "");
352
return input.split(",");
353
}
354
355
public static String getOutputPath(ParameterTool params) {
356
return params.getRequired("output");
357
}
358
359
public static Map<String, String> getHadoopProperties(ParameterTool params) {
360
Map<String, String> hadoopProps = new HashMap<>();
361
for (String key : params.getProperties().stringPropertyNames()) {
362
if (key.startsWith("hadoop.") || key.startsWith("mapred.") ||
363
key.startsWith("mapreduce.") || key.startsWith("fs.") || key.startsWith("dfs.")) {
364
hadoopProps.put(key, params.get(key));
365
}
366
}
367
return hadoopProps;
368
}
369
370
public static JobConf createJobConfFromParams(ParameterTool params) {
371
JobConf jobConf = new JobConf();
372
getHadoopProperties(params).forEach(jobConf::set);
373
return jobConf;
374
}
375
376
public static Job createJobFromParams(ParameterTool params) throws IOException {
377
Job job = Job.getInstance();
378
getHadoopProperties(params).forEach((key, value) ->
379
job.getConfiguration().set(key, value));
380
return job;
381
}
382
}
383
```
384
385
## Key Design Patterns
386
387
### Parameter Inheritance
388
Command-line parameters parsed by GenericOptionsParser can be easily converted to ParameterTool and then propagated to both Flink and Hadoop configurations.
389
390
### Configuration Layering
391
Multiple configuration sources (command-line, files, environment variables) can be layered with appropriate precedence rules.
392
393
### Validation and Error Handling
394
Configuration validation should be performed early in the application lifecycle with clear error messages for missing or invalid parameters.
395
396
### Resource Management
397
Integration with Hadoop FileSystem and resource management APIs enables proper handling of distributed file operations and cluster resources.