or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

function-wrappers.mdindex.mdinput-output-formats.mdtype-system.mdutility-classes.md

utility-classes.mddocs/

0

# Utility Classes

1

2

Helper methods and utility functions that simplify common Hadoop integration tasks, including parameter parsing, configuration management, and common integration patterns.

3

4

## Capabilities

5

6

### HadoopUtils

7

8

Utility class providing helper methods for working with Apache Hadoop libraries, particularly focused on parameter parsing and configuration management.

9

10

```java { .api }

11

/**

12

* Utility class to work with Apache Hadoop libraries

13

*/

14

public class HadoopUtils {

15

16

/**

17

* Parses command-line arguments using Hadoop's GenericOptionsParser and returns a ParameterTool

18

*

19

* This method leverages Hadoop's GenericOptionsParser to parse command-line arguments,

20

* but only extracts -D property definitions in the form "-D key=value". Other Hadoop

21

* options like -files, -libjars, etc. are processed by GenericOptionsParser but not

22

* included in the returned ParameterTool.

23

*

24

* @param args Command-line arguments to parse

25

* @return ParameterTool containing parsed -D property definitions

26

* @throws IOException if argument parsing fails or configuration access fails

27

*/

28

public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;

29

}

30

```

31

32

**Usage Examples:**

33

34

```java

35

import org.apache.flink.hadoopcompatibility.HadoopUtils;

36

import org.apache.flink.api.java.utils.ParameterTool;

37

import org.apache.flink.api.java.ExecutionEnvironment;

38

39

public class FlinkHadoopJob {

40

public static void main(String[] args) throws Exception {

41

// Parse Hadoop-style command line arguments

42

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

43

44

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

45

env.getConfig().setGlobalJobParameters(params);

46

47

// Access Hadoop properties set via -D

48

String hadoopProperty = params.get("mapred.max.split.size", "67108864");

49

String separator = params.get("mapred.textoutputformat.separator", "\t");

50

51

// Note: params only contains -D properties. Other arguments like --input, --output

52

// would need to be parsed separately using standard Java argument parsing

53

54

// ... rest of Flink application

55

}

56

}

57

```

58

59

### Command Line Integration

60

61

The HadoopUtils.paramsFromGenericOptionsParser method supports all standard Hadoop command-line options:

62

63

```bash

64

# Example command line usage

65

java -cp flink-app.jar FlinkHadoopJob \

66

-D mapred.max.split.size=134217728 \

67

-D mapred.textoutputformat.separator="|" \

68

--input hdfs://data/input \

69

--output hdfs://data/output \

70

--parallelism 4

71

72

# Note: -files and -libjars options are processed by GenericOptionsParser

73

# but are not returned in the ParameterTool. They affect the Hadoop runtime environment.

74

```

75

76

**Supported Options:**

77

78

- **-D property=value**: Set Hadoop configuration properties (extracted into ParameterTool)

79

80

**Note**: While GenericOptionsParser processes other Hadoop options like -files, -libjars, and -archives, only -D property definitions are extracted and returned in the ParameterTool. Other options are handled by Hadoop's infrastructure but not accessible through this method's return value.

81

82

**Flink-specific Parameters:**

83

84

Standard Flink parameters are also supported and can be mixed with Hadoop options:

85

86

```java { .api }

87

// Access Hadoop -D properties (only -D options are extracted)

88

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

89

90

// Hadoop configuration properties (set via -D)

91

String splitSize = params.get("mapred.max.split.size");

92

String separator = params.get("mapred.textoutputformat.separator");

93

94

// Note: Non-D parameters like --input, --output are NOT included in the returned ParameterTool

95

// You would need to parse those separately or use a different approach

96

```

97

98

### Configuration Bridge

99

100

The utility creates a seamless bridge between Hadoop and Flink configuration systems:

101

102

```java

103

import org.apache.hadoop.conf.Configuration;

104

import org.apache.flink.api.java.utils.ParameterTool;

105

106

// Parse arguments with Hadoop GenericOptionsParser

107

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

108

109

// Extract Hadoop configuration for use with InputFormats/OutputFormats

110

Configuration hadoopConf = new Configuration();

111

for (String key : params.toMap().keySet()) {

112

if (key.startsWith("mapred.") || key.startsWith("fs.") || key.startsWith("dfs.")) {

113

hadoopConf.set(key, params.get(key));

114

}

115

}

116

117

// Use with Hadoop InputFormats

118

JobConf jobConf = new JobConf(hadoopConf);

119

HadoopInputFormat<LongWritable, Text> inputFormat = new HadoopInputFormat<>(

120

new TextInputFormat(), LongWritable.class, Text.class, jobConf

121

);

122

```

123

124

### Error Handling

125

126

HadoopUtils provides comprehensive error handling for common integration scenarios:

127

128

**Configuration Errors:**

129

```java

130

try {

131

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

132

} catch (IOException e) {

133

// Handle configuration parsing errors

134

System.err.println("Failed to parse Hadoop configuration: " + e.getMessage());

135

// Common causes:

136

// - Invalid -D property syntax

137

// - Missing required configuration files

138

// - File system access issues for -files/-libjars

139

}

140

```

141

142

**Parameter Validation:**

143

```java

144

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

145

146

// Validate required parameters

147

try {

148

String input = params.getRequired("input");

149

String output = params.getRequired("output");

150

} catch (RuntimeException e) {

151

System.err.println("Missing required parameter: " + e.getMessage());

152

printUsage();

153

System.exit(1);

154

}

155

156

// Validate parameter formats

157

try {

158

int parallelism = params.getInt("parallelism");

159

if (parallelism <= 0) {

160

throw new IllegalArgumentException("Parallelism must be positive");

161

}

162

} catch (NumberFormatException e) {

163

System.err.println("Invalid parallelism value: " + params.get("parallelism"));

164

System.exit(1);

165

}

166

```

167

168

## Best Practices

169

170

### Application Structure

171

172

Recommended pattern for Flink applications that need Hadoop integration:

173

174

```java

175

public class FlinkHadoopApplication {

176

public static void main(String[] args) throws Exception {

177

// 1. Parse arguments using HadoopUtils

178

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

179

180

// 2. Set up Flink environment

181

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

182

env.getConfig().setGlobalJobParameters(params);

183

184

// 3. Create Hadoop configuration from parsed parameters

185

Configuration hadoopConf = createHadoopConfiguration(params);

186

187

// 4. Build and execute Flink pipeline

188

buildPipeline(env, params, hadoopConf).execute("Flink Hadoop Job");

189

}

190

191

private static Configuration createHadoopConfiguration(ParameterTool params) {

192

Configuration conf = new Configuration();

193

// Extract Hadoop-specific properties

194

params.toMap().entrySet().stream()

195

.filter(entry -> isHadoopProperty(entry.getKey()))

196

.forEach(entry -> conf.set(entry.getKey(), entry.getValue()));

197

return conf;

198

}

199

200

private static boolean isHadoopProperty(String key) {

201

return key.startsWith("fs.") || key.startsWith("dfs.") ||

202

key.startsWith("mapred.") || key.startsWith("yarn.");

203

}

204

}

205

```

206

207

### Parameter Naming Conventions

208

209

Follow consistent naming patterns for command-line parameters:

210

211

```java

212

// Recommended parameter names

213

String inputPath = params.getRequired("input"); // or --input-path

214

String outputPath = params.getRequired("output"); // or --output-path

215

int parallelism = params.getInt("parallelism", 1); // or --parallelism

216

boolean verbose = params.getBoolean("verbose", false); // or --verbose

217

218

// Hadoop configuration via -D flags

219

// -D fs.defaultFS=hdfs://namenode:9000

220

// -D mapred.max.split.size=134217728

221

// -D dfs.blocksize=268435456

222

```

223

224

### Testing Integration

225

226

Utilities for testing Hadoop integration:

227

228

```java

229

@Test

230

public void testParameterParsing() throws IOException {

231

String[] args = {

232

"-D", "mapred.max.split.size=1048576",

233

"-D", "fs.defaultFS=hdfs://localhost:9000",

234

"--input", "/test/input",

235

"--output", "/test/output"

236

};

237

238

ParameterTool params = HadoopUtils.paramsFromGenericOptionsParser(args);

239

240

assertEquals("1048576", params.get("mapred.max.split.size"));

241

assertEquals("hdfs://localhost:9000", params.get("fs.defaultFS"));

242

assertEquals("/test/input", params.get("input"));

243

assertEquals("/test/output", params.get("output"));

244

}

245

```

246

247

## Performance Considerations

248

249

### Configuration Overhead

250

251

- **Parse once**: Call paramsFromGenericOptionsParser once at application startup

252

- **Cache results**: Store ParameterTool and Configuration objects for reuse

253

- **Avoid repeated parsing**: Don't re-parse arguments in loops or transformations

254

255

### Memory Usage

256

257

- **Configuration size**: Large Hadoop configurations can impact memory usage

258

- **Parameter scope**: Use global job parameters judiciously to avoid serialization overhead

259

- **String interning**: Hadoop configuration keys are often repeated, benefiting from string interning

260

261

### Distributed Cache

262

263

Proper usage of Hadoop's distributed cache features:

264

265

```bash

266

# Add configuration files to all task nodes

267

java -cp flink-app.jar FlinkHadoopJob \

268

-files hdfs://config/core-site.xml,hdfs://config/hdfs-site.xml \

269

--input hdfs://data/input \

270

--output hdfs://data/output

271

```

272

273

Access distributed cache files in your Flink application:

274

275

```java

276

// Files specified with -files are available in working directory

277

Path configFile = Paths.get("core-site.xml");

278

if (Files.exists(configFile)) {

279

// Use the configuration file

280

Configuration conf = new Configuration();

281

conf.addResource(configFile.toString());

282

}

283

```