CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ray--streaming-state

State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications

Pending
Overview
Eval results
Files

configuration-key-groups.mddocs/

Configuration and Key Groups

Configuration system and key-group assignment algorithms enabling distributed state partitioning, parallel processing optimization, and configurable backend selection with built-in helper utilities.

Capabilities

Configuration Keys

Central configuration constants and helper methods for state backend configuration and parameter management.

/**
 * Configuration keys and helper methods for state management
 */
public class ConfigKey {
    /** Backend type configuration key */
    public static final String STATE_BACKEND_TYPE = "state.backend.type";
    
    /** Table name configuration key */
    public static final String STATE_TABLE_NAME = "state.table.name";
    
    /** Strategy mode configuration key */
    public static final String STATE_STRATEGY_MODE = "state.strategy.mode";
    
    /** Number per checkpoint configuration key */
    public static final String NUMBER_PER_CHECKPOINT = "number.per.checkpoint";
    
    /** Maximum parallelism configuration key */
    public static final String JOB_MAX_PARALLEL = "job.max.parallel";
    
    /** String delimiter for internal use */
    public static final String DELIMITER = "delimiter.string";
    
    /**
     * Get state strategy from configuration map
     * @param config Configuration map
     * @return State strategy string value
     */
    public static String getStateStrategyEnum(Map<String, String> config);
    
    /**
     * Get backend type from configuration map
     * @param config Configuration map
     * @return Backend type string value
     */
    public static String getBackendType(Map<String, String> config);
    
    /**
     * Get number per checkpoint from configuration map
     * @param config Configuration map
     * @return Number per checkpoint value
     */
    public static int getNumberPerCheckpoint(Map<String, String> config);
    
    /**
     * Get state table name from configuration map
     * @param config Configuration map
     * @return State table name
     */
    public static String getStateTableName(Map<String, String> config);
}

Usage Examples:

import io.ray.streaming.state.config.ConfigKey;
import java.util.Map;
import java.util.HashMap;

// Create configuration map
Map<String, String> config = new HashMap<>();
config.put(ConfigKey.STATE_BACKEND_TYPE, "MEMORY");
config.put(ConfigKey.STATE_STRATEGY_MODE, "DUAL_VERSION");
config.put(ConfigKey.STATE_TABLE_NAME, "user-state-table");
config.put(ConfigKey.NUMBER_PER_CHECKPOINT, "1000");
config.put(ConfigKey.JOB_MAX_PARALLEL, "16");

// Retrieve configuration values
String backendType = ConfigKey.getBackendType(config); // "MEMORY"
String strategy = ConfigKey.getStateStrategyEnum(config); // "DUAL_VERSION"
String tableName = ConfigKey.getStateTableName(config); // "user-state-table"
int numberPerCheckpoint = ConfigKey.getNumberPerCheckpoint(config); // 1000

// Use with state backend builder
AbstractStateBackend backend = StateBackendBuilder.buildStateBackend(config);

Configuration Helper Utilities

Helper methods for processing configuration maps with type conversion and default value support.

/**
 * Helper methods for configuration processing
 */
public class ConfigHelper {
    /**
     * Get integer value from configuration with default fallback
     * @param config Configuration map
     * @param configKey Configuration key to look up
     * @param defaultValue Default value if key not found or invalid
     * @return Integer value or default
     */
    public static int getIntegerOrDefault(Map config, String configKey, int defaultValue);
    
    /**
     * Get string value from configuration with default fallback
     * @param config Configuration map
     * @param configKey Configuration key to look up
     * @param defaultValue Default value if key not found
     * @return String value or default
     */
    public static String getStringOrDefault(Map config, String configKey, String defaultValue);
}

Usage Examples:

import io.ray.streaming.state.config.ConfigHelper;

// Safe configuration retrieval with defaults
Map<String, String> config = getConfigurationFromSource();

int parallelism = ConfigHelper.getIntegerOrDefault(config, "parallelism", 4);
String environment = ConfigHelper.getStringOrDefault(config, "environment", "production");
int bufferSize = ConfigHelper.getIntegerOrDefault(config, "buffer.size", 8192);

// Handle missing or malformed configuration gracefully
int timeout = ConfigHelper.getIntegerOrDefault(config, "timeout", 5000); // Will use 5000 if "timeout" is missing or not a valid integer

Key Group Management

Key group system providing scalable partitioning for distributed state processing across multiple parallel instances.

Key Group Class

/**
 * Defines key-groups for partitioned state processing
 */
public class KeyGroup {
    /**
     * Create key group with inclusive range
     * @param startIndex Start index (inclusive)
     * @param endIndex End index (inclusive)
     */
    public KeyGroup(int startIndex, int endIndex);
    
    /**
     * Get number of key-groups in this range
     * @return Number of key-groups
     */
    public int size();
    
    /**
     * Get start index of range
     * @return Start index (inclusive)
     */
    public int getStartIndex();
    
    /**
     * Get end index of range
     * @return End index (inclusive)
     */
    public int getEndIndex();
}

Usage Examples:

// Create key groups for different parallel instances
KeyGroup keyGroup1 = new KeyGroup(0, 31);   // Handles key groups 0-31 (32 groups)
KeyGroup keyGroup2 = new KeyGroup(32, 63);  // Handles key groups 32-63 (32 groups)
KeyGroup keyGroup3 = new KeyGroup(64, 95);  // Handles key groups 64-95 (32 groups)
KeyGroup keyGroup4 = new KeyGroup(96, 127); // Handles key groups 96-127 (32 groups)

// Check key group properties
int size1 = keyGroup1.size();           // 32
int start1 = keyGroup1.getStartIndex(); // 0
int end1 = keyGroup1.getEndIndex();     // 31

// Use with key state backend
KeyStateBackend backend1 = new KeyStateBackend(128, keyGroup1, stateBackend);
KeyStateBackend backend2 = new KeyStateBackend(128, keyGroup2, stateBackend);

Key Group Assignment Algorithms

/**
 * Key-group assignment algorithms for distributed processing
 */
public class KeyGroupAssignment {
    /**
     * Compute key-group range for specific operator instance
     * @param maxParallelism Maximum parallelism (total key groups)
     * @param parallelism Current parallelism level
     * @param index Operator instance index (0-based)
     * @return KeyGroup representing assigned range
     */
    public static KeyGroup getKeyGroup(int maxParallelism, int parallelism, int index);
    
    /**
     * Assign key to specific key-group index using hash function
     * @param key Key object to assign
     * @param maxParallelism Maximum parallelism (total key groups)
     * @return Key-group index for the key
     */
    public static int assignKeyGroupIndexForKey(Object key, int maxParallelism);
    
    /**
     * Compute mapping from key-groups to task instances
     * @param maxParallelism Maximum parallelism (total key groups)
     * @param targetTasks List of target task IDs
     * @return Map from key-group index to list of task IDs
     */
    public static Map<Integer, List<Integer>> computeKeyGroupToTask(int maxParallelism, List<Integer> targetTasks);
}

Advanced Usage Examples:

import io.ray.streaming.state.keystate.KeyGroupAssignment;
import java.util.List;
import java.util.Arrays;
import java.util.Map;

// Example 1: Distribute key groups across parallel instances
int maxParallelism = 128;  // Total key groups
int parallelism = 4;       // Number of parallel instances

// Calculate key group ranges for each instance
KeyGroup range0 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 0); // 0-31
KeyGroup range1 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 1); // 32-63
KeyGroup range2 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 2); // 64-95
KeyGroup range3 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 3); // 96-127

System.out.println("Instance 0 handles key groups: " + range0.getStartIndex() + "-" + range0.getEndIndex());
System.out.println("Instance 1 handles key groups: " + range1.getStartIndex() + "-" + range1.getEndIndex());

// Example 2: Determine which instance should handle a specific key
String userKey1 = "user123";
String userKey2 = "user456";
String userKey3 = "user789";

int keyGroup1 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey1, maxParallelism); // e.g., 45
int keyGroup2 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey2, maxParallelism); // e.g., 12
int keyGroup3 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey3, maxParallelism); // e.g., 89

// Determine which instance handles each key
int instance1 = keyGroup1 / (maxParallelism / parallelism); // 45 / 32 = 1
int instance2 = keyGroup2 / (maxParallelism / parallelism); // 12 / 32 = 0
int instance3 = keyGroup3 / (maxParallelism / parallelism); // 89 / 32 = 2

System.out.println(userKey1 + " -> key group " + keyGroup1 + " -> instance " + instance1);
System.out.println(userKey2 + " -> key group " + keyGroup2 + " -> instance " + instance2);
System.out.println(userKey3 + " -> key group " + keyGroup3 + " -> instance " + instance3);

// Example 3: Task assignment mapping
List<Integer> taskIds = Arrays.asList(100, 101, 102, 103);
Map<Integer, List<Integer>> keyGroupToTasks = KeyGroupAssignment.computeKeyGroupToTask(maxParallelism, taskIds);

// Show which tasks handle each key group
for (Map.Entry<Integer, List<Integer>> entry : keyGroupToTasks.entrySet()) {
    System.out.println("Key group " + entry.getKey() + " -> tasks " + entry.getValue());
}

Complete Configuration Example

/**
 * Complete example showing configuration setup for distributed state processing
 */
public class DistributedStateConfiguration {
    
    public static void setupDistributedStateProcessing() {
        // 1. Create configuration
        Map<String, String> config = new HashMap<>();
        config.put(ConfigKey.STATE_BACKEND_TYPE, "MEMORY");
        config.put(ConfigKey.STATE_STRATEGY_MODE, "DUAL_VERSION");
        config.put(ConfigKey.JOB_MAX_PARALLEL, "4");
        config.put(ConfigKey.NUMBER_PER_CHECKPOINT, "1000");
        
        // 2. Setup parallelism parameters
        int maxParallelism = ConfigHelper.getIntegerOrDefault(config, ConfigKey.JOB_MAX_PARALLEL, 1);
        int currentParallelism = 4;
        
        // 3. Create state backends for each parallel instance
        AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);
        
        KeyStateBackend[] backends = new KeyStateBackend[currentParallelism];
        for (int i = 0; i < currentParallelism; i++) {
            KeyGroup keyGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, currentParallelism, i);
            backends[i] = new KeyStateBackend(maxParallelism, keyGroup, stateBackend);
            
            System.out.println("Backend " + i + " handles key groups " + 
                keyGroup.getStartIndex() + "-" + keyGroup.getEndIndex());
        }
        
        // 4. Route keys to appropriate backend
        String[] testKeys = {"user1", "user2", "user3", "user4", "user5"};
        
        for (String key : testKeys) {
            int keyGroupIndex = KeyGroupAssignment.assignKeyGroupIndexForKey(key, maxParallelism);
            int backendIndex = findBackendForKeyGroup(keyGroupIndex, backends);
            
            System.out.println("Key '" + key + "' -> key group " + keyGroupIndex + 
                " -> backend " + backendIndex);
            
            // Use the appropriate backend for this key
            KeyStateBackend targetBackend = backends[backendIndex];
            targetBackend.setCurrentKey(key);
            
            // Now you can use states with this backend
            ValueStateDescriptor<String> desc = ValueStateDescriptor.build("value-state", String.class, "");
            ValueState<String> state = targetBackend.getValueState(desc);
            state.update("value-for-" + key);
        }
    }
    
    private static int findBackendForKeyGroup(int keyGroupIndex, KeyStateBackend[] backends) {
        for (int i = 0; i < backends.length; i++) {
            KeyGroup keyGroup = backends[i].getKeyGroup();
            if (keyGroupIndex >= keyGroup.getStartIndex() && keyGroupIndex <= keyGroup.getEndIndex()) {
                return i;
            }
        }
        throw new IllegalArgumentException("No backend found for key group " + keyGroupIndex);
    }
    
    // Utility method for dynamic scaling
    public static void redistributeKeyGroups(int oldParallelism, int newParallelism, int maxParallelism) {
        System.out.println("Redistributing key groups from " + oldParallelism + " to " + newParallelism + " instances:");
        
        // Show old distribution
        System.out.println("Old distribution:");
        for (int i = 0; i < oldParallelism; i++) {
            KeyGroup oldGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, oldParallelism, i);
            System.out.println("  Instance " + i + ": " + oldGroup.getStartIndex() + "-" + oldGroup.getEndIndex());
        }
        
        // Show new distribution
        System.out.println("New distribution:");
        for (int i = 0; i < newParallelism; i++) {
            KeyGroup newGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, newParallelism, i);
            System.out.println("  Instance " + i + ": " + newGroup.getStartIndex() + "-" + newGroup.getEndIndex());
        }
    }
}

Configuration Best Practices

/**
 * Best practices for configuration management
 */
public class ConfigurationBestPractices {
    
    // 1. Use configuration builder pattern
    public static class StateConfigBuilder {
        private final Map<String, String> config = new HashMap<>();
        
        public StateConfigBuilder backendType(String type) {
            config.put(ConfigKey.STATE_BACKEND_TYPE, type);
            return this;
        }
        
        public StateConfigBuilder strategy(String strategy) {
            config.put(ConfigKey.STATE_STRATEGY_MODE, strategy);
            return this;
        }
        
        public StateConfigBuilder maxParallelism(int parallelism) {
            config.put(ConfigKey.JOB_MAX_PARALLEL, String.valueOf(parallelism));
            return this;
        }
        
        public StateConfigBuilder numberPerCheckpoint(int number) {
            config.put(ConfigKey.NUMBER_PER_CHECKPOINT, String.valueOf(number));
            return this;
        }
        
        public Map<String, String> build() {
            return new HashMap<>(config);
        }
    }
    
    // 2. Environment-specific configuration
    public static Map<String, String> createEnvironmentConfig(String environment) {
        StateConfigBuilder builder = new StateConfigBuilder()
            .backendType("MEMORY")
            .strategy("DUAL_VERSION");
            
        switch (environment.toLowerCase()) {
            case "development":
                return builder
                    .maxParallelism(2)
                    .numberPerCheckpoint(100)
                    .build();
                    
            case "testing":
                return builder
                    .maxParallelism(4)
                    .numberPerCheckpoint(500)
                    .build();
                    
            case "production":
                return builder
                    .maxParallelism(16)
                    .numberPerCheckpoint(1000)
                    .build();
                    
            default:
                throw new IllegalArgumentException("Unknown environment: " + environment);
        }
    }
    
    // 3. Configuration validation
    public static void validateConfiguration(Map<String, String> config) {
        // Validate required keys
        String backendType = config.get(ConfigKey.STATE_BACKEND_TYPE);
        if (backendType == null || backendType.trim().isEmpty()) {
            throw new IllegalArgumentException("Backend type must be specified");
        }
        
        // Validate numeric values
        String maxParallelStr = config.get(ConfigKey.JOB_MAX_PARALLEL);
        if (maxParallelStr != null) {
            try {
                int maxParallel = Integer.parseInt(maxParallelStr);
                if (maxParallel <= 0) {
                    throw new IllegalArgumentException("Max parallelism must be positive");
                }
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Max parallelism must be a valid integer");
            }
        }
        
        System.out.println("Configuration validation passed");
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-ray--streaming-state

docs

backend-management.md

configuration-key-groups.md

index.md

key-state-management.md

serialization-framework.md

state-types-operations.md

transaction-management.md

tile.json