0
# Input Format Integration
1
2
The Input Format Integration capability provides comprehensive support for using Hadoop InputFormats within Flink applications. This enables reading data from various Hadoop-compatible sources including HDFS files, HBase tables, and custom data sources.
3
4
## Overview
5
6
Flink's Hadoop compatibility layer wraps Hadoop InputFormats to work seamlessly with Flink's DataSet API. The integration supports both legacy MapRed API and modern MapReduce API, automatically converting Hadoop key-value pairs to Flink Tuple2 objects or Scala tuples.
7
8
## HadoopInputs Utility Class (Java)
9
10
The primary entry point for creating Hadoop InputFormat wrappers in Java.
11
12
### MapRed API Methods
13
14
```java { .api }
15
// Read Hadoop FileInputFormat with custom JobConf
16
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
17
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
18
Class<K> key,
19
Class<V> value,
20
String inputPath,
21
JobConf job);
22
23
// Read Hadoop FileInputFormat with default JobConf
24
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
25
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
26
Class<K> key,
27
Class<V> value,
28
String inputPath);
29
30
// Read Hadoop SequenceFile
31
public static <K, V> HadoopInputFormat<K, V> readSequenceFile(
32
Class<K> key,
33
Class<V> value,
34
String inputPath) throws IOException;
35
36
// Create wrapper for any Hadoop InputFormat
37
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
38
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
39
Class<K> key,
40
Class<V> value,
41
JobConf job);
42
```
43
44
### MapReduce API Methods
45
46
```java { .api }
47
// Read Hadoop FileInputFormat with custom Job
48
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
49
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
50
Class<K> key,
51
Class<V> value,
52
String inputPath,
53
Job job) throws IOException;
54
55
// Read Hadoop FileInputFormat with default Job
56
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
57
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
58
Class<K> key,
59
Class<V> value,
60
String inputPath) throws IOException;
61
62
// Create wrapper for any MapReduce InputFormat
63
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
64
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
65
Class<K> key,
66
Class<V> value,
67
Job job);
68
```
69
70
## HadoopInputFormat Classes
71
72
### MapRed HadoopInputFormat
73
74
```java { .api }
75
@Public
76
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
77
implements ResultTypeQueryable<Tuple2<K, V>> {
78
79
// Constructor with JobConf
80
public HadoopInputFormat(
81
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
82
Class<K> key,
83
Class<V> value,
84
JobConf job);
85
86
// Constructor with default JobConf
87
public HadoopInputFormat(
88
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
89
Class<K> key,
90
Class<V> value);
91
92
// Read next record from input
93
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
94
95
// Get type information for produced tuples
96
public TypeInformation<Tuple2<K, V>> getProducedType();
97
}
98
```
99
100
### MapReduce HadoopInputFormat
101
102
```java { .api }
103
@Public
104
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
105
implements ResultTypeQueryable<Tuple2<K, V>> {
106
107
// Constructor with Job
108
public HadoopInputFormat(
109
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
110
Class<K> key,
111
Class<V> value,
112
Job job);
113
114
// Constructor with default Job
115
public HadoopInputFormat(
116
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
117
Class<K> key,
118
Class<V> value) throws IOException;
119
120
// Read next record from input
121
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
122
123
// Get type information for produced tuples
124
public TypeInformation<Tuple2<K, V>> getProducedType();
125
}
126
```
127
128
## Scala Input Formats
129
130
### Scala HadoopInputs Object
131
132
```scala { .api }
133
object HadoopInputs {
134
// MapRed API methods
135
def readHadoopFile[K, V](
136
mapredInputFormat: MapredFileInputFormat[K, V],
137
key: Class[K],
138
value: Class[V],
139
inputPath: String,
140
job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
141
142
def readHadoopFile[K, V](
143
mapredInputFormat: MapredFileInputFormat[K, V],
144
key: Class[K],
145
value: Class[V],
146
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
147
148
def readSequenceFile[K, V](
149
key: Class[K],
150
value: Class[V],
151
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
152
153
def createHadoopInput[K, V](
154
mapredInputFormat: MapredInputFormat[K, V],
155
key: Class[K],
156
value: Class[V],
157
job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
158
159
// MapReduce API methods
160
def readHadoopFile[K, V](
161
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
162
key: Class[K],
163
value: Class[V],
164
inputPath: String,
165
job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
166
167
def createHadoopInput[K, V](
168
mapreduceInputFormat: MapreduceInputFormat[K, V],
169
key: Class[K],
170
value: Class[V],
171
job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
172
}
173
```
174
175
### Scala HadoopInputFormat Classes
176
177
```scala { .api }
178
// MapRed Scala InputFormat
179
@Public
180
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
181
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: JobConf);
182
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
183
def nextRecord(reuse: (K, V)): (K, V);
184
}
185
186
// MapReduce Scala InputFormat
187
@Public
188
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
189
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: Job);
190
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
191
def nextRecord(reuse: (K, V)): (K, V);
192
}
193
```
194
195
## Usage Examples
196
197
### Reading Text Files
198
199
```java
200
import org.apache.flink.hadoopcompatibility.HadoopInputs;
201
import org.apache.hadoop.mapred.TextInputFormat;
202
import org.apache.hadoop.io.LongWritable;
203
import org.apache.hadoop.io.Text;
204
205
// Create input format for text files
206
DataSet<Tuple2<LongWritable, Text>> textData = env.createInput(
207
HadoopInputs.readHadoopFile(
208
new TextInputFormat(),
209
LongWritable.class,
210
Text.class,
211
"hdfs://namenode:port/path/to/textfiles"
212
)
213
);
214
```
215
216
### Reading Sequence Files
217
218
```java
219
import org.apache.hadoop.io.IntWritable;
220
import org.apache.hadoop.io.Text;
221
222
// Read sequence files with specific key-value types
223
DataSet<Tuple2<IntWritable, Text>> sequenceData = env.createInput(
224
HadoopInputs.readSequenceFile(
225
IntWritable.class,
226
Text.class,
227
"hdfs://namenode:port/path/to/sequence/files"
228
)
229
);
230
```
231
232
### Using Custom InputFormats
233
234
```java
235
import org.apache.hadoop.mapred.JobConf;
236
import com.example.CustomInputFormat;
237
import com.example.CustomKey;
238
import com.example.CustomValue;
239
240
// Configure custom input format
241
JobConf conf = new JobConf();
242
conf.setInputFormat(CustomInputFormat.class);
243
conf.set("custom.property", "value");
244
245
// Create wrapper for custom InputFormat
246
DataSet<Tuple2<CustomKey, CustomValue>> customData = env.createInput(
247
HadoopInputs.createHadoopInput(
248
new CustomInputFormat(),
249
CustomKey.class,
250
CustomValue.class,
251
conf
252
)
253
);
254
```
255
256
### Scala Usage
257
258
```scala
259
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
260
import org.apache.hadoop.mapred.TextInputFormat
261
import org.apache.hadoop.io.{LongWritable, Text}
262
263
// Read text files with Scala
264
val textData: DataSet[(LongWritable, Text)] = env.createInput(
265
HadoopInputs.readHadoopFile(
266
new TextInputFormat(),
267
classOf[LongWritable],
268
classOf[Text],
269
"hdfs://namenode:port/path/to/textfiles"
270
)
271
)
272
273
// Extract just the text content
274
val lines = textData.map(_._2.toString)
275
```
276
277
## Input Split Handling
278
279
The Hadoop compatibility layer automatically handles input split distribution across Flink's parallel execution environment.
280
281
```java { .api }
282
// Input split wrapper classes (used internally)
283
@PublicEvolving
284
public class HadoopInputSplit {
285
// MapRed input split wrapper
286
// Used internally by HadoopInputFormat
287
}
288
289
@PublicEvolving
290
public class HadoopInputSplit {
291
// MapReduce input split wrapper
292
// Used internally by HadoopInputFormat
293
}
294
```
295
296
## Error Handling
297
298
Input format operations may throw the following exceptions:
299
300
- `IOException` - When reading from input fails or configuration is invalid
301
- `ClassNotFoundException` - When specified key/value classes cannot be found
302
- `IllegalArgumentException` - When invalid parameters are provided
303
- `RuntimeException` - For various Hadoop-related runtime errors
304
305
Always handle these exceptions appropriately in your Flink programs:
306
307
```java
308
try {
309
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
310
HadoopInputs.readHadoopFile(
311
new TextInputFormat(),
312
LongWritable.class,
313
Text.class,
314
inputPath
315
)
316
);
317
} catch (IOException e) {
318
// Handle input/output errors
319
logger.error("Failed to create Hadoop input: " + e.getMessage());
320
}
321
```