0
# Output Format Integration
1
2
The Output Format Integration capability enables writing Flink DataSets to Hadoop OutputFormats, providing seamless integration with Hadoop ecosystem storage systems and custom output processing pipelines.
3
4
## Overview
5
6
Flink's Hadoop compatibility layer wraps Hadoop OutputFormats to accept data from Flink DataSets. The integration supports both legacy MapRed API and modern MapReduce API, automatically converting Flink Tuple2 objects or Scala tuples to Hadoop key-value pairs.
7
8
## HadoopOutputFormat Classes
9
10
### MapRed HadoopOutputFormat (Java)
11
12
```java { .api }
13
@Public
14
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
15
16
// Constructor with JobConf
17
public HadoopOutputFormat(
18
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
19
JobConf job);
20
21
// Constructor with OutputCommitter and JobConf
22
public HadoopOutputFormat(
23
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
24
Class<OutputCommitter> outputCommitterClass,
25
JobConf job);
26
27
// Write a record to the output
28
public void writeRecord(Tuple2<K, V> record) throws IOException;
29
}
30
```
31
32
### MapReduce HadoopOutputFormat (Java)
33
34
```java { .api }
35
@Public
36
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
37
38
// Constructor with Job
39
public HadoopOutputFormat(
40
org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat,
41
Job job);
42
43
// Write a record to the output
44
public void writeRecord(Tuple2<K, V> record) throws IOException;
45
}
46
```
47
48
### MapRed HadoopOutputFormat (Scala)
49
50
```scala { .api }
51
@Public
52
class HadoopOutputFormat[K, V] extends HadoopOutputFormatBase[K, V, (K, V)] {
53
54
// Constructor with JobConf
55
def this(mapredOutputFormat: OutputFormat[K, V], job: JobConf);
56
57
// Constructor with OutputCommitter and JobConf
58
def this(
59
mapredOutputFormat: OutputFormat[K, V],
60
outputCommitterClass: Class[OutputCommitter],
61
job: JobConf);
62
63
// Write a record to the output
64
def writeRecord(record: (K, V)): Unit;
65
}
66
```
67
68
## Usage Examples
69
70
### Writing Text Files (Java)
71
72
```java
73
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
74
import org.apache.hadoop.mapred.TextOutputFormat;
75
import org.apache.hadoop.mapred.JobConf;
76
import org.apache.hadoop.io.Text;
77
import org.apache.hadoop.io.NullWritable;
78
79
// Configure output
80
JobConf conf = new JobConf();
81
conf.setOutputFormat(TextOutputFormat.class);
82
conf.setOutputKeyClass(NullWritable.class);
83
conf.setOutputValueClass(Text.class);
84
TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/output"));
85
86
// Create output format
87
HadoopOutputFormat<NullWritable, Text> hadoopOutput =
88
new HadoopOutputFormat<>(new TextOutputFormat<>(), conf);
89
90
// Prepare data for output
91
DataSet<String> textLines = env.fromElements("Line 1", "Line 2", "Line 3");
92
DataSet<Tuple2<NullWritable, Text>> outputData = textLines.map(
93
line -> new Tuple2<>(NullWritable.get(), new Text(line))
94
);
95
96
// Write to Hadoop output
97
outputData.output(hadoopOutput);
98
```
99
100
### Writing Sequence Files (Java)
101
102
```java
103
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
104
import org.apache.hadoop.io.IntWritable;
105
import org.apache.hadoop.io.Text;
106
107
// Configure sequence file output
108
JobConf conf = new JobConf();
109
conf.setOutputFormat(SequenceFileOutputFormat.class);
110
conf.setOutputKeyClass(IntWritable.class);
111
conf.setOutputValueClass(Text.class);
112
SequenceFileOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/sequence-output"));
113
114
// Create output format
115
HadoopOutputFormat<IntWritable, Text> seqOutput =
116
new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), conf);
117
118
// Prepare key-value data
119
DataSet<Tuple2<IntWritable, Text>> keyValueData = env.fromElements(
120
new Tuple2<>(new IntWritable(1), new Text("First")),
121
new Tuple2<>(new IntWritable(2), new Text("Second")),
122
new Tuple2<>(new IntWritable(3), new Text("Third"))
123
);
124
125
// Write sequence file
126
keyValueData.output(seqOutput);
127
```
128
129
### Writing with MapReduce API (Java)
130
131
```java
132
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
133
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
134
import org.apache.hadoop.mapreduce.Job;
135
import org.apache.hadoop.io.Text;
136
import org.apache.hadoop.io.NullWritable;
137
138
// Configure MapReduce job
139
Job job = Job.getInstance();
140
job.setOutputFormatClass(TextOutputFormat.class);
141
job.setOutputKeyClass(NullWritable.class);
142
job.setOutputValueClass(Text.class);
143
TextOutputFormat.setOutputPath(job, new Path("hdfs://namenode:port/mapreduce-output"));
144
145
// Create MapReduce output format
146
HadoopOutputFormat<NullWritable, Text> mapreduceOutput =
147
new HadoopOutputFormat<>(new TextOutputFormat<>(), job);
148
149
// Write data
150
DataSet<Tuple2<NullWritable, Text>> outputData = textLines.map(
151
line -> new Tuple2<>(NullWritable.get(), new Text(line))
152
);
153
outputData.output(mapreduceOutput);
154
```
155
156
### Scala Usage
157
158
```scala
159
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
160
import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}
161
import org.apache.hadoop.io.{Text, NullWritable}
162
import org.apache.hadoop.fs.Path
163
164
// Configure output
165
val conf = new JobConf()
166
conf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])
167
conf.setOutputKeyClass(classOf[NullWritable])
168
conf.setOutputValueClass(classOf[Text])
169
TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/scala-output"))
170
171
// Create output format
172
val hadoopOutput = new HadoopOutputFormat(new TextOutputFormat[NullWritable, Text](), conf)
173
174
// Prepare and write data
175
val textLines = env.fromElements("Scala line 1", "Scala line 2", "Scala line 3")
176
val outputData = textLines.map(line => (NullWritable.get(), new Text(line)))
177
outputData.output(hadoopOutput)
178
```
179
180
### Custom Output Formats
181
182
```java
183
import com.example.CustomOutputFormat;
184
import com.example.CustomKey;
185
import com.example.CustomValue;
186
187
// Configure custom output format
188
JobConf conf = new JobConf();
189
conf.setOutputFormat(CustomOutputFormat.class);
190
conf.setOutputKeyClass(CustomKey.class);
191
conf.setOutputValueClass(CustomValue.class);
192
conf.set("custom.output.property", "custom-value");
193
194
// Use custom output format
195
HadoopOutputFormat<CustomKey, CustomValue> customOutput =
196
new HadoopOutputFormat<>(new CustomOutputFormat(), conf);
197
198
// Process and write custom data
199
DataSet<Tuple2<CustomKey, CustomValue>> customData = processedData.map(
200
data -> new Tuple2<>(new CustomKey(data.getId()), new CustomValue(data.getContent()))
201
);
202
customData.output(customOutput);
203
```
204
205
## Output Committer Integration
206
207
Hadoop OutputFormats often use OutputCommitters to manage the output lifecycle. The Hadoop compatibility layer properly integrates with these committers.
208
209
```java
210
import org.apache.hadoop.mapred.FileOutputCommitter;
211
212
// Specify custom OutputCommitter
213
JobConf conf = new JobConf();
214
conf.setOutputFormat(TextOutputFormat.class);
215
// OutputCommitter is automatically handled, but can be customized if needed
216
217
HadoopOutputFormat<NullWritable, Text> outputWithCommitter =
218
new HadoopOutputFormat<>(
219
new TextOutputFormat<>(),
220
FileOutputCommitter.class, // Custom committer class
221
conf
222
);
223
```
224
225
## Partitioning and Multiple Outputs
226
227
When writing to partitioned outputs or multiple files:
228
229
```java
230
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
231
232
// Configure multiple output files
233
JobConf conf = new JobConf();
234
conf.setOutputFormat(MultipleTextOutputFormat.class);
235
conf.setOutputKeyClass(Text.class);
236
conf.setOutputValueClass(Text.class);
237
238
// The key determines the output file name
239
DataSet<Tuple2<Text, Text>> partitionedData = processedData.map(
240
data -> new Tuple2<>(
241
new Text("partition-" + data.getPartition()), // File name prefix
242
new Text(data.getContent()) // Content
243
)
244
);
245
246
HadoopOutputFormat<Text, Text> multiOutput =
247
new HadoopOutputFormat<>(new MultipleTextOutputFormat<>(), conf);
248
partitionedData.output(multiOutput);
249
```
250
251
## Error Handling
252
253
Output format operations may encounter various errors:
254
255
```java
256
try {
257
outputData.output(hadoopOutput);
258
env.execute("Hadoop Output Job");
259
} catch (IOException e) {
260
// Handle I/O errors during writing
261
logger.error("Failed to write to Hadoop output: " + e.getMessage());
262
} catch (Exception e) {
263
// Handle other execution errors
264
logger.error("Job execution failed: " + e.getMessage());
265
}
266
```
267
268
Common exceptions include:
269
- `IOException` - File system or network errors during writing
270
- `IllegalArgumentException` - Invalid configuration or parameters
271
- `RuntimeException` - Various Hadoop-related runtime errors
272
- `JobExecutionException` - Flink job execution failures
273
274
## Configuration Best Practices
275
276
### Setting Output Paths
277
278
```java
279
// Always use absolute paths for distributed file systems
280
TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/full/path/to/output"));
281
282
// For local file system (testing only)
283
TextOutputFormat.setOutputPath(conf, new Path("file:///tmp/local/output"));
284
```
285
286
### Compression Configuration
287
288
```java
289
// Enable compression for text output
290
conf.setBoolean("mapred.output.compress", true);
291
conf.setClass("mapred.output.compression.codec",
292
org.apache.hadoop.io.compress.GzipCodec.class,
293
CompressionCodec.class);
294
295
// For sequence files
296
SequenceFileOutputFormat.setCompressOutput(conf, true);
297
SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);
298
```
299
300
### Performance Tuning
301
302
```java
303
// Set appropriate block size for HDFS
304
conf.setLong("dfs.block.size", 134217728); // 128MB
305
306
// Configure buffer sizes
307
conf.setInt("io.file.buffer.size", 65536); // 64KB
308
309
// Set replication factor
310
conf.setInt("dfs.replication", 3);
311
```