0
# Data Wrappers
1
2
Wrapper classes that integrate Avro data with Hadoop's MapReduce framework. These classes provide schema-aware serialization, comparison, and seamless integration with MapReduce's key-value processing model while preserving Avro's type safety and schema evolution capabilities.
3
4
## Capabilities
5
6
### Base Wrapper Class
7
8
The foundational wrapper class for Avro data in MapReduce. Serialization is handled by AvroSerialization framework rather than implementing WritableComparable directly.
9
10
```java { .api }
11
public class AvroWrapper<T> {
12
// Constructors
13
public AvroWrapper();
14
public AvroWrapper(T datum);
15
16
// Data access
17
public T datum();
18
public void datum(T datum);
19
20
// Object methods
21
public int hashCode();
22
public boolean equals(Object obj);
23
public String toString();
24
}
25
```
26
27
#### Usage Example
28
29
```java
30
import org.apache.avro.mapred.AvroWrapper;
31
import org.apache.avro.generic.GenericRecord;
32
33
// Create wrapper with data
34
GenericRecord user = ...; // Avro record
35
AvroWrapper<GenericRecord> wrapper = new AvroWrapper<>(user);
36
37
// Access wrapped data
38
GenericRecord data = wrapper.datum();
39
40
// Modify wrapped data
41
GenericRecord newUser = ...;
42
wrapper.datum(newUser);
43
```
44
45
### Key Wrapper
46
47
Specialized wrapper for MapReduce keys containing Avro data.
48
49
```java { .api }
50
public class AvroKey<T> extends AvroWrapper<T> {
51
// Constructors
52
public AvroKey();
53
public AvroKey(T datum);
54
}
55
```
56
57
#### Usage Example
58
59
```java
60
import org.apache.avro.mapred.AvroKey;
61
import org.apache.avro.util.Utf8;
62
63
// Create key wrapper
64
AvroKey<Utf8> key = new AvroKey<>(new Utf8("user123"));
65
66
// Use in mapper/reducer
67
public class MyMapper extends AvroMapper<GenericRecord, GenericRecord> {
68
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) {
69
AvroKey<Utf8> key = new AvroKey<>(new Utf8(input.get("id").toString()));
70
// Process...
71
}
72
}
73
```
74
75
### Value Wrapper
76
77
Specialized wrapper for MapReduce values containing Avro data.
78
79
```java { .api }
80
public class AvroValue<T> extends AvroWrapper<T> {
81
// Constructors
82
public AvroValue();
83
public AvroValue(T datum);
84
}
85
```
86
87
#### Usage Example
88
89
```java
90
import org.apache.avro.mapred.AvroValue;
91
import org.apache.avro.generic.GenericRecord;
92
93
// Create value wrapper
94
GenericRecord userData = ...;
95
AvroValue<GenericRecord> value = new AvroValue<>(userData);
96
97
// Use in new MapReduce API
98
public class MyMapper extends Mapper<AvroKey<String>, AvroValue<GenericRecord>, Text, IntWritable> {
99
public void map(AvroKey<String> key, AvroValue<GenericRecord> value, Context context) {
100
GenericRecord record = value.datum();
101
// Process record...
102
}
103
}
104
```
105
106
### Key-Value Pairs
107
108
Schema-aware key-value pair implementation that integrates with Avro's record system.
109
110
```java { .api }
111
public class Pair<K,V> implements IndexedRecord, Comparable<Pair>, SchemaConstructable {
112
// Primary constructors
113
public Pair(Schema schema);
114
public Pair(K key, Schema keySchema, V value, Schema valueSchema);
115
public Pair(Object key, Object value);
116
// Multiple type-specific convenience constructors available for various type combinations
117
118
// Data access
119
public K key();
120
public void key(K key);
121
public V value();
122
public void value(V value);
123
public void set(K key, V value);
124
125
// Schema operations
126
public Schema getSchema();
127
public static Schema getPairSchema(Schema key, Schema value);
128
public static Schema getKeySchema(Schema pair);
129
public static Schema getValueSchema(Schema pair);
130
131
// IndexedRecord interface
132
public Object get(int i);
133
public void put(int i, Object v);
134
135
// Comparable interface
136
public int compareTo(Pair o);
137
138
// Object methods
139
public boolean equals(Object obj);
140
public int hashCode();
141
public String toString();
142
}
143
```
144
145
#### Usage Example
146
147
```java
148
import org.apache.avro.mapred.Pair;
149
import org.apache.avro.Schema;
150
import org.apache.avro.util.Utf8;
151
152
// Create pair with explicit schemas
153
Schema keySchema = Schema.create(Schema.Type.STRING);
154
Schema valueSchema = Schema.create(Schema.Type.INT);
155
Schema pairSchema = Pair.getPairSchema(keySchema, valueSchema);
156
157
Pair<Utf8, Integer> pair = new Pair<>(pairSchema);
158
pair.set(new Utf8("count"), 42);
159
160
// Or create with data directly
161
Pair<Utf8, Integer> pair2 = new Pair<>(new Utf8("total"), 100);
162
163
// Access data
164
Utf8 key = pair.key();
165
Integer value = pair.value();
166
167
// Use in MapReduce
168
public class WordCountMapper extends AvroMapper<Utf8, Pair<Utf8, Integer>> {
169
public void map(Utf8 word, AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter) {
170
collector.collect(new Pair<>(word, 1));
171
}
172
}
173
```
174
175
### Key-Value Record Helper
176
177
Helper class for working with key-value pairs stored in Avro generic records.
178
179
```java { .api }
180
public class AvroKeyValue<K,V> {
181
// Constructor
182
public AvroKeyValue(GenericRecord keyValueRecord);
183
184
// Data access
185
public GenericRecord get();
186
public K getKey();
187
public V getValue();
188
public void setKey(K key);
189
public void setValue(V value);
190
191
// Schema utilities
192
public static Schema getSchema(Schema keySchema, Schema valueSchema);
193
194
// Constants
195
public static final String KEY_VALUE_PAIR_RECORD_NAME = "org.apache.avro.mapred.Pair";
196
public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = null;
197
public static final String KEY_FIELD = "key";
198
public static final String VALUE_FIELD = "value";
199
}
200
```
201
202
#### Usage Example
203
204
```java
205
import org.apache.avro.hadoop.io.AvroKeyValue;
206
import org.apache.avro.generic.GenericRecord;
207
import org.apache.avro.generic.GenericRecordBuilder;
208
209
// Create key-value schema
210
Schema keySchema = Schema.create(Schema.Type.STRING);
211
Schema valueSchema = Schema.create(Schema.Type.LONG);
212
Schema kvSchema = AvroKeyValue.getSchema(keySchema, valueSchema);
213
214
// Create record
215
GenericRecord record = new GenericRecordBuilder(kvSchema)
216
.set("key", "user_id")
217
.set("value", 12345L)
218
.build();
219
220
// Wrap in helper
221
AvroKeyValue<String, Long> kv = new AvroKeyValue<>(record);
222
223
// Access data
224
String key = kv.getKey(); // "user_id"
225
Long value = kv.getValue(); // 12345L
226
227
// Modify data
228
kv.setKey("session_id");
229
kv.setValue(67890L);
230
```
231
232
### Iterator Support
233
234
Iterator for processing collections of key-value pairs.
235
236
```java { .api }
237
public static class AvroKeyValue.Iterator<K,V> implements Iterator<AvroKeyValue<K,V>> {
238
public Iterator(Iterator<GenericRecord> records);
239
public boolean hasNext();
240
public AvroKeyValue<K,V> next();
241
public void remove();
242
}
243
```
244
245
#### Usage Example
246
247
```java
248
import org.apache.avro.hadoop.io.AvroKeyValue;
249
import java.util.List;
250
import java.util.Iterator;
251
252
// Process multiple key-value records
253
List<GenericRecord> records = ...; // List of key-value records
254
AvroKeyValue.Iterator<String, Integer> iterator =
255
new AvroKeyValue.Iterator<>(records.iterator());
256
257
while (iterator.hasNext()) {
258
AvroKeyValue<String, Integer> kv = iterator.next();
259
String key = kv.getKey();
260
Integer value = kv.getValue();
261
// Process key-value pair...
262
}
263
```
264
265
## Integration Patterns
266
267
### With MapReduce Input/Output Formats
268
269
Data wrappers integrate seamlessly with Avro input and output formats:
270
271
```java
272
// Legacy API - uses AvroWrapper directly
273
public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
274
// Returns AvroWrapper<T> as keys
275
}
276
277
// New API - uses specific wrapper types
278
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
279
// Returns AvroKey<T> as keys
280
}
281
282
public class AvroKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
283
// Returns AvroKey<K> and AvroValue<V>
284
}
285
```
286
287
### With Serialization Framework
288
289
Wrappers work with Hadoop's serialization system:
290
291
```java
292
import org.apache.avro.hadoop.io.AvroSerialization;
293
294
// Configure serialization
295
Configuration conf = new Configuration();
296
AvroSerialization.addToConfiguration(conf);
297
298
// Wrappers are automatically serialized using Avro serialization
299
AvroKey<GenericRecord> key = new AvroKey<>(record);
300
// Hadoop serialization framework handles the rest
301
```
302
303
### Schema Evolution Support
304
305
Wrappers support Avro schema evolution:
306
307
```java
308
// Writer schema (data was written with this schema)
309
Schema writerSchema = Schema.parse("...");
310
311
// Reader schema (current application schema)
312
Schema readerSchema = Schema.parse("...");
313
314
// Wrappers automatically handle schema evolution during deserialization
315
AvroKey<GenericRecord> key = new AvroKey<>();
316
// When deserialized, data is converted from writer to reader schema
317
```
318
319
## Performance Considerations
320
321
### Object Reuse
322
323
Wrappers support object reuse for better performance:
324
325
```java
326
// Reuse wrapper objects in hot code paths
327
AvroKey<GenericRecord> reusableKey = new AvroKey<>();
328
AvroValue<GenericRecord> reusableValue = new AvroValue<>();
329
330
for (GenericRecord record : records) {
331
reusableKey.datum(record);
332
reusableValue.datum(record);
333
// Process without creating new wrapper objects
334
}
335
```
336
337
### Memory Management
338
339
```java
340
// Clear references when done to help GC
341
wrapper.datum(null);
342
```
343
344
## Error Handling
345
346
Common issues and solutions:
347
348
- **NullPointerException**: Ensure datum is set before accessing
349
- **Schema Mismatch**: Verify schema compatibility when deserializing
350
- **ClassCastException**: Ensure type parameters match actual data types
351
- **Serialization Errors**: Verify AvroSerialization is configured properly