0
# Data Format System
1
2
Pluggable format system for converting data between different representations, with built-in support for common formats and extensible architecture for custom formats. The format system provides schema-aware data transformation capabilities essential for data ingestion, processing, and output in various formats.
3
4
## Capabilities
5
6
### Format Specification
7
8
Define format configurations including name, schema, and custom settings for data transformation.
9
10
```java { .api }
11
/**
12
* Specification for record format including schema and settings
13
*/
14
public final class FormatSpecification {
15
/**
16
* Create format specification with schema and settings
17
* @param name Format name
18
* @param schema Data schema (nullable)
19
* @param settings Format-specific settings (nullable, defaults to empty map)
20
*/
21
public FormatSpecification(String name, Schema schema, Map<String, String> settings);
22
23
/**
24
* Create format specification with schema only
25
* @param name Format name
26
* @param schema Data schema (nullable)
27
*/
28
public FormatSpecification(String name, Schema schema);
29
30
/**
31
* Get format name
32
* @return Format name
33
*/
34
public String getName();
35
36
/**
37
* Get format schema
38
* @return Schema or null if not specified
39
*/
40
public Schema getSchema();
41
42
/**
43
* Get format settings
44
* @return Immutable map of settings (never null)
45
*/
46
public Map<String, String> getSettings();
47
}
48
```
49
50
**Usage Examples:**
51
52
```java
53
// Basic format specification
54
Schema userSchema = Schema.recordOf("User",
55
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
56
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
57
Schema.Field.of("email", Schema.of(Schema.Type.STRING))
58
);
59
60
FormatSpecification jsonFormat = new FormatSpecification("json", userSchema);
61
62
// Format with custom settings
63
Map<String, String> csvSettings = new HashMap<>();
64
csvSettings.put("delimiter", ",");
65
csvSettings.put("header", "true");
66
csvSettings.put("escape", "\"");
67
68
FormatSpecification csvFormat = new FormatSpecification("csv", userSchema, csvSettings);
69
70
// Access specification properties
71
String formatName = csvFormat.getName(); // "csv"
72
Schema formatSchema = csvFormat.getSchema(); // userSchema
73
String delimiter = csvFormat.getSettings().get("delimiter"); // ","
74
```
75
76
### Abstract Record Format
77
78
Base class for implementing custom data format converters with schema validation and configuration.
79
80
```java { .api }
81
/**
82
* Abstract base class for data format conversion
83
* @param <FROM> Input data type
84
* @param <TO> Output data type
85
*/
86
public abstract class RecordFormat<FROM, TO> {
87
/**
88
* Read input data and convert to output format
89
* @param input Input data to transform
90
* @return Transformed data
91
* @throws UnexpectedFormatException if input cannot be processed
92
*/
93
public abstract TO read(FROM input) throws UnexpectedFormatException;
94
95
/**
96
* Get default schema for this format (if any)
97
* @return Default schema or null if schema must be provided
98
*/
99
protected abstract Schema getDefaultSchema();
100
101
/**
102
* Validate schema compatibility with this format
103
* @param schema Schema to validate (guaranteed non-null record with ≥1 field)
104
* @throws UnsupportedTypeException if schema not supported
105
*/
106
protected abstract void validateSchema(Schema schema) throws UnsupportedTypeException;
107
108
/**
109
* Initialize format with specification
110
* @param formatSpecification Format configuration (nullable)
111
* @throws UnsupportedTypeException if specification not supported
112
*/
113
public void initialize(FormatSpecification formatSpecification) throws UnsupportedTypeException;
114
115
/**
116
* Configure format with settings (called after schema is set)
117
* @param settings Configuration settings
118
*/
119
protected void configure(Map<String, String> settings);
120
121
/**
122
* Get current format schema
123
* @return Current schema (available after initialization)
124
*/
125
public Schema getSchema();
126
}
127
```
128
129
**Usage Examples:**
130
131
```java
132
// Example CSV format implementation
133
public class CsvRecordFormat extends RecordFormat<String, StructuredRecord> {
134
private String delimiter = ",";
135
private boolean hasHeader = false;
136
137
@Override
138
public StructuredRecord read(String csvLine) throws UnexpectedFormatException {
139
if (csvLine == null || csvLine.trim().isEmpty()) {
140
throw new UnexpectedFormatException("Empty CSV line");
141
}
142
143
String[] values = csvLine.split(delimiter);
144
Schema schema = getSchema();
145
List<Schema.Field> fields = schema.getFields();
146
147
if (values.length != fields.size()) {
148
throw new UnexpectedFormatException("CSV field count mismatch");
149
}
150
151
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
152
for (int i = 0; i < fields.size(); i++) {
153
Schema.Field field = fields.get(i);
154
builder.convertAndSet(field.getName(), values[i]);
155
}
156
157
return builder.build();
158
}
159
160
@Override
161
protected Schema getDefaultSchema() {
162
// CSV has no default schema - must be provided
163
return null;
164
}
165
166
@Override
167
protected void validateSchema(Schema schema) throws UnsupportedTypeException {
168
// CSV only supports simple types
169
for (Schema.Field field : schema.getFields()) {
170
Schema fieldSchema = field.getSchema();
171
if (fieldSchema.isNullable()) {
172
fieldSchema = fieldSchema.getNonNullable();
173
}
174
if (!fieldSchema.getType().isSimpleType()) {
175
throw new UnsupportedTypeException("CSV format only supports simple types");
176
}
177
}
178
}
179
180
@Override
181
protected void configure(Map<String, String> settings) {
182
delimiter = settings.getOrDefault("delimiter", ",");
183
hasHeader = Boolean.parseBoolean(settings.getOrDefault("header", "false"));
184
}
185
}
186
187
// Usage of custom format
188
CsvRecordFormat csvFormat = new CsvRecordFormat();
189
190
Schema productSchema = Schema.recordOf("Product",
191
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
192
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
193
Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE))
194
);
195
196
Map<String, String> csvSettings = new HashMap<>();
197
csvSettings.put("delimiter", ",");
198
csvSettings.put("header", "true");
199
200
FormatSpecification spec = new FormatSpecification("csv", productSchema, csvSettings);
201
csvFormat.initialize(spec);
202
203
// Process CSV data
204
String csvLine = "12345,Widget,29.99";
205
StructuredRecord product = csvFormat.read(csvLine);
206
207
Long id = product.get("id"); // 12345L
208
String name = product.get("name"); // "Widget"
209
Double price = product.get("price"); // 29.99
210
```
211
212
### Built-in Format Constants
213
214
Pre-defined format names for common data formats supported by the platform.
215
216
```java { .api }
217
/**
218
* Constants for built-in record format names
219
*/
220
public final class Formats {
221
public static final String AVRO = "avro";
222
public static final String CSV = "csv";
223
public static final String TSV = "tsv";
224
public static final String TEXT = "text";
225
public static final String COMBINED_LOG_FORMAT = "clf";
226
public static final String GROK = "grok";
227
public static final String SYSLOG = "syslog";
228
229
/**
230
* Array of all built-in format names
231
*/
232
public static final String[] ALL = {
233
AVRO, CSV, TSV, TEXT, COMBINED_LOG_FORMAT, GROK, SYSLOG
234
};
235
}
236
```
237
238
**Usage Examples:**
239
240
```java
241
// Using built-in format constants
242
FormatSpecification avroSpec = new FormatSpecification(Formats.AVRO, schema);
243
FormatSpecification csvSpec = new FormatSpecification(Formats.CSV, schema);
244
245
// Check if format is supported
246
String formatName = "json";
247
boolean isBuiltIn = Arrays.asList(Formats.ALL).contains(formatName);
248
249
// Iterate through all supported formats
250
for (String format : Formats.ALL) {
251
System.out.println("Supported format: " + format);
252
}
253
```
254
255
### Format Factory Pattern
256
257
Common pattern for managing multiple format implementations.
258
259
```java
260
public class FormatFactory {
261
private final Map<String, Class<? extends RecordFormat<?, ?>>> formatRegistry;
262
263
public FormatFactory() {
264
formatRegistry = new HashMap<>();
265
registerBuiltInFormats();
266
}
267
268
private void registerBuiltInFormats() {
269
formatRegistry.put(Formats.CSV, CsvRecordFormat.class);
270
formatRegistry.put(Formats.TSV, TsvRecordFormat.class);
271
formatRegistry.put(Formats.TEXT, TextRecordFormat.class);
272
// Register other built-in formats
273
}
274
275
public <FROM, TO> RecordFormat<FROM, TO> createFormat(String formatName)
276
throws InstantiationException, IllegalAccessException {
277
Class<? extends RecordFormat<?, ?>> formatClass = formatRegistry.get(formatName);
278
if (formatClass == null) {
279
throw new IllegalArgumentException("Unknown format: " + formatName);
280
}
281
282
@SuppressWarnings("unchecked")
283
RecordFormat<FROM, TO> format = (RecordFormat<FROM, TO>) formatClass.newInstance();
284
return format;
285
}
286
287
public void registerFormat(String name, Class<? extends RecordFormat<?, ?>> formatClass) {
288
formatRegistry.put(name, formatClass);
289
}
290
291
public Set<String> getSupportedFormats() {
292
return formatRegistry.keySet();
293
}
294
}
295
296
// Usage
297
FormatFactory factory = new FormatFactory();
298
RecordFormat<String, StructuredRecord> csvFormat = factory.createFormat(Formats.CSV);
299
```
300
301
### Advanced Format Implementations
302
303
Example implementations for common data processing scenarios.
304
305
**JSON Format Implementation:**
306
307
```java
308
import com.google.gson.Gson;
309
import com.google.gson.JsonObject;
310
import com.google.gson.JsonParser;
311
312
public class JsonRecordFormat extends RecordFormat<String, StructuredRecord> {
313
private final Gson gson = new Gson();
314
private final JsonParser parser = new JsonParser();
315
316
@Override
317
public StructuredRecord read(String jsonString) throws UnexpectedFormatException {
318
try {
319
JsonObject jsonObject = parser.parse(jsonString).getAsJsonObject();
320
Schema schema = getSchema();
321
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
322
323
for (Schema.Field field : schema.getFields()) {
324
String fieldName = field.getName();
325
if (jsonObject.has(fieldName)) {
326
Object value = extractJsonValue(jsonObject.get(fieldName), field.getSchema());
327
builder.set(fieldName, value);
328
}
329
}
330
331
return builder.build();
332
} catch (Exception e) {
333
throw new UnexpectedFormatException("Invalid JSON: " + jsonString, e);
334
}
335
}
336
337
private Object extractJsonValue(com.google.gson.JsonElement element, Schema schema) {
338
if (element.isJsonNull()) {
339
return null;
340
}
341
342
Schema.Type type = schema.getType();
343
if (schema.isNullable()) {
344
type = schema.getNonNullable().getType();
345
}
346
347
switch (type) {
348
case STRING:
349
return element.getAsString();
350
case INT:
351
return element.getAsInt();
352
case LONG:
353
return element.getAsLong();
354
case DOUBLE:
355
return element.getAsDouble();
356
case BOOLEAN:
357
return element.getAsBoolean();
358
default:
359
throw new UnsupportedOperationException("JSON format doesn't support type: " + type);
360
}
361
}
362
363
@Override
364
protected Schema getDefaultSchema() {
365
return null; // JSON requires explicit schema
366
}
367
368
@Override
369
protected void validateSchema(Schema schema) throws UnsupportedTypeException {
370
// Validate JSON-compatible schema
371
for (Schema.Field field : schema.getFields()) {
372
validateJsonCompatibleType(field.getSchema());
373
}
374
}
375
376
private void validateJsonCompatibleType(Schema schema) throws UnsupportedTypeException {
377
Schema.Type type = schema.getType();
378
if (schema.isNullable()) {
379
type = schema.getNonNullable().getType();
380
}
381
382
switch (type) {
383
case STRING:
384
case INT:
385
case LONG:
386
case DOUBLE:
387
case BOOLEAN:
388
break; // Supported
389
default:
390
throw new UnsupportedTypeException("JSON format doesn't support type: " + type);
391
}
392
}
393
}
394
```
395
396
**Binary Format Implementation:**
397
398
```java
399
import java.nio.ByteBuffer;
400
401
public class BinaryRecordFormat extends RecordFormat<ByteBuffer, StructuredRecord> {
402
403
@Override
404
public StructuredRecord read(ByteBuffer input) throws UnexpectedFormatException {
405
try {
406
Schema schema = getSchema();
407
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
408
409
for (Schema.Field field : schema.getFields()) {
410
Object value = readFieldValue(input, field.getSchema());
411
builder.set(field.getName(), value);
412
}
413
414
return builder.build();
415
} catch (Exception e) {
416
throw new UnexpectedFormatException("Failed to parse binary data", e);
417
}
418
}
419
420
private Object readFieldValue(ByteBuffer buffer, Schema schema) {
421
Schema.Type type = schema.getType();
422
if (schema.isNullable()) {
423
boolean isNull = buffer.get() == 0;
424
if (isNull) {
425
return null;
426
}
427
type = schema.getNonNullable().getType();
428
}
429
430
switch (type) {
431
case INT:
432
return buffer.getInt();
433
case LONG:
434
return buffer.getLong();
435
case FLOAT:
436
return buffer.getFloat();
437
case DOUBLE:
438
return buffer.getDouble();
439
case BOOLEAN:
440
return buffer.get() != 0;
441
case STRING:
442
int length = buffer.getInt();
443
byte[] stringBytes = new byte[length];
444
buffer.get(stringBytes);
445
return new String(stringBytes);
446
default:
447
throw new UnsupportedOperationException("Binary format doesn't support type: " + type);
448
}
449
}
450
451
@Override
452
protected Schema getDefaultSchema() {
453
return null; // Binary requires explicit schema
454
}
455
456
@Override
457
protected void validateSchema(Schema schema) throws UnsupportedTypeException {
458
// Binary format supports most primitive types
459
for (Schema.Field field : schema.getFields()) {
460
validateBinaryCompatibleType(field.getSchema());
461
}
462
}
463
464
private void validateBinaryCompatibleType(Schema schema) throws UnsupportedTypeException {
465
Schema.Type type = schema.getType();
466
if (schema.isNullable()) {
467
type = schema.getNonNullable().getType();
468
}
469
470
if (!type.isSimpleType()) {
471
throw new UnsupportedTypeException("Binary format only supports simple types");
472
}
473
}
474
}
475
```
476
477
## Format Initialization Process
478
479
### Initialization Flow
480
481
1. **Format Creation**: Instantiate format class
482
2. **Schema Resolution**: Determine schema (from specification or default)
483
3. **Schema Validation**: Validate schema compatibility with format
484
4. **Configuration**: Apply format-specific settings
485
5. **Ready for Use**: Format ready to process data
486
487
```java
488
// Manual initialization process
489
RecordFormat<String, StructuredRecord> format = new CsvRecordFormat();
490
491
// Create specification
492
Schema schema = Schema.recordOf("Data",
493
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
494
Schema.Field.of("name", Schema.of(Schema.Type.STRING))
495
);
496
497
Map<String, String> settings = new HashMap<>();
498
settings.put("delimiter", "|");
499
settings.put("quote", "\"");
500
501
FormatSpecification spec = new FormatSpecification("csv", schema, settings);
502
503
// Initialize (schema validation and configuration occur here)
504
format.initialize(spec);
505
506
// Now ready to use
507
StructuredRecord record = format.read("123|John Doe");
508
```
509
510
### Error Handling
511
512
```java
513
try {
514
format.initialize(specification);
515
} catch (UnsupportedTypeException e) {
516
// Handle schema validation failure
517
System.err.println("Schema not supported: " + e.getMessage());
518
} catch (Exception e) {
519
// Handle other initialization errors
520
System.err.println("Initialization failed: " + e.getMessage());
521
}
522
523
try {
524
StructuredRecord record = format.read(inputData);
525
} catch (UnexpectedFormatException e) {
526
// Handle data parsing failure
527
System.err.println("Failed to parse data: " + e.getMessage());
528
}
529
```
530
531
## Exception Types
532
533
### Format-Related Exceptions
534
535
```java { .api }
536
/**
537
* Exception indicating data is in unexpected format
538
*/
539
public class UnexpectedFormatException extends RuntimeException {
540
public UnexpectedFormatException(String message);
541
public UnexpectedFormatException(String message, Throwable cause);
542
public UnexpectedFormatException(Throwable cause);
543
}
544
545
/**
546
* Exception indicating unsupported schema type
547
*/
548
public class UnsupportedTypeException extends Exception {
549
public UnsupportedTypeException(String message);
550
public UnsupportedTypeException(String message, Throwable cause);
551
public UnsupportedTypeException(Throwable cause);
552
}
553
```
554
555
## Best Practices
556
557
### Schema Design
558
559
- Design schemas with format limitations in mind
560
- Use simple types for maximum format compatibility
561
- Consider nullable fields for optional data
562
- Validate schemas during format initialization
563
564
### Performance Optimization
565
566
- Reuse format instances when processing multiple records
567
- Cache compiled patterns/parsers in format implementations
568
- Use efficient parsing libraries (e.g., Jackson for JSON)
569
- Minimize object allocation in read() methods
570
571
### Error Handling
572
573
- Provide detailed error messages in exceptions
574
- Include input data context in error messages
575
- Handle edge cases (null/empty inputs, malformed data)
576
- Validate configuration settings during initialization
577
578
### Custom Format Implementation
579
580
- Extend RecordFormat with appropriate generic type parameters
581
- Implement all abstract methods with proper error handling
582
- Validate schema compatibility in validateSchema()
583
- Use configure() method for format-specific settings
584
- Document supported schema types and configuration options