Apache Avro core components for data serialization with rich data structures, compact binary format, and schema evolution support
—
Avro's schema evolution support enables managing schema changes over time while maintaining backward and forward compatibility. This system allows data written with one schema version to be read with different but compatible schema versions, supporting long-term data management and system integration.
Tools for validating schema compatibility between reader and writer schemas.
public class SchemaCompatibility {
public static SchemaPairCompatibility checkReaderWriterCompatibility(Schema reader, Schema writer);
public static SchemaCompatibilityResult checkReaderWriterCompatibility(Schema reader, List<Schema> writers);
// Compatibility result types
public static class SchemaPairCompatibility {
public SchemaCompatibilityType getType();
public String getDescription();
public Schema getReader();
public Schema getWriter();
}
public static class SchemaCompatibilityResult {
public SchemaCompatibilityType getCompatibility();
public List<SchemaIncompatibilityDetail> getIncompatibilities();
}
public enum SchemaCompatibilityType {
COMPATIBLE, INCOMPATIBLE
}
}Usage Examples:
// Check compatibility between two schema versions
String oldSchemaJson = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
""";
String newSchemaJson = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
""";
Schema oldSchema = new Schema.Parser().parse(oldSchemaJson);
Schema newSchema = new Schema.Parser().parse(newSchemaJson);
// Check if new schema can read data written with old schema
SchemaPairCompatibility compatibility =
SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema);
if (compatibility.getType() == SchemaCompatibilityType.COMPATIBLE) {
System.out.println("Schemas are compatible - evolution is safe");
} else {
System.out.println("Incompatible schemas: " + compatibility.getDescription());
}
// Check compatibility with multiple writer schemas
List<Schema> writerSchemas = Arrays.asList(oldSchema, intermediateSchema);
SchemaCompatibilityResult result =
SchemaCompatibility.checkReaderWriterCompatibility(newSchema, writerSchemas);
if (result.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
for (SchemaIncompatibilityDetail detail : result.getIncompatibilities()) {
System.err.println("Incompatibility: " + detail.getMessage());
}
}Decoder that automatically handles schema evolution during data reading by resolving differences between writer and reader schemas.
public class ResolvingDecoder extends ValidatingDecoder {
// Created through DecoderFactory
// Automatically handles:
// - Missing fields (applies defaults)
// - Extra fields (ignores)
// - Field reordering
// - Type promotions
// - Union evolution
}Usage Examples:
// Read data written with old schema using new schema
Schema writerSchema = parseSchema(oldSchemaJson);
Schema readerSchema = parseSchema(newSchemaJson);
// Create resolving decoder
InputStream dataStream = new FileInputStream("old_data.avro");
BinaryDecoder baseDecoder = DecoderFactory.get().binaryDecoder(dataStream, null);
ResolvingDecoder resolvingDecoder = DecoderFactory.get()
.resolvingDecoder(writerSchema, readerSchema, baseDecoder);
// Read with automatic schema resolution
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(readerSchema);
GenericRecord record = reader.read(null, resolvingDecoder);
// Fields present in reader but not writer get default values
System.out.println("Name: " + record.get("name")); // From writer
System.out.println("Age: " + record.get("age")); // From writer
System.out.println("Email: " + record.get("email")); // Default: null
// Process multiple records with evolution
List<GenericRecord> evolvedRecords = new ArrayList<>();
GenericRecord reusedRecord = null;
while (hasMoreData(resolvingDecoder)) {
reusedRecord = reader.read(reusedRecord, resolvingDecoder);
evolvedRecords.add(new GenericData.Record(reusedRecord, true)); // Deep copy
}Logical types provide semantic meaning to physical types and enable controlled evolution.
public class LogicalTypes {
// Date and time logical types
public static LogicalType date();
public static LogicalType timeMillis();
public static LogicalType timeMicros();
public static LogicalType timestampMillis();
public static LogicalType timestampMicros();
public static LogicalType localTimestampMillis();
public static LogicalType localTimestampMicros();
// Numeric logical types
public static LogicalType decimal(int precision);
public static LogicalType decimal(int precision, int scale);
// String logical types
public static LogicalType uuid();
}
public abstract class LogicalType {
public abstract String getName();
public abstract void validate(Schema schema);
public Schema addToSchema(Schema schema);
}Usage Examples:
// Evolution using logical types
String oldSchemaWithLogicalTypes = """
{
"type": "record",
"name": "Transaction",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}},
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
""";
String newSchemaWithLogicalTypes = """
{
"type": "record",
"name": "Transaction",
"fields": [
{"name": "id", "type": {"type": "string", "logicalType": "uuid"}},
{"name": "amount", "type": {"type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 2}},
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "currency", "type": "string", "default": "USD"}
]
}
""";
// Create schemas with logical types programmatically
Schema decimalSchema = LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES));
Schema timestampSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
Schema uuidSchema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
// Build transaction schema
Schema transactionSchema = SchemaBuilder.builder()
.record("Transaction")
.fields()
.name("id").type(uuidSchema).noDefault()
.name("amount").type(decimalSchema).noDefault()
.name("timestamp").type(timestampSchema).noDefault()
.name("currency").type().stringType().withDefault("USD")
.endRecord();
// Logical types enable semantic validation and evolution
LogicalType amountLogicalType = transactionSchema.getField("amount").schema().getLogicalType();
System.out.println("Amount logical type: " + amountLogicalType.getName());Field aliases enable renaming fields while maintaining compatibility.
// Field aliases are defined in schema JSON
// Example schema with aliases:
String schemaWithAliases = """
{
"type": "record",
"name": "Person",
"fields": [
{"name": "fullName", "type": "string", "aliases": ["name", "full_name"]},
{"name": "emailAddress", "type": ["null", "string"], "aliases": ["email"], "default": null},
{"name": "dateOfBirth", "type": {"type": "int", "logicalType": "date"}, "aliases": ["birth_date", "dob"]}
]
}
""";Usage Examples:
// Original schema
String originalSchema = """
{
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
""";
// Evolved schema with field renames and aliases
String evolvedSchema = """
{
"type": "record",
"name": "Person",
"fields": [
{"name": "fullName", "type": "string", "aliases": ["name"]},
{"name": "emailAddress", "type": ["null", "string"], "aliases": ["email"], "default": null},
{"name": "phoneNumber", "type": ["null", "string"], "default": null}
]
}
""";
Schema writerSchema = new Schema.Parser().parse(originalSchema);
Schema readerSchema = new Schema.Parser().parse(evolvedSchema);
// Test compatibility
SchemaPairCompatibility compatibility =
SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema);
System.out.println("Compatibility: " + compatibility.getType());
// Read old data with new schema using aliases
InputStream oldData = new FileInputStream("old_persons.avro");
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(oldData, null);
ResolvingDecoder resolvingDecoder = DecoderFactory.get()
.resolvingDecoder(writerSchema, readerSchema, decoder);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(readerSchema);
GenericRecord person = reader.read(null, resolvingDecoder);
// Fields are mapped via aliases
System.out.println("Full name: " + person.get("fullName")); // Was "name"
System.out.println("Email: " + person.get("emailAddress")); // Was "email"
System.out.println("Phone: " + person.get("phoneNumber")); // New field, gets defaultHandle evolution of union types including adding and removing types.
// Union evolution examples
String originalUnion = """
{
"type": "record",
"name": "Event",
"fields": [
{"name": "data", "type": ["string", "int"]}
]
}
""";
String evolvedUnion = """
{
"type": "record",
"name": "Event",
"fields": [
{"name": "data", "type": ["null", "string", "int", "boolean"], "default": null}
]
}
""";Usage Examples:
// Handle union evolution
Schema originalUnionSchema = new Schema.Parser().parse(originalUnion);
Schema evolvedUnionSchema = new Schema.Parser().parse(evolvedUnion);
// Check if union evolution is compatible
SchemaPairCompatibility unionCompatibility =
SchemaCompatibility.checkReaderWriterCompatibility(evolvedUnionSchema, originalUnionSchema);
if (unionCompatibility.getType() == SchemaCompatibilityType.COMPATIBLE) {
System.out.println("Union evolution is safe");
}
// Read data with evolved union schema
ResolvingDecoder unionResolver = DecoderFactory.get()
.resolvingDecoder(originalUnionSchema, evolvedUnionSchema, baseDecoder);
GenericDatumReader<GenericRecord> unionReader = new GenericDatumReader<>(evolvedUnionSchema);
GenericRecord eventRecord = unionReader.read(null, unionResolver);
// Handle different union types
Object eventData = eventRecord.get("data");
if (eventData instanceof String) {
System.out.println("String data: " + eventData);
} else if (eventData instanceof Integer) {
System.out.println("Integer data: " + eventData);
} else if (eventData instanceof Boolean) {
System.out.println("Boolean data: " + eventData);
} else if (eventData == null) {
System.out.println("Null data");
}Manage default values for new fields during schema evolution.
// Schema evolution with various default types
String schemaWithDefaults = """
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int", "default": 0},
{"name": "active", "type": "boolean", "default": true},
{"name": "score", "type": "double", "default": 0.0},
{"name": "tags", "type": {"type": "array", "items": "string"}, "default": []},
{"name": "metadata", "type": {"type": "map", "values": "string"}, "default": {}},
{"name": "profile", "type": ["null", "string"], "default": null}
]
}
""";Usage Examples:
// Evolution with complex default values
String baseSchema = """
{
"type": "record",
"name": "Product",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"}
]
}
""";
String evolvedWithDefaults = """
{
"type": "record",
"name": "Product",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "price", "type": "double", "default": 0.0},
{"name": "inStock", "type": "boolean", "default": true},
{"name": "categories", "type": {"type": "array", "items": "string"}, "default": []},
{"name": "properties", "type": {"type": "map", "values": "string"}, "default": {}},
{"name": "description", "type": ["null", "string"], "default": null},
{"name": "rating", "type": {"name": "Rating", "type": "record", "fields": [
{"name": "score", "type": "double", "default": 0.0},
{"name": "count", "type": "int", "default": 0}
]}, "default": {"score": 0.0, "count": 0}}
]
}
""";
Schema writerSchema = new Schema.Parser().parse(baseSchema);
Schema readerSchema = new Schema.Parser().parse(evolvedWithDefaults);
// Read old data and get default values for new fields
ResolvingDecoder defaultResolver = DecoderFactory.get()
.resolvingDecoder(writerSchema, readerSchema, baseDecoder);
GenericDatumReader<GenericRecord> defaultReader = new GenericDatumReader<>(readerSchema);
GenericRecord product = defaultReader.read(null, defaultResolver);
// Verify default values are applied
System.out.println("Price: " + product.get("price")); // 0.0
System.out.println("In stock: " + product.get("inStock")); // true
System.out.println("Categories: " + product.get("categories")); // []
System.out.println("Properties: " + product.get("properties")); // {}
System.out.println("Description: " + product.get("description")); // null
GenericRecord rating = (GenericRecord) product.get("rating");
System.out.println("Rating score: " + rating.get("score")); // 0.0
System.out.println("Rating count: " + rating.get("count")); // 0Avro supports automatic type promotions during schema evolution.
// Supported type promotions:
// int -> long, float, double
// long -> float, double
// float -> double
// string -> bytes
// bytes -> stringUsage Examples:
// Schema with int field
String intSchema = """
{
"type": "record",
"name": "Metric",
"fields": [
{"name": "value", "type": "int"}
]
}
""";
// Evolved schema with promoted type
String promotedSchema = """
{
"type": "record",
"name": "Metric",
"fields": [
{"name": "value", "type": "long"}
]
}
""";
Schema writerIntSchema = new Schema.Parser().parse(intSchema);
Schema readerLongSchema = new Schema.Parser().parse(promotedSchema);
// Check that int -> long promotion is compatible
SchemaPairCompatibility promotionCheck =
SchemaCompatibility.checkReaderWriterCompatibility(readerLongSchema, writerIntSchema);
System.out.println("Int to long promotion: " + promotionCheck.getType());
// Read int data as long
byte[] intData = writeIntRecord(42);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(intData, null);
ResolvingDecoder promotingDecoder = DecoderFactory.get()
.resolvingDecoder(writerIntSchema, readerLongSchema, decoder);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(readerLongSchema);
GenericRecord record = reader.read(null, promotingDecoder);
Long promotedValue = (Long) record.get("value");
System.out.println("Promoted value: " + promotedValue); // 42L
// Multiple promotions in same schema
String multiPromotionSchema = """
{
"type": "record",
"name": "Data",
"fields": [
{"name": "intToLong", "type": "long"},
{"name": "intToDouble", "type": "double"},
{"name": "longToDouble", "type": "double"},
{"name": "stringToBytes", "type": "bytes"}
]
}
""";public class SchemaCompatibility {
public static SchemaPairCompatibility checkReaderWriterCompatibility(Schema reader, Schema writer);
public static SchemaCompatibilityResult checkReaderWriterCompatibility(Schema reader, List<Schema> writers);
public static class SchemaPairCompatibility {
public SchemaCompatibilityType getType();
public String getDescription();
public Schema getReader();
public Schema getWriter();
}
public static class SchemaCompatibilityResult {
public SchemaCompatibilityType getCompatibility();
public List<SchemaIncompatibilityDetail> getIncompatibilities();
}
public enum SchemaCompatibilityType {
COMPATIBLE, INCOMPATIBLE
}
}
public class ResolvingDecoder extends ValidatingDecoder {
// Schema evolution decoder implementation
}
public class LogicalTypes {
// Factory for logical type implementations
}
public abstract class LogicalType {
public abstract String getName();
public abstract void validate(Schema schema);
public Schema addToSchema(Schema schema);
}
// Specific logical type implementations
public static class Decimal extends LogicalType;
public static class Date extends LogicalType;
public static class TimeMillis extends LogicalType;
public static class TimeMicros extends LogicalType;
public static class TimestampMillis extends LogicalType;
public static class TimestampMicros extends LogicalType;
public static class Uuid extends LogicalType;
// Evolution-related exceptions
public class SchemaCompatibilityResult {
public static class SchemaIncompatibilityDetail {
public String getMessage();
public SchemaIncompatibilityType getType();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro