0
# Schema Evolution
1
2
Avro's schema evolution support enables managing schema changes over time while maintaining backward and forward compatibility. This system allows data written with one schema version to be read with different but compatible schema versions, supporting long-term data management and system integration.
3
4
## Capabilities
5
6
### Schema Compatibility Checking
7
8
Tools for validating schema compatibility between reader and writer schemas.
9
10
```java { .api }
11
public class SchemaCompatibility {
12
public static SchemaPairCompatibility checkReaderWriterCompatibility(Schema reader, Schema writer);
13
public static SchemaCompatibilityResult checkReaderWriterCompatibility(Schema reader, List<Schema> writers);
14
15
// Compatibility result types
16
public static class SchemaPairCompatibility {
17
public SchemaCompatibilityType getType();
18
public String getDescription();
19
public Schema getReader();
20
public Schema getWriter();
21
}
22
23
public static class SchemaCompatibilityResult {
24
public SchemaCompatibilityType getCompatibility();
25
public List<SchemaIncompatibilityDetail> getIncompatibilities();
26
}
27
28
public enum SchemaCompatibilityType {
29
COMPATIBLE, INCOMPATIBLE
30
}
31
}
32
```
33
34
**Usage Examples:**
35
36
```java
37
// Check compatibility between two schema versions
38
String oldSchemaJson = """
39
{
40
"type": "record",
41
"name": "User",
42
"fields": [
43
{"name": "name", "type": "string"},
44
{"name": "age", "type": "int"}
45
]
46
}
47
""";
48
49
String newSchemaJson = """
50
{
51
"type": "record",
52
"name": "User",
53
"fields": [
54
{"name": "name", "type": "string"},
55
{"name": "age", "type": "int"},
56
{"name": "email", "type": ["null", "string"], "default": null}
57
]
58
}
59
""";
60
61
Schema oldSchema = new Schema.Parser().parse(oldSchemaJson);
62
Schema newSchema = new Schema.Parser().parse(newSchemaJson);
63
64
// Check if new schema can read data written with old schema
65
SchemaPairCompatibility compatibility =
66
SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema);
67
68
if (compatibility.getType() == SchemaCompatibilityType.COMPATIBLE) {
69
System.out.println("Schemas are compatible - evolution is safe");
70
} else {
71
System.out.println("Incompatible schemas: " + compatibility.getDescription());
72
}
73
74
// Check compatibility with multiple writer schemas
75
List<Schema> writerSchemas = Arrays.asList(oldSchema, intermediateSchema);
76
SchemaCompatibilityResult result =
77
SchemaCompatibility.checkReaderWriterCompatibility(newSchema, writerSchemas);
78
79
if (result.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
80
for (SchemaIncompatibilityDetail detail : result.getIncompatibilities()) {
81
System.err.println("Incompatibility: " + detail.getMessage());
82
}
83
}
84
```
85
86
### Resolving Decoder
87
88
Decoder that automatically handles schema evolution during data reading by resolving differences between writer and reader schemas.
89
90
```java { .api }
91
public class ResolvingDecoder extends ValidatingDecoder {
92
// Created through DecoderFactory
93
// Automatically handles:
94
// - Missing fields (applies defaults)
95
// - Extra fields (ignores)
96
// - Field reordering
97
// - Type promotions
98
// - Union evolution
99
}
100
```
101
102
**Usage Examples:**
103
104
```java
105
// Read data written with old schema using new schema
106
Schema writerSchema = parseSchema(oldSchemaJson);
107
Schema readerSchema = parseSchema(newSchemaJson);
108
109
// Create resolving decoder
110
InputStream dataStream = new FileInputStream("old_data.avro");
111
BinaryDecoder baseDecoder = DecoderFactory.get().binaryDecoder(dataStream, null);
112
ResolvingDecoder resolvingDecoder = DecoderFactory.get()
113
.resolvingDecoder(writerSchema, readerSchema, baseDecoder);
114
115
// Read with automatic schema resolution
116
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(readerSchema);
117
GenericRecord record = reader.read(null, resolvingDecoder);
118
119
// Fields present in reader but not writer get default values
120
System.out.println("Name: " + record.get("name")); // From writer
121
System.out.println("Age: " + record.get("age")); // From writer
122
System.out.println("Email: " + record.get("email")); // Default: null
123
124
// Process multiple records with evolution
125
List<GenericRecord> evolvedRecords = new ArrayList<>();
126
GenericRecord reusedRecord = null;
127
128
while (hasMoreData(resolvingDecoder)) {
129
reusedRecord = reader.read(reusedRecord, resolvingDecoder);
130
evolvedRecords.add(new GenericData.Record(reusedRecord, true)); // Deep copy
131
}
132
```
133
134
### Logical Types for Evolution
135
136
Logical types provide semantic meaning to physical types and enable controlled evolution.
137
138
```java { .api }
139
public class LogicalTypes {
140
// Date and time logical types
141
public static LogicalType date();
142
public static LogicalType timeMillis();
143
public static LogicalType timeMicros();
144
public static LogicalType timestampMillis();
145
public static LogicalType timestampMicros();
146
public static LogicalType localTimestampMillis();
147
public static LogicalType localTimestampMicros();
148
149
// Numeric logical types
150
public static LogicalType decimal(int precision);
151
public static LogicalType decimal(int precision, int scale);
152
153
// String logical types
154
public static LogicalType uuid();
155
}
156
157
public abstract class LogicalType {
158
public abstract String getName();
159
public abstract void validate(Schema schema);
160
public Schema addToSchema(Schema schema);
161
}
162
```
163
164
**Usage Examples:**
165
166
```java
167
// Evolution using logical types
168
String oldSchemaWithLogicalTypes = """
169
{
170
"type": "record",
171
"name": "Transaction",
172
"fields": [
173
{"name": "id", "type": "string"},
174
{"name": "amount", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}},
175
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}}
176
]
177
}
178
""";
179
180
String newSchemaWithLogicalTypes = """
181
{
182
"type": "record",
183
"name": "Transaction",
184
"fields": [
185
{"name": "id", "type": {"type": "string", "logicalType": "uuid"}},
186
{"name": "amount", "type": {"type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 2}},
187
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
188
{"name": "currency", "type": "string", "default": "USD"}
189
]
190
}
191
""";
192
193
// Create schemas with logical types programmatically
194
Schema decimalSchema = LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES));
195
Schema timestampSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
196
Schema uuidSchema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
197
198
// Build transaction schema
199
Schema transactionSchema = SchemaBuilder.builder()
200
.record("Transaction")
201
.fields()
202
.name("id").type(uuidSchema).noDefault()
203
.name("amount").type(decimalSchema).noDefault()
204
.name("timestamp").type(timestampSchema).noDefault()
205
.name("currency").type().stringType().withDefault("USD")
206
.endRecord();
207
208
// Logical types enable semantic validation and evolution
209
LogicalType amountLogicalType = transactionSchema.getField("amount").schema().getLogicalType();
210
System.out.println("Amount logical type: " + amountLogicalType.getName());
211
```
212
213
### Field Aliases and Evolution
214
215
Field aliases enable renaming fields while maintaining compatibility.
216
217
```java { .api }
218
// Field aliases are defined in schema JSON
219
// Example schema with aliases:
220
String schemaWithAliases = """
221
{
222
"type": "record",
223
"name": "Person",
224
"fields": [
225
{"name": "fullName", "type": "string", "aliases": ["name", "full_name"]},
226
{"name": "emailAddress", "type": ["null", "string"], "aliases": ["email"], "default": null},
227
{"name": "dateOfBirth", "type": {"type": "int", "logicalType": "date"}, "aliases": ["birth_date", "dob"]}
228
]
229
}
230
""";
231
```
232
233
**Usage Examples:**
234
235
```java
236
// Original schema
237
String originalSchema = """
238
{
239
"type": "record",
240
"name": "Person",
241
"fields": [
242
{"name": "name", "type": "string"},
243
{"name": "email", "type": ["null", "string"], "default": null}
244
]
245
}
246
""";
247
248
// Evolved schema with field renames and aliases
249
String evolvedSchema = """
250
{
251
"type": "record",
252
"name": "Person",
253
"fields": [
254
{"name": "fullName", "type": "string", "aliases": ["name"]},
255
{"name": "emailAddress", "type": ["null", "string"], "aliases": ["email"], "default": null},
256
{"name": "phoneNumber", "type": ["null", "string"], "default": null}
257
]
258
}
259
""";
260
261
Schema writerSchema = new Schema.Parser().parse(originalSchema);
262
Schema readerSchema = new Schema.Parser().parse(evolvedSchema);
263
264
// Test compatibility
265
SchemaPairCompatibility compatibility =
266
SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema);
267
268
System.out.println("Compatibility: " + compatibility.getType());
269
270
// Read old data with new schema using aliases
271
InputStream oldData = new FileInputStream("old_persons.avro");
272
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(oldData, null);
273
ResolvingDecoder resolvingDecoder = DecoderFactory.get()
274
.resolvingDecoder(writerSchema, readerSchema, decoder);
275
276
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(readerSchema);
277
GenericRecord person = reader.read(null, resolvingDecoder);
278
279
// Fields are mapped via aliases
280
System.out.println("Full name: " + person.get("fullName")); // Was "name"
281
System.out.println("Email: " + person.get("emailAddress")); // Was "email"
282
System.out.println("Phone: " + person.get("phoneNumber")); // New field, gets default
283
```
284
285
### Union Evolution
286
287
Handle evolution of union types including adding and removing types.
288
289
```java { .api }
290
// Union evolution examples
291
String originalUnion = """
292
{
293
"type": "record",
294
"name": "Event",
295
"fields": [
296
{"name": "data", "type": ["string", "int"]}
297
]
298
}
299
""";
300
301
String evolvedUnion = """
302
{
303
"type": "record",
304
"name": "Event",
305
"fields": [
306
{"name": "data", "type": ["null", "string", "int", "boolean"], "default": null}
307
]
308
}
309
""";
310
```
311
312
**Usage Examples:**
313
314
```java
315
// Handle union evolution
316
Schema originalUnionSchema = new Schema.Parser().parse(originalUnion);
317
Schema evolvedUnionSchema = new Schema.Parser().parse(evolvedUnion);
318
319
// Check if union evolution is compatible
320
SchemaPairCompatibility unionCompatibility =
321
SchemaCompatibility.checkReaderWriterCompatibility(evolvedUnionSchema, originalUnionSchema);
322
323
if (unionCompatibility.getType() == SchemaCompatibilityType.COMPATIBLE) {
324
System.out.println("Union evolution is safe");
325
}
326
327
// Read data with evolved union schema
328
ResolvingDecoder unionResolver = DecoderFactory.get()
329
.resolvingDecoder(originalUnionSchema, evolvedUnionSchema, baseDecoder);
330
331
GenericDatumReader<GenericRecord> unionReader = new GenericDatumReader<>(evolvedUnionSchema);
332
GenericRecord eventRecord = unionReader.read(null, unionResolver);
333
334
// Handle different union types
335
Object eventData = eventRecord.get("data");
336
if (eventData instanceof String) {
337
System.out.println("String data: " + eventData);
338
} else if (eventData instanceof Integer) {
339
System.out.println("Integer data: " + eventData);
340
} else if (eventData instanceof Boolean) {
341
System.out.println("Boolean data: " + eventData);
342
} else if (eventData == null) {
343
System.out.println("Null data");
344
}
345
```
346
347
### Default Value Handling
348
349
Manage default values for new fields during schema evolution.
350
351
```java { .api }
352
// Schema evolution with various default types
353
String schemaWithDefaults = """
354
{
355
"type": "record",
356
"name": "User",
357
"fields": [
358
{"name": "name", "type": "string"},
359
{"name": "age", "type": "int", "default": 0},
360
{"name": "active", "type": "boolean", "default": true},
361
{"name": "score", "type": "double", "default": 0.0},
362
{"name": "tags", "type": {"type": "array", "items": "string"}, "default": []},
363
{"name": "metadata", "type": {"type": "map", "values": "string"}, "default": {}},
364
{"name": "profile", "type": ["null", "string"], "default": null}
365
]
366
}
367
""";
368
```
369
370
**Usage Examples:**
371
372
```java
373
// Evolution with complex default values
374
String baseSchema = """
375
{
376
"type": "record",
377
"name": "Product",
378
"fields": [
379
{"name": "id", "type": "string"},
380
{"name": "name", "type": "string"}
381
]
382
}
383
""";
384
385
String evolvedWithDefaults = """
386
{
387
"type": "record",
388
"name": "Product",
389
"fields": [
390
{"name": "id", "type": "string"},
391
{"name": "name", "type": "string"},
392
{"name": "price", "type": "double", "default": 0.0},
393
{"name": "inStock", "type": "boolean", "default": true},
394
{"name": "categories", "type": {"type": "array", "items": "string"}, "default": []},
395
{"name": "properties", "type": {"type": "map", "values": "string"}, "default": {}},
396
{"name": "description", "type": ["null", "string"], "default": null},
397
{"name": "rating", "type": {"name": "Rating", "type": "record", "fields": [
398
{"name": "score", "type": "double", "default": 0.0},
399
{"name": "count", "type": "int", "default": 0}
400
]}, "default": {"score": 0.0, "count": 0}}
401
]
402
}
403
""";
404
405
Schema writerSchema = new Schema.Parser().parse(baseSchema);
406
Schema readerSchema = new Schema.Parser().parse(evolvedWithDefaults);
407
408
// Read old data and get default values for new fields
409
ResolvingDecoder defaultResolver = DecoderFactory.get()
410
.resolvingDecoder(writerSchema, readerSchema, baseDecoder);
411
412
GenericDatumReader<GenericRecord> defaultReader = new GenericDatumReader<>(readerSchema);
413
GenericRecord product = defaultReader.read(null, defaultResolver);
414
415
// Verify default values are applied
416
System.out.println("Price: " + product.get("price")); // 0.0
417
System.out.println("In stock: " + product.get("inStock")); // true
418
System.out.println("Categories: " + product.get("categories")); // []
419
System.out.println("Properties: " + product.get("properties")); // {}
420
System.out.println("Description: " + product.get("description")); // null
421
422
GenericRecord rating = (GenericRecord) product.get("rating");
423
System.out.println("Rating score: " + rating.get("score")); // 0.0
424
System.out.println("Rating count: " + rating.get("count")); // 0
425
```
426
427
### Type Promotion Rules
428
429
Avro supports automatic type promotions during schema evolution.
430
431
```java { .api }
432
// Supported type promotions:
433
// int -> long, float, double
434
// long -> float, double
435
// float -> double
436
// string -> bytes
437
// bytes -> string
438
```
439
440
**Usage Examples:**
441
442
```java
443
// Schema with int field
444
String intSchema = """
445
{
446
"type": "record",
447
"name": "Metric",
448
"fields": [
449
{"name": "value", "type": "int"}
450
]
451
}
452
""";
453
454
// Evolved schema with promoted type
455
String promotedSchema = """
456
{
457
"type": "record",
458
"name": "Metric",
459
"fields": [
460
{"name": "value", "type": "long"}
461
]
462
}
463
""";
464
465
Schema writerIntSchema = new Schema.Parser().parse(intSchema);
466
Schema readerLongSchema = new Schema.Parser().parse(promotedSchema);
467
468
// Check that int -> long promotion is compatible
469
SchemaPairCompatibility promotionCheck =
470
SchemaCompatibility.checkReaderWriterCompatibility(readerLongSchema, writerIntSchema);
471
472
System.out.println("Int to long promotion: " + promotionCheck.getType());
473
474
// Read int data as long
475
byte[] intData = writeIntRecord(42);
476
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(intData, null);
477
ResolvingDecoder promotingDecoder = DecoderFactory.get()
478
.resolvingDecoder(writerIntSchema, readerLongSchema, decoder);
479
480
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(readerLongSchema);
481
GenericRecord record = reader.read(null, promotingDecoder);
482
483
Long promotedValue = (Long) record.get("value");
484
System.out.println("Promoted value: " + promotedValue); // 42L
485
486
// Multiple promotions in same schema
487
String multiPromotionSchema = """
488
{
489
"type": "record",
490
"name": "Data",
491
"fields": [
492
{"name": "intToLong", "type": "long"},
493
{"name": "intToDouble", "type": "double"},
494
{"name": "longToDouble", "type": "double"},
495
{"name": "stringToBytes", "type": "bytes"}
496
]
497
}
498
""";
499
```
500
501
## Types
502
503
```java { .api }
504
public class SchemaCompatibility {
505
public static SchemaPairCompatibility checkReaderWriterCompatibility(Schema reader, Schema writer);
506
public static SchemaCompatibilityResult checkReaderWriterCompatibility(Schema reader, List<Schema> writers);
507
508
public static class SchemaPairCompatibility {
509
public SchemaCompatibilityType getType();
510
public String getDescription();
511
public Schema getReader();
512
public Schema getWriter();
513
}
514
515
public static class SchemaCompatibilityResult {
516
public SchemaCompatibilityType getCompatibility();
517
public List<SchemaIncompatibilityDetail> getIncompatibilities();
518
}
519
520
public enum SchemaCompatibilityType {
521
COMPATIBLE, INCOMPATIBLE
522
}
523
}
524
525
public class ResolvingDecoder extends ValidatingDecoder {
526
// Schema evolution decoder implementation
527
}
528
529
public class LogicalTypes {
530
// Factory for logical type implementations
531
}
532
533
public abstract class LogicalType {
534
public abstract String getName();
535
public abstract void validate(Schema schema);
536
public Schema addToSchema(Schema schema);
537
}
538
539
// Specific logical type implementations
540
public static class Decimal extends LogicalType;
541
public static class Date extends LogicalType;
542
public static class TimeMillis extends LogicalType;
543
public static class TimeMicros extends LogicalType;
544
public static class TimestampMillis extends LogicalType;
545
public static class TimestampMicros extends LogicalType;
546
public static class Uuid extends LogicalType;
547
548
// Evolution-related exceptions
549
public class SchemaCompatibilityResult {
550
public static class SchemaIncompatibilityDetail {
551
public String getMessage();
552
public SchemaIncompatibilityType getType();
553
}
554
}
555
```