0
# Input and Output Formats
1
2
Specialized InputFormat and OutputFormat implementations for reading and writing Avro data in various formats within MapReduce jobs. These formats provide seamless integration between Avro's schema-based serialization and Hadoop's distributed file system, supporting both legacy and modern MapReduce APIs.
3
4
## Capabilities
5
6
### Legacy API Input Formats
7
8
Input formats for the legacy `org.apache.hadoop.mapred` API that read Avro data and present it as wrapped objects.
9
10
```java { .api }
11
public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
12
public RecordReader<AvroWrapper<T>, NullWritable> getRecordReader(
13
InputSplit split, JobConf job, Reporter reporter) throws IOException;
14
15
// Configuration constants
16
public static final String IGNORE_FILES_WITHOUT_EXTENSION_KEY = "avro.mapred.ignore.inputs.without.extension";
17
public static final boolean IGNORE_INPUTS_WITHOUT_EXTENSION_DEFAULT = true;
18
}
19
20
public class AvroAsTextInputFormat<T> extends AvroInputFormat<T> {
21
// Reads Avro data but presents as text representation
22
}
23
24
public class AvroUtf8InputFormat extends AvroInputFormat<Utf8> {
25
// Specialized for reading Avro UTF-8 string data
26
}
27
```
28
29
#### Usage Example
30
31
```java
32
import org.apache.avro.mapred.AvroInputFormat;
33
import org.apache.avro.mapred.AvroJob;
34
import org.apache.hadoop.mapred.JobConf;
35
36
// Configure job for Avro input
37
JobConf job = new JobConf();
38
job.setInputFormat(AvroInputFormat.class);
39
AvroJob.setInputSchema(job, userSchema);
40
41
// Input format will read Avro container files and produce AvroWrapper<T> keys
42
```
43
44
### Legacy API Output Formats
45
46
Output formats for writing Avro data from MapReduce jobs using the legacy API.
47
48
```java { .api }
49
public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
50
public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
51
FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException;
52
}
53
54
public class AvroTextOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
55
// Writes Avro data as text representation
56
}
57
```
58
59
#### Usage Example
60
61
```java
62
import org.apache.avro.mapred.AvroOutputFormat;
63
import org.apache.avro.mapred.AvroJob;
64
65
// Configure job for Avro output
66
job.setOutputFormat(AvroOutputFormat.class);
67
AvroJob.setOutputSchema(job, outputSchema);
68
AvroJob.setOutputCodec(job, "snappy");
69
70
// Output format will write AvroWrapper<T> data as Avro container files
71
```
72
73
### New API Input Formats
74
75
Input formats for the modern `org.apache.hadoop.mapreduce` API with enhanced key-value separation.
76
77
```java { .api }
78
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
79
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
80
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
81
}
82
83
public class AvroKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
84
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
85
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
86
}
87
88
public class CombineAvroKeyValueFileInputFormat<K,V> extends CombineFileInputFormat<AvroKey<K>, AvroValue<V>> {
89
// Optimized for processing many small Avro files
90
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
91
InputSplit split, TaskAttemptContext context) throws IOException;
92
}
93
```
94
95
#### Usage Example
96
97
```java
98
import org.apache.avro.mapreduce.AvroKeyInputFormat;
99
import org.apache.avro.mapreduce.AvroKeyValueInputFormat;
100
import org.apache.avro.mapreduce.AvroJob;
101
import org.apache.hadoop.mapreduce.Job;
102
103
// Single key input format
104
Job job = Job.getInstance();
105
job.setInputFormatClass(AvroKeyInputFormat.class);
106
AvroJob.setInputKeySchema(job, keySchema);
107
108
// Key-value input format
109
job.setInputFormatClass(AvroKeyValueInputFormat.class);
110
AvroJob.setInputKeySchema(job, keySchema);
111
AvroJob.setInputValueSchema(job, valueSchema);
112
113
// For many small files
114
job.setInputFormatClass(CombineAvroKeyValueFileInputFormat.class);
115
```
116
117
### New API Output Formats
118
119
Output formats for writing Avro data using the modern MapReduce API.
120
121
```java { .api }
122
public class AvroKeyOutputFormat<T> extends FileOutputFormat<AvroKey<T>, NullWritable> {
123
public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
124
throws IOException, InterruptedException;
125
}
126
127
public class AvroKeyValueOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
128
public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context)
129
throws IOException, InterruptedException;
130
}
131
132
public abstract class AvroOutputFormatBase<K,V> extends FileOutputFormat<K,V> {
133
// Base class providing common functionality for Avro output formats
134
protected static class AvroRecordWriter<K,V> extends RecordWriter<K,V> {
135
public void write(K key, V value) throws IOException, InterruptedException;
136
public void close(TaskAttemptContext context) throws IOException, InterruptedException;
137
}
138
}
139
```
140
141
#### Usage Example
142
143
```java
144
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
145
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
146
import org.apache.avro.mapreduce.AvroJob;
147
148
// Single key output format
149
job.setOutputFormatClass(AvroKeyOutputFormat.class);
150
AvroJob.setOutputKeySchema(job, outputKeySchema);
151
152
// Key-value output format
153
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
154
AvroJob.setOutputKeySchema(job, outputKeySchema);
155
AvroJob.setOutputValueSchema(job, outputValueSchema);
156
```
157
158
### Sequence File Integration
159
160
Formats for reading and writing Avro data in Hadoop SequenceFile format.
161
162
```java { .api }
163
public class AvroSequenceFileInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
164
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
165
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
166
}
167
168
public class AvroSequenceFileOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
169
public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context)
170
throws IOException, InterruptedException;
171
}
172
173
// Legacy API equivalent
174
public class SequenceFileInputFormat<K,V> extends FileInputFormat<AvroWrapper<K>, AvroWrapper<V>> {
175
public RecordReader<AvroWrapper<K>, AvroWrapper<V>> getRecordReader(
176
InputSplit split, JobConf job, Reporter reporter) throws IOException;
177
}
178
```
179
180
#### Usage Example
181
182
```java
183
import org.apache.avro.mapreduce.AvroSequenceFileInputFormat;
184
import org.apache.avro.mapreduce.AvroSequenceFileOutputFormat;
185
186
// Read from SequenceFile with Avro serialization
187
job.setInputFormatClass(AvroSequenceFileInputFormat.class);
188
AvroJob.setInputKeySchema(job, keySchema);
189
AvroJob.setInputValueSchema(job, valueSchema);
190
191
// Write to SequenceFile with Avro serialization
192
job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
193
AvroJob.setOutputKeySchema(job, keySchema);
194
AvroJob.setOutputValueSchema(job, valueSchema);
195
```
196
197
### Record Readers and Writers
198
199
Base classes and implementations for reading and writing Avro records.
200
201
```java { .api }
202
public abstract class AvroRecordReaderBase<K,V,T> extends RecordReader<K,V> {
203
// Base class for Avro record readers
204
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
205
public boolean nextKeyValue() throws IOException, InterruptedException;
206
public float getProgress() throws IOException, InterruptedException;
207
public void close() throws IOException;
208
209
// Abstract methods for specific implementations
210
public abstract K getCurrentKey() throws IOException, InterruptedException;
211
public abstract V getCurrentValue() throws IOException, InterruptedException;
212
}
213
214
public class AvroKeyRecordReader<T> extends AvroRecordReaderBase<AvroKey<T>, NullWritable, T> {
215
public AvroKey<T> getCurrentKey() throws IOException, InterruptedException;
216
public NullWritable getCurrentValue() throws IOException, InterruptedException;
217
}
218
219
public class AvroKeyValueRecordReader<K,V> extends AvroRecordReaderBase<AvroKey<K>, AvroValue<V>, Pair<K,V>> {
220
public AvroKey<K> getCurrentKey() throws IOException, InterruptedException;
221
public AvroValue<V> getCurrentValue() throws IOException, InterruptedException;
222
}
223
224
public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> {
225
public void write(AvroKey<T> key, NullWritable value) throws IOException, InterruptedException;
226
public void close(TaskAttemptContext context) throws IOException, InterruptedException;
227
}
228
229
public class AvroKeyValueRecordWriter<K,V> extends RecordWriter<AvroKey<K>, AvroValue<V>> {
230
public void write(AvroKey<K> key, AvroValue<V> value) throws IOException, InterruptedException;
231
public void close(TaskAttemptContext context) throws IOException, InterruptedException;
232
}
233
```
234
235
### Multiple Output Support
236
237
Support for writing to multiple output files from a single job.
238
239
```java { .api }
240
// Legacy API
241
public class AvroMultipleOutputs {
242
public AvroMultipleOutputs(JobConf job);
243
public <T> void write(String name, AvroWrapper<T> key, NullWritable value) throws IOException;
244
public void close() throws IOException;
245
246
public static void addNamedOutput(JobConf job, String name, Class<? extends OutputFormat> outputFormat, Schema schema);
247
public static void setCountersEnabled(JobConf job, boolean enabled);
248
}
249
250
// New API
251
public class org.apache.avro.mapreduce.AvroMultipleOutputs {
252
public AvroMultipleOutputs(TaskAttemptContext context);
253
public <K> void write(K key, NullWritable value, String baseOutputPath) throws IOException, InterruptedException;
254
public <K,V> void write(K key, V value, String baseOutputPath) throws IOException, InterruptedException;
255
public void close() throws IOException, InterruptedException;
256
257
public static void addNamedOutput(Job job, String name, Class<? extends OutputFormat> outputFormat,
258
Class<?> keyClass, Class<?> valueClass);
259
public static void setCountersEnabled(Job job, boolean enabled);
260
}
261
```
262
263
#### Usage Example
264
265
```java
266
import org.apache.avro.mapreduce.AvroMultipleOutputs;
267
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
268
269
// Configure multiple outputs
270
AvroMultipleOutputs.addNamedOutput(job, "users", AvroKeyOutputFormat.class, AvroKey.class, NullWritable.class);
271
AvroMultipleOutputs.addNamedOutput(job, "events", AvroKeyOutputFormat.class, AvroKey.class, NullWritable.class);
272
273
// In reducer
274
public class MyReducer extends Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable> {
275
private AvroMultipleOutputs multipleOutputs;
276
277
protected void setup(Context context) {
278
multipleOutputs = new AvroMultipleOutputs(context);
279
}
280
281
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
282
// Write to different outputs based on logic
283
if (key.toString().startsWith("user_")) {
284
multipleOutputs.write(new AvroKey<>(userRecord), NullWritable.get(), "users");
285
} else {
286
multipleOutputs.write(new AvroKey<>(eventRecord), NullWritable.get(), "events");
287
}
288
}
289
290
protected void cleanup(Context context) throws IOException, InterruptedException {
291
multipleOutputs.close();
292
}
293
}
294
```
295
296
## Configuration and Integration
297
298
### File Extensions and Filtering
299
300
Control which files are processed by input formats:
301
302
```java
303
// Ignore files without .avro extension
304
job.setBoolean(AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, true);
305
306
// Process all files regardless of extension
307
job.setBoolean(AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false);
308
```
309
310
### Compression Support
311
312
All output formats support Avro's compression codecs:
313
314
```java
315
import org.apache.avro.mapred.AvroJob;
316
import org.apache.avro.mapreduce.AvroJob;
317
318
// Legacy API
319
AvroJob.setOutputCodec(job, "snappy");
320
AvroJob.setOutputCodec(job, "deflate");
321
AvroJob.setOutputCodec(job, "bzip2");
322
323
// New API - via configuration
324
job.getConfiguration().set("avro.mapreduce.output.codec", "snappy");
325
```
326
327
### Schema Configuration Integration
328
329
Input/output formats automatically use schemas configured via AvroJob:
330
331
```java
332
// Schemas set via AvroJob are automatically picked up by formats
333
AvroJob.setInputSchema(job, inputSchema); // Used by AvroInputFormat
334
AvroJob.setOutputSchema(job, outputSchema); // Used by AvroOutputFormat
335
336
// New API with separate key/value schemas
337
AvroJob.setInputKeySchema(job, keySchema); // Used by AvroKeyInputFormat
338
AvroJob.setInputValueSchema(job, valueSchema); // Used by AvroKeyValueInputFormat
339
```
340
341
## Performance Considerations
342
343
### Small Files Optimization
344
345
Use `CombineAvroKeyValueFileInputFormat` for many small files:
346
347
```java
348
job.setInputFormatClass(CombineAvroKeyValueFileInputFormat.class);
349
350
// Configure combine parameters
351
job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", 128 * 1024 * 1024);
352
job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", 64 * 1024 * 1024);
353
```
354
355
### Memory Management
356
357
Input formats handle memory efficiently by:
358
- Reusing reader objects
359
- Supporting lazy deserialization
360
- Proper resource cleanup
361
362
## Error Handling
363
364
Common issues and solutions:
365
366
- **Schema Not Found**: Ensure schema is configured via AvroJob before setting input/output format
367
- **File Format Errors**: Verify input files are valid Avro container files
368
- **Codec Errors**: Ensure output codec is supported and available on all nodes
369
- **Split Size Issues**: For large files, tune split size parameters
370
- **Memory Issues**: For large records, increase task memory limits