0
# Hadoop Output Formats
1
2
Integration for writing data to Hadoop OutputFormats from Flink applications. Supports both legacy mapred API and newer mapreduce API with automatic conversion from Flink Tuple2 objects to Hadoop key-value pairs.
3
4
## Capabilities
5
6
### Output Format Wrapper (mapred API)
7
8
Wrapper class for Hadoop OutputFormats using the legacy mapred API.
9
10
```java { .api }
11
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
12
13
/**
14
* Constructor with basic configuration
15
* @param mapredOutputFormat The Hadoop OutputFormat to wrap
16
* @param job JobConf configuration for the Hadoop job
17
*/
18
public HadoopOutputFormat(
19
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
20
JobConf job
21
);
22
23
/**
24
* Constructor with custom OutputCommitter
25
* @param mapredOutputFormat The Hadoop OutputFormat to wrap
26
* @param outputCommitterClass Class of the OutputCommitter to use
27
* @param job JobConf configuration for the Hadoop job
28
*/
29
public HadoopOutputFormat(
30
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
31
Class<OutputCommitter> outputCommitterClass,
32
JobConf job
33
);
34
35
/**
36
* Write a record to the Hadoop OutputFormat
37
* @param record The record to write as a Tuple2<K, V>
38
* @throws IOException if writing fails
39
*/
40
public void writeRecord(Tuple2<K, V> record) throws IOException;
41
}
42
```
43
44
**Usage Example:**
45
46
```java
47
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
48
import org.apache.flink.api.java.DataSet;
49
import org.apache.flink.api.java.tuple.Tuple2;
50
import org.apache.hadoop.mapred.TextOutputFormat;
51
import org.apache.hadoop.mapred.JobConf;
52
import org.apache.hadoop.io.LongWritable;
53
import org.apache.hadoop.io.Text;
54
import org.apache.hadoop.fs.Path;
55
56
// Configure JobConf for output
57
JobConf jobConf = new JobConf();
58
jobConf.set("mapred.output.dir", "hdfs://output/path");
59
jobConf.setOutputFormat(TextOutputFormat.class);
60
jobConf.setOutputKeyClass(LongWritable.class);
61
jobConf.setOutputValueClass(Text.class);
62
63
// Create Hadoop output format wrapper
64
HadoopOutputFormat<LongWritable, Text> hadoopOutput =
65
new HadoopOutputFormat<>(
66
new TextOutputFormat<LongWritable, Text>(),
67
jobConf
68
);
69
70
// Use with Flink DataSet
71
DataSet<Tuple2<LongWritable, Text>> dataset = // ... your dataset
72
dataset.output(hadoopOutput);
73
```
74
75
### Output Format Wrapper (mapreduce API)
76
77
Wrapper class for Hadoop OutputFormats using the newer mapreduce API.
78
79
```java { .api }
80
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
81
82
/**
83
* Constructor for mapreduce OutputFormat
84
* @param mapreduceOutputFormat The Hadoop OutputFormat to wrap
85
* @param job Job configuration for the Hadoop job
86
*/
87
public HadoopOutputFormat(
88
org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat,
89
Job job
90
);
91
92
/**
93
* Write a record to the Hadoop OutputFormat
94
* @param record The record to write as a Tuple2<K, V>
95
* @throws IOException if writing fails
96
*/
97
public void writeRecord(Tuple2<K, V> record) throws IOException;
98
}
99
```
100
101
**Usage Example:**
102
103
```java
104
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
105
import org.apache.flink.api.java.DataSet;
106
import org.apache.flink.api.java.tuple.Tuple2;
107
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
108
import org.apache.hadoop.mapreduce.Job;
109
import org.apache.hadoop.io.LongWritable;
110
import org.apache.hadoop.io.Text;
111
import org.apache.hadoop.fs.Path;
112
113
// Configure Job for output
114
Job job = Job.getInstance();
115
job.setOutputFormatClass(TextOutputFormat.class);
116
job.setOutputKeyClass(LongWritable.class);
117
job.setOutputValueClass(Text.class);
118
TextOutputFormat.setOutputPath(job, new Path("hdfs://output/path"));
119
120
// Create Hadoop output format wrapper
121
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<LongWritable, Text> mapreduceOutput =
122
new org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<>(
123
new TextOutputFormat<LongWritable, Text>(),
124
job
125
);
126
127
// Use with Flink DataSet
128
DataSet<Tuple2<LongWritable, Text>> dataset = // ... your dataset
129
dataset.output(mapreduceOutput);
130
```
131
132
## Common Output Format Examples
133
134
### Text File Output
135
136
Writing plain text files using TextOutputFormat.
137
138
**mapred API:**
139
140
```java
141
import org.apache.hadoop.mapred.TextOutputFormat;
142
import org.apache.hadoop.mapred.JobConf;
143
import org.apache.hadoop.io.NullWritable;
144
import org.apache.hadoop.io.Text;
145
146
JobConf jobConf = new JobConf();
147
jobConf.set("mapred.output.dir", "hdfs://output/text");
148
jobConf.setOutputFormat(TextOutputFormat.class);
149
150
HadoopOutputFormat<NullWritable, Text> textOutput =
151
new HadoopOutputFormat<>(
152
new TextOutputFormat<NullWritable, Text>(),
153
jobConf
154
);
155
156
// Convert strings to Tuple2<NullWritable, Text>
157
DataSet<String> stringDataset = // ... your string dataset
158
DataSet<Tuple2<NullWritable, Text>> tuples = stringDataset.map(
159
s -> new Tuple2<>(NullWritable.get(), new Text(s))
160
);
161
tuples.output(textOutput);
162
```
163
164
**mapreduce API:**
165
166
```java
167
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
168
import org.apache.hadoop.mapreduce.Job;
169
import org.apache.hadoop.io.NullWritable;
170
import org.apache.hadoop.io.Text;
171
172
Job job = Job.getInstance();
173
job.setOutputFormatClass(TextOutputFormat.class);
174
TextOutputFormat.setOutputPath(job, new Path("hdfs://output/text"));
175
176
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<NullWritable, Text> textOutput =
177
new org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<>(
178
new TextOutputFormat<NullWritable, Text>(),
179
job
180
);
181
```
182
183
### Sequence File Output
184
185
Writing Hadoop sequence files with key-value pairs.
186
187
**mapred API:**
188
189
```java
190
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
191
import org.apache.hadoop.mapred.JobConf;
192
import org.apache.hadoop.io.IntWritable;
193
import org.apache.hadoop.io.Text;
194
195
JobConf jobConf = new JobConf();
196
jobConf.set("mapred.output.dir", "hdfs://output/sequence");
197
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
198
jobConf.setOutputKeyClass(IntWritable.class);
199
jobConf.setOutputValueClass(Text.class);
200
201
HadoopOutputFormat<IntWritable, Text> sequenceOutput =
202
new HadoopOutputFormat<>(
203
new SequenceFileOutputFormat<IntWritable, Text>(),
204
jobConf
205
);
206
207
DataSet<Tuple2<IntWritable, Text>> keyValuePairs = // ... your dataset
208
keyValuePairs.output(sequenceOutput);
209
```
210
211
### Multiple Output Files
212
213
Using MultipleTextOutputFormat to write to multiple files based on keys.
214
215
```java
216
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
217
import org.apache.hadoop.mapred.JobConf;
218
import org.apache.hadoop.io.Text;
219
220
// Custom MultipleTextOutputFormat that partitions by key prefix
221
public static class KeyPartitionedOutput extends MultipleTextOutputFormat<Text, Text> {
222
@Override
223
protected String generateFileNameForKeyValue(Text key, Text value, String name) {
224
return key.toString().substring(0, 1) + "/" + name;
225
}
226
}
227
228
JobConf jobConf = new JobConf();
229
jobConf.set("mapred.output.dir", "hdfs://output/partitioned");
230
jobConf.setOutputFormat(KeyPartitionedOutput.class);
231
232
HadoopOutputFormat<Text, Text> partitionedOutput =
233
new HadoopOutputFormat<>(
234
new KeyPartitionedOutput(),
235
jobConf
236
);
237
```
238
239
## Configuration Patterns
240
241
### Basic Configuration
242
243
Standard configuration for common output scenarios.
244
245
```java
246
// mapred API configuration
247
JobConf jobConf = new JobConf();
248
jobConf.set("mapred.output.dir", outputPath);
249
jobConf.setOutputFormat(outputFormatClass);
250
jobConf.setOutputKeyClass(keyClass);
251
jobConf.setOutputValueClass(valueClass);
252
253
// mapreduce API configuration
254
Job job = Job.getInstance();
255
job.setOutputFormatClass(outputFormatClass);
256
job.setOutputKeyClass(keyClass);
257
job.setOutputValueClass(valueClass);
258
OutputFormat.setOutputPath(job, new Path(outputPath));
259
```
260
261
### Compression Configuration
262
263
Enabling compression for output files.
264
265
```java
266
// Enable compression in JobConf
267
JobConf jobConf = new JobConf();
268
jobConf.setBoolean("mapred.output.compress", true);
269
jobConf.setClass("mapred.output.compression.codec",
270
GzipCodec.class, CompressionCodec.class);
271
jobConf.set("mapred.output.compression.type", "BLOCK");
272
273
// Enable compression in Job
274
Job job = Job.getInstance();
275
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", true);
276
job.getConfiguration().setClass("mapreduce.output.fileoutputformat.compress.codec",
277
GzipCodec.class, CompressionCodec.class);
278
```
279
280
### Custom Output Committer
281
282
Using custom OutputCommitter for advanced output coordination.
283
284
```java
285
import org.apache.hadoop.mapred.FileOutputCommitter;
286
287
// Custom committer that moves files to final location
288
public class CustomOutputCommitter extends FileOutputCommitter {
289
@Override
290
public void commitJob(JobContext context) throws IOException {
291
super.commitJob(context);
292
// Custom logic after job completion
293
}
294
}
295
296
HadoopOutputFormat<K, V> outputFormat =
297
new HadoopOutputFormat<>(
298
hadoopOutputFormat,
299
CustomOutputCommitter.class,
300
jobConf
301
);
302
```
303
304
## Key Design Patterns
305
306
### Tuple2 Input Convention
307
All output formats consume `Tuple2<K, V>` objects where:
308
- `f0` contains the key to be written
309
- `f1` contains the value to be written
310
311
### Configuration Flexibility
312
Both JobConf (mapred) and Job (mapreduce) configuration objects are supported, allowing use of existing Hadoop configuration patterns.
313
314
### OutputCommitter Support
315
Custom OutputCommitter classes can be specified for advanced output coordination and cleanup operations.
316
317
### Exception Handling
318
IOException is thrown for write operations, maintaining consistency with Hadoop's exception handling patterns.