State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications
—
Key-based state management system providing ValueState, ListState, and MapState abstractions with key-group partitioning, transaction support, and fault-tolerant operations for distributed streaming applications.
Main state backend implementation for managing different types of keyed states with key-group partitioning and transaction support. Note: This class is not thread-safe.
/**
* Key state backend manager for different types of states (not thread-safe)
*/
public class KeyStateBackend extends AbstractKeyStateBackend {
/**
* Create key state backend with partitioning configuration
* @param numberOfKeyGroups Total number of key groups for partitioning
* @param keyGroup Key group range assigned to this backend instance
* @param abstractStateBackend Underlying state backend for storage
*/
public KeyStateBackend(int numberOfKeyGroups, KeyGroup keyGroup, AbstractStateBackend abstractStateBackend);
/**
* Get or create value state instance
* @param stateDescriptor Value state descriptor defining the state
* @return ValueState instance for the specified descriptor
*/
public <T> ValueState<T> getValueState(ValueStateDescriptor<T> stateDescriptor);
/**
* Get or create list state instance
* @param stateDescriptor List state descriptor defining the state
* @return ListState instance for the specified descriptor
*/
public <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);
/**
* Get or create map state instance
* @param stateDescriptor Map state descriptor defining the state
* @return MapState instance for the specified descriptor
*/
public <S, T> MapState<S, T> getMapState(MapStateDescriptor<S, T> stateDescriptor);
/**
* Set current processing key for state operations
* @param currentKey Current key for state access
*/
public void setCurrentKey(Object currentKey);
/**
* Get number of key groups for partitioning
* @return Total number of key groups
*/
public int getNumberOfKeyGroups();
/**
* Get assigned key group range
* @return KeyGroup representing the assigned range
*/
public KeyGroup getKeyGroup();
/**
* Close backend and clean up resources
*/
public void close();
}Usage Examples:
import io.ray.streaming.state.backend.*;
import io.ray.streaming.state.keystate.*;
import io.ray.streaming.state.keystate.desc.*;
import io.ray.streaming.state.keystate.state.*;
// Create key state backend
Map<String, String> config = new HashMap<>();
config.put("state.backend.type", "MEMORY");
AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);
int numberOfKeyGroups = 128;
KeyGroup keyGroup = new KeyGroup(0, 63); // Handle key groups 0-63
KeyStateBackend keyStateBackend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);
// Create and use different state types
ValueStateDescriptor<String> valueDesc = ValueStateDescriptor.build("user-session", String.class, "");
ValueState<String> valueState = keyStateBackend.getValueState(valueDesc);
ListStateDescriptor<String> listDesc = ListStateDescriptor.build("user-events", String.class);
ListState<String> listState = keyStateBackend.getListState(listDesc);
MapStateDescriptor<String, Integer> mapDesc = MapStateDescriptor.build("user-counters", String.class, Integer.class);
MapState<String, Integer> mapState = keyStateBackend.getMapState(mapDesc);
// Set current key and use states
keyStateBackend.setCurrentKey("user123");
valueState.update("session-abc");
listState.add("login-event");
mapState.put("clicks", 5);
// Close when done
keyStateBackend.close();Base class providing transaction support and common state management functionality for key-based state backends. Note: This class is not thread-safe.
/**
* Base class providing transaction support and state management (not thread-safe)
*/
public abstract class AbstractKeyStateBackend {
/**
* Create abstract key state backend
* @param backend Underlying state backend
*/
public AbstractKeyStateBackend(AbstractStateBackend backend);
/**
* Put value into state with descriptor and key
* @param descriptor State descriptor
* @param key State key
* @param value Value to store
*/
public <K, T> void put(AbstractStateDescriptor descriptor, K key, T value);
/**
* Get value from state with descriptor and key
* @param descriptor State descriptor
* @param key State key
* @return Retrieved value
*/
public <K, T> T get(AbstractStateDescriptor descriptor, K key);
/**
* Finish checkpoint phase - complete batch data saving and serialization
* @param checkpointId Checkpoint identifier
*/
public void finish(long checkpointId);
/**
* Commit checkpoint phase - persist data (can be async)
* @param checkpointId Checkpoint identifier
*/
public void commit(long checkpointId);
/**
* Acknowledge commit phase - clean up after commit
* @param checkpointId Checkpoint identifier
* @param timeStamp Timestamp of acknowledgment
*/
public void ackCommit(long checkpointId, long timeStamp);
/**
* Rollback checkpoint phase - recover from checkpoint
* @param checkpointId Checkpoint identifier
*/
public void rollBack(long checkpointId);
/**
* Get current processing key
* @return Current key object
*/
public Object getCurrentKey();
/**
* Set current processing key (abstract method)
* @param currentKey Current key to set
*/
public abstract void setCurrentKey(Object currentKey);
/**
* Get current checkpoint ID
* @return Current checkpoint ID
*/
public long getCheckpointId();
/**
* Set checkpoint ID
* @param checkpointId Checkpoint ID to set
*/
public void setCheckpointId(long checkpointId);
/**
* Set processing context with checkpoint and key
* @param checkpointId Checkpoint identifier
* @param currentKey Current processing key
*/
public void setContext(long checkpointId, Object currentKey);
/**
* Get key group index for current key
* @return Key group index
*/
public int getKeyGroupIndex();
}State descriptors define the metadata and configuration for different types of states, providing type safety and unique identification.
/**
* Base class for all state descriptors
*/
public abstract class AbstractStateDescriptor<S, T> {
/**
* Create state descriptor with name and type
* @param name Descriptor name
* @param type Value type class
*/
public AbstractStateDescriptor(String name, Class<T> type);
/**
* Get descriptor name
* @return Descriptor name
*/
public String getName();
/**
* Get value type class
* @return Type class
*/
public Class<T> getType();
/**
* Get table name for storage
* @return Table name
*/
public String getTableName();
/**
* Set table name for storage
* @param tableName Table name
*/
public void setTableName(String tableName);
/**
* Get unique identifier for this descriptor
* @return Unique identifier string
*/
public String getIdentify();
/**
* Get state type enumeration
* @return StateType enum value
*/
public abstract StateType getStateType();
}
/**
* State type enumeration
*/
public enum StateType {
/** Value state type */
VALUE,
/** List state type */
LIST,
/** Map state type */
MAP
}/**
* Descriptor for value state configuration
*/
public class ValueStateDescriptor<T> extends AbstractStateDescriptor<ValueState<T>, T> {
/**
* Create value state descriptor
* @param name State name
* @param type Value type class
* @param defaultValue Default value when state is empty
*/
public ValueStateDescriptor(String name, Class<T> type, T defaultValue);
/**
* Factory method for creating value state descriptor
* @param name State name
* @param type Value type class
* @param defaultValue Default value
* @return ValueStateDescriptor instance
*/
public static <T> ValueStateDescriptor<T> build(String name, Class<T> type, T defaultValue);
/**
* Get default value
* @return Default value
*/
public T getDefaultValue();
/**
* Get state type
* @return StateType.VALUE
*/
public StateType getStateType();
}/**
* Descriptor for list state configuration
*/
public class ListStateDescriptor<T> extends AbstractStateDescriptor<ListState<T>, T> {
/**
* Factory method for creating list state descriptor
* @param name State name
* @param type Element type class
* @return ListStateDescriptor instance
*/
public static <T> ListStateDescriptor<T> build(String name, Class<T> type);
/**
* Factory method for creating list state descriptor with operator flag
* @param name State name
* @param type Element type class
* @param isOperatorList Whether this is an operator-level list state
* @return ListStateDescriptor instance
*/
public static <T> ListStateDescriptor<T> build(String name, Class<T> type, boolean isOperatorList);
/**
* Check if this is an operator list
* @return True if operator list
*/
public boolean isOperatorList();
/**
* Get partition index
* @return Partition index
*/
public int getIndex();
/**
* Set partition index
* @param index Partition index
*/
public void setIndex(int index);
/**
* Get partition number
* @return Partition number
*/
public int getPartitionNumber();
/**
* Set partition number
* @param number Partition number
*/
public void setPartitionNumber(int number);
/**
* Get state type
* @return StateType.LIST
*/
public StateType getStateType();
}/**
* Descriptor for map state configuration
*/
public class MapStateDescriptor<K, V> extends AbstractStateDescriptor<MapState<K, V>, V> {
/**
* Create map state descriptor
* @param name State name
* @param keyType Key type class
* @param valueType Value type class
*/
public MapStateDescriptor(String name, Class<K> keyType, Class<V> valueType);
/**
* Factory method for creating map state descriptor
* @param name State name
* @param keyType Key type class
* @param valueType Value type class
* @return MapStateDescriptor instance
*/
public static <K, V> MapStateDescriptor<K, V> build(String name, Class<K> keyType, Class<V> valueType);
/**
* Get state type
* @return StateType.MAP
*/
public StateType getStateType();
}Usage Examples:
// Create descriptors for different state types
ValueStateDescriptor<String> userNameDesc = ValueStateDescriptor.build("user-name", String.class, "anonymous");
ListStateDescriptor<String> eventListDesc = ListStateDescriptor.build("events", String.class);
ListStateDescriptor<String> operatorListDesc = ListStateDescriptor.build("operator-events", String.class, true);
MapStateDescriptor<String, Integer> counterMapDesc = MapStateDescriptor.build("counters", String.class, Integer.class);
// Use descriptors with key state backend
ValueState<String> userNameState = keyStateBackend.getValueState(userNameDesc);
ListState<String> eventListState = keyStateBackend.getListState(eventListDesc);
MapState<String, Integer> counterMapState = keyStateBackend.getMapState(counterMapDesc);Install with Tessl CLI
npx tessl i tessl/maven-io-ray--streaming-state