or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdtype-system.mdutilities.md

utilities.mddocs/

0

# Configuration Utilities

1

2

Utility functions for handling Hadoop configuration, command-line argument parsing, and integration between Hadoop and Flink execution environments. Provides helper methods for common configuration tasks and parameter management.

3

4

## Capabilities

5

6

### Command-Line Parameter Parsing

7

8

Utility for parsing Hadoop-style command-line arguments using GenericOptionsParser.

9

10

```java { .api }

11

/**

12

* Returns ParameterTool for the arguments parsed by GenericOptionsParser

13

* @param args Input array arguments that should be parsable by GenericOptionsParser

14

* @return A ParameterTool containing the parsed parameters

15

* @throws IOException If arguments cannot be parsed by GenericOptionsParser

16

* @see GenericOptionsParser

17

*/

18

public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;

19

```

20

21

**Usage Example:**

22

23

```java

24

import org.apache.flink.hadoopcompatibility.HadoopUtils;

25

import org.apache.flink.util.ParameterTool;

26

27

public class HadoopCompatibleFlinkJob {

28

public static void main(String[] args) throws Exception {

29

// Parse Hadoop-style command line arguments

30

// Supports arguments like: -D property=value -files file1,file2 -archives archive1

31

ParameterTool parameters = HadoopUtils.paramsFromGenericOptionsParser(args);

32

33

// Access parsed parameters

34

String inputPath = parameters.get("input.path", "hdfs://default/input");

35

String outputPath = parameters.get("output.path", "hdfs://default/output");

36

int parallelism = parameters.getInt("parallelism", 1);

37

38

// Use parameters in Flink job configuration

39

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

40

env.setParallelism(parallelism);

41

42

// Create configuration from parameters

43

Configuration config = Configuration.fromMap(parameters.toMap());

44

45

System.out.println("Input path: " + inputPath);

46

System.out.println("Output path: " + outputPath);

47

System.out.println("Parallelism: " + parallelism);

48

}

49

}

50

```

51

52

### Hadoop Configuration Integration

53

54

Working with Hadoop configuration objects and integrating them with Flink.

55

56

**JobConf Integration:**

57

58

```java

59

import org.apache.hadoop.mapred.JobConf;

60

import org.apache.flink.util.ParameterTool;

61

62

// Create JobConf from parsed parameters

63

public static JobConf createJobConf(ParameterTool parameters) {

64

JobConf jobConf = new JobConf();

65

66

// Set common Hadoop properties from parameters

67

if (parameters.has("mapred.job.name")) {

68

jobConf.setJobName(parameters.get("mapred.job.name"));

69

}

70

71

if (parameters.has("mapred.reduce.tasks")) {

72

jobConf.setNumReduceTasks(parameters.getInt("mapred.reduce.tasks"));

73

}

74

75

// Copy all parameters as properties

76

for (String key : parameters.getProperties().stringPropertyNames()) {

77

jobConf.set(key, parameters.get(key));

78

}

79

80

return jobConf;

81

}

82

83

// Usage example

84

String[] args = {"-D", "mapred.job.name=MyFlinkJob", "-D", "input.path=/data/input"};

85

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

86

JobConf jobConf = createJobConf(params);

87

```

88

89

**Job (mapreduce API) Integration:**

90

91

```java

92

import org.apache.hadoop.mapreduce.Job;

93

import org.apache.flink.util.ParameterTool;

94

95

// Create Job from parsed parameters

96

public static Job createJob(ParameterTool parameters) throws IOException {

97

Job job = Job.getInstance();

98

99

// Set common mapreduce properties from parameters

100

if (parameters.has("mapreduce.job.name")) {

101

job.setJobName(parameters.get("mapreduce.job.name"));

102

}

103

104

if (parameters.has("mapreduce.job.reduces")) {

105

job.setNumReduceTasks(parameters.getInt("mapreduce.job.reduces"));

106

}

107

108

// Copy all parameters as configuration properties

109

for (String key : parameters.getProperties().stringPropertyNames()) {

110

job.getConfiguration().set(key, parameters.get(key));

111

}

112

113

return job;

114

}

115

116

// Usage example

117

String[] args = {"-D", "mapreduce.job.name=MyFlinkJob", "-D", "output.path=/data/output"};

118

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

119

Job job = createJob(params);

120

```

121

122

## Advanced Configuration Patterns

123

124

### Environment Variable Integration

125

126

Combining command-line arguments with environment variables.

127

128

```java

129

public static ParameterTool createConfigurationFromMultipleSources(String[] args) throws IOException {

130

// Parse command-line arguments

131

ParameterTool cmdLineParams = HadoopUtils.paramsFromGenericOptionsParser(args);

132

133

// Merge with environment variables

134

ParameterTool envParams = ParameterTool.fromSystemProperties();

135

136

// Merge with properties file if specified

137

ParameterTool allParams = cmdLineParams;

138

if (cmdLineParams.has("config.file")) {

139

ParameterTool fileParams = ParameterTool.fromPropertiesFile(cmdLineParams.get("config.file"));

140

allParams = fileParams.mergeWith(cmdLineParams); // Command line overrides file

141

}

142

143

// Environment variables have lowest priority

144

allParams = allParams.mergeWith(envParams);

145

146

return allParams;

147

}

148

149

// Usage

150

String[] args = {"-D", "parallelism=4", "--config.file", "app.properties"};

151

ParameterTool config = createConfigurationFromMultipleSources(args);

152

```

153

154

### Configuration Validation

155

156

Validating and processing configuration parameters.

157

158

```java

159

public static class ConfigurationValidator {

160

161

public static void validateRequiredParameters(ParameterTool params, String... requiredKeys)

162

throws IllegalArgumentException {

163

for (String key : requiredKeys) {

164

if (!params.has(key)) {

165

throw new IllegalArgumentException("Required parameter missing: " + key);

166

}

167

}

168

}

169

170

public static void validatePaths(ParameterTool params) throws IOException {

171

if (params.has("input.path")) {

172

String inputPath = params.get("input.path");

173

// Validate input path exists (for HDFS paths, would need Hadoop FileSystem)

174

if (!inputPath.startsWith("hdfs://") && !inputPath.startsWith("file://")) {

175

throw new IllegalArgumentException("Invalid input path format: " + inputPath);

176

}

177

}

178

179

if (params.has("output.path")) {

180

String outputPath = params.get("output.path");

181

// Validate output path format

182

if (!outputPath.startsWith("hdfs://") && !outputPath.startsWith("file://")) {

183

throw new IllegalArgumentException("Invalid output path format: " + outputPath);

184

}

185

}

186

}

187

188

public static int getValidatedParallelism(ParameterTool params, int defaultValue, int maxValue) {

189

int parallelism = params.getInt("parallelism", defaultValue);

190

if (parallelism < 1 || parallelism > maxValue) {

191

throw new IllegalArgumentException(

192

"Parallelism must be between 1 and " + maxValue + ", got: " + parallelism);

193

}

194

return parallelism;

195

}

196

}

197

198

// Usage example

199

public static void main(String[] args) throws Exception {

200

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

201

202

// Validate configuration

203

ConfigurationValidator.validateRequiredParameters(params, "input.path", "output.path");

204

ConfigurationValidator.validatePaths(params);

205

int parallelism = ConfigurationValidator.getValidatedParallelism(params, 1, 100);

206

207

// Use validated configuration

208

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

209

env.setParallelism(parallelism);

210

}

211

```

212

213

### Resource Management Integration

214

215

Integrating with Hadoop resource management and file systems.

216

217

```java

218

import org.apache.hadoop.fs.FileSystem;

219

import org.apache.hadoop.fs.Path;

220

import org.apache.hadoop.conf.Configuration;

221

222

public static class ResourceManager {

223

224

public static Configuration createHadoopConfiguration(ParameterTool params) {

225

Configuration conf = new Configuration();

226

227

// Set HDFS configuration from parameters

228

if (params.has("fs.defaultFS")) {

229

conf.set("fs.defaultFS", params.get("fs.defaultFS"));

230

}

231

232

if (params.has("hadoop.conf.dir")) {

233

// Add Hadoop configuration directory to classpath

234

String confDir = params.get("hadoop.conf.dir");

235

conf.addResource(new Path(confDir + "/core-site.xml"));

236

conf.addResource(new Path(confDir + "/hdfs-site.xml"));

237

conf.addResource(new Path(confDir + "/mapred-site.xml"));

238

}

239

240

// Copy all hadoop.* properties

241

for (String key : params.getProperties().stringPropertyNames()) {

242

if (key.startsWith("hadoop.") || key.startsWith("fs.") || key.startsWith("dfs.")) {

243

conf.set(key, params.get(key));

244

}

245

}

246

247

return conf;

248

}

249

250

public static boolean validateInputPath(String inputPath, Configuration hadoopConf) throws IOException {

251

FileSystem fs = FileSystem.get(hadoopConf);

252

Path path = new Path(inputPath);

253

return fs.exists(path);

254

}

255

256

public static void ensureOutputPath(String outputPath, Configuration hadoopConf) throws IOException {

257

FileSystem fs = FileSystem.get(hadoopConf);

258

Path path = new Path(outputPath);

259

260

if (fs.exists(path)) {

261

fs.delete(path, true); // Delete existing output directory

262

}

263

264

// Create parent directories if they don't exist

265

Path parent = path.getParent();

266

if (parent != null && !fs.exists(parent)) {

267

fs.mkdirs(parent);

268

}

269

}

270

}

271

```

272

273

## Common Usage Patterns

274

275

### Complete Job Configuration

276

277

Example of a complete Flink job using Hadoop utilities for configuration.

278

279

```java

280

public class CompleteHadoopFlinkJob {

281

282

public static void main(String[] args) throws Exception {

283

// Parse Hadoop-style arguments

284

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

285

286

// Validate required parameters

287

if (!params.has("input") || !params.has("output")) {

288

System.err.println("Usage: program -D input=<path> -D output=<path> [options]");

289

System.exit(1);

290

}

291

292

// Set up execution environment

293

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

294

env.getConfig().setGlobalJobParameters(params);

295

296

// Configure parallelism

297

if (params.has("parallelism")) {

298

env.setParallelism(params.getInt("parallelism"));

299

}

300

301

// Create Hadoop configuration

302

JobConf jobConf = new JobConf();

303

jobConf.set("mapred.input.dir", params.get("input"));

304

jobConf.set("mapred.output.dir", params.get("output"));

305

306

// Create input format

307

HadoopInputFormat<LongWritable, Text> inputFormat =

308

HadoopInputs.readHadoopFile(

309

new TextInputFormat(),

310

LongWritable.class,

311

Text.class,

312

params.get("input"),

313

jobConf

314

);

315

316

// Process data

317

DataSet<Tuple2<LongWritable, Text>> input = env.createInput(inputFormat);

318

DataSet<Tuple2<Text, IntWritable>> result = input

319

.flatMap(new WordCountMapper())

320

.groupBy(0)

321

.sum(1);

322

323

// Create output format

324

JobConf outputJobConf = new JobConf();

325

outputJobConf.set("mapred.output.dir", params.get("output"));

326

outputJobConf.setOutputFormat(TextOutputFormat.class);

327

328

HadoopOutputFormat<Text, IntWritable> outputFormat =

329

new HadoopOutputFormat<>(

330

new TextOutputFormat<Text, IntWritable>(),

331

outputJobConf

332

);

333

334

// Write results

335

result.output(outputFormat);

336

337

// Execute job

338

env.execute("Hadoop Compatible Flink Job");

339

}

340

}

341

```

342

343

### Parameter Processing Utilities

344

345

Utility methods for common parameter processing tasks.

346

347

```java

348

public static class ParameterUtils {

349

350

public static String[] getInputPaths(ParameterTool params) {

351

String input = params.get("input", "");

352

return input.split(",");

353

}

354

355

public static String getOutputPath(ParameterTool params) {

356

return params.getRequired("output");

357

}

358

359

public static Map<String, String> getHadoopProperties(ParameterTool params) {

360

Map<String, String> hadoopProps = new HashMap<>();

361

for (String key : params.getProperties().stringPropertyNames()) {

362

if (key.startsWith("hadoop.") || key.startsWith("mapred.") ||

363

key.startsWith("mapreduce.") || key.startsWith("fs.") || key.startsWith("dfs.")) {

364

hadoopProps.put(key, params.get(key));

365

}

366

}

367

return hadoopProps;

368

}

369

370

public static JobConf createJobConfFromParams(ParameterTool params) {

371

JobConf jobConf = new JobConf();

372

getHadoopProperties(params).forEach(jobConf::set);

373

return jobConf;

374

}

375

376

public static Job createJobFromParams(ParameterTool params) throws IOException {

377

Job job = Job.getInstance();

378

getHadoopProperties(params).forEach((key, value) ->

379

job.getConfiguration().set(key, value));

380

return job;

381

}

382

}

383

```

384

385

## Key Design Patterns

386

387

### Parameter Inheritance

388

Command-line parameters parsed by GenericOptionsParser can be easily converted to ParameterTool and then propagated to both Flink and Hadoop configurations.

389

390

### Configuration Layering

391

Multiple configuration sources (command-line, files, environment variables) can be layered with appropriate precedence rules.

392

393

### Validation and Error Handling

394

Configuration validation should be performed early in the application lifecycle with clear error messages for missing or invalid parameters.

395

396

### Resource Management

397

Integration with Hadoop FileSystem and resource management APIs enables proper handling of distributed file operations and cluster resources.