CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ray--streaming-state

State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications

Pending
Overview
Eval results
Files

serialization-framework.mddocs/

Serialization Framework

Pluggable serialization system with default FST-based implementations supporting custom serializers for both key-value stores and key-map stores, enabling efficient state persistence and cross-language compatibility.

Capabilities

Core Serialization Utility

Fast serialization utility using FST (Fast Serialization) library for efficient object-to-bytes conversion.

/**
 * FST serialization wrapper for efficient object serialization
 */
public class Serializer {
    /**
     * Serialize object to byte array using FST
     * @param value Object to serialize
     * @return Serialized byte array
     */
    public static byte[] object2Bytes(Object value);
    
    /**
     * Deserialize byte array to object using FST
     * @param buffer Byte array to deserialize
     * @return Deserialized object
     */
    public static Object bytes2Object(byte[] buffer);
}

Usage Examples:

import io.ray.streaming.state.serialization.Serializer;

// Serialize different types of objects
String text = "Hello, Ray Streaming!";
byte[] textBytes = Serializer.object2Bytes(text);

Integer number = 42;
byte[] numberBytes = Serializer.object2Bytes(number);

List<String> list = Arrays.asList("item1", "item2", "item3");
byte[] listBytes = Serializer.object2Bytes(list);

// Deserialize objects back
String deserializedText = (String) Serializer.bytes2Object(textBytes);
Integer deserializedNumber = (Integer) Serializer.bytes2Object(numberBytes);
List<String> deserializedList = (List<String>) Serializer.bytes2Object(listBytes);

// Custom objects
class UserProfile {
    public String name;
    public int age;
    public List<String> preferences;
}

UserProfile profile = new UserProfile();
profile.name = "Alice";
profile.age = 30;
profile.preferences = Arrays.asList("tech", "music");

byte[] profileBytes = Serializer.object2Bytes(profile);
UserProfile deserializedProfile = (UserProfile) Serializer.bytes2Object(profileBytes);

Key-Value Store Serialization Interface

Interface defining serialization methods for key-value stores with separate key and value serialization support.

/**
 * Interface for key-value store serialization/deserialization
 */
public interface KeyValueStoreSerialization<K, V> {
    /**
     * Serialize key to byte array
     * @param key Key object to serialize
     * @return Serialized key bytes
     */
    byte[] serializeKey(K key);
    
    /**
     * Serialize value to byte array
     * @param value Value object to serialize
     * @return Serialized value bytes
     */
    byte[] serializeValue(V value);
    
    /**
     * Deserialize byte array to value object
     * @param valueArray Serialized value bytes
     * @return Deserialized value object
     */
    V deserializeValue(byte[] valueArray);
}

Key-Map Store Serialization Interface

Interface defining serialization methods for key-map stores supporting hierarchical key structures with sub-keys and sub-values.

/**
 * Interface for key-map store serialization/deserialization
 */
public interface KeyMapStoreSerializer<K, S, T> {
    /**
     * Serialize primary key to byte array
     * @param key Primary key object to serialize
     * @return Serialized key bytes
     */
    byte[] serializeKey(K key);
    
    /**
     * Serialize sub-key to byte array
     * @param uk Sub-key object to serialize
     * @return Serialized sub-key bytes
     */
    byte[] serializeUKey(S uk);
    
    /**
     * Serialize sub-value to byte array
     * @param uv Sub-value object to serialize
     * @return Serialized sub-value bytes
     */
    byte[] serializeUValue(T uv);
    
    /**
     * Deserialize byte array to sub-key object
     * @param ukArray Serialized sub-key bytes
     * @return Deserialized sub-key object
     */
    S deserializeUKey(byte[] ukArray);
    
    /**
     * Deserialize byte array to sub-value object
     * @param uvArray Serialized sub-value bytes
     * @return Deserialized sub-value object
     */
    T deserializeUValue(byte[] uvArray);
}

Default Serialization Implementations

The framework provides default implementations for common serialization scenarios.

Abstract Serialization Base Class

/**
 * Base class for serialization implementations
 */
public abstract class AbstractSerialization {
    // Common serialization functionality and utilities
}

Default Key-Value Store Serialization

/**
 * Default implementation using FST serialization with key prefixing
 */
public class DefaultKeyValueStoreSerialization<K, V> extends AbstractSerialization implements KeyValueStoreSerialization<K, V> {
    /**
     * Serialize key with namespace prefix
     * @param key Key to serialize
     * @return Serialized key bytes with prefix
     */
    public byte[] serializeKey(K key);
    
    /**
     * Serialize value using FST
     * @param value Value to serialize
     * @return Serialized value bytes
     */
    public byte[] serializeValue(V value);
    
    /**
     * Deserialize value using FST
     * @param valueArray Serialized value bytes
     * @return Deserialized value object
     */
    public V deserializeValue(byte[] valueArray);
}

Default Key-Map Store Serialization

/**
 * Default implementation for key-map store serialization
 */
public class DefaultKeyMapStoreSerializer<K, S, T> extends AbstractSerialization implements KeyMapStoreSerializer<K, S, T> {
    /**
     * Serialize primary key
     * @param key Primary key to serialize
     * @return Serialized key bytes
     */
    public byte[] serializeKey(K key);
    
    /**
     * Serialize sub-key
     * @param uk Sub-key to serialize
     * @return Serialized sub-key bytes
     */
    public byte[] serializeUKey(S uk);
    
    /**
     * Serialize sub-value
     * @param uv Sub-value to serialize
     * @return Serialized sub-value bytes
     */
    public byte[] serializeUValue(T uv);
    
    /**
     * Deserialize sub-key
     * @param ukArray Serialized sub-key bytes
     * @return Deserialized sub-key object
     */
    public S deserializeUKey(byte[] ukArray);
    
    /**
     * Deserialize sub-value
     * @param uvArray Serialized sub-value bytes
     * @return Deserialized sub-value object
     */
    public T deserializeUValue(byte[] uvArray);
}

Custom Serialization Implementation Examples:

// Example 1: JSON-based serialization for key-value stores
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonKeyValueSerialization<K, V> implements KeyValueStoreSerialization<K, V> {
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Class<K> keyClass;
    private final Class<V> valueClass;
    
    public JsonKeyValueSerialization(Class<K> keyClass, Class<V> valueClass) {
        this.keyClass = keyClass;
        this.valueClass = valueClass;
    }
    
    @Override
    public byte[] serializeKey(K key) {
        try {
            return objectMapper.writeValueAsBytes(key);
        } catch (Exception e) {
            throw new RuntimeException("Key serialization failed", e);
        }
    }
    
    @Override
    public byte[] serializeValue(V value) {
        try {
            return objectMapper.writeValueAsBytes(value);
        } catch (Exception e) {
            throw new RuntimeException("Value serialization failed", e);
        }
    }
    
    @Override
    public V deserializeValue(byte[] valueArray) {
        try {
            return objectMapper.readValue(valueArray, valueClass);
        } catch (Exception e) {
            throw new RuntimeException("Value deserialization failed", e);
        }
    }
}

// Example 2: Protobuf-based serialization for key-map stores
public class ProtobufKeyMapSerializer implements KeyMapStoreSerializer<String, String, UserData> {
    
    @Override
    public byte[] serializeKey(String key) {
        return key.getBytes(StandardCharsets.UTF_8);
    }
    
    @Override
    public byte[] serializeUKey(String uk) {
        return uk.getBytes(StandardCharsets.UTF_8);
    }
    
    @Override
    public byte[] serializeUValue(UserData uv) {
        // Assuming UserData has toProtobuf() method
        return uv.toProtobuf().toByteArray();
    }
    
    @Override
    public String deserializeUKey(byte[] ukArray) {
        return new String(ukArray, StandardCharsets.UTF_8);
    }
    
    @Override
    public UserData deserializeUValue(byte[] uvArray) {
        try {
            // Assuming UserData has fromProtobuf() method
            return UserData.fromProtobuf(UserDataProto.parseFrom(uvArray));
        } catch (Exception e) {
            throw new RuntimeException("UserData deserialization failed", e);
        }
    }
}

Integration with Storage Systems

The serialization framework integrates seamlessly with the storage layer:

// Example: Using custom serialization with key-value stores
public class CustomSerializationExample {
    
    public void demonstrateCustomSerialization() {
        // Create backend with custom serialization
        Map<String, String> config = new HashMap<>();
        config.put("state.backend.type", "MEMORY");
        AbstractStateBackend backend = StateBackendBuilder.buildStateBackend(config);
        
        // Get key-value store
        KeyValueStore<String, UserProfile> store = backend.getKeyValueStore("user-profiles");
        
        // The store will use the configured serialization automatically
        UserProfile profile = new UserProfile("Alice", 30, Arrays.asList("tech", "music"));
        store.put("user123", profile);
        
        UserProfile retrieved = store.get("user123");
        System.out.println("Retrieved profile: " + retrieved.name);
    }
    
    // Example: Serialization with compression
    public static class CompressedSerialization<K, V> implements KeyValueStoreSerialization<K, V> {
        private final Deflater deflater = new Deflater();
        private final Inflater inflater = new Inflater();
        
        @Override
        public byte[] serializeKey(K key) {
            byte[] raw = Serializer.object2Bytes(key);
            return compress(raw);
        }
        
        @Override
        public byte[] serializeValue(V value) {
            byte[] raw = Serializer.object2Bytes(value);
            return compress(raw);
        }
        
        @Override
        public V deserializeValue(byte[] valueArray) {
            byte[] decompressed = decompress(valueArray);
            return (V) Serializer.bytes2Object(decompressed);
        }
        
        private byte[] compress(byte[] data) {
            deflater.setInput(data);
            deflater.finish();
            
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            byte[] buffer = new byte[1024];
            
            while (!deflater.finished()) {
                int count = deflater.deflate(buffer);
                baos.write(buffer, 0, count);
            }
            
            deflater.reset();
            return baos.toByteArray();
        }
        
        private byte[] decompress(byte[] data) {
            inflater.setInput(data);
            
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            byte[] buffer = new byte[1024];
            
            try {
                while (!inflater.finished()) {
                    int count = inflater.inflate(buffer);
                    baos.write(buffer, 0, count);
                }
            } catch (DataFormatException e) {
                throw new RuntimeException("Decompression failed", e);
            }
            
            inflater.reset();
            return baos.toByteArray();
        }
    }
}

Performance and Best Practices

// Best practices for serialization performance
public class SerializationBestPractices {
    
    // 1. Reuse serialization instances when possible
    private static final DefaultKeyValueStoreSerialization<String, Object> REUSABLE_SERIALIZER = 
        new DefaultKeyValueStoreSerialization<>();
    
    // 2. Use appropriate serialization for data types
    public void chooseAppropriateSerializer() {
        // For simple types, FST is efficient
        KeyValueStoreSerialization<String, String> stringSerializer = new DefaultKeyValueStoreSerialization<>();
        
        // For complex nested objects, consider JSON or Protobuf
        KeyValueStoreSerialization<String, ComplexObject> complexSerializer = new JsonKeyValueSerialization<>(String.class, ComplexObject.class);
        
        // For high-frequency serialization, use binary formats
        KeyValueStoreSerialization<String, byte[]> binarySerializer = new DefaultKeyValueStoreSerialization<>();
    }
    
    // 3. Handle serialization errors gracefully
    public byte[] safeSerialize(Object value) {
        try {
            return Serializer.object2Bytes(value);
        } catch (Exception e) {
            // Log error and provide fallback
            System.err.println("Serialization failed for value: " + value.getClass().getName());
            
            // Return empty bytes or default serialization
            return new byte[0];
        }
    }
    
    // 4. Optimize for common use cases
    public static class OptimizedStringMapSerializer implements KeyMapStoreSerializer<String, String, String> {
        
        @Override
        public byte[] serializeKey(String key) {
            return key.getBytes(StandardCharsets.UTF_8);
        }
        
        @Override
        public byte[] serializeUKey(String uk) {
            return uk.getBytes(StandardCharsets.UTF_8);
        }
        
        @Override  
        public byte[] serializeUValue(String uv) {
            return uv.getBytes(StandardCharsets.UTF_8);
        }
        
        @Override
        public String deserializeUKey(byte[] ukArray) {
            return new String(ukArray, StandardCharsets.UTF_8);
        }
        
        @Override
        public String deserializeUValue(byte[] uvArray) {
            return new String(uvArray, StandardCharsets.UTF_8);
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-ray--streaming-state

docs

backend-management.md

configuration-key-groups.md

index.md

key-state-management.md

serialization-framework.md

state-types-operations.md

transaction-management.md

tile.json