State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications
—
Pluggable serialization system with default FST-based implementations supporting custom serializers for both key-value stores and key-map stores, enabling efficient state persistence and cross-language compatibility.
Fast serialization utility using FST (Fast Serialization) library for efficient object-to-bytes conversion.
/**
* FST serialization wrapper for efficient object serialization
*/
public class Serializer {
/**
* Serialize object to byte array using FST
* @param value Object to serialize
* @return Serialized byte array
*/
public static byte[] object2Bytes(Object value);
/**
* Deserialize byte array to object using FST
* @param buffer Byte array to deserialize
* @return Deserialized object
*/
public static Object bytes2Object(byte[] buffer);
}Usage Examples:
import io.ray.streaming.state.serialization.Serializer;
// Serialize different types of objects
String text = "Hello, Ray Streaming!";
byte[] textBytes = Serializer.object2Bytes(text);
Integer number = 42;
byte[] numberBytes = Serializer.object2Bytes(number);
List<String> list = Arrays.asList("item1", "item2", "item3");
byte[] listBytes = Serializer.object2Bytes(list);
// Deserialize objects back
String deserializedText = (String) Serializer.bytes2Object(textBytes);
Integer deserializedNumber = (Integer) Serializer.bytes2Object(numberBytes);
List<String> deserializedList = (List<String>) Serializer.bytes2Object(listBytes);
// Custom objects
class UserProfile {
public String name;
public int age;
public List<String> preferences;
}
UserProfile profile = new UserProfile();
profile.name = "Alice";
profile.age = 30;
profile.preferences = Arrays.asList("tech", "music");
byte[] profileBytes = Serializer.object2Bytes(profile);
UserProfile deserializedProfile = (UserProfile) Serializer.bytes2Object(profileBytes);Interface defining serialization methods for key-value stores with separate key and value serialization support.
/**
* Interface for key-value store serialization/deserialization
*/
public interface KeyValueStoreSerialization<K, V> {
/**
* Serialize key to byte array
* @param key Key object to serialize
* @return Serialized key bytes
*/
byte[] serializeKey(K key);
/**
* Serialize value to byte array
* @param value Value object to serialize
* @return Serialized value bytes
*/
byte[] serializeValue(V value);
/**
* Deserialize byte array to value object
* @param valueArray Serialized value bytes
* @return Deserialized value object
*/
V deserializeValue(byte[] valueArray);
}Interface defining serialization methods for key-map stores supporting hierarchical key structures with sub-keys and sub-values.
/**
* Interface for key-map store serialization/deserialization
*/
public interface KeyMapStoreSerializer<K, S, T> {
/**
* Serialize primary key to byte array
* @param key Primary key object to serialize
* @return Serialized key bytes
*/
byte[] serializeKey(K key);
/**
* Serialize sub-key to byte array
* @param uk Sub-key object to serialize
* @return Serialized sub-key bytes
*/
byte[] serializeUKey(S uk);
/**
* Serialize sub-value to byte array
* @param uv Sub-value object to serialize
* @return Serialized sub-value bytes
*/
byte[] serializeUValue(T uv);
/**
* Deserialize byte array to sub-key object
* @param ukArray Serialized sub-key bytes
* @return Deserialized sub-key object
*/
S deserializeUKey(byte[] ukArray);
/**
* Deserialize byte array to sub-value object
* @param uvArray Serialized sub-value bytes
* @return Deserialized sub-value object
*/
T deserializeUValue(byte[] uvArray);
}The framework provides default implementations for common serialization scenarios.
/**
* Base class for serialization implementations
*/
public abstract class AbstractSerialization {
// Common serialization functionality and utilities
}/**
* Default implementation using FST serialization with key prefixing
*/
public class DefaultKeyValueStoreSerialization<K, V> extends AbstractSerialization implements KeyValueStoreSerialization<K, V> {
/**
* Serialize key with namespace prefix
* @param key Key to serialize
* @return Serialized key bytes with prefix
*/
public byte[] serializeKey(K key);
/**
* Serialize value using FST
* @param value Value to serialize
* @return Serialized value bytes
*/
public byte[] serializeValue(V value);
/**
* Deserialize value using FST
* @param valueArray Serialized value bytes
* @return Deserialized value object
*/
public V deserializeValue(byte[] valueArray);
}/**
* Default implementation for key-map store serialization
*/
public class DefaultKeyMapStoreSerializer<K, S, T> extends AbstractSerialization implements KeyMapStoreSerializer<K, S, T> {
/**
* Serialize primary key
* @param key Primary key to serialize
* @return Serialized key bytes
*/
public byte[] serializeKey(K key);
/**
* Serialize sub-key
* @param uk Sub-key to serialize
* @return Serialized sub-key bytes
*/
public byte[] serializeUKey(S uk);
/**
* Serialize sub-value
* @param uv Sub-value to serialize
* @return Serialized sub-value bytes
*/
public byte[] serializeUValue(T uv);
/**
* Deserialize sub-key
* @param ukArray Serialized sub-key bytes
* @return Deserialized sub-key object
*/
public S deserializeUKey(byte[] ukArray);
/**
* Deserialize sub-value
* @param uvArray Serialized sub-value bytes
* @return Deserialized sub-value object
*/
public T deserializeUValue(byte[] uvArray);
}Custom Serialization Implementation Examples:
// Example 1: JSON-based serialization for key-value stores
import com.fasterxml.jackson.databind.ObjectMapper;
public class JsonKeyValueSerialization<K, V> implements KeyValueStoreSerialization<K, V> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final Class<K> keyClass;
private final Class<V> valueClass;
public JsonKeyValueSerialization(Class<K> keyClass, Class<V> valueClass) {
this.keyClass = keyClass;
this.valueClass = valueClass;
}
@Override
public byte[] serializeKey(K key) {
try {
return objectMapper.writeValueAsBytes(key);
} catch (Exception e) {
throw new RuntimeException("Key serialization failed", e);
}
}
@Override
public byte[] serializeValue(V value) {
try {
return objectMapper.writeValueAsBytes(value);
} catch (Exception e) {
throw new RuntimeException("Value serialization failed", e);
}
}
@Override
public V deserializeValue(byte[] valueArray) {
try {
return objectMapper.readValue(valueArray, valueClass);
} catch (Exception e) {
throw new RuntimeException("Value deserialization failed", e);
}
}
}
// Example 2: Protobuf-based serialization for key-map stores
public class ProtobufKeyMapSerializer implements KeyMapStoreSerializer<String, String, UserData> {
@Override
public byte[] serializeKey(String key) {
return key.getBytes(StandardCharsets.UTF_8);
}
@Override
public byte[] serializeUKey(String uk) {
return uk.getBytes(StandardCharsets.UTF_8);
}
@Override
public byte[] serializeUValue(UserData uv) {
// Assuming UserData has toProtobuf() method
return uv.toProtobuf().toByteArray();
}
@Override
public String deserializeUKey(byte[] ukArray) {
return new String(ukArray, StandardCharsets.UTF_8);
}
@Override
public UserData deserializeUValue(byte[] uvArray) {
try {
// Assuming UserData has fromProtobuf() method
return UserData.fromProtobuf(UserDataProto.parseFrom(uvArray));
} catch (Exception e) {
throw new RuntimeException("UserData deserialization failed", e);
}
}
}The serialization framework integrates seamlessly with the storage layer:
// Example: Using custom serialization with key-value stores
public class CustomSerializationExample {
public void demonstrateCustomSerialization() {
// Create backend with custom serialization
Map<String, String> config = new HashMap<>();
config.put("state.backend.type", "MEMORY");
AbstractStateBackend backend = StateBackendBuilder.buildStateBackend(config);
// Get key-value store
KeyValueStore<String, UserProfile> store = backend.getKeyValueStore("user-profiles");
// The store will use the configured serialization automatically
UserProfile profile = new UserProfile("Alice", 30, Arrays.asList("tech", "music"));
store.put("user123", profile);
UserProfile retrieved = store.get("user123");
System.out.println("Retrieved profile: " + retrieved.name);
}
// Example: Serialization with compression
public static class CompressedSerialization<K, V> implements KeyValueStoreSerialization<K, V> {
private final Deflater deflater = new Deflater();
private final Inflater inflater = new Inflater();
@Override
public byte[] serializeKey(K key) {
byte[] raw = Serializer.object2Bytes(key);
return compress(raw);
}
@Override
public byte[] serializeValue(V value) {
byte[] raw = Serializer.object2Bytes(value);
return compress(raw);
}
@Override
public V deserializeValue(byte[] valueArray) {
byte[] decompressed = decompress(valueArray);
return (V) Serializer.bytes2Object(decompressed);
}
private byte[] compress(byte[] data) {
deflater.setInput(data);
deflater.finish();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
while (!deflater.finished()) {
int count = deflater.deflate(buffer);
baos.write(buffer, 0, count);
}
deflater.reset();
return baos.toByteArray();
}
private byte[] decompress(byte[] data) {
inflater.setInput(data);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
try {
while (!inflater.finished()) {
int count = inflater.inflate(buffer);
baos.write(buffer, 0, count);
}
} catch (DataFormatException e) {
throw new RuntimeException("Decompression failed", e);
}
inflater.reset();
return baos.toByteArray();
}
}
}// Best practices for serialization performance
public class SerializationBestPractices {
// 1. Reuse serialization instances when possible
private static final DefaultKeyValueStoreSerialization<String, Object> REUSABLE_SERIALIZER =
new DefaultKeyValueStoreSerialization<>();
// 2. Use appropriate serialization for data types
public void chooseAppropriateSerializer() {
// For simple types, FST is efficient
KeyValueStoreSerialization<String, String> stringSerializer = new DefaultKeyValueStoreSerialization<>();
// For complex nested objects, consider JSON or Protobuf
KeyValueStoreSerialization<String, ComplexObject> complexSerializer = new JsonKeyValueSerialization<>(String.class, ComplexObject.class);
// For high-frequency serialization, use binary formats
KeyValueStoreSerialization<String, byte[]> binarySerializer = new DefaultKeyValueStoreSerialization<>();
}
// 3. Handle serialization errors gracefully
public byte[] safeSerialize(Object value) {
try {
return Serializer.object2Bytes(value);
} catch (Exception e) {
// Log error and provide fallback
System.err.println("Serialization failed for value: " + value.getClass().getName());
// Return empty bytes or default serialization
return new byte[0];
}
}
// 4. Optimize for common use cases
public static class OptimizedStringMapSerializer implements KeyMapStoreSerializer<String, String, String> {
@Override
public byte[] serializeKey(String key) {
return key.getBytes(StandardCharsets.UTF_8);
}
@Override
public byte[] serializeUKey(String uk) {
return uk.getBytes(StandardCharsets.UTF_8);
}
@Override
public byte[] serializeUValue(String uv) {
return uv.getBytes(StandardCharsets.UTF_8);
}
@Override
public String deserializeUKey(byte[] ukArray) {
return new String(ukArray, StandardCharsets.UTF_8);
}
@Override
public String deserializeUValue(byte[] uvArray) {
return new String(uvArray, StandardCharsets.UTF_8);
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ray--streaming-state