Apache Avro core components for data serialization with rich data structures, compact binary format, and schema evolution support
—
Avro's message operations provide efficient encoding and decoding of individual objects for messaging systems and data exchange. These operations support both header-based schema identification and raw data formats for different messaging scenarios.
Core interface for encoding single objects into byte arrays or streams.
public interface MessageEncoder<D> {
byte[] encode(D datum) throws IOException;
void encode(D datum, OutputStream stream) throws IOException;
}Usage Examples:
// Example implementation usage (see concrete implementations below)
MessageEncoder<GenericRecord> encoder = /* create encoder */;
GenericRecord record = createUserRecord();
// Encode to byte array
byte[] encoded = encoder.encode(record);
System.out.println("Encoded size: " + encoded.length + " bytes");
// Encode to stream
ByteArrayOutputStream stream = new ByteArrayOutputStream();
encoder.encode(record, stream);
byte[] streamEncoded = stream.toByteArray();
// Send encoded data via messaging system
messagingSystem.send("user.topic", encoded);Core interface for decoding single objects from byte arrays or streams.
public interface MessageDecoder<D> {
D decode(InputStream stream) throws IOException;
D decode(InputStream stream, D reuse) throws IOException;
D decode(byte[] encoded) throws IOException;
D decode(byte[] encoded, D reuse) throws IOException;
// Base decoder class with common functionality
public abstract static class BaseDecoder<D> implements MessageDecoder<D> {
// Default implementations for byte array methods
public D decode(byte[] encoded) throws IOException;
public D decode(byte[] encoded, D reuse) throws IOException;
}
}Usage Examples:
// Example implementation usage (see concrete implementations below)
MessageDecoder<GenericRecord> decoder = /* create decoder */;
// Decode from byte array
byte[] encodedData = receiveFromMessaging();
GenericRecord record = decoder.decode(encodedData);
System.out.println("Decoded name: " + record.get("name"));
// Decode with object reuse for performance
GenericRecord reusedRecord = null;
for (byte[] message : messages) {
reusedRecord = decoder.decode(message, reusedRecord);
processRecord(reusedRecord);
}
// Decode from stream
InputStream messageStream = new ByteArrayInputStream(encodedData);
GenericRecord streamRecord = decoder.decode(messageStream);Encoder for binary single-object messages with schema fingerprint headers.
public class BinaryMessageEncoder<D> implements MessageEncoder<D> {
public static <D> BinaryMessageEncoder<D> of(GenericData model, Schema schema);
public static <D> BinaryMessageEncoder<D> of(GenericData model, Schema schema, Map<String, Object> config);
// MessageEncoder implementation
public byte[] encode(D datum) throws IOException;
public void encode(D datum, OutputStream stream) throws IOException;
}Usage Examples:
// Create binary message encoder for generic records
Schema userSchema = new Schema.Parser().parse(userSchemaJson);
BinaryMessageEncoder<GenericRecord> encoder =
BinaryMessageEncoder.of(GenericData.get(), userSchema);
// Create user record
GenericRecord user = new GenericData.Record(userSchema);
user.put("name", "Alice");
user.put("age", 30);
user.put("email", "alice@example.com");
// Encode to binary message with header
byte[] binaryMessage = encoder.encode(user);
System.out.println("Binary message size: " + binaryMessage.length);
// The encoded message includes:
// - Magic byte and format version
// - Schema fingerprint
// - Serialized data
// Send via message queue
messageQueue.publish("users", binaryMessage);
// Create encoder for specific records
BinaryMessageEncoder<User> specificEncoder =
BinaryMessageEncoder.of(SpecificData.get(), User.SCHEMA$);
User specificUser = new User("Bob", 35, "bob@example.com");
byte[] specificMessage = specificEncoder.encode(specificUser);Decoder for binary single-object messages that can resolve schemas using fingerprints.
public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
public static <D> BinaryMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema);
public static <D> BinaryMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema, SchemaStore resolver);
// MessageDecoder implementation
public D decode(InputStream stream) throws IOException;
public D decode(InputStream stream, D reuse) throws IOException;
}Usage Examples:
// Create binary message decoder
Schema readerSchema = new Schema.Parser().parse(readerSchemaJson);
BinaryMessageDecoder<GenericRecord> decoder =
BinaryMessageDecoder.of(GenericData.get(), readerSchema, readerSchema);
// Decode binary message
byte[] binaryMessage = messageQueue.receive("users");
GenericRecord decodedUser = decoder.decode(binaryMessage);
System.out.println("Decoded user: " + decodedUser.get("name"));
// Schema evolution with different reader schema
Schema evolvedReaderSchema = new Schema.Parser().parse(evolvedSchemaJson);
BinaryMessageDecoder<GenericRecord> evolvingDecoder =
BinaryMessageDecoder.of(GenericData.get(), writerSchema, evolvedReaderSchema);
GenericRecord evolvedUser = evolvingDecoder.decode(binaryMessage);
// Use schema store for automatic schema resolution
SchemaStore schemaStore = createSchemaStore();
BinaryMessageDecoder<GenericRecord> resolverDecoder =
BinaryMessageDecoder.of(GenericData.get(), null, readerSchema, schemaStore);
// This decoder can handle messages with different writer schemas
GenericRecord autoResolvedUser = resolverDecoder.decode(binaryMessage);
// Decode multiple messages efficiently
List<byte[]> messages = messageQueue.receiveBatch("users", 100);
GenericRecord reusedRecord = null;
for (byte[] message : messages) {
reusedRecord = decoder.decode(message, reusedRecord);
processUser(reusedRecord);
}Encoder for raw messages without headers - just the serialized data.
public class RawMessageEncoder<D> implements MessageEncoder<D> {
public static <D> RawMessageEncoder<D> of(GenericData model, Schema schema);
// MessageEncoder implementation
public byte[] encode(D datum) throws IOException;
public void encode(D datum, OutputStream stream) throws IOException;
}Usage Examples:
// Create raw message encoder (no headers)
Schema userSchema = new Schema.Parser().parse(userSchemaJson);
RawMessageEncoder<GenericRecord> rawEncoder =
RawMessageEncoder.of(GenericData.get(), userSchema);
GenericRecord user = createUserRecord();
// Encode without any headers - just the raw data
byte[] rawMessage = rawEncoder.encode(user);
System.out.println("Raw message size: " + rawMessage.length);
// Raw messages are smaller but require external schema management
// Useful when schema is managed separately (e.g., schema registry)
// Use in streaming scenarios where every byte counts
for (GenericRecord record : largeDataset) {
byte[] rawData = rawEncoder.encode(record);
highThroughputStream.write(rawData);
}Decoder for raw messages that requires external schema knowledge.
public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
public static <D> RawMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema);
// MessageDecoder implementation
public D decode(InputStream stream) throws IOException;
public D decode(InputStream stream, D reuse) throws IOException;
}Usage Examples:
// Schema must be known ahead of time for raw messages
Schema knownSchema = getSchemaFromRegistry("user-v1");
RawMessageDecoder<GenericRecord> rawDecoder =
RawMessageDecoder.of(GenericData.get(), knownSchema, knownSchema);
// Decode raw message data
byte[] rawMessage = receiveRawMessage();
GenericRecord decodedUser = rawDecoder.decode(rawMessage);
// Efficient decoding of raw message streams
InputStream rawStream = getRawMessageStream();
List<GenericRecord> users = new ArrayList<>();
GenericRecord reusedRecord = null;
while (rawStream.available() > 0) {
reusedRecord = rawDecoder.decode(rawStream, reusedRecord);
users.add(new GenericData.Record(reusedRecord, true)); // Deep copy
}
// Schema evolution with raw messages
Schema writerSchema = getSchemaFromRegistry("user-v1");
Schema readerSchema = getSchemaFromRegistry("user-v2");
RawMessageDecoder<GenericRecord> evolvingRawDecoder =
RawMessageDecoder.of(GenericData.get(), writerSchema, readerSchema);
GenericRecord evolvedUser = evolvingRawDecoder.decode(rawMessage);Interface for resolving schemas by fingerprint in message decoding scenarios.
public interface SchemaStore {
Schema findByFingerprint(long fingerprint);
// Optional method for caching
default void addSchema(Schema schema) {
// Default implementation does nothing
}
}Usage Examples:
// Implement custom schema store
import org.apache.avro.SchemaNormalization;
public class InMemorySchemaStore implements SchemaStore {
private final Map<Long, Schema> schemas = new ConcurrentHashMap<>();
@Override
public Schema findByFingerprint(long fingerprint) {
return schemas.get(fingerprint);
}
@Override
public void addSchema(Schema schema) {
long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
schemas.put(fingerprint, schema);
}
public void loadSchemasFromRegistry() {
// Load schemas from external registry
List<Schema> allSchemas = schemaRegistry.getAllSchemas();
for (Schema schema : allSchemas) {
addSchema(schema);
}
}
}
// Use schema store with message decoder
InMemorySchemaStore schemaStore = new InMemorySchemaStore();
schemaStore.loadSchemasFromRegistry();
BinaryMessageDecoder<GenericRecord> decoder =
BinaryMessageDecoder.of(GenericData.get(), null, readerSchema, schemaStore);
// Decoder will automatically resolve writer schemas using fingerprints
byte[] message = receiveMessage();
GenericRecord record = decoder.decode(message); // Schema resolved automatically
// Database-backed schema store with complete fingerprint handling
public class DatabaseSchemaStore implements SchemaStore {
private final SchemaRepository repository;
private final Cache<Long, Schema> cache;
@Override
public Schema findByFingerprint(long fingerprint) {
return cache.get(fingerprint, fp -> {
String schemaJson = repository.findByFingerprint(fp);
return schemaJson != null ? new Schema.Parser().parse(schemaJson) : null;
});
}
@Override
public void addSchema(Schema schema) {
// Calculate 64-bit parsing fingerprint for message headers
long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
// Store in database
repository.save(fingerprint, schema.toString());
// Update cache
cache.put(fingerprint, schema);
System.out.println("Stored schema with fingerprint: " + fingerprint);
}
// Utility method to get fingerprint for any schema
public long getFingerprint(Schema schema) {
return SchemaNormalization.parsingFingerprint64(schema);
}
}Specific exceptions for message encoding/decoding operations.
public class MissingSchemaException extends AvroRuntimeException {
public MissingSchemaException(String message);
public MissingSchemaException(String message, Throwable cause);
}
public class BadHeaderException extends AvroRuntimeException {
public BadHeaderException(String message);
public BadHeaderException(String message, Throwable cause);
}Usage Examples:
// Handle message-specific exceptions
BinaryMessageDecoder<GenericRecord> decoder = createDecoder();
try {
byte[] suspiciousMessage = receiveMessage();
GenericRecord record = decoder.decode(suspiciousMessage);
processRecord(record);
} catch (MissingSchemaException e) {
// Schema not found in schema store
logger.error("Schema not available for message: " + e.getMessage());
// Possibly fetch schema from registry and retry
} catch (BadHeaderException e) {
// Message header is corrupted or invalid
logger.error("Invalid message header: " + e.getMessage());
// Skip message or send to dead letter queue
} catch (IOException e) {
// General I/O error during decoding
logger.error("Failed to decode message: " + e.getMessage());
}
// Robust message processing with error handling
public void processMessages(List<byte[]> messages) {
int successCount = 0;
int errorCount = 0;
for (byte[] message : messages) {
try {
GenericRecord record = decoder.decode(message);
processRecord(record);
successCount++;
} catch (MissingSchemaException | BadHeaderException e) {
logger.warn("Skipping invalid message: " + e.getMessage());
errorCount++;
} catch (Exception e) {
logger.error("Unexpected error processing message", e);
errorCount++;
}
}
logger.info("Processed {} messages successfully, {} errors", successCount, errorCount);
}public interface MessageEncoder<D> {
byte[] encode(D datum) throws IOException;
void encode(D datum, OutputStream stream) throws IOException;
}
public interface MessageDecoder<D> {
D decode(InputStream stream) throws IOException;
D decode(InputStream stream, D reuse) throws IOException;
D decode(byte[] encoded) throws IOException;
D decode(byte[] encoded, D reuse) throws IOException;
public abstract static class BaseDecoder<D> implements MessageDecoder<D> {
// Common implementation for byte array methods
}
}
public class BinaryMessageEncoder<D> implements MessageEncoder<D> {
// Binary format with schema fingerprint header
}
public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
// Binary format decoder with schema resolution
}
public class RawMessageEncoder<D> implements MessageEncoder<D> {
// Raw data encoding without headers
}
public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
// Raw data decoding with known schema
}
public interface SchemaStore {
Schema findByFingerprint(long fingerprint);
default void addSchema(Schema schema);
}
public class MissingSchemaException extends AvroRuntimeException {
// Schema not found exception
}
public class BadHeaderException extends AvroRuntimeException {
// Invalid message header exception
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro