0
# Apache Avro MapReduce
1
2
A comprehensive Hadoop MapReduce compatible API for using Apache Avro serialization in distributed data processing pipelines. This library provides seamless integration between Avro's schema-based serialization system and Hadoop's MapReduce framework, supporting both legacy (`org.apache.hadoop.mapred`) and modern (`org.apache.hadoop.mapreduce`) APIs with efficient serialization, file I/O, and cross-language capabilities.
3
4
## Package Information
5
6
- **Package Name**: avro-mapred
7
- **Package Type**: maven
8
- **Language**: Java
9
- **GroupId**: org.apache.avro
10
- **ArtifactId**: avro-mapred
11
- **Version**: 1.12.0
12
- **Installation**: Add to `pom.xml`:
13
```xml
14
<dependency>
15
<groupId>org.apache.avro</groupId>
16
<artifactId>avro-mapred</artifactId>
17
<version>1.12.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
Key imports for MapReduce job development:
24
25
```java
26
// Legacy MapReduce API (org.apache.hadoop.mapred)
27
import org.apache.avro.mapred.*;
28
29
// New MapReduce API (org.apache.hadoop.mapreduce)
30
import org.apache.avro.mapreduce.*;
31
32
// Data wrappers
33
import org.apache.avro.mapred.AvroKey;
34
import org.apache.avro.mapred.AvroValue;
35
import org.apache.avro.mapred.Pair;
36
37
// Serialization
38
import org.apache.avro.hadoop.io.AvroSerialization;
39
40
// File utilities
41
import org.apache.avro.hadoop.file.SortedKeyValueFile;
42
```
43
44
## Basic Usage
45
46
### Simple Avro MapReduce Job (Legacy API)
47
48
```java
49
import org.apache.avro.Schema;
50
import org.apache.avro.mapred.*;
51
import org.apache.hadoop.conf.Configuration;
52
import org.apache.hadoop.mapred.JobConf;
53
54
// Configure job for Avro input/output
55
JobConf job = new JobConf();
56
AvroJob.setInputSchema(job, inputSchema);
57
AvroJob.setOutputSchema(job, outputSchema);
58
AvroJob.setMapperClass(job, MyAvroMapper.class);
59
AvroJob.setReducerClass(job, MyAvroReducer.class);
60
61
// Set input/output formats
62
job.setInputFormat(AvroInputFormat.class);
63
job.setOutputFormat(AvroOutputFormat.class);
64
```
65
66
### Using AvroKey/AvroValue Wrappers
67
68
```java
69
import org.apache.avro.mapred.AvroKey;
70
import org.apache.avro.mapred.AvroValue;
71
import org.apache.avro.generic.GenericRecord;
72
73
// Wrap Avro data for MapReduce
74
AvroKey<GenericRecord> key = new AvroKey<GenericRecord>(record);
75
AvroValue<GenericRecord> value = new AvroValue<GenericRecord>(data);
76
77
// Access wrapped data
78
GenericRecord keyData = key.datum();
79
GenericRecord valueData = value.datum();
80
```
81
82
### Schema Configuration
83
84
```java
85
import org.apache.avro.Schema;
86
import org.apache.avro.mapred.AvroJob;
87
88
// Set schemas for different stages of the job
89
Schema userSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",...}");
90
AvroJob.setInputSchema(job, userSchema);
91
AvroJob.setMapOutputSchema(job, userSchema);
92
AvroJob.setOutputSchema(job, userSchema);
93
```
94
95
## Architecture
96
97
Apache Avro MapReduce is organized around several key architectural components:
98
99
### **Dual API Support**
100
- **Legacy API** (`org.apache.avro.mapred`): Compatible with `org.apache.hadoop.mapred`
101
- **New API** (`org.apache.avro.mapreduce`): Compatible with `org.apache.hadoop.mapreduce`
102
- Both APIs provide parallel functionality with different integration patterns
103
104
### **Data Wrapper System**
105
- **AvroWrapper<T>**: Base wrapper for Avro data in MapReduce context
106
- **AvroKey<T>/AvroValue<T>**: Specific wrappers for keys and values
107
- **Pair<K,V>**: Key-value pair implementation with schema support
108
109
### **Serialization Framework**
110
- **AvroSerialization**: Hadoop serialization integration
111
- **Schema Management**: Automatic schema propagation through job configuration
112
- **Data Model Support**: GenericData, SpecificData, and ReflectData models
113
114
### **File I/O Utilities**
115
- **Container Files**: Direct Avro container file support
116
- **Sequence Files**: Avro-enhanced Hadoop SequenceFile support
117
- **Sorted Files**: SortedKeyValueFile for efficient key-based lookups
118
119
### **Cross-Language Support**
120
- **Tether Framework**: Enables MapReduce jobs in non-Java languages
121
- **Protocol-based Communication**: Language-agnostic data exchange
122
123
## Capabilities
124
125
### Job Configuration and Setup
126
127
Core utilities for configuring MapReduce jobs with Avro schemas and data models, supporting both legacy and modern Hadoop APIs.
128
129
```java { .api }
130
// Legacy API (org.apache.avro.mapred.AvroJob)
131
public class AvroJob {
132
// Schema configuration
133
public static void setInputSchema(JobConf job, Schema schema);
134
public static void setMapOutputSchema(JobConf job, Schema schema);
135
public static void setOutputSchema(JobConf job, Schema schema);
136
public static Schema getInputSchema(Configuration job);
137
public static Schema getMapOutputSchema(Configuration job);
138
public static Schema getOutputSchema(Configuration job);
139
140
// Job component configuration
141
public static void setMapperClass(JobConf job, Class<? extends AvroMapper> c);
142
public static void setCombinerClass(JobConf job, Class<? extends AvroReducer> c);
143
public static void setReducerClass(JobConf job, Class<? extends AvroReducer> c);
144
145
// Output configuration
146
public static void setOutputCodec(JobConf job, String codec);
147
public static void setOutputMeta(JobConf job, String key, String value);
148
public static void setOutputMeta(JobConf job, String key, long value);
149
public static void setOutputMeta(JobConf job, String key, byte[] value);
150
151
// Input format configuration
152
public static void setInputSequenceFile(JobConf job);
153
154
// Data model configuration
155
public static void setReflect(JobConf job);
156
public static void setInputReflect(JobConf job);
157
public static void setMapOutputReflect(JobConf job);
158
public static void setDataModelClass(JobConf job, Class<? extends GenericData> modelClass);
159
public static Class<? extends GenericData> getDataModelClass(Configuration conf);
160
public static GenericData createDataModel(Configuration conf);
161
public static GenericData createInputDataModel(Configuration conf);
162
public static GenericData createMapOutputDataModel(Configuration conf);
163
}
164
165
// New API (org.apache.avro.mapreduce.AvroJob)
166
public class org.apache.avro.mapreduce.AvroJob {
167
// Schema setters
168
public static void setInputKeySchema(Job job, Schema schema);
169
public static void setInputValueSchema(Job job, Schema schema);
170
public static void setMapOutputKeySchema(Job job, Schema schema);
171
public static void setMapOutputValueSchema(Job job, Schema schema);
172
public static void setOutputKeySchema(Job job, Schema schema);
173
public static void setOutputValueSchema(Job job, Schema schema);
174
175
// Schema getters
176
public static Schema getInputKeySchema(Configuration conf);
177
public static Schema getInputValueSchema(Configuration conf);
178
public static Schema getMapOutputKeySchema(Configuration conf);
179
public static Schema getMapOutputValueSchema(Configuration conf);
180
public static Schema getOutputKeySchema(Configuration conf);
181
public static Schema getOutputValueSchema(Configuration conf);
182
183
// Data model configuration
184
public static void setDataModelClass(Job job, Class<? extends GenericData> modelClass);
185
}
186
```
187
188
[Job Configuration](./job-configuration.md)
189
190
### Data Wrappers and Types
191
192
Wrapper classes that integrate Avro data with Hadoop's MapReduce framework, providing schema-aware serialization and comparison.
193
194
```java { .api }
195
public class AvroWrapper<T> {
196
public AvroWrapper();
197
public AvroWrapper(T datum);
198
public T datum();
199
public void datum(T datum);
200
public boolean equals(Object obj);
201
public int hashCode();
202
public String toString();
203
}
204
205
public class AvroKey<T> extends AvroWrapper<T> {
206
public AvroKey();
207
public AvroKey(T datum);
208
}
209
210
public class AvroValue<T> extends AvroWrapper<T> {
211
public AvroValue();
212
public AvroValue(T datum);
213
}
214
215
public class Pair<K,V> implements IndexedRecord, Comparable<Pair>, SchemaConstructable {
216
// Primary constructors
217
public Pair(Schema schema);
218
public Pair(K key, Schema keySchema, V value, Schema valueSchema);
219
public Pair(Object key, Object value);
220
// Multiple convenience constructors for type combinations omitted for brevity
221
222
// Schema methods
223
public Schema getSchema();
224
public static Schema getPairSchema(Schema key, Schema value);
225
public static Schema getKeySchema(Schema pair);
226
public static Schema getValueSchema(Schema pair);
227
228
// Data access methods
229
public K key();
230
public void key(K key);
231
public V value();
232
public void value(V value);
233
public void set(K key, V value);
234
235
// IndexedRecord implementation
236
public Object get(int i);
237
public void put(int i, Object o);
238
239
// Comparison and equality
240
public int compareTo(Pair that);
241
public boolean equals(Object o);
242
public int hashCode();
243
public String toString();
244
}
245
```
246
247
[Data Wrappers](./data-wrappers.md)
248
249
### Input and Output Formats
250
251
Specialized InputFormat and OutputFormat implementations for reading and writing Avro data in various formats within MapReduce jobs.
252
253
```java { .api }
254
// Legacy API
255
public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
256
public RecordReader<AvroWrapper<T>, NullWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter);
257
}
258
259
public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
260
public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress);
261
}
262
263
// New API
264
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
265
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context);
266
}
267
268
public class AvroKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
269
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(InputSplit split, TaskAttemptContext context);
270
}
271
```
272
273
[Input Output Formats](./input-output-formats.md)
274
275
### Serialization and I/O Infrastructure
276
277
Core serialization framework that integrates Avro with Hadoop's serialization system, providing efficient data exchange and schema management.
278
279
```java { .api }
280
public class AvroSerialization<T> implements Serialization<AvroWrapper<T>> {
281
public boolean accept(Class<?> c);
282
public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c);
283
public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c);
284
public static void addToConfiguration(Configuration conf);
285
public static void setKeyWriterSchema(Configuration conf, Schema schema);
286
public static void setValueWriterSchema(Configuration conf, Schema schema);
287
}
288
289
public class AvroSerializer<T> {
290
public AvroSerializer(Schema writerSchema);
291
public void serialize(AvroWrapper<T> avroWrapper);
292
}
293
294
public abstract class AvroDeserializer<T extends AvroWrapper<D>,D> {
295
public abstract T deserialize(T avroWrapperToReuse);
296
public Schema getWriterSchema();
297
public Schema getReaderSchema();
298
}
299
```
300
301
[Serialization](./serialization.md)
302
303
### File Utilities and Storage
304
305
Advanced file handling utilities for sorted key-value files, sequence files, and compression codec integration.
306
307
```java { .api }
308
public class SortedKeyValueFile {
309
public static class Reader<K,V> {
310
public Reader(Options options);
311
public V get(K key);
312
public Iterator<AvroKeyValue<K,V>> iterator();
313
public void close();
314
}
315
316
public static class Writer<K,V> {
317
public Writer(Options options);
318
public void append(K key, V value);
319
public void close();
320
}
321
}
322
323
public class AvroSequenceFile {
324
public static SequenceFile.Writer createWriter(Writer.Options options);
325
public static class Reader {
326
public Reader(Reader.Options options);
327
}
328
}
329
330
public class HadoopCodecFactory {
331
public static CodecFactory fromHadoopString(String hadoopCodecClass);
332
public static String getAvroCodecName(String hadoopCodecClass);
333
}
334
```
335
336
[File Utilities](./file-utilities.md)
337
338
### MapReduce Processing Framework
339
340
Base classes and utilities for implementing Avro-aware mappers and reducers in the legacy MapReduce API.
341
342
```java { .api }
343
public abstract class AvroMapper<IN,OUT> {
344
public abstract void map(IN datum, AvroCollector<OUT> collector, Reporter reporter);
345
public void configure(JobConf jobConf);
346
public void close();
347
}
348
349
public abstract class AvroReducer<K,V,OUT> {
350
public abstract void reduce(K key, Iterable<V> values, AvroCollector<OUT> collector, Reporter reporter);
351
public void configure(JobConf jobConf);
352
public void close();
353
}
354
355
public abstract class AvroCollector<T> {
356
public abstract void collect(T datum);
357
}
358
```
359
360
[MapReduce Processing](./mapreduce-processing.md)
361
362
### Cross-Language Processing (Tether)
363
364
Tether framework for implementing MapReduce jobs in non-Java languages while maintaining Avro data integration and schema compatibility.
365
366
```java { .api }
367
public class TetherJob {
368
public static void setExecutable(JobConf job, File executable);
369
public static void setInputSchema(JobConf job, Schema schema);
370
public static void setMapOutputSchema(JobConf job, Schema schema);
371
public static void setOutputSchema(JobConf job, Schema schema);
372
}
373
374
public class TetherInputFormat extends FileInputFormat<AvroKey<Object>, AvroValue<Object>> {
375
public RecordReader<AvroKey<Object>, AvroValue<Object>> createRecordReader(InputSplit split, TaskAttemptContext context);
376
}
377
378
public class TetherOutputFormat extends AvroOutputFormat<Object> {
379
public RecordWriter<AvroKey<Object>, AvroValue<Object>> getRecordWriter(TaskAttemptContext context);
380
}
381
```
382
383
[Cross Language Processing](./cross-language-processing.md)
384
385
## Types
386
387
### Core Data Types
388
389
```java { .api }
390
// Base wrapper type
391
public class AvroWrapper<T> {
392
// Implementation details in Data Wrappers section
393
}
394
395
// Key-value pair type
396
public class AvroKeyValue<K,V> {
397
public AvroKeyValue(GenericRecord keyValueRecord);
398
public K getKey();
399
public V getValue();
400
public void setKey(K key);
401
public void setValue(V value);
402
public static Schema getSchema(Schema keySchema, Schema valueSchema);
403
}
404
405
// Options classes for file operations
406
public static class SortedKeyValueFile.Reader.Options {
407
// Builder pattern for reader configuration
408
}
409
410
public static class SortedKeyValueFile.Writer.Options {
411
// Builder pattern for writer configuration
412
}
413
```
414
415
### Comparator Types
416
417
```java { .api }
418
public class AvroKeyComparator<T> implements RawComparator<AvroKey<T>>, Configurable {
419
public int compare(AvroKey<T> x, AvroKey<T> y);
420
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
421
}
422
423
public class AvroCharSequenceComparator<T> implements Comparator<T> {
424
public int compare(T o1, T o2);
425
public static final AvroCharSequenceComparator<CharSequence> INSTANCE;
426
}
427
```
428
429
### Converter Types
430
431
```java { .api }
432
public abstract class AvroDatumConverter<INPUT,OUTPUT> {
433
public abstract OUTPUT convert(INPUT input);
434
public abstract Schema getWriterSchema();
435
}
436
437
public class AvroDatumConverterFactory {
438
public AvroDatumConverterFactory(Configuration conf);
439
public <IN,OUT> AvroDatumConverter<IN,OUT> create(Class<IN> inputClass);
440
}
441
```