State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications
npx @tessl/cli install tessl/maven-io-ray--streaming-state@1.10.0Ray Streaming State is a comprehensive state management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications. It enables streaming applications to maintain persistent, transactional state across distributed processing nodes with fault-tolerant recovery mechanisms.
<dependency>
<groupId>io.ray</groupId>
<artifactId>streaming-state</artifactId>
<version>1.10.0</version>
</dependency>import io.ray.streaming.state.backend.StateBackendBuilder;
import io.ray.streaming.state.backend.KeyStateBackend;
import io.ray.streaming.state.keystate.desc.ValueStateDescriptor;
import io.ray.streaming.state.keystate.desc.ListStateDescriptor;
import io.ray.streaming.state.keystate.desc.MapStateDescriptor;
import io.ray.streaming.state.keystate.state.ValueState;
import io.ray.streaming.state.keystate.state.ListState;
import io.ray.streaming.state.keystate.state.MapState;import io.ray.streaming.state.backend.*;
import io.ray.streaming.state.keystate.desc.*;
import io.ray.streaming.state.keystate.state.*;
import io.ray.streaming.state.keystate.KeyGroup;
import java.util.Map;
import java.util.HashMap;
// Create state backend
Map<String, String> config = new HashMap<>();
config.put("state.backend.type", "MEMORY");
AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);
// Create key state backend
int numberOfKeyGroups = 128;
KeyGroup keyGroup = new KeyGroup(0, 63);
KeyStateBackend keyStateBackend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);
// Create value state
ValueStateDescriptor<String> valueDesc = ValueStateDescriptor.build("user-name", String.class, "unknown");
ValueState<String> valueState = keyStateBackend.getValueState(valueDesc);
// Use state with transactional operations
keyStateBackend.setCurrentKey("user123");
valueState.update("Alice");
String name = valueState.get(); // "Alice"
// Transaction operations for checkpointing
long checkpointId = 1001L;
keyStateBackend.finish(checkpointId);
keyStateBackend.commit(checkpointId);
keyStateBackend.ackCommit(checkpointId, System.currentTimeMillis());Ray Streaming State is built around several key architectural components:
Core state backend system providing pluggable storage implementations, configuration management, and factory methods for creating state backends with different strategies and storage types.
public static AbstractStateBackend buildStateBackend(Map<String, String> config);
public enum BackendType {
MEMORY
}
public enum StateStrategy {
DUAL_VERSION,
SINGLE_VERSION
}Key-based state management system providing ValueState, ListState, and MapState abstractions with key-group partitioning, transaction support, and fault-tolerant operations for distributed streaming applications.
public class KeyStateBackend extends AbstractKeyStateBackend {
public <T> ValueState<T> getValueState(ValueStateDescriptor<T> stateDescriptor);
public <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);
public <S, T> MapState<S, T> getMapState(MapStateDescriptor<S, T> stateDescriptor);
public void setCurrentKey(Object currentKey);
}Three core state abstractions - ValueState for single values, ListState for ordered collections, and MapState for key-value mappings - each providing specialized operations and transaction support for different data access patterns.
public interface ValueState<T> extends UnaryState<T> {
void update(T value);
}
public interface ListState<T> extends UnaryState<List<T>> {
void add(T value);
void update(List<T> list);
}
public interface MapState<K, V> extends UnaryState<Map<K, V>> {
V get(K key);
void put(K key, V value);
void remove(K key);
void putAll(Map<K, V> map);
}Comprehensive transaction system implementing four-phase commit protocol with finish, commit, ackCommit, and rollback operations for ensuring data consistency during failures and supporting checkpoint-based recovery.
public interface StateStoreManager {
void finish(long checkpointId);
void commit(long checkpointId);
void ackCommit(long checkpointId, long timeStamp);
void rollBack(long checkpointId);
}Pluggable serialization system with default FST-based implementations supporting custom serializers for both key-value stores and key-map stores, enabling efficient state persistence and cross-language compatibility.
public interface KeyValueStoreSerialization<K, V> {
byte[] serializeKey(K key);
byte[] serializeValue(V value);
V deserializeValue(byte[] valueArray);
}
public interface KeyMapStoreSerializer<K, S, T> {
byte[] serializeKey(K key);
byte[] serializeUKey(S uk);
byte[] serializeUValue(T uv);
S deserializeUKey(byte[] ukArray);
T deserializeUValue(byte[] uvArray);
}Configuration system and key-group assignment algorithms enabling distributed state partitioning, parallel processing optimization, and configurable backend selection with built-in helper utilities.
public class KeyGroupAssignment {
public static KeyGroup getKeyGroup(int maxParallelism, int parallelism, int index);
public static int assignKeyGroupIndexForKey(Object key, int maxParallelism);
public static Map<Integer, List<Integer>> computeKeyGroupToTask(int maxParallelism, List<Integer> targetTasks);
}
public class ConfigKey {
public static final String STATE_BACKEND_TYPE = "state.backend.type";
public static final String STATE_STRATEGY_MODE = "state.strategy.mode";
public static final String STATE_TABLE_NAME = "state.table.name";
}public class StorageRecord<T> {
public StorageRecord(long checkpointId, T value);
public T getValue();
public long getCheckpointId();
public void setCheckpointId(long checkpointId);
}
public class PartitionRecord<T> {
public PartitionRecord(int partitionID, T value);
public T getValue();
public int getPartitionID();
public void setPartitionID(int partitionID);
}
public class StateException extends RuntimeException {
public StateException(Throwable t);
public StateException(String msg);
}
public class KeyGroup {
public KeyGroup(int startIndex, int endIndex);
public int size();
public int getStartIndex();
public int getEndIndex();
}