0
# Hadoop Input Formats
1
2
Comprehensive integration for reading data from Hadoop InputFormats in Flink applications. Supports both legacy mapred API and newer mapreduce API with automatic key-value pair conversion to Flink Tuple2 objects.
3
4
## Capabilities
5
6
### File Input Format Reading (mapred API)
7
8
Creates Flink InputFormat wrappers for Hadoop FileInputFormats using the legacy mapred API.
9
10
```java { .api }
11
/**
12
* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapred API)
13
* @param mapredInputFormat The Hadoop FileInputFormat to wrap
14
* @param key The class of the key type
15
* @param value The class of the value type
16
* @param inputPath The path to read input data from
17
* @param job JobConf configuration for the Hadoop job
18
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat
19
*/
20
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
21
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
22
Class<K> key,
23
Class<V> value,
24
String inputPath,
25
JobConf job
26
);
27
28
/**
29
* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapred API) with default JobConf
30
* @param mapredInputFormat The Hadoop FileInputFormat to wrap
31
* @param key The class of the key type
32
* @param value The class of the value type
33
* @param inputPath The path to read input data from
34
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat
35
*/
36
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
37
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
38
Class<K> key,
39
Class<V> value,
40
String inputPath
41
);
42
```
43
44
**Usage Example:**
45
46
```java
47
import org.apache.flink.hadoopcompatibility.HadoopInputs;
48
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
49
import org.apache.hadoop.mapred.TextInputFormat;
50
import org.apache.hadoop.mapred.JobConf;
51
import org.apache.hadoop.io.LongWritable;
52
import org.apache.hadoop.io.Text;
53
54
// Reading text files using TextInputFormat
55
HadoopInputFormat<LongWritable, Text> textInput =
56
HadoopInputs.readHadoopFile(
57
new TextInputFormat(),
58
LongWritable.class,
59
Text.class,
60
"hdfs://data/input.txt"
61
);
62
63
// With custom JobConf
64
JobConf jobConf = new JobConf();
65
jobConf.set("mapreduce.input.fileinputformat.split.minsize", "1048576");
66
67
HadoopInputFormat<LongWritable, Text> configuredInput =
68
HadoopInputs.readHadoopFile(
69
new TextInputFormat(),
70
LongWritable.class,
71
Text.class,
72
"hdfs://data/input.txt",
73
jobConf
74
);
75
```
76
77
### File Input Format Reading (mapreduce API)
78
79
Creates Flink InputFormat wrappers for Hadoop FileInputFormats using the newer mapreduce API.
80
81
```java { .api }
82
/**
83
* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapreduce API)
84
* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
85
* @param key The class of the key type
86
* @param value The class of the value type
87
* @param inputPath The path to read input data from
88
* @param job Job configuration for the Hadoop job
89
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat
90
* @throws IOException if the Job configuration cannot be processed
91
*/
92
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
93
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
94
Class<K> key,
95
Class<V> value,
96
String inputPath,
97
Job job
98
) throws IOException;
99
100
/**
101
* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapreduce API) with default Job
102
* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
103
* @param key The class of the key type
104
* @param value The class of the value type
105
* @param inputPath The path to read input data from
106
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat
107
* @throws IOException if the Job configuration cannot be created
108
*/
109
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
110
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
111
Class<K> key,
112
Class<V> value,
113
String inputPath
114
) throws IOException;
115
```
116
117
**Usage Example:**
118
119
```java
120
import org.apache.flink.hadoopcompatibility.HadoopInputs;
121
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
122
import org.apache.hadoop.mapreduce.Job;
123
import org.apache.hadoop.io.LongWritable;
124
import org.apache.hadoop.io.Text;
125
126
// Reading text files using mapreduce API
127
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> mapreduceInput =
128
HadoopInputs.readHadoopFile(
129
new TextInputFormat(),
130
LongWritable.class,
131
Text.class,
132
"hdfs://data/input.txt"
133
);
134
135
// With custom Job configuration
136
Job job = Job.getInstance();
137
job.getConfiguration().set("mapreduce.input.textinputformat.record.delimiter", "\n");
138
139
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> configuredMapreduceInput =
140
HadoopInputs.readHadoopFile(
141
new TextInputFormat(),
142
LongWritable.class,
143
Text.class,
144
"hdfs://data/input.txt",
145
job
146
);
147
```
148
149
### Sequence File Reading
150
151
Specialized method for reading Hadoop sequence files with automatic format configuration.
152
153
```java { .api }
154
/**
155
* Creates a Flink InputFormat to read a Hadoop sequence file for the given key and value classes
156
* @param key The class of the key type
157
* @param value The class of the value type
158
* @param inputPath The path to the sequence file
159
* @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat
160
* @throws IOException if the sequence file cannot be accessed
161
*/
162
public static <K, V> HadoopInputFormat<K, V> readSequenceFile(
163
Class<K> key,
164
Class<V> value,
165
String inputPath
166
) throws IOException;
167
```
168
169
**Usage Example:**
170
171
```java
172
import org.apache.flink.hadoopcompatibility.HadoopInputs;
173
import org.apache.hadoop.io.IntWritable;
174
import org.apache.hadoop.io.Text;
175
176
// Reading sequence files
177
HadoopInputFormat<IntWritable, Text> sequenceInput =
178
HadoopInputs.readSequenceFile(
179
IntWritable.class,
180
Text.class,
181
"hdfs://data/sequence.seq"
182
);
183
```
184
185
### Generic Input Format Creation (mapred API)
186
187
Creates Flink InputFormat wrappers for any Hadoop InputFormat using the mapred API.
188
189
```java { .api }
190
/**
191
* Creates a Flink InputFormat that wraps the given Hadoop InputFormat (mapred API)
192
* @param mapredInputFormat The Hadoop InputFormat to wrap
193
* @param key The class of the key type
194
* @param value The class of the value type
195
* @param job JobConf configuration for the Hadoop job
196
* @return A Flink InputFormat that wraps the Hadoop InputFormat
197
*/
198
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
199
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
200
Class<K> key,
201
Class<V> value,
202
JobConf job
203
);
204
```
205
206
**Usage Example:**
207
208
```java
209
import org.apache.flink.hadoopcompatibility.HadoopInputs;
210
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
211
import org.apache.hadoop.mapred.JobConf;
212
import org.apache.hadoop.io.Text;
213
214
// Using KeyValueTextInputFormat
215
JobConf jobConf = new JobConf();
216
jobConf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
217
218
HadoopInputFormat<Text, Text> keyValueInput =
219
HadoopInputs.createHadoopInput(
220
new KeyValueTextInputFormat(),
221
Text.class,
222
Text.class,
223
jobConf
224
);
225
```
226
227
### Generic Input Format Creation (mapreduce API)
228
229
Creates Flink InputFormat wrappers for any Hadoop InputFormat using the mapreduce API.
230
231
```java { .api }
232
/**
233
* Creates a Flink InputFormat that wraps the given Hadoop InputFormat (mapreduce API)
234
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
235
* @param key The class of the key type
236
* @param value The class of the value type
237
* @param job Job configuration for the Hadoop job
238
* @return A Flink InputFormat that wraps the Hadoop InputFormat
239
*/
240
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
241
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
242
Class<K> key,
243
Class<V> value,
244
Job job
245
);
246
```
247
248
**Usage Example:**
249
250
```java
251
import org.apache.flink.hadoopcompatibility.HadoopInputs;
252
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
253
import org.apache.hadoop.mapreduce.Job;
254
import org.apache.hadoop.io.Text;
255
256
// Using mapreduce KeyValueTextInputFormat
257
Job job = Job.getInstance();
258
job.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
259
260
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<Text, Text> mapreduceKeyValueInput =
261
HadoopInputs.createHadoopInput(
262
new KeyValueTextInputFormat(),
263
Text.class,
264
Text.class,
265
job
266
);
267
```
268
269
## Input Format Classes
270
271
### HadoopInputFormat (mapred API)
272
273
Wrapper class for Hadoop InputFormats using the mapred API.
274
275
```java { .api }
276
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
277
implements ResultTypeQueryable<Tuple2<K, V>> {
278
279
/**
280
* Constructor with full configuration
281
* @param mapredInputFormat The Hadoop InputFormat to wrap
282
* @param key The class of the key type
283
* @param value The class of the value type
284
* @param job JobConf configuration
285
*/
286
public HadoopInputFormat(
287
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
288
Class<K> key,
289
Class<V> value,
290
JobConf job
291
);
292
293
/**
294
* Constructor with default JobConf
295
* @param mapredInputFormat The Hadoop InputFormat to wrap
296
* @param key The class of the key type
297
* @param value The class of the value type
298
*/
299
public HadoopInputFormat(
300
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
301
Class<K> key,
302
Class<V> value
303
);
304
305
/**
306
* Read the next record from the Hadoop InputFormat
307
* @param record Reusable record object
308
* @return The next record as a Tuple2
309
* @throws IOException if reading fails
310
*/
311
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
312
313
/**
314
* Get type information for the produced Tuple2 type
315
* @return TypeInformation for Tuple2<K, V>
316
*/
317
public TypeInformation<Tuple2<K, V>> getProducedType();
318
}
319
```
320
321
### HadoopInputFormat (mapreduce API)
322
323
Wrapper class for Hadoop InputFormats using the mapreduce API.
324
325
```java { .api }
326
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
327
implements ResultTypeQueryable<Tuple2<K, V>> {
328
329
/**
330
* Constructor with full configuration
331
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
332
* @param key The class of the key type
333
* @param value The class of the value type
334
* @param job Job configuration
335
*/
336
public HadoopInputFormat(
337
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
338
Class<K> key,
339
Class<V> value,
340
Job job
341
);
342
343
/**
344
* Constructor with default Job configuration
345
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
346
* @param key The class of the key type
347
* @param value The class of the value type
348
* @throws IOException if Job configuration cannot be created
349
*/
350
public HadoopInputFormat(
351
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
352
Class<K> key,
353
Class<V> value
354
) throws IOException;
355
356
/**
357
* Read the next record from the Hadoop InputFormat
358
* @param record Reusable record object
359
* @return The next record as a Tuple2
360
* @throws IOException if reading fails
361
*/
362
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
363
364
/**
365
* Get type information for the produced Tuple2 type
366
* @return TypeInformation for Tuple2<K, V>
367
*/
368
public TypeInformation<Tuple2<K, V>> getProducedType();
369
}
370
```
371
372
## Key Design Patterns
373
374
### Type Safety
375
All InputFormat wrappers are generically typed with `<K, V>` parameters, ensuring compile-time type safety and proper type inference in Flink applications.
376
377
### Tuple2 Convention
378
All input formats produce `Tuple2<K, V>` objects where:
379
- `f0` contains the key of type K
380
- `f1` contains the value of type V
381
382
### Configuration Flexibility
383
Both JobConf (mapred) and Job (mapreduce) configuration objects are supported, with convenient overloads providing default configurations when not specified.
384
385
### Exception Handling
386
IOException is thrown for I/O operations, maintaining consistency with Hadoop's exception handling patterns.