0
# File Utilities and Storage
1
2
Advanced file handling utilities for sorted key-value files, sequence files, and compression codec integration. These utilities provide efficient storage and retrieval mechanisms for Avro data with specialized support for indexed access, sorted operations, and seamless integration with Hadoop's file system.
3
4
## Capabilities
5
6
### Sorted Key-Value Files
7
8
Indexed Avro container files that support efficient key-based lookups, similar to Hadoop's MapFile but designed specifically for Avro data.
9
10
```java { .api }
11
public class SortedKeyValueFile {
12
// Nested reader class
13
public static class Reader<K,V> implements Closeable {
14
// Constructor
15
public Reader(Options options) throws IOException;
16
17
// Data access
18
public V get(K key) throws IOException;
19
public Iterator<AvroKeyValue<K,V>> iterator() throws IOException;
20
21
// Resource management
22
public void close() throws IOException;
23
24
// Options configuration
25
public static class Options {
26
// Builder pattern methods for configuration
27
public Options withKeySchema(Schema keySchema);
28
public Options withValueSchema(Schema valueSchema);
29
public Options withPath(Path path);
30
public Options withConfiguration(Configuration conf);
31
public Options withDataModel(GenericData dataModel);
32
}
33
}
34
35
// Nested writer class
36
public static class Writer<K,V> implements Closeable {
37
// Constructor
38
public Writer(Options options) throws IOException;
39
40
// Data writing (keys must be in sorted order)
41
public void append(K key, V value) throws IOException;
42
43
// Resource management
44
public void close() throws IOException;
45
46
// Options configuration
47
public static class Options {
48
// Builder pattern methods for configuration
49
public Options withKeySchema(Schema keySchema);
50
public Options withValueSchema(Schema valueSchema);
51
public Options withPath(Path path);
52
public Options withConfiguration(Configuration conf);
53
public Options withDataModel(GenericData dataModel);
54
public Options withCodec(CodecFactory codec);
55
}
56
}
57
}
58
```
59
60
#### Usage Example
61
62
```java
63
import org.apache.avro.hadoop.file.SortedKeyValueFile;
64
import org.apache.avro.Schema;
65
import org.apache.hadoop.fs.Path;
66
67
// Define schemas
68
Schema keySchema = Schema.create(Schema.Type.STRING);
69
Schema valueSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",...}");
70
71
// Write sorted key-value file
72
SortedKeyValueFile.Writer.Options writerOpts = new SortedKeyValueFile.Writer.Options()
73
.withKeySchema(keySchema)
74
.withValueSchema(valueSchema)
75
.withPath(new Path("/data/users.skv"))
76
.withConfiguration(conf)
77
.withCodec(CodecFactory.snappyCodec());
78
79
try (SortedKeyValueFile.Writer<String, GenericRecord> writer =
80
new SortedKeyValueFile.Writer<>(writerOpts)) {
81
82
// Append data in sorted key order
83
writer.append("alice", aliceRecord);
84
writer.append("bob", bobRecord);
85
writer.append("charlie", charlieRecord);
86
}
87
88
// Read from sorted key-value file
89
SortedKeyValueFile.Reader.Options readerOpts = new SortedKeyValueFile.Reader.Options()
90
.withKeySchema(keySchema)
91
.withValueSchema(valueSchema)
92
.withPath(new Path("/data/users.skv"))
93
.withConfiguration(conf);
94
95
try (SortedKeyValueFile.Reader<String, GenericRecord> reader =
96
new SortedKeyValueFile.Reader<>(readerOpts)) {
97
98
// Efficient key lookup
99
GenericRecord user = reader.get("bob");
100
101
// Iterator over all records
102
Iterator<AvroKeyValue<String, GenericRecord>> iter = reader.iterator();
103
while (iter.hasNext()) {
104
AvroKeyValue<String, GenericRecord> kv = iter.next();
105
String key = kv.getKey();
106
GenericRecord value = kv.getValue();
107
// Process key-value pair
108
}
109
}
110
```
111
112
### Avro Sequence Files
113
114
Enhanced Hadoop SequenceFile support with Avro serialization, providing metadata storage and schema information.
115
116
```java { .api }
117
public class AvroSequenceFile {
118
// Metadata field constants
119
public static final Text METADATA_FIELD_KEY_SCHEMA = new Text("key.schema");
120
public static final Text METADATA_FIELD_VALUE_SCHEMA = new Text("value.schema");
121
122
// Writer creation
123
public static SequenceFile.Writer createWriter(Writer.Options options) throws IOException;
124
125
// Nested writer class
126
public static class Writer implements Closeable {
127
// Data writing
128
public void append(AvroWrapper key, AvroWrapper value) throws IOException;
129
public void close() throws IOException;
130
131
// Options configuration
132
public static class Options {
133
public Options withPath(Path path);
134
public Options withKeySchema(Schema keySchema);
135
public Options withValueSchema(Schema valueSchema);
136
public Options withConfiguration(Configuration conf);
137
public Options withCompressionType(SequenceFile.CompressionType compressionType);
138
public Options withCompressionCodec(CompressionCodec codec);
139
public Options withDataModel(GenericData dataModel);
140
}
141
}
142
143
// Nested reader class
144
public static class Reader implements Closeable {
145
// Data reading
146
public boolean next(AvroWrapper key, AvroWrapper value) throws IOException;
147
public void close() throws IOException;
148
149
// Schema access
150
public Schema getKeySchema();
151
public Schema getValueSchema();
152
153
// Options configuration
154
public static class Options {
155
public Options withPath(Path path);
156
public Options withConfiguration(Configuration conf);
157
public Options withDataModel(GenericData dataModel);
158
}
159
}
160
}
161
```
162
163
#### Usage Example
164
165
```java
166
import org.apache.avro.hadoop.io.AvroSequenceFile;
167
import org.apache.avro.mapred.AvroWrapper;
168
import org.apache.hadoop.io.SequenceFile;
169
170
// Write Avro sequence file
171
AvroSequenceFile.Writer.Options writerOpts = new AvroSequenceFile.Writer.Options()
172
.withPath(new Path("/data/sequence.seq"))
173
.withKeySchema(keySchema)
174
.withValueSchema(valueSchema)
175
.withConfiguration(conf)
176
.withCompressionType(SequenceFile.CompressionType.BLOCK);
177
178
try (AvroSequenceFile.Writer writer = new AvroSequenceFile.Writer(writerOpts)) {
179
AvroWrapper<String> key = new AvroWrapper<>();
180
AvroWrapper<GenericRecord> value = new AvroWrapper<>();
181
182
// Write key-value pairs
183
key.datum("key1");
184
value.datum(record1);
185
writer.append(key, value);
186
187
key.datum("key2");
188
value.datum(record2);
189
writer.append(key, value);
190
}
191
192
// Read Avro sequence file
193
AvroSequenceFile.Reader.Options readerOpts = new AvroSequenceFile.Reader.Options()
194
.withPath(new Path("/data/sequence.seq"))
195
.withConfiguration(conf);
196
197
try (AvroSequenceFile.Reader reader = new AvroSequenceFile.Reader(readerOpts)) {
198
Schema keySchema = reader.getKeySchema();
199
Schema valueSchema = reader.getValueSchema();
200
201
AvroWrapper<String> key = new AvroWrapper<>();
202
AvroWrapper<GenericRecord> value = new AvroWrapper<>();
203
204
while (reader.next(key, value)) {
205
String keyData = key.datum();
206
GenericRecord valueData = value.datum();
207
// Process data
208
}
209
}
210
```
211
212
### Hadoop Codec Integration
213
214
Utilities for mapping between Hadoop compression codecs and Avro compression codecs.
215
216
```java { .api }
217
public class HadoopCodecFactory {
218
// Codec mapping methods
219
public static CodecFactory fromHadoopString(String hadoopCodecClass) throws AvroRuntimeException;
220
public static String getAvroCodecName(String hadoopCodecClass);
221
}
222
```
223
224
Supported codec mappings:
225
- `org.apache.hadoop.io.compress.DefaultCodec` → `deflate`
226
- `org.apache.hadoop.io.compress.GzipCodec` → `deflate`
227
- `org.apache.hadoop.io.compress.BZip2Codec` → `bzip2`
228
- `org.apache.hadoop.io.compress.SnappyCodec` → `snappy`
229
- `org.apache.hadoop.io.compress.Lz4Codec` → `xz`
230
- `com.github.luben.zstd.ZstdCodec` → `zstandard`
231
232
#### Usage Example
233
234
```java
235
import org.apache.avro.hadoop.file.HadoopCodecFactory;
236
import org.apache.avro.file.CodecFactory;
237
238
// Convert Hadoop codec class to Avro codec
239
String hadoopCodecClass = "org.apache.hadoop.io.compress.SnappyCodec";
240
CodecFactory avroCodec = HadoopCodecFactory.fromHadoopString(hadoopCodecClass);
241
242
// Get Avro codec name
243
String avroCodecName = HadoopCodecFactory.getAvroCodecName(hadoopCodecClass);
244
// Returns "snappy"
245
246
// Use in file writing
247
SortedKeyValueFile.Writer.Options opts = new SortedKeyValueFile.Writer.Options()
248
.withCodec(avroCodec);
249
```
250
251
### Key-Value Helper Utilities
252
253
Helper classes for working with key-value data structures in Avro format.
254
255
```java { .api }
256
public class AvroKeyValue<K,V> {
257
// Constructor
258
public AvroKeyValue(GenericRecord keyValueRecord);
259
260
// Data access
261
public GenericRecord get();
262
public K getKey();
263
public V getValue();
264
public void setKey(K key);
265
public void setValue(V value);
266
267
// Schema utilities
268
public static Schema getSchema(Schema keySchema, Schema valueSchema);
269
270
// Field name constants
271
public static final String KEY_VALUE_PAIR_RECORD_NAME = "org.apache.avro.mapred.Pair";
272
public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = null;
273
public static final String KEY_FIELD = "key";
274
public static final String VALUE_FIELD = "value";
275
276
// Iterator support
277
public static class Iterator<K,V> implements java.util.Iterator<AvroKeyValue<K,V>> {
278
public Iterator(java.util.Iterator<GenericRecord> records);
279
public boolean hasNext();
280
public AvroKeyValue<K,V> next();
281
public void remove();
282
}
283
}
284
```
285
286
#### Usage Example
287
288
```java
289
import org.apache.avro.hadoop.io.AvroKeyValue;
290
import org.apache.avro.generic.GenericRecord;
291
import org.apache.avro.generic.GenericRecordBuilder;
292
293
// Create key-value schema and record
294
Schema keySchema = Schema.create(Schema.Type.STRING);
295
Schema valueSchema = Schema.create(Schema.Type.INT);
296
Schema kvSchema = AvroKeyValue.getSchema(keySchema, valueSchema);
297
298
GenericRecord kvRecord = new GenericRecordBuilder(kvSchema)
299
.set(AvroKeyValue.KEY_FIELD, "count")
300
.set(AvroKeyValue.VALUE_FIELD, 42)
301
.build();
302
303
// Use helper
304
AvroKeyValue<String, Integer> kv = new AvroKeyValue<>(kvRecord);
305
String key = kv.getKey(); // "count"
306
Integer value = kv.getValue(); // 42
307
308
// Modify values
309
kv.setKey("total");
310
kv.setValue(100);
311
```
312
313
## Integration with MapReduce
314
315
### Input Format Integration
316
317
File utilities integrate with MapReduce input formats:
318
319
```java
320
// Use sorted key-value files as MapReduce input
321
public class SortedKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
322
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(InputSplit split, TaskAttemptContext context) {
323
return new SortedKeyValueRecordReader<>();
324
}
325
}
326
```
327
328
### Output Format Integration
329
330
```java
331
// Write MapReduce output as sorted key-value files
332
public class SortedKeyValueOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
333
public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context) {
334
return new SortedKeyValueRecordWriter<>();
335
}
336
}
337
```
338
339
## Performance Considerations
340
341
### Sorted File Optimization
342
343
```java
344
// Configure appropriate options for performance
345
SortedKeyValueFile.Writer.Options opts = new SortedKeyValueFile.Writer.Options()
346
.withKeySchema(keySchema)
347
.withValueSchema(valueSchema)
348
.withPath(path)
349
.withCodec(CodecFactory.snappyCodec()) // Use compression
350
.withConfiguration(conf);
351
352
// Ensure keys are pre-sorted for optimal performance
353
// SortedKeyValueFile requires keys to be in sorted order
354
```
355
356
### Memory Management
357
358
```java
359
// Use try-with-resources for proper resource management
360
try (SortedKeyValueFile.Reader<String, GenericRecord> reader =
361
new SortedKeyValueFile.Reader<>(readerOpts)) {
362
// Use reader
363
} // Automatically closed
364
365
// Reuse objects in loops
366
AvroKeyValue<String, GenericRecord> reusableKV = null;
367
Iterator<AvroKeyValue<String, GenericRecord>> iter = reader.iterator();
368
while (iter.hasNext()) {
369
reusableKV = iter.next(); // May reuse object
370
// Process reusableKV
371
}
372
```
373
374
### Compression Strategy
375
376
```java
377
// Choose appropriate compression based on use case
378
CodecFactory snappy = CodecFactory.snappyCodec(); // Fast compression/decompression
379
CodecFactory deflate = CodecFactory.deflateCodec(6); // Better compression ratio
380
CodecFactory bzip2 = CodecFactory.bzip2Codec(); // Highest compression ratio
381
382
// Configure based on workload
383
SortedKeyValueFile.Writer.Options opts = new SortedKeyValueFile.Writer.Options()
384
.withCodec(snappy); // Good balance of speed and compression
385
```
386
387
## Error Handling
388
389
Common issues and solutions:
390
391
### Sorted File Issues
392
393
- **Unsorted Keys**: SortedKeyValueFile requires keys in sorted order during writing
394
- **Schema Mismatch**: Ensure reader and writer use compatible schemas
395
- **Index Corruption**: Verify file integrity if key lookups fail
396
397
### Sequence File Issues
398
399
- **Codec Not Available**: Ensure compression codecs are available on all nodes
400
- **Schema Missing**: Verify schemas are stored in sequence file metadata
401
- **Version Compatibility**: Check Hadoop version compatibility
402
403
### Resource Management
404
405
```java
406
// Always use try-with-resources or explicit cleanup
407
try (SortedKeyValueFile.Writer<String, GenericRecord> writer =
408
new SortedKeyValueFile.Writer<>(opts)) {
409
// Use writer
410
} catch (IOException e) {
411
// Handle errors
412
log.error("Failed to write sorted key-value file", e);
413
throw e;
414
}
415
```
416
417
### Exception Handling
418
419
```java
420
try {
421
CodecFactory codec = HadoopCodecFactory.fromHadoopString("unknown.codec");
422
} catch (AvroRuntimeException e) {
423
// Handle unsupported codec
424
log.warn("Unsupported codec, falling back to default", e);
425
CodecFactory codec = CodecFactory.deflateCodec();
426
}
427
```