State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications
—
Core state backend system providing pluggable storage implementations, configuration management, and factory methods for creating state backends with different strategies and storage types.
Factory class for creating state backends from configuration maps, supporting automatic backend type detection and default fallback behavior.
/**
* Factory for creating state backends from configuration
*/
public class StateBackendBuilder {
/**
* Build state backend from configuration map
* @param config Configuration map containing backend settings
* @return AbstractStateBackend instance based on configuration
*/
public static AbstractStateBackend buildStateBackend(Map<String, String> config);
}Usage Examples:
import io.ray.streaming.state.backend.StateBackendBuilder;
import java.util.Map;
import java.util.HashMap;
// Create with explicit configuration
Map<String, String> config = new HashMap<>();
config.put("state.backend.type", "MEMORY");
config.put("state.strategy.mode", "DUAL_VERSION");
AbstractStateBackend backend = StateBackendBuilder.buildStateBackend(config);
// Create with default configuration (uses MEMORY backend)
AbstractStateBackend defaultBackend = StateBackendBuilder.buildStateBackend(null);Base class for all state backend implementations providing core storage operations, strategy management, and table name generation for state descriptors.
/**
* Base class for different state backends
*/
public abstract class AbstractStateBackend {
/**
* Get key-value store for the specified table
* @param tableName Name of the table
* @return KeyValueStore instance
*/
public abstract <K, V> KeyValueStore<K, V> getKeyValueStore(String tableName);
/**
* Get key-map store for the specified table
* @param tableName Name of the table
* @return KeyMapStore instance
*/
public abstract <K, S, T> KeyMapStore<K, S, T> getKeyMapStore(String tableName);
/**
* Get the backend type
* @return BackendType enum value
*/
public abstract BackendType getBackendType();
/**
* Get the state strategy
* @return StateStrategy enum value
*/
public abstract StateStrategy getStateStrategy();
/**
* Generate table name for state descriptor
* @param stateDescriptor State descriptor
* @return Generated table name
*/
public String getTableName(AbstractStateDescriptor stateDescriptor);
/**
* Generate state key from descriptor name and current key
* @param descName Descriptor name
* @param currentKey Current key
* @return Generated state key
*/
public String getStateKey(String descName, String currentKey);
/**
* Set key group index for partitioning
* @param keyGroupIndex Key group index
*/
public void setKeyGroupIndex(int keyGroupIndex);
}Memory-based state backend implementation providing in-memory storage for both key-value and key-map stores, suitable for development and testing environments.
/**
* Memory-based state backend implementation
*/
public class MemoryStateBackend extends AbstractStateBackend {
/**
* Create memory state backend with configuration
* @param config Configuration map
*/
public MemoryStateBackend(Map<String, String> config);
/**
* Get memory-based key-value store
* @param tableName Table name
* @return MemoryKeyValueStore instance
*/
public <K, V> KeyValueStore<K, V> getKeyValueStore(String tableName);
/**
* Get memory-based key-map store
* @param tableName Table name
* @return MemoryKeyMapStore instance
*/
public <K, S, T> KeyMapStore<K, S, T> getKeyMapStore(String tableName);
}Enumeration defining available backend storage types with string-based configuration support and case-insensitive parsing.
/**
* Backend storage types
*/
public enum BackendType {
/** Memory-based backend for in-memory storage */
MEMORY;
/**
* Get enum from string value, ignoring case
* @param value String value to parse
* @return BackendType enum, defaults to MEMORY if not found
*/
public static BackendType getEnum(String value);
}Enumeration defining state saving strategies for different consistency and performance requirements.
/**
* State saving strategies
*/
public enum StateStrategy {
/** Save two versions for rollback support */
DUAL_VERSION,
/** Save only current version for MVCC storage */
SINGLE_VERSION;
/**
* Get enum from string value, ignoring case
* @param value String value to parse
* @return StateStrategy enum, defaults to DUAL_VERSION if not found
*/
public static StateStrategy getEnum(String value);
}Usage Examples:
// Backend type configuration
BackendType type = BackendType.getEnum("memory"); // MEMORY
BackendType defaultType = BackendType.getEnum("invalid"); // MEMORY (default)
// Strategy configuration
StateStrategy strategy = StateStrategy.getEnum("DUAL_VERSION"); // DUAL_VERSION
StateStrategy mvccStrategy = StateStrategy.getEnum("single_version"); // SINGLE_VERSIONSpecialized state backend for operator-level state management supporting split and union list states for parallel processing patterns.
/**
* Operator state manager for split or union list states
*/
public class OperatorStateBackend {
/**
* Create operator state backend
* @param backend Underlying state backend
*/
public OperatorStateBackend(AbstractStateBackend backend);
/**
* Set current processing key
* @param currentKey Current key
*/
public void setCurrentKey(Object currentKey);
/**
* Get split list state for parallel processing
* @param stateDescriptor List state descriptor
* @return ListState instance for split operations
*/
public <T> ListState<T> getSplitListState(ListStateDescriptor<T> stateDescriptor);
/**
* Get union list state for aggregated processing
* @param stateDescriptor List state descriptor
* @return ListState instance for union operations
*/
public <T> ListState<T> getUnionListState(ListStateDescriptor<T> stateDescriptor);
}Usage Examples:
// Create operator state backend
AbstractStateBackend backend = StateBackendBuilder.buildStateBackend(config);
OperatorStateBackend operatorBackend = new OperatorStateBackend(backend);
// Create split list state for parallel processing
ListStateDescriptor<String> splitDesc = ListStateDescriptor.build("split-state", String.class, true);
ListState<String> splitState = operatorBackend.getSplitListState(splitDesc);
// Create union list state for aggregation
ListStateDescriptor<Integer> unionDesc = ListStateDescriptor.build("union-state", Integer.class, true);
ListState<Integer> unionState = operatorBackend.getUnionListState(unionDesc);Install with Tessl CLI
npx tessl i tessl/maven-io-ray--streaming-state