or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdscala-api.mdtype-system.md

configuration.mddocs/

0

# Configuration Utilities

1

2

The Configuration Utilities capability provides helper functions for Hadoop configuration management, command-line argument parsing, and seamless integration with existing Hadoop tooling and workflows.

3

4

## Overview

5

6

The configuration utilities enable smooth integration between Flink applications and existing Hadoop infrastructure by providing tools to parse Hadoop-style command-line arguments and manage Hadoop configuration objects within Flink programs.

7

8

## HadoopUtils Class

9

10

Primary utility class for Hadoop configuration management.

11

12

```java { .api }

13

@Public

14

public class HadoopUtils {

15

16

/**

17

* Returns ParameterTool for arguments parsed by GenericOptionsParser.

18

*

19

* @param args Input array arguments parsable by GenericOptionsParser

20

* @return A ParameterTool containing the parsed parameters

21

* @throws IOException If arguments cannot be parsed by GenericOptionsParser

22

*/

23

public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;

24

}

25

```

26

27

## Usage Examples

28

29

### Basic Command-Line Argument Parsing

30

31

```java

32

import org.apache.flink.hadoopcompatibility.HadoopUtils;

33

import org.apache.flink.api.java.utils.ParameterTool;

34

35

public static void main(String[] args) throws Exception {

36

// Parse Hadoop-style command line arguments

37

// Supports arguments like: -D key=value, -conf config.xml, -fs hdfs://namenode:port

38

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

39

40

// Access parsed parameters

41

String inputPath = params.get("input", "hdfs://localhost:9000/input");

42

String outputPath = params.get("output", "hdfs://localhost:9000/output");

43

int parallelism = params.getInt("parallelism", 1);

44

45

// Configure Flink environment

46

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

47

env.setParallelism(parallelism);

48

49

// Use parameters in your Flink job

50

System.out.println("Input path: " + inputPath);

51

System.out.println("Output path: " + outputPath);

52

}

53

```

54

55

### Advanced Parameter Handling

56

57

```java

58

import org.apache.hadoop.util.GenericOptionsParser;

59

60

public class HadoopFlinkJob {

61

public static void main(String[] args) throws Exception {

62

try {

63

// Parse Hadoop command-line arguments

64

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

65

66

// Check for required parameters

67

if (!params.has("input")) {

68

System.err.println("Missing required parameter: input");

69

printUsage();

70

System.exit(1);

71

}

72

73

// Extract configuration parameters

74

String inputPath = params.getRequired("input");

75

String outputPath = params.get("output", inputPath + "_output");

76

boolean enableCompression = params.getBoolean("compress", false);

77

String compressionCodec = params.get("codec", "gzip");

78

int bufferSize = params.getInt("buffer.size", 65536);

79

80

// Log configuration

81

System.out.println("Job Configuration:");

82

System.out.println(" Input Path: " + inputPath);

83

System.out.println(" Output Path: " + outputPath);

84

System.out.println(" Compression: " + enableCompression);

85

if (enableCompression) {

86

System.out.println(" Codec: " + compressionCodec);

87

}

88

System.out.println(" Buffer Size: " + bufferSize);

89

90

// Run Flink job with parsed parameters

91

runFlinkJob(params);

92

93

} catch (IOException e) {

94

System.err.println("Failed to parse command line arguments: " + e.getMessage());

95

printUsage();

96

System.exit(1);

97

}

98

}

99

100

private static void printUsage() {

101

System.out.println("Usage:");

102

System.out.println(" HadoopFlinkJob -D input=<path> [-D output=<path>] [options]");

103

System.out.println();

104

System.out.println("Required parameters:");

105

System.out.println(" -D input=<path> Input data path");

106

System.out.println();

107

System.out.println("Optional parameters:");

108

System.out.println(" -D output=<path> Output data path (default: input_output)");

109

System.out.println(" -D compress=<bool> Enable compression (default: false)");

110

System.out.println(" -D codec=<string> Compression codec (default: gzip)");

111

System.out.println(" -D buffer.size=<int> Buffer size in bytes (default: 65536)");

112

System.out.println(" -D parallelism=<int> Job parallelism (default: 1)");

113

}

114

115

private static void runFlinkJob(ParameterTool params) throws Exception {

116

// Implementation of the actual Flink job

117

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

118

119

// Make parameters available to all operators

120

env.getConfig().setGlobalJobParameters(params);

121

122

// Configure based on parameters

123

if (params.has("parallelism")) {

124

env.setParallelism(params.getInt("parallelism"));

125

}

126

127

// Your Flink job logic here

128

// ...

129

130

env.execute("Hadoop-Flink Integration Job");

131

}

132

}

133

```

134

135

### Integration with Hadoop Configuration

136

137

```java

138

import org.apache.hadoop.conf.Configuration;

139

import org.apache.hadoop.mapred.JobConf;

140

141

public class ConfigurationIntegration {

142

public static void main(String[] args) throws Exception {

143

// Parse Hadoop arguments

144

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

145

146

// Create Hadoop configuration from parsed parameters

147

JobConf jobConf = createJobConf(params);

148

149

// Use configuration with Hadoop InputFormats

150

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

151

152

DataSet<Tuple2<LongWritable, Text>> input = env.createInput(

153

HadoopInputs.readHadoopFile(

154

new TextInputFormat(),

155

LongWritable.class,

156

Text.class,

157

params.getRequired("input"),

158

jobConf

159

)

160

);

161

162

// Process data and write output

163

// ...

164

}

165

166

private static JobConf createJobConf(ParameterTool params) {

167

JobConf conf = new JobConf();

168

169

// Set HDFS configuration

170

if (params.has("fs.defaultFS")) {

171

conf.set("fs.defaultFS", params.get("fs.defaultFS"));

172

}

173

174

// Set input/output formats

175

if (params.has("mapreduce.inputformat.class")) {

176

conf.set("mapreduce.inputformat.class", params.get("mapreduce.inputformat.class"));

177

}

178

179

// Set compression settings

180

if (params.getBoolean("mapred.output.compress", false)) {

181

conf.setBoolean("mapred.output.compress", true);

182

conf.set("mapred.output.compression.codec",

183

params.get("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"));

184

}

185

186

// Set custom properties

187

for (String key : params.getProperties().stringPropertyNames()) {

188

if (key.startsWith("hadoop.")) {

189

String hadoopKey = key.substring("hadoop.".length());

190

conf.set(hadoopKey, params.get(key));

191

}

192

}

193

194

return conf;

195

}

196

}

197

```

198

199

### Command-Line Examples

200

201

The utility supports standard Hadoop command-line arguments:

202

203

```bash

204

# Basic usage with input/output paths

205

java -cp flink-job.jar HadoopFlinkJob \

206

-D input=hdfs://namenode:9000/data/input \

207

-D output=hdfs://namenode:9000/data/output

208

209

# With compression settings

210

java -cp flink-job.jar HadoopFlinkJob \

211

-D input=hdfs://namenode:9000/data/input \

212

-D output=hdfs://namenode:9000/data/output \

213

-D mapred.output.compress=true \

214

-D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec

215

216

# With custom Hadoop configuration

217

java -cp flink-job.jar HadoopFlinkJob \

218

-conf /path/to/hadoop/conf/core-site.xml \

219

-conf /path/to/hadoop/conf/hdfs-site.xml \

220

-D input=hdfs://namenode:9000/data/input \

221

-D output=hdfs://namenode:9000/data/output \

222

-D parallelism=8

223

224

# With file system specification

225

java -cp flink-job.jar HadoopFlinkJob \

226

-fs hdfs://namenode:9000 \

227

-D input=/data/input \

228

-D output=/data/output

229

230

# With custom properties

231

java -cp flink-job.jar HadoopFlinkJob \

232

-D input=hdfs://namenode:9000/data/input \

233

-D hadoop.io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec \

234

-D hadoop.mapreduce.job.queuename=production \

235

-D custom.batch.size=1000

236

```

237

238

### Scala Integration

239

240

```scala

241

import org.apache.flink.hadoopcompatibility.HadoopUtils

242

import org.apache.flink.api.java.utils.ParameterTool

243

import org.apache.flink.api.scala._

244

245

object ScalaHadoopJob {

246

def main(args: Array[String]): Unit = {

247

try {

248

// Parse Hadoop-style arguments

249

val params = HadoopUtils.paramsFromGenericOptionsParser(args)

250

251

// Extract parameters with default values

252

val inputPath = params.getRequired("input")

253

val outputPath = params.get("output", s"${inputPath}_output")

254

val parallelism = params.getInt("parallelism", 4)

255

256

// Configure Flink environment

257

val env = ExecutionEnvironment.getExecutionEnvironment

258

env.setParallelism(parallelism)

259

env.getConfig.setGlobalJobParameters(params)

260

261

// Log configuration

262

println(s"Input: $inputPath")

263

println(s"Output: $outputPath")

264

println(s"Parallelism: $parallelism")

265

266

// Run job with configuration

267

runJob(env, params)

268

269

} catch {

270

case e: IOException =>

271

Console.err.println(s"Failed to parse arguments: ${e.getMessage}")

272

printUsage()

273

sys.exit(1)

274

}

275

}

276

277

def runJob(env: ExecutionEnvironment, params: ParameterTool): Unit = {

278

// Job implementation using parsed parameters

279

val inputPath = params.getRequired("input")

280

val outputPath = params.get("output")

281

282

// Create input with configuration

283

val input = env.createInput(

284

HadoopInputs.readHadoopFile(

285

new TextInputFormat(),

286

classOf[LongWritable],

287

classOf[Text],

288

inputPath

289

)

290

)

291

292

// Process and output

293

val result = input

294

.map(_._2.toString)

295

.filter(_.nonEmpty)

296

.map((_, 1))

297

.groupBy(0)

298

.sum(1)

299

300

// Write output if specified

301

if (params.has("output")) {

302

// Configure and write output

303

result.writeAsText(outputPath)

304

} else {

305

result.print()

306

}

307

308

env.execute("Scala Hadoop Integration Job")

309

}

310

311

def printUsage(): Unit = {

312

println("Usage: ScalaHadoopJob -D input=<path> [options]")

313

println("Options:")

314

println(" -D input=<path> Input data path (required)")

315

println(" -D output=<path> Output data path (optional)")

316

println(" -D parallelism=<int> Job parallelism (default: 4)")

317

}

318

}

319

```

320

321

### Configuration File Integration

322

323

```java

324

import org.apache.hadoop.conf.Configuration;

325

import java.io.File;

326

327

public class ConfigFileIntegration {

328

public static void main(String[] args) throws Exception {

329

// Parse command line arguments

330

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

331

332

// Load Hadoop configuration files if specified

333

Configuration hadoopConf = loadHadoopConfiguration(params);

334

335

// Convert to JobConf for use with Flink-Hadoop integration

336

JobConf jobConf = new JobConf(hadoopConf);

337

338

// Override with command-line parameters

339

applyParameterOverrides(jobConf, params);

340

341

// Use in Flink job

342

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

343

344

DataSet<Tuple2<LongWritable, Text>> input = env.createInput(

345

HadoopInputs.createHadoopInput(

346

new TextInputFormat(),

347

LongWritable.class,

348

Text.class,

349

jobConf

350

)

351

);

352

353

// Process data...

354

}

355

356

private static Configuration loadHadoopConfiguration(ParameterTool params) {

357

Configuration conf = new Configuration();

358

359

// Load core Hadoop configuration files

360

String hadoopConfDir = params.get("hadoop.conf.dir",

361

System.getenv("HADOOP_CONF_DIR"));

362

363

if (hadoopConfDir != null) {

364

File confDir = new File(hadoopConfDir);

365

if (confDir.exists() && confDir.isDirectory()) {

366

// Load standard Hadoop configuration files

367

loadConfigFile(conf, new File(confDir, "core-site.xml"));

368

loadConfigFile(conf, new File(confDir, "hdfs-site.xml"));

369

loadConfigFile(conf, new File(confDir, "mapred-site.xml"));

370

loadConfigFile(conf, new File(confDir, "yarn-site.xml"));

371

}

372

}

373

374

// Load additional config files specified via command line

375

String[] configFiles = params.get("conf", "").split(",");

376

for (String configFile : configFiles) {

377

if (!configFile.trim().isEmpty()) {

378

loadConfigFile(conf, new File(configFile.trim()));

379

}

380

}

381

382

return conf;

383

}

384

385

private static void loadConfigFile(Configuration conf, File configFile) {

386

if (configFile.exists() && configFile.isFile()) {

387

try {

388

conf.addResource(configFile.toURI().toURL());

389

System.out.println("Loaded configuration from: " + configFile.getAbsolutePath());

390

} catch (Exception e) {

391

System.err.println("Failed to load config file " + configFile + ": " + e.getMessage());

392

}

393

}

394

}

395

396

private static void applyParameterOverrides(JobConf jobConf, ParameterTool params) {

397

// Apply command-line parameter overrides

398

for (String key : params.getProperties().stringPropertyNames()) {

399

String value = params.get(key);

400

401

// Skip Flink-specific parameters

402

if (!key.startsWith("flink.")) {

403

jobConf.set(key, value);

404

System.out.println("Override: " + key + " = " + value);

405

}

406

}

407

}

408

}

409

```

410

411

## Error Handling

412

413

```java

414

public class RobustConfigurationHandling {

415

public static ParameterTool parseArgumentsSafely(String[] args) {

416

try {

417

return HadoopUtils.paramsFromGenericOptionsParser(args);

418

} catch (IOException e) {

419

System.err.println("Error parsing command line arguments: " + e.getMessage());

420

System.err.println("This may be due to:");

421

System.err.println(" - Invalid argument format");

422

System.err.println(" - Missing configuration files");

423

System.err.println(" - Insufficient permissions");

424

425

// Provide fallback parameters

426

return ParameterTool.fromArgs(args);

427

} catch (IllegalArgumentException e) {

428

System.err.println("Invalid arguments provided: " + e.getMessage());

429

printUsage();

430

throw e;

431

}

432

}

433

434

private static void printUsage() {

435

System.out.println("Supported argument formats:");

436

System.out.println(" -D key=value Set configuration property");

437

System.out.println(" -conf <file> Load configuration from file");

438

System.out.println(" -fs <filesystem> Set default filesystem");

439

System.out.println(" -jt <jobtracker> Set job tracker address");

440

System.out.println(" -files <files> Specify files to be copied");

441

System.out.println(" -archives <archives> Specify archives to be copied");

442

}

443

}

444

```

445

446

## Best Practices

447

448

### Parameter Validation

449

450

```java

451

public static void validateParameters(ParameterTool params) throws IllegalArgumentException {

452

// Check required parameters

453

String[] requiredParams = {"input"};

454

for (String param : requiredParams) {

455

if (!params.has(param)) {

456

throw new IllegalArgumentException("Missing required parameter: " + param);

457

}

458

}

459

460

// Validate parameter values

461

if (params.has("parallelism")) {

462

int parallelism = params.getInt("parallelism");

463

if (parallelism <= 0) {

464

throw new IllegalArgumentException("Parallelism must be positive, got: " + parallelism);

465

}

466

}

467

468

// Validate paths

469

String inputPath = params.get("input");

470

if (!inputPath.startsWith("hdfs://") && !inputPath.startsWith("file://") && !inputPath.startsWith("/")) {

471

throw new IllegalArgumentException("Invalid input path format: " + inputPath);

472

}

473

}

474

```

475

476

### Configuration Documentation

477

478

```java

479

public static void printConfiguration(ParameterTool params) {

480

System.out.println("=== Job Configuration ===");

481

482

// Print all parameters sorted by key

483

params.getProperties().stringPropertyNames().stream()

484

.sorted()

485

.forEach(key -> {

486

String value = params.get(key);

487

// Mask sensitive values

488

if (key.toLowerCase().contains("password") || key.toLowerCase().contains("secret")) {

489

value = "***";

490

}

491

System.out.println(String.format(" %-30s: %s", key, value));

492

});

493

494

System.out.println("========================");

495

}

496

```