0
# MapReduce Processing Framework
1
2
Base classes and utilities for implementing Avro-aware mappers and reducers in the legacy MapReduce API. This framework provides abstract base classes that handle Avro data serialization and deserialization automatically, allowing developers to focus on business logic while maintaining type safety and schema evolution support.
3
4
## Capabilities
5
6
### Avro Mapper Base Class
7
8
Abstract base class for implementing mappers that process Avro data in the legacy MapReduce API.
9
10
```java { .api }
11
public abstract class AvroMapper<IN,OUT> extends Configured implements JobConfigurable, Closeable {
12
// Abstract method to implement business logic
13
public abstract void map(IN datum, AvroCollector<OUT> collector, Reporter reporter)
14
throws IOException;
15
16
// Lifecycle methods (from interfaces)
17
public void configure(JobConf jobConf);
18
public void close() throws IOException;
19
}
20
```
21
22
#### Usage Example
23
24
```java
25
import org.apache.avro.mapred.AvroMapper;
26
import org.apache.avro.mapred.AvroCollector;
27
import org.apache.avro.generic.GenericRecord;
28
import org.apache.hadoop.mapred.Reporter;
29
30
// Implement mapper for processing user records
31
public class UserMapper extends AvroMapper<GenericRecord, GenericRecord> {
32
33
@Override
34
public void map(GenericRecord user, AvroCollector<GenericRecord> collector, Reporter reporter)
35
throws IOException {
36
37
// Access Avro data directly (no wrapper needed)
38
String name = user.get("name").toString();
39
Integer age = (Integer) user.get("age");
40
41
// Filter and transform data
42
if (age >= 18) {
43
GenericRecord output = new GenericRecordBuilder(outputSchema)
44
.set("name", name.toUpperCase())
45
.set("age", age)
46
.set("category", "adult")
47
.build();
48
49
// Collect output (automatically wrapped)
50
collector.collect(output);
51
}
52
53
// Update counters
54
reporter.incrCounter("USER_PROCESSING", "TOTAL_USERS", 1);
55
if (age >= 18) {
56
reporter.incrCounter("USER_PROCESSING", "ADULT_USERS", 1);
57
}
58
}
59
60
@Override
61
public void configure(JobConf jobConf) {
62
// Initialize mapper with job configuration
63
this.outputSchema = AvroJob.getMapOutputSchema(jobConf);
64
}
65
66
private Schema outputSchema;
67
}
68
```
69
70
### Avro Reducer Base Class
71
72
Abstract base class for implementing reducers that process grouped Avro data.
73
74
```java { .api }
75
public abstract class AvroReducer<K,V,OUT> extends Configured implements JobConfigurable, Closeable {
76
// Abstract method for business logic
77
public abstract void reduce(K key, Iterable<V> values, AvroCollector<OUT> collector, Reporter reporter)
78
throws IOException;
79
80
// Lifecycle methods (from interfaces)
81
public void configure(JobConf jobConf);
82
public void close() throws IOException;
83
}
84
```
85
86
#### Usage Example
87
88
```java
89
import org.apache.avro.mapred.AvroReducer;
90
import org.apache.avro.mapred.AvroCollector;
91
import org.apache.avro.generic.GenericRecord;
92
import org.apache.hadoop.mapred.Reporter;
93
94
// Implement reducer for aggregating user data by department
95
public class UserAggregateReducer extends AvroReducer<CharSequence, GenericRecord, GenericRecord> {
96
97
@Override
98
public void reduce(CharSequence department, Iterable<GenericRecord> users,
99
AvroCollector<GenericRecord> collector, Reporter reporter)
100
throws IOException {
101
102
int totalUsers = 0;
103
int totalAge = 0;
104
List<String> userNames = new ArrayList<>();
105
106
// Process all users in this department
107
for (GenericRecord user : users) {
108
totalUsers++;
109
totalAge += (Integer) user.get("age");
110
userNames.add(user.get("name").toString());
111
}
112
113
// Create aggregated output
114
GenericRecord summary = new GenericRecordBuilder(outputSchema)
115
.set("department", department.toString())
116
.set("user_count", totalUsers)
117
.set("average_age", totalAge / totalUsers)
118
.set("user_names", userNames)
119
.build();
120
121
collector.collect(summary);
122
123
// Update counters
124
reporter.incrCounter("AGGREGATION", "DEPARTMENTS_PROCESSED", 1);
125
reporter.incrCounter("AGGREGATION", "USERS_AGGREGATED", totalUsers);
126
}
127
128
@Override
129
public void configure(JobConf jobConf) {
130
this.outputSchema = AvroJob.getOutputSchema(jobConf);
131
}
132
133
private Schema outputSchema;
134
}
135
```
136
137
### Avro Collector
138
139
Abstract collector interface for gathering output from mappers and reducers.
140
141
```java { .api }
142
public abstract class AvroCollector<T> extends Configured {
143
// Core collection method
144
public abstract void collect(T datum) throws IOException;
145
}
146
```
147
148
The framework provides concrete implementations that automatically wrap collected data in AvroWrapper objects for integration with Hadoop's MapReduce infrastructure.
149
150
#### Usage Example
151
152
```java
153
// In mapper or reducer
154
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) {
155
// Process input
156
GenericRecord output = processRecord(input);
157
158
// Collect output - automatically wrapped for Hadoop
159
collector.collect(output);
160
161
// Multiple outputs are supported
162
if (shouldEmitSummary(input)) {
163
GenericRecord summary = createSummary(input);
164
collector.collect(summary);
165
}
166
}
167
```
168
169
### Hadoop Integration Classes
170
171
Classes that bridge Avro processing with standard Hadoop mappers and reducers.
172
173
```java { .api }
174
public abstract class HadoopMapper<IN,OUT> extends AvroMapper<IN,OUT> {
175
// Integration with standard Hadoop Mapper interface
176
}
177
178
public abstract class HadoopReducer<K,V,OUT> extends AvroReducer<K,V,OUT> {
179
// Integration with standard Hadoop Reducer interface
180
}
181
182
public abstract class HadoopCombiner<K,V,OUT> extends AvroReducer<K,V,OUT> {
183
// Specialized for combine operations
184
}
185
186
public abstract class HadoopReducerBase<K,V,OUT> extends AvroReducer<K,V,OUT> {
187
// Base class with additional Hadoop-specific functionality
188
}
189
```
190
191
### Map Collector Implementation
192
193
Specific collector implementation for map phase output.
194
195
```java { .api }
196
public class MapCollector<T> extends AvroCollector<T> {
197
// Constructor
198
public MapCollector(OutputCollector<AvroWrapper<T>, NullWritable> collector);
199
200
// Collect implementation
201
public void collect(T datum) throws IOException;
202
}
203
```
204
205
## Complete Example: Word Count
206
207
Here's a complete example showing mapper and reducer implementation:
208
209
### Word Count Mapper
210
211
```java
212
import org.apache.avro.mapred.AvroMapper;
213
import org.apache.avro.mapred.AvroCollector;
214
import org.apache.avro.mapred.Pair;
215
import org.apache.avro.util.Utf8;
216
217
public class WordCountMapper extends AvroMapper<Utf8, Pair<Utf8, Integer>> {
218
219
@Override
220
public void map(Utf8 line, AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter)
221
throws IOException {
222
223
// Split line into words
224
String[] words = line.toString().toLowerCase().split("\\W+");
225
226
// Emit each word with count of 1
227
for (String word : words) {
228
if (!word.isEmpty()) {
229
Pair<Utf8, Integer> pair = new Pair<>(new Utf8(word), 1);
230
collector.collect(pair);
231
}
232
}
233
234
reporter.incrCounter("WORDS", "LINES_PROCESSED", 1);
235
reporter.incrCounter("WORDS", "WORDS_EMITTED", words.length);
236
}
237
}
238
```
239
240
### Word Count Reducer
241
242
```java
243
import org.apache.avro.mapred.AvroReducer;
244
import org.apache.avro.mapred.AvroCollector;
245
import org.apache.avro.mapred.Pair;
246
import org.apache.avro.util.Utf8;
247
248
public class WordCountReducer extends AvroReducer<Utf8, Integer, Pair<Utf8, Integer>> {
249
250
@Override
251
public void reduce(Utf8 word, Iterable<Integer> counts,
252
AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter)
253
throws IOException {
254
255
// Sum all counts for this word
256
int totalCount = 0;
257
for (Integer count : counts) {
258
totalCount += count;
259
}
260
261
// Emit word with total count
262
Pair<Utf8, Integer> result = new Pair<>(word, totalCount);
263
collector.collect(result);
264
265
reporter.incrCounter("WORDS", "UNIQUE_WORDS", 1);
266
}
267
}
268
```
269
270
### Job Configuration
271
272
```java
273
import org.apache.avro.mapred.AvroJob;
274
import org.apache.avro.mapred.AvroInputFormat;
275
import org.apache.avro.mapred.AvroOutputFormat;
276
import org.apache.hadoop.mapred.JobConf;
277
278
// Configure word count job
279
JobConf job = new JobConf();
280
job.setJobName("Avro Word Count");
281
282
// Set input/output formats
283
job.setInputFormat(AvroInputFormat.class);
284
job.setOutputFormat(AvroOutputFormat.class);
285
286
// Configure schemas
287
Schema stringSchema = Schema.create(Schema.Type.STRING);
288
Schema pairSchema = Pair.getPairSchema(stringSchema, Schema.create(Schema.Type.INT));
289
290
AvroJob.setInputSchema(job, stringSchema);
291
AvroJob.setMapOutputSchema(job, pairSchema);
292
AvroJob.setOutputSchema(job, pairSchema);
293
294
// Set mapper and reducer classes
295
AvroJob.setMapperClass(job, WordCountMapper.class);
296
AvroJob.setReducerClass(job, WordCountReducer.class);
297
298
// Set input/output paths
299
FileInputFormat.setInputPaths(job, new Path("/input"));
300
FileOutputFormat.setOutputPath(job, new Path("/output"));
301
302
// Run job
303
JobClient.runJob(job);
304
```
305
306
## Advanced Patterns
307
308
### Custom Initialization
309
310
```java
311
public class ConfigurableMapper extends AvroMapper<GenericRecord, GenericRecord> {
312
private Schema outputSchema;
313
private String filterField;
314
private Object filterValue;
315
316
@Override
317
public void configure(JobConf jobConf) {
318
// Get schemas and configuration
319
this.outputSchema = AvroJob.getMapOutputSchema(jobConf);
320
this.filterField = jobConf.get("filter.field");
321
this.filterValue = jobConf.get("filter.value");
322
}
323
324
@Override
325
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)
326
throws IOException {
327
328
// Use configuration in processing
329
if (input.get(filterField).equals(filterValue)) {
330
collector.collect(transformRecord(input));
331
}
332
}
333
}
334
```
335
336
### Multiple Output Types
337
338
```java
339
public class MultiOutputMapper extends AvroMapper<GenericRecord, GenericRecord> {
340
341
@Override
342
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)
343
throws IOException {
344
345
String recordType = input.get("type").toString();
346
347
switch (recordType) {
348
case "user":
349
collector.collect(processUser(input));
350
break;
351
case "event":
352
collector.collect(processEvent(input));
353
break;
354
case "transaction":
355
collector.collect(processTransaction(input));
356
break;
357
}
358
}
359
}
360
```
361
362
### Error Handling
363
364
```java
365
public class RobustMapper extends AvroMapper<GenericRecord, GenericRecord> {
366
367
@Override
368
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)
369
throws IOException {
370
371
try {
372
// Process record
373
GenericRecord output = processRecord(input);
374
collector.collect(output);
375
376
reporter.incrCounter("PROCESSING", "SUCCESS", 1);
377
378
} catch (Exception e) {
379
// Log error and continue processing
380
System.err.println("Failed to process record: " + input + ", error: " + e.getMessage());
381
reporter.incrCounter("PROCESSING", "ERRORS", 1);
382
383
// Optionally emit error record
384
GenericRecord errorRecord = createErrorRecord(input, e);
385
collector.collect(errorRecord);
386
}
387
}
388
}
389
```
390
391
## Performance Considerations
392
393
### Object Reuse
394
395
```java
396
public class EfficientMapper extends AvroMapper<GenericRecord, GenericRecord> {
397
private GenericRecord reusableOutput;
398
399
@Override
400
public void configure(JobConf jobConf) {
401
Schema outputSchema = AvroJob.getMapOutputSchema(jobConf);
402
this.reusableOutput = new GenericData.Record(outputSchema);
403
}
404
405
@Override
406
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter)
407
throws IOException {
408
409
// Reuse output object to reduce GC pressure
410
reusableOutput.put("field1", input.get("field1"));
411
reusableOutput.put("field2", processField(input.get("field2")));
412
413
collector.collect(reusableOutput);
414
}
415
}
416
```
417
418
### Memory Management
419
420
```java
421
public class MemoryEfficientReducer extends AvroReducer<Utf8, GenericRecord, GenericRecord> {
422
423
@Override
424
public void reduce(Utf8 key, Iterable<GenericRecord> values,
425
AvroCollector<GenericRecord> collector, Reporter reporter)
426
throws IOException {
427
428
// Process values in streaming fashion to avoid loading all into memory
429
int count = 0;
430
GenericRecord first = null;
431
432
for (GenericRecord value : values) {
433
if (first == null) {
434
first = GenericData.get().deepCopy(value.getSchema(), value);
435
}
436
count++;
437
438
// Process without accumulating
439
if (count % 1000 == 0) {
440
// Periodically report progress
441
reporter.progress();
442
}
443
}
444
445
// Create summary without holding all values
446
GenericRecord summary = createSummary(key, first, count);
447
collector.collect(summary);
448
}
449
}
450
```
451
452
## Error Handling
453
454
Common issues and solutions:
455
456
- **Schema Mismatch**: Ensure input/output schemas are properly configured via AvroJob
457
- **NullPointerException**: Check for null values in Avro records before processing
458
- **ClassCastException**: Verify data types match schema expectations
459
- **Configuration Errors**: Ensure mapper/reducer classes are properly registered with AvroJob
460
- **Memory Issues**: Use object reuse patterns and streaming processing for large datasets