State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications
—
Configuration system and key-group assignment algorithms enabling distributed state partitioning, parallel processing optimization, and configurable backend selection with built-in helper utilities.
Central configuration constants and helper methods for state backend configuration and parameter management.
/**
* Configuration keys and helper methods for state management
*/
public class ConfigKey {
/** Backend type configuration key */
public static final String STATE_BACKEND_TYPE = "state.backend.type";
/** Table name configuration key */
public static final String STATE_TABLE_NAME = "state.table.name";
/** Strategy mode configuration key */
public static final String STATE_STRATEGY_MODE = "state.strategy.mode";
/** Number per checkpoint configuration key */
public static final String NUMBER_PER_CHECKPOINT = "number.per.checkpoint";
/** Maximum parallelism configuration key */
public static final String JOB_MAX_PARALLEL = "job.max.parallel";
/** String delimiter for internal use */
public static final String DELIMITER = "delimiter.string";
/**
* Get state strategy from configuration map
* @param config Configuration map
* @return State strategy string value
*/
public static String getStateStrategyEnum(Map<String, String> config);
/**
* Get backend type from configuration map
* @param config Configuration map
* @return Backend type string value
*/
public static String getBackendType(Map<String, String> config);
/**
* Get number per checkpoint from configuration map
* @param config Configuration map
* @return Number per checkpoint value
*/
public static int getNumberPerCheckpoint(Map<String, String> config);
/**
* Get state table name from configuration map
* @param config Configuration map
* @return State table name
*/
public static String getStateTableName(Map<String, String> config);
}Usage Examples:
import io.ray.streaming.state.config.ConfigKey;
import java.util.Map;
import java.util.HashMap;
// Create configuration map
Map<String, String> config = new HashMap<>();
config.put(ConfigKey.STATE_BACKEND_TYPE, "MEMORY");
config.put(ConfigKey.STATE_STRATEGY_MODE, "DUAL_VERSION");
config.put(ConfigKey.STATE_TABLE_NAME, "user-state-table");
config.put(ConfigKey.NUMBER_PER_CHECKPOINT, "1000");
config.put(ConfigKey.JOB_MAX_PARALLEL, "16");
// Retrieve configuration values
String backendType = ConfigKey.getBackendType(config); // "MEMORY"
String strategy = ConfigKey.getStateStrategyEnum(config); // "DUAL_VERSION"
String tableName = ConfigKey.getStateTableName(config); // "user-state-table"
int numberPerCheckpoint = ConfigKey.getNumberPerCheckpoint(config); // 1000
// Use with state backend builder
AbstractStateBackend backend = StateBackendBuilder.buildStateBackend(config);Helper methods for processing configuration maps with type conversion and default value support.
/**
* Helper methods for configuration processing
*/
public class ConfigHelper {
/**
* Get integer value from configuration with default fallback
* @param config Configuration map
* @param configKey Configuration key to look up
* @param defaultValue Default value if key not found or invalid
* @return Integer value or default
*/
public static int getIntegerOrDefault(Map config, String configKey, int defaultValue);
/**
* Get string value from configuration with default fallback
* @param config Configuration map
* @param configKey Configuration key to look up
* @param defaultValue Default value if key not found
* @return String value or default
*/
public static String getStringOrDefault(Map config, String configKey, String defaultValue);
}Usage Examples:
import io.ray.streaming.state.config.ConfigHelper;
// Safe configuration retrieval with defaults
Map<String, String> config = getConfigurationFromSource();
int parallelism = ConfigHelper.getIntegerOrDefault(config, "parallelism", 4);
String environment = ConfigHelper.getStringOrDefault(config, "environment", "production");
int bufferSize = ConfigHelper.getIntegerOrDefault(config, "buffer.size", 8192);
// Handle missing or malformed configuration gracefully
int timeout = ConfigHelper.getIntegerOrDefault(config, "timeout", 5000); // Will use 5000 if "timeout" is missing or not a valid integerKey group system providing scalable partitioning for distributed state processing across multiple parallel instances.
/**
* Defines key-groups for partitioned state processing
*/
public class KeyGroup {
/**
* Create key group with inclusive range
* @param startIndex Start index (inclusive)
* @param endIndex End index (inclusive)
*/
public KeyGroup(int startIndex, int endIndex);
/**
* Get number of key-groups in this range
* @return Number of key-groups
*/
public int size();
/**
* Get start index of range
* @return Start index (inclusive)
*/
public int getStartIndex();
/**
* Get end index of range
* @return End index (inclusive)
*/
public int getEndIndex();
}Usage Examples:
// Create key groups for different parallel instances
KeyGroup keyGroup1 = new KeyGroup(0, 31); // Handles key groups 0-31 (32 groups)
KeyGroup keyGroup2 = new KeyGroup(32, 63); // Handles key groups 32-63 (32 groups)
KeyGroup keyGroup3 = new KeyGroup(64, 95); // Handles key groups 64-95 (32 groups)
KeyGroup keyGroup4 = new KeyGroup(96, 127); // Handles key groups 96-127 (32 groups)
// Check key group properties
int size1 = keyGroup1.size(); // 32
int start1 = keyGroup1.getStartIndex(); // 0
int end1 = keyGroup1.getEndIndex(); // 31
// Use with key state backend
KeyStateBackend backend1 = new KeyStateBackend(128, keyGroup1, stateBackend);
KeyStateBackend backend2 = new KeyStateBackend(128, keyGroup2, stateBackend);/**
* Key-group assignment algorithms for distributed processing
*/
public class KeyGroupAssignment {
/**
* Compute key-group range for specific operator instance
* @param maxParallelism Maximum parallelism (total key groups)
* @param parallelism Current parallelism level
* @param index Operator instance index (0-based)
* @return KeyGroup representing assigned range
*/
public static KeyGroup getKeyGroup(int maxParallelism, int parallelism, int index);
/**
* Assign key to specific key-group index using hash function
* @param key Key object to assign
* @param maxParallelism Maximum parallelism (total key groups)
* @return Key-group index for the key
*/
public static int assignKeyGroupIndexForKey(Object key, int maxParallelism);
/**
* Compute mapping from key-groups to task instances
* @param maxParallelism Maximum parallelism (total key groups)
* @param targetTasks List of target task IDs
* @return Map from key-group index to list of task IDs
*/
public static Map<Integer, List<Integer>> computeKeyGroupToTask(int maxParallelism, List<Integer> targetTasks);
}Advanced Usage Examples:
import io.ray.streaming.state.keystate.KeyGroupAssignment;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
// Example 1: Distribute key groups across parallel instances
int maxParallelism = 128; // Total key groups
int parallelism = 4; // Number of parallel instances
// Calculate key group ranges for each instance
KeyGroup range0 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 0); // 0-31
KeyGroup range1 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 1); // 32-63
KeyGroup range2 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 2); // 64-95
KeyGroup range3 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 3); // 96-127
System.out.println("Instance 0 handles key groups: " + range0.getStartIndex() + "-" + range0.getEndIndex());
System.out.println("Instance 1 handles key groups: " + range1.getStartIndex() + "-" + range1.getEndIndex());
// Example 2: Determine which instance should handle a specific key
String userKey1 = "user123";
String userKey2 = "user456";
String userKey3 = "user789";
int keyGroup1 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey1, maxParallelism); // e.g., 45
int keyGroup2 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey2, maxParallelism); // e.g., 12
int keyGroup3 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey3, maxParallelism); // e.g., 89
// Determine which instance handles each key
int instance1 = keyGroup1 / (maxParallelism / parallelism); // 45 / 32 = 1
int instance2 = keyGroup2 / (maxParallelism / parallelism); // 12 / 32 = 0
int instance3 = keyGroup3 / (maxParallelism / parallelism); // 89 / 32 = 2
System.out.println(userKey1 + " -> key group " + keyGroup1 + " -> instance " + instance1);
System.out.println(userKey2 + " -> key group " + keyGroup2 + " -> instance " + instance2);
System.out.println(userKey3 + " -> key group " + keyGroup3 + " -> instance " + instance3);
// Example 3: Task assignment mapping
List<Integer> taskIds = Arrays.asList(100, 101, 102, 103);
Map<Integer, List<Integer>> keyGroupToTasks = KeyGroupAssignment.computeKeyGroupToTask(maxParallelism, taskIds);
// Show which tasks handle each key group
for (Map.Entry<Integer, List<Integer>> entry : keyGroupToTasks.entrySet()) {
System.out.println("Key group " + entry.getKey() + " -> tasks " + entry.getValue());
}/**
* Complete example showing configuration setup for distributed state processing
*/
public class DistributedStateConfiguration {
public static void setupDistributedStateProcessing() {
// 1. Create configuration
Map<String, String> config = new HashMap<>();
config.put(ConfigKey.STATE_BACKEND_TYPE, "MEMORY");
config.put(ConfigKey.STATE_STRATEGY_MODE, "DUAL_VERSION");
config.put(ConfigKey.JOB_MAX_PARALLEL, "4");
config.put(ConfigKey.NUMBER_PER_CHECKPOINT, "1000");
// 2. Setup parallelism parameters
int maxParallelism = ConfigHelper.getIntegerOrDefault(config, ConfigKey.JOB_MAX_PARALLEL, 1);
int currentParallelism = 4;
// 3. Create state backends for each parallel instance
AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);
KeyStateBackend[] backends = new KeyStateBackend[currentParallelism];
for (int i = 0; i < currentParallelism; i++) {
KeyGroup keyGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, currentParallelism, i);
backends[i] = new KeyStateBackend(maxParallelism, keyGroup, stateBackend);
System.out.println("Backend " + i + " handles key groups " +
keyGroup.getStartIndex() + "-" + keyGroup.getEndIndex());
}
// 4. Route keys to appropriate backend
String[] testKeys = {"user1", "user2", "user3", "user4", "user5"};
for (String key : testKeys) {
int keyGroupIndex = KeyGroupAssignment.assignKeyGroupIndexForKey(key, maxParallelism);
int backendIndex = findBackendForKeyGroup(keyGroupIndex, backends);
System.out.println("Key '" + key + "' -> key group " + keyGroupIndex +
" -> backend " + backendIndex);
// Use the appropriate backend for this key
KeyStateBackend targetBackend = backends[backendIndex];
targetBackend.setCurrentKey(key);
// Now you can use states with this backend
ValueStateDescriptor<String> desc = ValueStateDescriptor.build("value-state", String.class, "");
ValueState<String> state = targetBackend.getValueState(desc);
state.update("value-for-" + key);
}
}
private static int findBackendForKeyGroup(int keyGroupIndex, KeyStateBackend[] backends) {
for (int i = 0; i < backends.length; i++) {
KeyGroup keyGroup = backends[i].getKeyGroup();
if (keyGroupIndex >= keyGroup.getStartIndex() && keyGroupIndex <= keyGroup.getEndIndex()) {
return i;
}
}
throw new IllegalArgumentException("No backend found for key group " + keyGroupIndex);
}
// Utility method for dynamic scaling
public static void redistributeKeyGroups(int oldParallelism, int newParallelism, int maxParallelism) {
System.out.println("Redistributing key groups from " + oldParallelism + " to " + newParallelism + " instances:");
// Show old distribution
System.out.println("Old distribution:");
for (int i = 0; i < oldParallelism; i++) {
KeyGroup oldGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, oldParallelism, i);
System.out.println(" Instance " + i + ": " + oldGroup.getStartIndex() + "-" + oldGroup.getEndIndex());
}
// Show new distribution
System.out.println("New distribution:");
for (int i = 0; i < newParallelism; i++) {
KeyGroup newGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, newParallelism, i);
System.out.println(" Instance " + i + ": " + newGroup.getStartIndex() + "-" + newGroup.getEndIndex());
}
}
}/**
* Best practices for configuration management
*/
public class ConfigurationBestPractices {
// 1. Use configuration builder pattern
public static class StateConfigBuilder {
private final Map<String, String> config = new HashMap<>();
public StateConfigBuilder backendType(String type) {
config.put(ConfigKey.STATE_BACKEND_TYPE, type);
return this;
}
public StateConfigBuilder strategy(String strategy) {
config.put(ConfigKey.STATE_STRATEGY_MODE, strategy);
return this;
}
public StateConfigBuilder maxParallelism(int parallelism) {
config.put(ConfigKey.JOB_MAX_PARALLEL, String.valueOf(parallelism));
return this;
}
public StateConfigBuilder numberPerCheckpoint(int number) {
config.put(ConfigKey.NUMBER_PER_CHECKPOINT, String.valueOf(number));
return this;
}
public Map<String, String> build() {
return new HashMap<>(config);
}
}
// 2. Environment-specific configuration
public static Map<String, String> createEnvironmentConfig(String environment) {
StateConfigBuilder builder = new StateConfigBuilder()
.backendType("MEMORY")
.strategy("DUAL_VERSION");
switch (environment.toLowerCase()) {
case "development":
return builder
.maxParallelism(2)
.numberPerCheckpoint(100)
.build();
case "testing":
return builder
.maxParallelism(4)
.numberPerCheckpoint(500)
.build();
case "production":
return builder
.maxParallelism(16)
.numberPerCheckpoint(1000)
.build();
default:
throw new IllegalArgumentException("Unknown environment: " + environment);
}
}
// 3. Configuration validation
public static void validateConfiguration(Map<String, String> config) {
// Validate required keys
String backendType = config.get(ConfigKey.STATE_BACKEND_TYPE);
if (backendType == null || backendType.trim().isEmpty()) {
throw new IllegalArgumentException("Backend type must be specified");
}
// Validate numeric values
String maxParallelStr = config.get(ConfigKey.JOB_MAX_PARALLEL);
if (maxParallelStr != null) {
try {
int maxParallel = Integer.parseInt(maxParallelStr);
if (maxParallel <= 0) {
throw new IllegalArgumentException("Max parallelism must be positive");
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Max parallelism must be a valid integer");
}
}
System.out.println("Configuration validation passed");
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ray--streaming-state