0
# Serialization and I/O Infrastructure
1
2
Core serialization framework that integrates Avro with Hadoop's serialization system, providing efficient data exchange, schema management, and seamless conversion between Java objects and Avro data formats within MapReduce pipelines.
3
4
## Capabilities
5
6
### Hadoop Serialization Integration
7
8
Main serialization class that registers Avro data types with Hadoop's serialization framework.
9
10
```java { .api }
11
public class AvroSerialization<T> implements Serialization<AvroWrapper<T>> {
12
// Core serialization interface
13
public boolean accept(Class<?> c);
14
public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c);
15
public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c);
16
17
// Configuration management
18
public static void addToConfiguration(Configuration conf);
19
20
// Schema configuration
21
public static void setKeyWriterSchema(Configuration conf, Schema schema);
22
public static void setKeyReaderSchema(Configuration conf, Schema schema);
23
public static void setValueWriterSchema(Configuration conf, Schema schema);
24
public static void setValueReaderSchema(Configuration conf, Schema schema);
25
26
// Schema retrieval
27
public static Schema getKeyWriterSchema(Configuration conf);
28
public static Schema getKeyReaderSchema(Configuration conf);
29
public static Schema getValueWriterSchema(Configuration conf);
30
public static Schema getValueReaderSchema(Configuration conf);
31
32
// Data model support
33
public static GenericData createDataModel(Configuration conf);
34
}
35
```
36
37
#### Usage Example
38
39
```java
40
import org.apache.avro.hadoop.io.AvroSerialization;
41
import org.apache.hadoop.conf.Configuration;
42
43
// Enable Avro serialization
44
Configuration conf = new Configuration();
45
AvroSerialization.addToConfiguration(conf);
46
47
// Configure schemas
48
Schema userSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",...}");
49
AvroSerialization.setKeyWriterSchema(conf, userSchema);
50
AvroSerialization.setValueWriterSchema(conf, userSchema);
51
52
// Retrieve configured schemas
53
Schema keySchema = AvroSerialization.getKeyWriterSchema(conf);
54
Schema valueSchema = AvroSerialization.getValueWriterSchema(conf);
55
```
56
57
### Avro Serializer
58
59
Serializer implementation for converting AvroWrapper objects to binary format.
60
61
```java { .api }
62
public class AvroSerializer<T> implements Serializer<AvroWrapper<T>> {
63
// Constructors
64
public AvroSerializer(Schema writerSchema);
65
public AvroSerializer(Schema writerSchema, DatumWriter<T> datumWriter);
66
67
// Configuration
68
public Schema getWriterSchema();
69
70
// Serialization lifecycle
71
public void open(OutputStream outputStream) throws IOException;
72
public void serialize(AvroWrapper<T> avroWrapper) throws IOException;
73
public void close() throws IOException;
74
}
75
```
76
77
#### Usage Example
78
79
```java
80
import org.apache.avro.hadoop.io.AvroSerializer;
81
import org.apache.avro.mapred.AvroWrapper;
82
import java.io.ByteArrayOutputStream;
83
84
// Create serializer
85
Schema schema = Schema.parse("...");
86
AvroSerializer<GenericRecord> serializer = new AvroSerializer<>(schema);
87
88
// Serialize data
89
ByteArrayOutputStream out = new ByteArrayOutputStream();
90
serializer.open(out);
91
92
AvroWrapper<GenericRecord> wrapper = new AvroWrapper<>(record);
93
serializer.serialize(wrapper);
94
serializer.close();
95
96
byte[] serializedData = out.toByteArray();
97
```
98
99
### Avro Deserializer
100
101
Base deserializer class for converting binary data back to AvroWrapper objects.
102
103
```java { .api }
104
public abstract class AvroDeserializer<T extends AvroWrapper<D>,D> implements Deserializer<T> {
105
// Schema access
106
public Schema getWriterSchema();
107
public Schema getReaderSchema();
108
109
// Deserialization lifecycle
110
public void open(InputStream inputStream) throws IOException;
111
public abstract T deserialize(T avroWrapperToReuse) throws IOException;
112
public void close() throws IOException;
113
}
114
115
public class AvroKeyDeserializer<D> extends AvroDeserializer<AvroKey<D>, D> {
116
public AvroKey<D> deserialize(AvroKey<D> avroWrapperToReuse) throws IOException;
117
}
118
119
public class AvroValueDeserializer<D> extends AvroDeserializer<AvroValue<D>, D> {
120
public AvroValue<D> deserialize(AvroValue<D> avroWrapperToReuse) throws IOException;
121
}
122
```
123
124
#### Usage Example
125
126
```java
127
import org.apache.avro.hadoop.io.AvroKeyDeserializer;
128
import org.apache.avro.mapred.AvroKey;
129
import java.io.ByteArrayInputStream;
130
131
// Create deserializer
132
AvroKeyDeserializer<GenericRecord> deserializer = new AvroKeyDeserializer<>();
133
134
// Deserialize data
135
ByteArrayInputStream in = new ByteArrayInputStream(serializedData);
136
deserializer.open(in);
137
138
AvroKey<GenericRecord> key = new AvroKey<>();
139
AvroKey<GenericRecord> result = deserializer.deserialize(key);
140
deserializer.close();
141
142
GenericRecord record = result.datum();
143
```
144
145
### Data Conversion Framework
146
147
Framework for converting between different data formats and Avro.
148
149
```java { .api }
150
public abstract class AvroDatumConverter<INPUT,OUTPUT> {
151
// Core conversion method
152
public abstract OUTPUT convert(INPUT input);
153
154
// Schema information
155
public abstract Schema getWriterSchema();
156
}
157
158
public class AvroDatumConverterFactory {
159
// Constructor
160
public AvroDatumConverterFactory(Configuration conf);
161
162
// Factory method
163
public <IN,OUT> AvroDatumConverter<IN,OUT> create(Class<IN> inputClass);
164
}
165
```
166
167
The factory includes built-in converters for common Hadoop types:
168
169
- `WritableConverter`: Converts Hadoop Writable objects to Avro
170
- `TextConverter`: Converts Text objects to Avro strings
171
- `LongWritableConverter`: Converts LongWritable to Avro long
172
- `IntWritableConverter`: Converts IntWritable to Avro int
173
- `DoubleWritableConverter`: Converts DoubleWritable to Avro double
174
- `FloatWritableConverter`: Converts FloatWritable to Avro float
175
- `BooleanWritableConverter`: Converts BooleanWritable to Avro boolean
176
- `BytesWritableConverter`: Converts BytesWritable to Avro bytes
177
178
#### Usage Example
179
180
```java
181
import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
182
import org.apache.avro.hadoop.io.AvroDatumConverter;
183
import org.apache.hadoop.io.Text;
184
185
// Create converter factory
186
Configuration conf = new Configuration();
187
AvroDatumConverterFactory factory = new AvroDatumConverterFactory(conf);
188
189
// Get converter for Text to Avro string
190
AvroDatumConverter<Text, Utf8> converter = factory.create(Text.class);
191
192
// Convert data
193
Text input = new Text("Hello, World!");
194
Utf8 avroString = converter.convert(input);
195
Schema schema = converter.getWriterSchema();
196
```
197
198
### Key Comparators
199
200
Specialized comparators for Avro data that support both object and raw byte comparison.
201
202
```java { .api }
203
public class AvroKeyComparator<T> implements RawComparator<AvroKey<T>>, Configurable {
204
// Object comparison
205
public int compare(AvroKey<T> x, AvroKey<T> y);
206
207
// Raw byte comparison (for efficiency)
208
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
209
210
// Configuration
211
public void setConf(Configuration conf);
212
public Configuration getConf();
213
}
214
```
215
216
#### Usage Example
217
218
```java
219
import org.apache.avro.hadoop.io.AvroKeyComparator;
220
import org.apache.avro.mapred.AvroKey;
221
import org.apache.hadoop.conf.Configuration;
222
223
// Create and configure comparator
224
AvroKeyComparator<GenericRecord> comparator = new AvroKeyComparator<>();
225
Configuration conf = new Configuration();
226
AvroSerialization.setKeyWriterSchema(conf, schema);
227
comparator.setConf(conf);
228
229
// Compare objects
230
AvroKey<GenericRecord> key1 = new AvroKey<>(record1);
231
AvroKey<GenericRecord> key2 = new AvroKey<>(record2);
232
int result = comparator.compare(key1, key2);
233
```
234
235
### Legacy API Serialization
236
237
Serialization support specifically for the legacy MapReduce API.
238
239
```java { .api }
240
public class org.apache.avro.mapred.AvroSerialization implements Serialization<AvroWrapper> {
241
// Legacy implementation for org.apache.hadoop.mapred compatibility
242
public boolean accept(Class<?> c);
243
public Deserializer<AvroWrapper> getDeserializer(Class<AvroWrapper> c);
244
public Serializer<AvroWrapper> getSerializer(Class<AvroWrapper> c);
245
}
246
```
247
248
## Schema Evolution Support
249
250
The serialization framework supports Avro's schema evolution capabilities:
251
252
### Writer and Reader Schemas
253
254
```java
255
// Set different schemas for writing and reading
256
AvroSerialization.setKeyWriterSchema(conf, writerSchema);
257
AvroSerialization.setKeyReaderSchema(conf, readerSchema);
258
259
// Data written with writer schema will be automatically converted to reader schema
260
```
261
262
### Evolution Rules
263
264
The serialization framework follows Avro's schema evolution rules:
265
- **Forward Compatibility**: New schema can read data written with old schema
266
- **Backward Compatibility**: Old schema can read data written with new schema
267
- **Full Compatibility**: Both forward and backward compatible
268
269
#### Example
270
271
```java
272
// Original schema (version 1)
273
Schema v1Schema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}");
274
275
// Evolved schema (version 2) with new field
276
Schema v2Schema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null}]}");
277
278
// Configure for evolution
279
AvroSerialization.setKeyWriterSchema(conf, v1Schema); // Data was written with v1
280
AvroSerialization.setKeyReaderSchema(conf, v2Schema); // Read with v2 (email will be null)
281
```
282
283
## Data Model Integration
284
285
Support for different Avro data models:
286
287
### Generic Data Model
288
289
```java
290
import org.apache.avro.generic.GenericData;
291
292
// Use generic data model (default)
293
AvroSerialization.setDataModelClass(conf, GenericData.class);
294
GenericData dataModel = AvroSerialization.createDataModel(conf);
295
```
296
297
### Specific Data Model
298
299
```java
300
import org.apache.avro.specific.SpecificData;
301
302
// Use specific data model for generated classes
303
AvroSerialization.setDataModelClass(conf, SpecificData.class);
304
```
305
306
### Reflect Data Model
307
308
```java
309
import org.apache.avro.reflect.ReflectData;
310
311
// Use reflection data model for POJOs
312
AvroSerialization.setDataModelClass(conf, ReflectData.class);
313
```
314
315
## Performance Optimization
316
317
### Object Reuse
318
319
Serializers and deserializers support object reuse for better performance:
320
321
```java
322
// Reuse wrapper objects
323
AvroKey<GenericRecord> reusableKey = new AvroKey<>();
324
AvroValue<GenericRecord> reusableValue = new AvroValue<>();
325
326
// Deserializers will reuse these objects
327
AvroKey<GenericRecord> result = deserializer.deserialize(reusableKey);
328
```
329
330
### Raw Byte Comparison
331
332
Use raw byte comparison for sorting without deserialization:
333
334
```java
335
// Raw comparator avoids object deserialization for sorting
336
AvroKeyComparator<GenericRecord> comparator = new AvroKeyComparator<>();
337
// Hadoop will use raw byte comparison when possible
338
```
339
340
### Memory Management
341
342
```java
343
// Proper resource management
344
try (AvroSerializer<GenericRecord> serializer = new AvroSerializer<>(schema)) {
345
serializer.open(outputStream);
346
// Use serializer
347
} // Automatically closed
348
```
349
350
## Configuration Best Practices
351
352
### Schema Management
353
354
```java
355
// Store schemas in configuration for access across tasks
356
AvroSerialization.setKeyWriterSchema(conf, keySchema);
357
AvroSerialization.setValueWriterSchema(conf, valueSchema);
358
359
// Retrieve schemas where needed
360
Schema keySchema = AvroSerialization.getKeyWriterSchema(conf);
361
```
362
363
### Serialization Registration
364
365
```java
366
// Always add Avro serialization to configuration
367
AvroSerialization.addToConfiguration(conf);
368
369
// Verify serialization is properly configured
370
String[] serializations = conf.getStrings("io.serializations");
371
// Should include "org.apache.avro.hadoop.io.AvroSerialization"
372
```
373
374
## Error Handling
375
376
Common serialization issues and solutions:
377
378
- **Schema Not Found**: Ensure schemas are configured before creating serializers/deserializers
379
- **Schema Evolution Errors**: Verify schema compatibility using Avro's SchemaCompatibility class
380
- **Memory Issues**: Use object reuse patterns for high-throughput scenarios
381
- **Configuration Errors**: Verify AvroSerialization is added to configuration
382
- **Codec Issues**: Ensure compression codecs are available on all cluster nodes