or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

filesystem-factory.mdfilesystem-operations.mdhadoop-utilities.mdindex.mdio-streams.mdrecoverable-writers.md

hadoop-utilities.mddocs/

0

# Hadoop Integration Utilities

1

2

The Hadoop integration utilities provide essential functions for configuration management, security handling, and version compatibility when working with Hadoop ecosystems. These utilities bridge Flink and Hadoop configurations and handle authentication and security concerns.

3

4

## Capabilities

5

6

### HadoopUtils

7

8

Main utility class providing static methods for Hadoop integration tasks.

9

10

```java { .api }

11

/**

12

* Utility class for working with Hadoop-related classes.

13

* Should only be used if Hadoop is on the classpath.

14

*/

15

public class HadoopUtils {

16

/**

17

* HDFS delegation token kind identifier.

18

*/

19

public static final Text HDFS_DELEGATION_TOKEN_KIND;

20

}

21

```

22

23

### Configuration Management

24

25

Methods for converting and managing configurations between Flink and Hadoop.

26

27

```java { .api }

28

/**

29

* Gets Hadoop configuration from Flink configuration.

30

* Converts Flink configuration properties to Hadoop configuration format.

31

* @param flinkConfiguration Flink's configuration object

32

* @return Hadoop Configuration with converted properties

33

*/

34

public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration);

35

```

36

37

**Usage Examples:**

38

39

```java

40

import org.apache.flink.configuration.Configuration;

41

import org.apache.flink.runtime.util.HadoopUtils;

42

43

// Create Flink configuration with Hadoop properties

44

Configuration flinkConfig = new Configuration();

45

flinkConfig.setString("fs.defaultFS", "hdfs://namenode:9000");

46

flinkConfig.setString("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

47

flinkConfig.setString("dfs.replication", "3");

48

flinkConfig.setString("dfs.blocksize", "134217728"); // 128MB

49

50

// Convert to Hadoop configuration

51

org.apache.hadoop.conf.Configuration hadoopConfig =

52

HadoopUtils.getHadoopConfiguration(flinkConfig);

53

54

// Hadoop config now contains the converted properties

55

System.out.println("HDFS default FS: " + hadoopConfig.get("fs.defaultFS"));

56

System.out.println("Block size: " + hadoopConfig.get("dfs.blocksize"));

57

58

// Use with Hadoop FileSystem

59

org.apache.hadoop.fs.FileSystem hadoopFs =

60

org.apache.hadoop.fs.FileSystem.get(hadoopConfig);

61

```

62

63

### Kerberos Security Management

64

65

Methods for handling Kerberos authentication and security validation.

66

67

```java { .api }

68

/**

69

* Checks if Kerberos security is enabled for the user.

70

* @param ugi UserGroupInformation to check

71

* @return true if Kerberos security is enabled

72

*/

73

public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi);

74

75

/**

76

* Validates if Kerberos credentials are valid and not expired.

77

* @param ugi UserGroupInformation to validate

78

* @param useTicketCache whether to use ticket cache for validation

79

* @return true if credentials are valid

80

*/

81

public static boolean areKerberosCredentialsValid(UserGroupInformation ugi, boolean useTicketCache);

82

83

/**

84

* Checks if the user has HDFS delegation tokens.

85

* @param ugi UserGroupInformation to check

86

* @return true if HDFS delegation tokens are present

87

*/

88

public static boolean hasHDFSDelegationToken(UserGroupInformation ugi);

89

```

90

91

**Usage Examples:**

92

93

```java

94

import org.apache.hadoop.security.UserGroupInformation;

95

import org.apache.flink.runtime.util.HadoopUtils;

96

97

// Get current user information

98

UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

99

100

// Check security configuration

101

if (HadoopUtils.isKerberosSecurityEnabled(ugi)) {

102

System.out.println("Kerberos security is enabled");

103

104

// Validate credentials

105

boolean validCredentials = HadoopUtils.areKerberosCredentialsValid(ugi, true);

106

if (validCredentials) {

107

System.out.println("Kerberos credentials are valid");

108

} else {

109

System.err.println("Kerberos credentials are invalid or expired");

110

// Handle credential renewal

111

}

112

113

// Check for delegation tokens

114

if (HadoopUtils.hasHDFSDelegationToken(ugi)) {

115

System.out.println("HDFS delegation tokens available");

116

} else {

117

System.out.println("No HDFS delegation tokens found");

118

}

119

} else {

120

System.out.println("Simple authentication (no Kerberos)");

121

}

122

123

// Login with keytab (if needed)

124

if (!HadoopUtils.areKerberosCredentialsValid(ugi, false)) {

125

UserGroupInformation.loginUserFromKeytab("user@REALM", "/path/to/user.keytab");

126

ugi = UserGroupInformation.getCurrentUser();

127

}

128

```

129

130

### Version Compatibility Checks

131

132

Methods for checking Hadoop version compatibility.

133

134

```java { .api }

135

/**

136

* Checks if the current Hadoop version meets the minimum required version.

137

* @param major minimum major version required

138

* @param minor minimum minor version required

139

* @return true if current version meets minimum requirements

140

* @throws FlinkRuntimeException if version information cannot be determined

141

*/

142

public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException;

143

144

/**

145

* Checks if the current Hadoop version is at most the specified maximum version.

146

* @param major maximum major version allowed

147

* @param minor maximum minor version allowed

148

* @return true if current version is within maximum limits

149

* @throws FlinkRuntimeException if version information cannot be determined

150

*/

151

public static boolean isMaxHadoopVersion(int major, int minor) throws FlinkRuntimeException;

152

```

153

154

**Usage Examples:**

155

156

```java

157

import org.apache.flink.util.FlinkRuntimeException;

158

import org.apache.flink.runtime.util.HadoopUtils;

159

160

try {

161

// Check minimum version requirements

162

if (HadoopUtils.isMinHadoopVersion(2, 6)) {

163

System.out.println("Hadoop version is 2.6 or higher");

164

} else {

165

System.err.println("Hadoop version is below 2.6, some features may not work");

166

}

167

168

// Check maximum version constraints

169

if (HadoopUtils.isMaxHadoopVersion(3, 2)) {

170

System.out.println("Hadoop version is 3.2 or lower");

171

} else {

172

System.out.println("Hadoop version is above 3.2, compatibility not guaranteed");

173

}

174

175

// Version range check

176

if (HadoopUtils.isMinHadoopVersion(2, 7) && HadoopUtils.isMaxHadoopVersion(3, 1)) {

177

System.out.println("Hadoop version is in supported range (2.7 - 3.1)");

178

// Enable version-specific features

179

}

180

181

} catch (FlinkRuntimeException e) {

182

System.err.println("Cannot determine Hadoop version: " + e.getMessage());

183

}

184

```

185

186

### HadoopConfigLoader

187

188

Advanced configuration loader that provides lazy loading and caching of Hadoop configurations.

189

190

```java { .api }

191

/**

192

* Lazily loads Hadoop configuration from resettable Flink's configuration.

193

* Provides efficient configuration management with caching and reset capabilities.

194

*/

195

public class HadoopConfigLoader {

196

/**

197

* Creates a configuration loader with specified parameters.

198

* @param flinkConfigPrefixes prefixes for Flink configuration keys

199

* @param mirroredConfigKeys configuration keys to mirror between systems

200

* @param hadoopConfigPrefix prefix for Hadoop configuration

201

* @param packagePrefixesToShade package prefixes to shade

202

* @param configKeysToShade configuration keys to shade

203

* @param flinkShadingPrefix Flink shading prefix

204

*/

205

public HadoopConfigLoader(String[] flinkConfigPrefixes,

206

String[][] mirroredConfigKeys,

207

String hadoopConfigPrefix,

208

Set<String> packagePrefixesToShade,

209

Set<String> configKeysToShade,

210

String flinkShadingPrefix);

211

212

/**

213

* Sets the Flink configuration and triggers reload on next access.

214

* @param config new Flink configuration

215

*/

216

public void setFlinkConfig(Configuration config);

217

218

/**

219

* Gets the current Hadoop configuration, loading it if necessary.

220

* @return loaded Hadoop Configuration object

221

*/

222

public org.apache.hadoop.conf.Configuration getOrLoadHadoopConfig();

223

}

224

```

225

226

**Usage Examples:**

227

228

```java

229

import org.apache.flink.runtime.util.HadoopConfigLoader;

230

import java.util.Set;

231

import java.util.HashSet;

232

233

// Create configuration loader

234

String[] flinkPrefixes = {"flink.hadoop."};

235

String[][] mirroredKeys = {{"fs.defaultFS", "fs.default.name"}};

236

Set<String> shadedPackages = new HashSet<>();

237

shadedPackages.add("org.apache.hadoop");

238

239

HadoopConfigLoader loader = new HadoopConfigLoader(

240

flinkPrefixes,

241

mirroredKeys,

242

"hadoop.",

243

shadedPackages,

244

new HashSet<>(),

245

"org.apache.flink.shaded."

246

);

247

248

// Set Flink configuration

249

Configuration flinkConfig = new Configuration();

250

flinkConfig.setString("flink.hadoop.fs.defaultFS", "hdfs://namenode:9000");

251

loader.setFlinkConfig(flinkConfig);

252

253

// Load Hadoop configuration (cached after first load)

254

org.apache.hadoop.conf.Configuration hadoopConfig = loader.getOrLoadHadoopConfig();

255

256

// Configuration is cached until setFlinkConfig is called again

257

org.apache.hadoop.conf.Configuration sameConfig = loader.getOrLoadHadoopConfig();

258

// Returns cached instance

259

```

260

261

### Security Token Management

262

263

Advanced token management for secure Hadoop clusters.

264

265

```java

266

import org.apache.hadoop.security.UserGroupInformation;

267

import org.apache.hadoop.security.token.Token;

268

import org.apache.hadoop.security.token.TokenIdentifier;

269

270

// Check and manage delegation tokens

271

UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

272

273

if (HadoopUtils.hasHDFSDelegationToken(ugi)) {

274

// Get all tokens

275

Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();

276

277

for (Token<? extends TokenIdentifier> token : tokens) {

278

if (token.getKind().equals(HadoopUtils.HDFS_DELEGATION_TOKEN_KIND)) {

279

System.out.println("HDFS Token: " + token.toString());

280

System.out.println("Token service: " + token.getService());

281

282

// Check token expiration

283

long expirationTime = token.decodeIdentifier().getMaxDate();

284

if (System.currentTimeMillis() > expirationTime) {

285

System.out.println("Token is expired, renewal needed");

286

}

287

}

288

}

289

}

290

```

291

292

### Configuration Integration Example

293

294

Complete example showing integration between Flink and Hadoop configurations:

295

296

```java

297

public class HadoopIntegrationExample {

298

299

public void setupHadoopIntegration() {

300

// Create Flink configuration with Hadoop properties

301

Configuration flinkConfig = new Configuration();

302

303

// Basic HDFS configuration

304

flinkConfig.setString("fs.defaultFS", "hdfs://ha-namenode:9000");

305

flinkConfig.setString("dfs.nameservices", "mycluster");

306

flinkConfig.setString("dfs.ha.namenodes.mycluster", "nn1,nn2");

307

flinkConfig.setString("dfs.namenode.rpc-address.mycluster.nn1", "namenode1:8020");

308

flinkConfig.setString("dfs.namenode.rpc-address.mycluster.nn2", "namenode2:8020");

309

310

// Security configuration

311

flinkConfig.setString("hadoop.security.authentication", "kerberos");

312

flinkConfig.setString("hadoop.security.authorization", "true");

313

314

// Performance tuning

315

flinkConfig.setString("dfs.client.read.shortcircuit", "true");

316

flinkConfig.setString("dfs.client.cache.drop.behind.writes", "true");

317

318

// Convert to Hadoop configuration

319

org.apache.hadoop.conf.Configuration hadoopConfig =

320

HadoopUtils.getHadoopConfiguration(flinkConfig);

321

322

// Validate setup

323

try {

324

if (HadoopUtils.isMinHadoopVersion(2, 6)) {

325

System.out.println("Hadoop version compatible");

326

327

// Check security

328

UserGroupInformation.setConfiguration(hadoopConfig);

329

UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

330

331

if (HadoopUtils.isKerberosSecurityEnabled(ugi)) {

332

if (HadoopUtils.areKerberosCredentialsValid(ugi, true)) {

333

System.out.println("Security setup valid");

334

} else {

335

System.err.println("Invalid Kerberos credentials");

336

}

337

}

338

339

// Create file system

340

org.apache.hadoop.fs.FileSystem fs =

341

org.apache.hadoop.fs.FileSystem.get(hadoopConfig);

342

System.out.println("Successfully connected to: " + fs.getUri());

343

344

} else {

345

throw new RuntimeException("Unsupported Hadoop version");

346

}

347

348

} catch (Exception e) {

349

System.err.println("Hadoop integration failed: " + e.getMessage());

350

}

351

}

352

}

353

```

354

355

## Types

356

357

```java { .api }

358

// Hadoop configuration classes

359

public class Configuration {

360

public String get(String name);

361

public void set(String name, String value);

362

public boolean getBoolean(String name, boolean defaultValue);

363

public int getInt(String name, int defaultValue);

364

}

365

366

// Security classes

367

public abstract class UserGroupInformation {

368

public static UserGroupInformation getCurrentUser() throws IOException;

369

public static void loginUserFromKeytab(String user, String path) throws IOException;

370

public Collection<Token<? extends TokenIdentifier>> getTokens();

371

public boolean hasKerberosCredentials();

372

}

373

374

// Token classes

375

public class Token<T extends TokenIdentifier> {

376

public Text getKind();

377

public Text getService();

378

public T decodeIdentifier() throws IOException;

379

}

380

381

public class Text {

382

public Text(String string);

383

public String toString();

384

public boolean equals(Object o);

385

}

386

387

// Exception types

388

public class FlinkRuntimeException extends RuntimeException {

389

public FlinkRuntimeException(String message);

390

public FlinkRuntimeException(String message, Throwable cause);

391

}

392

```