CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro

Apache Avro core components for data serialization with rich data structures, compact binary format, and schema evolution support

Pending
Overview
Eval results
Files

message-operations.mddocs/

Message Operations

Avro's message operations provide efficient encoding and decoding of individual objects for messaging systems and data exchange. These operations support both header-based schema identification and raw data formats for different messaging scenarios.

Capabilities

Message Encoder Interface

Core interface for encoding single objects into byte arrays or streams.

public interface MessageEncoder<D> {
    byte[] encode(D datum) throws IOException;
    void encode(D datum, OutputStream stream) throws IOException;
}

Usage Examples:

// Example implementation usage (see concrete implementations below)
MessageEncoder<GenericRecord> encoder = /* create encoder */;

GenericRecord record = createUserRecord();

// Encode to byte array
byte[] encoded = encoder.encode(record);
System.out.println("Encoded size: " + encoded.length + " bytes");

// Encode to stream
ByteArrayOutputStream stream = new ByteArrayOutputStream();
encoder.encode(record, stream);
byte[] streamEncoded = stream.toByteArray();

// Send encoded data via messaging system
messagingSystem.send("user.topic", encoded);

Message Decoder Interface

Core interface for decoding single objects from byte arrays or streams.

public interface MessageDecoder<D> {
    D decode(InputStream stream) throws IOException;
    D decode(InputStream stream, D reuse) throws IOException;
    D decode(byte[] encoded) throws IOException;
    D decode(byte[] encoded, D reuse) throws IOException;
    
    // Base decoder class with common functionality
    public abstract static class BaseDecoder<D> implements MessageDecoder<D> {
        // Default implementations for byte array methods
        public D decode(byte[] encoded) throws IOException;
        public D decode(byte[] encoded, D reuse) throws IOException;
    }
}

Usage Examples:

// Example implementation usage (see concrete implementations below)
MessageDecoder<GenericRecord> decoder = /* create decoder */;

// Decode from byte array
byte[] encodedData = receiveFromMessaging();
GenericRecord record = decoder.decode(encodedData);
System.out.println("Decoded name: " + record.get("name"));

// Decode with object reuse for performance
GenericRecord reusedRecord = null;
for (byte[] message : messages) {
    reusedRecord = decoder.decode(message, reusedRecord);
    processRecord(reusedRecord);
}

// Decode from stream
InputStream messageStream = new ByteArrayInputStream(encodedData);
GenericRecord streamRecord = decoder.decode(messageStream);

Binary Message Encoder

Encoder for binary single-object messages with schema fingerprint headers.

public class BinaryMessageEncoder<D> implements MessageEncoder<D> {
    public static <D> BinaryMessageEncoder<D> of(GenericData model, Schema schema);
    public static <D> BinaryMessageEncoder<D> of(GenericData model, Schema schema, Map<String, Object> config);
    
    // MessageEncoder implementation
    public byte[] encode(D datum) throws IOException;
    public void encode(D datum, OutputStream stream) throws IOException;
}

Usage Examples:

// Create binary message encoder for generic records
Schema userSchema = new Schema.Parser().parse(userSchemaJson);
BinaryMessageEncoder<GenericRecord> encoder = 
    BinaryMessageEncoder.of(GenericData.get(), userSchema);

// Create user record
GenericRecord user = new GenericData.Record(userSchema);
user.put("name", "Alice");
user.put("age", 30);
user.put("email", "alice@example.com");

// Encode to binary message with header
byte[] binaryMessage = encoder.encode(user);
System.out.println("Binary message size: " + binaryMessage.length);

// The encoded message includes:
// - Magic byte and format version
// - Schema fingerprint
// - Serialized data

// Send via message queue
messageQueue.publish("users", binaryMessage);

// Create encoder for specific records
BinaryMessageEncoder<User> specificEncoder = 
    BinaryMessageEncoder.of(SpecificData.get(), User.SCHEMA$);

User specificUser = new User("Bob", 35, "bob@example.com");
byte[] specificMessage = specificEncoder.encode(specificUser);

Binary Message Decoder

Decoder for binary single-object messages that can resolve schemas using fingerprints.

public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
    public static <D> BinaryMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema);
    public static <D> BinaryMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema, SchemaStore resolver);
    
    // MessageDecoder implementation
    public D decode(InputStream stream) throws IOException;
    public D decode(InputStream stream, D reuse) throws IOException;
}

Usage Examples:

// Create binary message decoder
Schema readerSchema = new Schema.Parser().parse(readerSchemaJson);
BinaryMessageDecoder<GenericRecord> decoder = 
    BinaryMessageDecoder.of(GenericData.get(), readerSchema, readerSchema);

// Decode binary message
byte[] binaryMessage = messageQueue.receive("users");
GenericRecord decodedUser = decoder.decode(binaryMessage);
System.out.println("Decoded user: " + decodedUser.get("name"));

// Schema evolution with different reader schema
Schema evolvedReaderSchema = new Schema.Parser().parse(evolvedSchemaJson);
BinaryMessageDecoder<GenericRecord> evolvingDecoder = 
    BinaryMessageDecoder.of(GenericData.get(), writerSchema, evolvedReaderSchema);

GenericRecord evolvedUser = evolvingDecoder.decode(binaryMessage);

// Use schema store for automatic schema resolution
SchemaStore schemaStore = createSchemaStore();
BinaryMessageDecoder<GenericRecord> resolverDecoder = 
    BinaryMessageDecoder.of(GenericData.get(), null, readerSchema, schemaStore);

// This decoder can handle messages with different writer schemas
GenericRecord autoResolvedUser = resolverDecoder.decode(binaryMessage);

// Decode multiple messages efficiently
List<byte[]> messages = messageQueue.receiveBatch("users", 100);
GenericRecord reusedRecord = null;
for (byte[] message : messages) {
    reusedRecord = decoder.decode(message, reusedRecord);
    processUser(reusedRecord);
}

Raw Message Encoder

Encoder for raw messages without headers - just the serialized data.

public class RawMessageEncoder<D> implements MessageEncoder<D> {
    public static <D> RawMessageEncoder<D> of(GenericData model, Schema schema);
    
    // MessageEncoder implementation
    public byte[] encode(D datum) throws IOException;
    public void encode(D datum, OutputStream stream) throws IOException;
}

Usage Examples:

// Create raw message encoder (no headers)
Schema userSchema = new Schema.Parser().parse(userSchemaJson);
RawMessageEncoder<GenericRecord> rawEncoder = 
    RawMessageEncoder.of(GenericData.get(), userSchema);

GenericRecord user = createUserRecord();

// Encode without any headers - just the raw data
byte[] rawMessage = rawEncoder.encode(user);
System.out.println("Raw message size: " + rawMessage.length);

// Raw messages are smaller but require external schema management
// Useful when schema is managed separately (e.g., schema registry)

// Use in streaming scenarios where every byte counts
for (GenericRecord record : largeDataset) {
    byte[] rawData = rawEncoder.encode(record);
    highThroughputStream.write(rawData);
}

Raw Message Decoder

Decoder for raw messages that requires external schema knowledge.

public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
    public static <D> RawMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema);
    
    // MessageDecoder implementation
    public D decode(InputStream stream) throws IOException;
    public D decode(InputStream stream, D reuse) throws IOException;
}

Usage Examples:

// Schema must be known ahead of time for raw messages
Schema knownSchema = getSchemaFromRegistry("user-v1");
RawMessageDecoder<GenericRecord> rawDecoder = 
    RawMessageDecoder.of(GenericData.get(), knownSchema, knownSchema);

// Decode raw message data
byte[] rawMessage = receiveRawMessage();
GenericRecord decodedUser = rawDecoder.decode(rawMessage);

// Efficient decoding of raw message streams
InputStream rawStream = getRawMessageStream();
List<GenericRecord> users = new ArrayList<>();
GenericRecord reusedRecord = null;

while (rawStream.available() > 0) {
    reusedRecord = rawDecoder.decode(rawStream, reusedRecord);
    users.add(new GenericData.Record(reusedRecord, true)); // Deep copy
}

// Schema evolution with raw messages
Schema writerSchema = getSchemaFromRegistry("user-v1");
Schema readerSchema = getSchemaFromRegistry("user-v2");
RawMessageDecoder<GenericRecord> evolvingRawDecoder = 
    RawMessageDecoder.of(GenericData.get(), writerSchema, readerSchema);

GenericRecord evolvedUser = evolvingRawDecoder.decode(rawMessage);

Schema Store Interface

Interface for resolving schemas by fingerprint in message decoding scenarios.

public interface SchemaStore {
    Schema findByFingerprint(long fingerprint);
    
    // Optional method for caching
    default void addSchema(Schema schema) {
        // Default implementation does nothing
    }
}

Usage Examples:

// Implement custom schema store
import org.apache.avro.SchemaNormalization;

public class InMemorySchemaStore implements SchemaStore {
    private final Map<Long, Schema> schemas = new ConcurrentHashMap<>();
    
    @Override
    public Schema findByFingerprint(long fingerprint) {
        return schemas.get(fingerprint);
    }
    
    @Override
    public void addSchema(Schema schema) {
        long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
        schemas.put(fingerprint, schema);
    }
    
    public void loadSchemasFromRegistry() {
        // Load schemas from external registry
        List<Schema> allSchemas = schemaRegistry.getAllSchemas();
        for (Schema schema : allSchemas) {
            addSchema(schema);
        }
    }
}

// Use schema store with message decoder
InMemorySchemaStore schemaStore = new InMemorySchemaStore();
schemaStore.loadSchemasFromRegistry();

BinaryMessageDecoder<GenericRecord> decoder = 
    BinaryMessageDecoder.of(GenericData.get(), null, readerSchema, schemaStore);

// Decoder will automatically resolve writer schemas using fingerprints
byte[] message = receiveMessage();
GenericRecord record = decoder.decode(message); // Schema resolved automatically

// Database-backed schema store with complete fingerprint handling
public class DatabaseSchemaStore implements SchemaStore {
    private final SchemaRepository repository;
    private final Cache<Long, Schema> cache;
    
    @Override
    public Schema findByFingerprint(long fingerprint) {
        return cache.get(fingerprint, fp -> {
            String schemaJson = repository.findByFingerprint(fp);
            return schemaJson != null ? new Schema.Parser().parse(schemaJson) : null;
        });
    }
    
    @Override
    public void addSchema(Schema schema) {
        // Calculate 64-bit parsing fingerprint for message headers
        long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
        
        // Store in database
        repository.save(fingerprint, schema.toString());
        
        // Update cache
        cache.put(fingerprint, schema);
        
        System.out.println("Stored schema with fingerprint: " + fingerprint);
    }
    
    // Utility method to get fingerprint for any schema
    public long getFingerprint(Schema schema) {
        return SchemaNormalization.parsingFingerprint64(schema);
    }
}

Message Exception Handling

Specific exceptions for message encoding/decoding operations.

public class MissingSchemaException extends AvroRuntimeException {
    public MissingSchemaException(String message);
    public MissingSchemaException(String message, Throwable cause);
}

public class BadHeaderException extends AvroRuntimeException {
    public BadHeaderException(String message);
    public BadHeaderException(String message, Throwable cause);
}

Usage Examples:

// Handle message-specific exceptions
BinaryMessageDecoder<GenericRecord> decoder = createDecoder();

try {
    byte[] suspiciousMessage = receiveMessage();
    GenericRecord record = decoder.decode(suspiciousMessage);
    processRecord(record);
    
} catch (MissingSchemaException e) {
    // Schema not found in schema store
    logger.error("Schema not available for message: " + e.getMessage());
    // Possibly fetch schema from registry and retry
    
} catch (BadHeaderException e) {
    // Message header is corrupted or invalid
    logger.error("Invalid message header: " + e.getMessage());
    // Skip message or send to dead letter queue
    
} catch (IOException e) {
    // General I/O error during decoding
    logger.error("Failed to decode message: " + e.getMessage());
}

// Robust message processing with error handling
public void processMessages(List<byte[]> messages) {
    int successCount = 0;
    int errorCount = 0;
    
    for (byte[] message : messages) {
        try {
            GenericRecord record = decoder.decode(message);
            processRecord(record);
            successCount++;
        } catch (MissingSchemaException | BadHeaderException e) {
            logger.warn("Skipping invalid message: " + e.getMessage());
            errorCount++;
        } catch (Exception e) {
            logger.error("Unexpected error processing message", e);
            errorCount++;
        }
    }
    
    logger.info("Processed {} messages successfully, {} errors", successCount, errorCount);
}

Types

public interface MessageEncoder<D> {
    byte[] encode(D datum) throws IOException;
    void encode(D datum, OutputStream stream) throws IOException;
}

public interface MessageDecoder<D> {
    D decode(InputStream stream) throws IOException;
    D decode(InputStream stream, D reuse) throws IOException;
    D decode(byte[] encoded) throws IOException;
    D decode(byte[] encoded, D reuse) throws IOException;
    
    public abstract static class BaseDecoder<D> implements MessageDecoder<D> {
        // Common implementation for byte array methods
    }
}

public class BinaryMessageEncoder<D> implements MessageEncoder<D> {
    // Binary format with schema fingerprint header
}

public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
    // Binary format decoder with schema resolution
}

public class RawMessageEncoder<D> implements MessageEncoder<D> {
    // Raw data encoding without headers
}

public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
    // Raw data decoding with known schema
}

public interface SchemaStore {
    Schema findByFingerprint(long fingerprint);
    default void addSchema(Schema schema);
}

public class MissingSchemaException extends AvroRuntimeException {
    // Schema not found exception
}

public class BadHeaderException extends AvroRuntimeException {
    // Invalid message header exception
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro

docs

code-generation.md

encoding-decoding.md

file-operations.md

generic-data.md

index.md

message-operations.md

reflection-operations.md

schema-evolution.md

schema-system.md

tile.json