0
# Hadoop Integration Utilities
1
2
The Hadoop integration utilities provide essential functions for configuration management, security handling, and version compatibility when working with Hadoop ecosystems. These utilities bridge Flink and Hadoop configurations and handle authentication and security concerns.
3
4
## Capabilities
5
6
### HadoopUtils
7
8
Main utility class providing static methods for Hadoop integration tasks.
9
10
```java { .api }
11
/**
12
* Utility class for working with Hadoop-related classes.
13
* Should only be used if Hadoop is on the classpath.
14
*/
15
public class HadoopUtils {
16
/**
17
* HDFS delegation token kind identifier.
18
*/
19
public static final Text HDFS_DELEGATION_TOKEN_KIND;
20
}
21
```
22
23
### Configuration Management
24
25
Methods for converting and managing configurations between Flink and Hadoop.
26
27
```java { .api }
28
/**
29
* Gets Hadoop configuration from Flink configuration.
30
* Converts Flink configuration properties to Hadoop configuration format.
31
* @param flinkConfiguration Flink's configuration object
32
* @return Hadoop Configuration with converted properties
33
*/
34
public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration);
35
```
36
37
**Usage Examples:**
38
39
```java
40
import org.apache.flink.configuration.Configuration;
41
import org.apache.flink.runtime.util.HadoopUtils;
42
43
// Create Flink configuration with Hadoop properties
44
Configuration flinkConfig = new Configuration();
45
flinkConfig.setString("fs.defaultFS", "hdfs://namenode:9000");
46
flinkConfig.setString("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
47
flinkConfig.setString("dfs.replication", "3");
48
flinkConfig.setString("dfs.blocksize", "134217728"); // 128MB
49
50
// Convert to Hadoop configuration
51
org.apache.hadoop.conf.Configuration hadoopConfig =
52
HadoopUtils.getHadoopConfiguration(flinkConfig);
53
54
// Hadoop config now contains the converted properties
55
System.out.println("HDFS default FS: " + hadoopConfig.get("fs.defaultFS"));
56
System.out.println("Block size: " + hadoopConfig.get("dfs.blocksize"));
57
58
// Use with Hadoop FileSystem
59
org.apache.hadoop.fs.FileSystem hadoopFs =
60
org.apache.hadoop.fs.FileSystem.get(hadoopConfig);
61
```
62
63
### Kerberos Security Management
64
65
Methods for handling Kerberos authentication and security validation.
66
67
```java { .api }
68
/**
69
* Checks if Kerberos security is enabled for the user.
70
* @param ugi UserGroupInformation to check
71
* @return true if Kerberos security is enabled
72
*/
73
public static boolean isKerberosSecurityEnabled(UserGroupInformation ugi);
74
75
/**
76
* Validates if Kerberos credentials are valid and not expired.
77
* @param ugi UserGroupInformation to validate
78
* @param useTicketCache whether to use ticket cache for validation
79
* @return true if credentials are valid
80
*/
81
public static boolean areKerberosCredentialsValid(UserGroupInformation ugi, boolean useTicketCache);
82
83
/**
84
* Checks if the user has HDFS delegation tokens.
85
* @param ugi UserGroupInformation to check
86
* @return true if HDFS delegation tokens are present
87
*/
88
public static boolean hasHDFSDelegationToken(UserGroupInformation ugi);
89
```
90
91
**Usage Examples:**
92
93
```java
94
import org.apache.hadoop.security.UserGroupInformation;
95
import org.apache.flink.runtime.util.HadoopUtils;
96
97
// Get current user information
98
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
99
100
// Check security configuration
101
if (HadoopUtils.isKerberosSecurityEnabled(ugi)) {
102
System.out.println("Kerberos security is enabled");
103
104
// Validate credentials
105
boolean validCredentials = HadoopUtils.areKerberosCredentialsValid(ugi, true);
106
if (validCredentials) {
107
System.out.println("Kerberos credentials are valid");
108
} else {
109
System.err.println("Kerberos credentials are invalid or expired");
110
// Handle credential renewal
111
}
112
113
// Check for delegation tokens
114
if (HadoopUtils.hasHDFSDelegationToken(ugi)) {
115
System.out.println("HDFS delegation tokens available");
116
} else {
117
System.out.println("No HDFS delegation tokens found");
118
}
119
} else {
120
System.out.println("Simple authentication (no Kerberos)");
121
}
122
123
// Login with keytab (if needed)
124
if (!HadoopUtils.areKerberosCredentialsValid(ugi, false)) {
125
UserGroupInformation.loginUserFromKeytab("user@REALM", "/path/to/user.keytab");
126
ugi = UserGroupInformation.getCurrentUser();
127
}
128
```
129
130
### Version Compatibility Checks
131
132
Methods for checking Hadoop version compatibility.
133
134
```java { .api }
135
/**
136
* Checks if the current Hadoop version meets the minimum required version.
137
* @param major minimum major version required
138
* @param minor minimum minor version required
139
* @return true if current version meets minimum requirements
140
* @throws FlinkRuntimeException if version information cannot be determined
141
*/
142
public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException;
143
144
/**
145
* Checks if the current Hadoop version is at most the specified maximum version.
146
* @param major maximum major version allowed
147
* @param minor maximum minor version allowed
148
* @return true if current version is within maximum limits
149
* @throws FlinkRuntimeException if version information cannot be determined
150
*/
151
public static boolean isMaxHadoopVersion(int major, int minor) throws FlinkRuntimeException;
152
```
153
154
**Usage Examples:**
155
156
```java
157
import org.apache.flink.util.FlinkRuntimeException;
158
import org.apache.flink.runtime.util.HadoopUtils;
159
160
try {
161
// Check minimum version requirements
162
if (HadoopUtils.isMinHadoopVersion(2, 6)) {
163
System.out.println("Hadoop version is 2.6 or higher");
164
} else {
165
System.err.println("Hadoop version is below 2.6, some features may not work");
166
}
167
168
// Check maximum version constraints
169
if (HadoopUtils.isMaxHadoopVersion(3, 2)) {
170
System.out.println("Hadoop version is 3.2 or lower");
171
} else {
172
System.out.println("Hadoop version is above 3.2, compatibility not guaranteed");
173
}
174
175
// Version range check
176
if (HadoopUtils.isMinHadoopVersion(2, 7) && HadoopUtils.isMaxHadoopVersion(3, 1)) {
177
System.out.println("Hadoop version is in supported range (2.7 - 3.1)");
178
// Enable version-specific features
179
}
180
181
} catch (FlinkRuntimeException e) {
182
System.err.println("Cannot determine Hadoop version: " + e.getMessage());
183
}
184
```
185
186
### HadoopConfigLoader
187
188
Advanced configuration loader that provides lazy loading and caching of Hadoop configurations.
189
190
```java { .api }
191
/**
192
* Lazily loads Hadoop configuration from resettable Flink's configuration.
193
* Provides efficient configuration management with caching and reset capabilities.
194
*/
195
public class HadoopConfigLoader {
196
/**
197
* Creates a configuration loader with specified parameters.
198
* @param flinkConfigPrefixes prefixes for Flink configuration keys
199
* @param mirroredConfigKeys configuration keys to mirror between systems
200
* @param hadoopConfigPrefix prefix for Hadoop configuration
201
* @param packagePrefixesToShade package prefixes to shade
202
* @param configKeysToShade configuration keys to shade
203
* @param flinkShadingPrefix Flink shading prefix
204
*/
205
public HadoopConfigLoader(String[] flinkConfigPrefixes,
206
String[][] mirroredConfigKeys,
207
String hadoopConfigPrefix,
208
Set<String> packagePrefixesToShade,
209
Set<String> configKeysToShade,
210
String flinkShadingPrefix);
211
212
/**
213
* Sets the Flink configuration and triggers reload on next access.
214
* @param config new Flink configuration
215
*/
216
public void setFlinkConfig(Configuration config);
217
218
/**
219
* Gets the current Hadoop configuration, loading it if necessary.
220
* @return loaded Hadoop Configuration object
221
*/
222
public org.apache.hadoop.conf.Configuration getOrLoadHadoopConfig();
223
}
224
```
225
226
**Usage Examples:**
227
228
```java
229
import org.apache.flink.runtime.util.HadoopConfigLoader;
230
import java.util.Set;
231
import java.util.HashSet;
232
233
// Create configuration loader
234
String[] flinkPrefixes = {"flink.hadoop."};
235
String[][] mirroredKeys = {{"fs.defaultFS", "fs.default.name"}};
236
Set<String> shadedPackages = new HashSet<>();
237
shadedPackages.add("org.apache.hadoop");
238
239
HadoopConfigLoader loader = new HadoopConfigLoader(
240
flinkPrefixes,
241
mirroredKeys,
242
"hadoop.",
243
shadedPackages,
244
new HashSet<>(),
245
"org.apache.flink.shaded."
246
);
247
248
// Set Flink configuration
249
Configuration flinkConfig = new Configuration();
250
flinkConfig.setString("flink.hadoop.fs.defaultFS", "hdfs://namenode:9000");
251
loader.setFlinkConfig(flinkConfig);
252
253
// Load Hadoop configuration (cached after first load)
254
org.apache.hadoop.conf.Configuration hadoopConfig = loader.getOrLoadHadoopConfig();
255
256
// Configuration is cached until setFlinkConfig is called again
257
org.apache.hadoop.conf.Configuration sameConfig = loader.getOrLoadHadoopConfig();
258
// Returns cached instance
259
```
260
261
### Security Token Management
262
263
Advanced token management for secure Hadoop clusters.
264
265
```java
266
import org.apache.hadoop.security.UserGroupInformation;
267
import org.apache.hadoop.security.token.Token;
268
import org.apache.hadoop.security.token.TokenIdentifier;
269
270
// Check and manage delegation tokens
271
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
272
273
if (HadoopUtils.hasHDFSDelegationToken(ugi)) {
274
// Get all tokens
275
Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens();
276
277
for (Token<? extends TokenIdentifier> token : tokens) {
278
if (token.getKind().equals(HadoopUtils.HDFS_DELEGATION_TOKEN_KIND)) {
279
System.out.println("HDFS Token: " + token.toString());
280
System.out.println("Token service: " + token.getService());
281
282
// Check token expiration
283
long expirationTime = token.decodeIdentifier().getMaxDate();
284
if (System.currentTimeMillis() > expirationTime) {
285
System.out.println("Token is expired, renewal needed");
286
}
287
}
288
}
289
}
290
```
291
292
### Configuration Integration Example
293
294
Complete example showing integration between Flink and Hadoop configurations:
295
296
```java
297
public class HadoopIntegrationExample {
298
299
public void setupHadoopIntegration() {
300
// Create Flink configuration with Hadoop properties
301
Configuration flinkConfig = new Configuration();
302
303
// Basic HDFS configuration
304
flinkConfig.setString("fs.defaultFS", "hdfs://ha-namenode:9000");
305
flinkConfig.setString("dfs.nameservices", "mycluster");
306
flinkConfig.setString("dfs.ha.namenodes.mycluster", "nn1,nn2");
307
flinkConfig.setString("dfs.namenode.rpc-address.mycluster.nn1", "namenode1:8020");
308
flinkConfig.setString("dfs.namenode.rpc-address.mycluster.nn2", "namenode2:8020");
309
310
// Security configuration
311
flinkConfig.setString("hadoop.security.authentication", "kerberos");
312
flinkConfig.setString("hadoop.security.authorization", "true");
313
314
// Performance tuning
315
flinkConfig.setString("dfs.client.read.shortcircuit", "true");
316
flinkConfig.setString("dfs.client.cache.drop.behind.writes", "true");
317
318
// Convert to Hadoop configuration
319
org.apache.hadoop.conf.Configuration hadoopConfig =
320
HadoopUtils.getHadoopConfiguration(flinkConfig);
321
322
// Validate setup
323
try {
324
if (HadoopUtils.isMinHadoopVersion(2, 6)) {
325
System.out.println("Hadoop version compatible");
326
327
// Check security
328
UserGroupInformation.setConfiguration(hadoopConfig);
329
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
330
331
if (HadoopUtils.isKerberosSecurityEnabled(ugi)) {
332
if (HadoopUtils.areKerberosCredentialsValid(ugi, true)) {
333
System.out.println("Security setup valid");
334
} else {
335
System.err.println("Invalid Kerberos credentials");
336
}
337
}
338
339
// Create file system
340
org.apache.hadoop.fs.FileSystem fs =
341
org.apache.hadoop.fs.FileSystem.get(hadoopConfig);
342
System.out.println("Successfully connected to: " + fs.getUri());
343
344
} else {
345
throw new RuntimeException("Unsupported Hadoop version");
346
}
347
348
} catch (Exception e) {
349
System.err.println("Hadoop integration failed: " + e.getMessage());
350
}
351
}
352
}
353
```
354
355
## Types
356
357
```java { .api }
358
// Hadoop configuration classes
359
public class Configuration {
360
public String get(String name);
361
public void set(String name, String value);
362
public boolean getBoolean(String name, boolean defaultValue);
363
public int getInt(String name, int defaultValue);
364
}
365
366
// Security classes
367
public abstract class UserGroupInformation {
368
public static UserGroupInformation getCurrentUser() throws IOException;
369
public static void loginUserFromKeytab(String user, String path) throws IOException;
370
public Collection<Token<? extends TokenIdentifier>> getTokens();
371
public boolean hasKerberosCredentials();
372
}
373
374
// Token classes
375
public class Token<T extends TokenIdentifier> {
376
public Text getKind();
377
public Text getService();
378
public T decodeIdentifier() throws IOException;
379
}
380
381
public class Text {
382
public Text(String string);
383
public String toString();
384
public boolean equals(Object o);
385
}
386
387
// Exception types
388
public class FlinkRuntimeException extends RuntimeException {
389
public FlinkRuntimeException(String message);
390
public FlinkRuntimeException(String message, Throwable cause);
391
}
392
```