0
# Input and Output Formats
1
2
Comprehensive wrapper classes that enable Hadoop InputFormats and OutputFormats to work seamlessly with Flink DataSets. The library provides complete support for both the legacy MapRed API (org.apache.hadoop.mapred) and the modern MapReduce API (org.apache.hadoop.mapreduce).
3
4
## Capabilities
5
6
### HadoopInputs Factory Class
7
8
Central factory class providing convenient methods to create Flink InputFormat wrappers for various Hadoop InputFormat types.
9
10
```java { .api }
11
/**
12
* Utility class to use Apache Hadoop InputFormats with Apache Flink
13
*/
14
public class HadoopInputs {
15
16
/**
17
* Creates a Flink InputFormat wrapper for Hadoop mapred FileInputFormat with JobConf
18
* @param mapredInputFormat The Hadoop FileInputFormat to wrap
19
* @param key The key class type
20
* @param value The value class type
21
* @param inputPath Path to input files
22
* @param job JobConf for Hadoop configuration
23
* @return HadoopInputFormat wrapper for use with Flink
24
*/
25
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
26
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
27
Class<K> key, Class<V> value, String inputPath, JobConf job
28
);
29
30
/**
31
* Creates a Flink InputFormat wrapper for Hadoop mapred FileInputFormat with default JobConf
32
* @param mapredInputFormat The Hadoop FileInputFormat to wrap
33
* @param key The key class type
34
* @param value The value class type
35
* @param inputPath Path to input files
36
* @return HadoopInputFormat wrapper for use with Flink
37
*/
38
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
39
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
40
Class<K> key, Class<V> value, String inputPath
41
);
42
43
/**
44
* Creates a Flink InputFormat for reading Hadoop sequence files
45
* @param key The key class type
46
* @param value The value class type
47
* @param inputPath Path to sequence files
48
* @return HadoopInputFormat wrapper for reading sequence files
49
* @throws IOException if sequence file access fails
50
*/
51
public static <K, V> HadoopInputFormat<K, V> readSequenceFile(
52
Class<K> key, Class<V> value, String inputPath
53
) throws IOException;
54
55
/**
56
* Creates a Flink InputFormat wrapper for any Hadoop mapred InputFormat
57
* @param mapredInputFormat The Hadoop InputFormat to wrap
58
* @param key The key class type
59
* @param value The value class type
60
* @param job JobConf for Hadoop configuration
61
* @return HadoopInputFormat wrapper for use with Flink
62
*/
63
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
64
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
65
Class<K> key, Class<V> value, JobConf job
66
);
67
68
/**
69
* Creates a Flink InputFormat wrapper for Hadoop mapreduce FileInputFormat with Job
70
* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
71
* @param key The key class type
72
* @param value The value class type
73
* @param inputPath Path to input files
74
* @param job Job for Hadoop configuration
75
* @return HadoopInputFormat wrapper for use with Flink
76
* @throws IOException if file system access fails
77
*/
78
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
79
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
80
Class<K> key, Class<V> value, String inputPath, Job job
81
) throws IOException;
82
83
/**
84
* Creates a Flink InputFormat wrapper for Hadoop mapreduce FileInputFormat with default Job
85
* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
86
* @param key The key class type
87
* @param value The value class type
88
* @param inputPath Path to input files
89
* @return HadoopInputFormat wrapper for use with Flink
90
* @throws IOException if file system access fails
91
*/
92
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
93
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
94
Class<K> key, Class<V> value, String inputPath
95
) throws IOException;
96
97
/**
98
* Creates a Flink InputFormat wrapper for any Hadoop mapreduce InputFormat
99
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
100
* @param key The key class type
101
* @param value The value class type
102
* @param job Job for Hadoop configuration
103
* @return HadoopInputFormat wrapper for use with Flink
104
*/
105
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
106
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
107
Class<K> key, Class<V> value, Job job
108
);
109
}
110
```
111
112
**Usage Examples:**
113
114
```java
115
import org.apache.flink.hadoopcompatibility.HadoopInputs;
116
import org.apache.flink.api.java.ExecutionEnvironment;
117
import org.apache.hadoop.io.LongWritable;
118
import org.apache.hadoop.io.Text;
119
import org.apache.hadoop.mapred.TextInputFormat;
120
import org.apache.hadoop.mapred.JobConf;
121
122
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
123
124
// Read text files using MapRed API
125
DataSet<Tuple2<LongWritable, Text>> textData = env.createInput(
126
HadoopInputs.readHadoopFile(
127
new TextInputFormat(),
128
LongWritable.class,
129
Text.class,
130
"hdfs://path/to/input/*.txt"
131
)
132
);
133
134
// Read sequence files
135
DataSet<Tuple2<Text, IntWritable>> sequenceData = env.createInput(
136
HadoopInputs.readSequenceFile(
137
Text.class,
138
IntWritable.class,
139
"hdfs://path/to/sequence/files"
140
)
141
);
142
143
// Custom InputFormat with configuration
144
JobConf conf = new JobConf();
145
conf.set("custom.property", "value");
146
DataSet<Tuple2<Text, MyWritable>> customData = env.createInput(
147
HadoopInputs.createHadoopInput(
148
new MyCustomInputFormat(),
149
Text.class,
150
MyWritable.class,
151
conf
152
)
153
);
154
```
155
156
### MapReduce API InputFormat
157
158
InputFormat implementation for the modern Hadoop MapReduce API (org.apache.hadoop.mapreduce).
159
160
```java { .api }
161
/**
162
* InputFormat implementation allowing to use Hadoop (mapreduce) InputFormats with Flink
163
* @param <K> The key type
164
* @param <V> The value type
165
*/
166
@Public
167
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, org.apache.hadoop.mapreduce.InputFormat<K, V>> {
168
169
/**
170
* Creates a HadoopInputFormat wrapper with Job configuration
171
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
172
* @param key The key class type
173
* @param value The value class type
174
* @param job Job for Hadoop configuration
175
*/
176
public HadoopInputFormat(
177
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
178
Class<K> key, Class<V> value, Job job
179
);
180
181
/**
182
* Creates a HadoopInputFormat wrapper with default configuration
183
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
184
* @param key The key class type
185
* @param value The value class type
186
* @throws IOException if default Job creation fails
187
*/
188
public HadoopInputFormat(
189
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
190
Class<K> key, Class<V> value
191
) throws IOException;
192
}
193
```
194
195
### MapReduce API OutputFormat
196
197
OutputFormat implementation for the modern Hadoop MapReduce API (org.apache.hadoop.mapreduce).
198
199
```java { .api }
200
/**
201
* OutputFormat implementation allowing to use Hadoop (mapreduce) OutputFormats with Flink
202
* @param <K> The key type
203
* @param <V> The value type
204
*/
205
@Public
206
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, org.apache.hadoop.mapreduce.OutputFormat<K, V>> {
207
208
/**
209
* Creates a HadoopOutputFormat wrapper with Job configuration
210
* @param mapreduceOutputFormat The Hadoop OutputFormat to wrap
211
* @param job Job for Hadoop configuration
212
*/
213
public HadoopOutputFormat(
214
org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat,
215
Job job
216
);
217
}
218
```
219
220
### MapRed API InputFormat
221
222
InputFormat implementation for the legacy Hadoop MapRed API (org.apache.hadoop.mapred).
223
224
```java { .api }
225
/**
226
* Wrapper for using HadoopInputFormats (mapred-variant) with Flink
227
* @param <K> The key type
228
* @param <V> The value type
229
*/
230
@Public
231
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, org.apache.hadoop.mapred.InputFormat<K, V>> {
232
233
/**
234
* Creates a HadoopInputFormat wrapper with JobConf configuration
235
* @param mapredInputFormat The Hadoop InputFormat to wrap
236
* @param key The key class type
237
* @param value The value class type
238
* @param job JobConf for Hadoop configuration
239
*/
240
public HadoopInputFormat(
241
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
242
Class<K> key, Class<V> value, JobConf job
243
);
244
245
/**
246
* Creates a HadoopInputFormat wrapper with default JobConf
247
* @param mapredInputFormat The Hadoop InputFormat to wrap
248
* @param key The key class type
249
* @param value The value class type
250
*/
251
public HadoopInputFormat(
252
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
253
Class<K> key, Class<V> value
254
);
255
}
256
```
257
258
### MapRed API OutputFormat
259
260
OutputFormat implementation for the legacy Hadoop MapRed API (org.apache.hadoop.mapred).
261
262
```java { .api }
263
/**
264
* Wrapper for using HadoopOutputFormats (mapred-variant) with Flink
265
* @param <K> The key type
266
* @param <V> The value type
267
*/
268
@Public
269
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, org.apache.hadoop.mapred.OutputFormat<K, V>> {
270
271
/**
272
* Creates a HadoopOutputFormat wrapper with JobConf configuration
273
* @param mapredOutputFormat The Hadoop OutputFormat to wrap
274
* @param job JobConf for Hadoop configuration
275
*/
276
public HadoopOutputFormat(
277
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
278
JobConf job
279
);
280
281
/**
282
* Creates a HadoopOutputFormat wrapper with custom OutputCommitter and JobConf
283
* @param mapredOutputFormat The Hadoop OutputFormat to wrap
284
* @param outputCommitterClass Custom OutputCommitter class
285
* @param job JobConf for Hadoop configuration
286
*/
287
public HadoopOutputFormat(
288
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
289
Class<OutputCommitter> outputCommitterClass, JobConf job
290
);
291
}
292
```
293
294
**Usage Examples:**
295
296
```java
297
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
298
import org.apache.hadoop.mapreduce.Job;
299
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
300
import org.apache.hadoop.io.Text;
301
import org.apache.hadoop.io.NullWritable;
302
303
// Write to text files using MapReduce API
304
Job outputJob = Job.getInstance();
305
outputJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", "\t");
306
307
DataSet<Tuple2<NullWritable, Text>> results = // ... your data
308
results.output(new HadoopOutputFormat<>(
309
new TextOutputFormat<NullWritable, Text>(),
310
outputJob
311
));
312
```
313
314
## Key Design Patterns
315
316
### Tuple2 Integration
317
All data is exposed as Flink Tuple2<K,V> objects where f0 is the key and f1 is the value, providing seamless integration with Flink's DataSet API.
318
319
### Type Safety
320
All classes are parameterized with key (K) and value (V) types, ensuring compile-time type safety and preventing runtime ClassCastExceptions.
321
322
### Configuration Support
323
Full support for both Hadoop JobConf (MapRed API) and Job (MapReduce API) configurations, allowing fine-grained control over Hadoop behavior.
324
325
### Error Handling
326
Proper IOException propagation for file system and configuration errors, with detailed error messages for troubleshooting integration issues.