CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ray--streaming-state

State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications

Pending
Overview
Eval results
Files

key-state-management.mddocs/

Key State Management

Key-based state management system providing ValueState, ListState, and MapState abstractions with key-group partitioning, transaction support, and fault-tolerant operations for distributed streaming applications.

Capabilities

Key State Backend

Main state backend implementation for managing different types of keyed states with key-group partitioning and transaction support. Note: This class is not thread-safe.

/**
 * Key state backend manager for different types of states (not thread-safe)
 */
public class KeyStateBackend extends AbstractKeyStateBackend {
    /**
     * Create key state backend with partitioning configuration
     * @param numberOfKeyGroups Total number of key groups for partitioning
     * @param keyGroup Key group range assigned to this backend instance
     * @param abstractStateBackend Underlying state backend for storage
     */
    public KeyStateBackend(int numberOfKeyGroups, KeyGroup keyGroup, AbstractStateBackend abstractStateBackend);
    
    /**
     * Get or create value state instance
     * @param stateDescriptor Value state descriptor defining the state
     * @return ValueState instance for the specified descriptor
     */
    public <T> ValueState<T> getValueState(ValueStateDescriptor<T> stateDescriptor);
    
    /**
     * Get or create list state instance
     * @param stateDescriptor List state descriptor defining the state
     * @return ListState instance for the specified descriptor
     */
    public <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);
    
    /**
     * Get or create map state instance
     * @param stateDescriptor Map state descriptor defining the state
     * @return MapState instance for the specified descriptor
     */
    public <S, T> MapState<S, T> getMapState(MapStateDescriptor<S, T> stateDescriptor);
    
    /**
     * Set current processing key for state operations
     * @param currentKey Current key for state access
     */
    public void setCurrentKey(Object currentKey);
    
    /**
     * Get number of key groups for partitioning
     * @return Total number of key groups
     */
    public int getNumberOfKeyGroups();
    
    /**
     * Get assigned key group range
     * @return KeyGroup representing the assigned range
     */
    public KeyGroup getKeyGroup();
    
    /**
     * Close backend and clean up resources
     */
    public void close();
}

Usage Examples:

import io.ray.streaming.state.backend.*;
import io.ray.streaming.state.keystate.*;
import io.ray.streaming.state.keystate.desc.*;
import io.ray.streaming.state.keystate.state.*;

// Create key state backend
Map<String, String> config = new HashMap<>();
config.put("state.backend.type", "MEMORY");
AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);

int numberOfKeyGroups = 128;
KeyGroup keyGroup = new KeyGroup(0, 63); // Handle key groups 0-63
KeyStateBackend keyStateBackend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);

// Create and use different state types
ValueStateDescriptor<String> valueDesc = ValueStateDescriptor.build("user-session", String.class, "");
ValueState<String> valueState = keyStateBackend.getValueState(valueDesc);

ListStateDescriptor<String> listDesc = ListStateDescriptor.build("user-events", String.class);
ListState<String> listState = keyStateBackend.getListState(listDesc);

MapStateDescriptor<String, Integer> mapDesc = MapStateDescriptor.build("user-counters", String.class, Integer.class);
MapState<String, Integer> mapState = keyStateBackend.getMapState(mapDesc);

// Set current key and use states
keyStateBackend.setCurrentKey("user123");
valueState.update("session-abc");
listState.add("login-event");
mapState.put("clicks", 5);

// Close when done
keyStateBackend.close();

Abstract Key State Backend

Base class providing transaction support and common state management functionality for key-based state backends. Note: This class is not thread-safe.

/**
 * Base class providing transaction support and state management (not thread-safe)
 */
public abstract class AbstractKeyStateBackend {
    /**
     * Create abstract key state backend
     * @param backend Underlying state backend
     */
    public AbstractKeyStateBackend(AbstractStateBackend backend);
    
    /**
     * Put value into state with descriptor and key
     * @param descriptor State descriptor
     * @param key State key
     * @param value Value to store
     */
    public <K, T> void put(AbstractStateDescriptor descriptor, K key, T value);
    
    /**
     * Get value from state with descriptor and key
     * @param descriptor State descriptor
     * @param key State key
     * @return Retrieved value
     */
    public <K, T> T get(AbstractStateDescriptor descriptor, K key);
    
    /**
     * Finish checkpoint phase - complete batch data saving and serialization
     * @param checkpointId Checkpoint identifier
     */
    public void finish(long checkpointId);
    
    /**
     * Commit checkpoint phase - persist data (can be async)
     * @param checkpointId Checkpoint identifier
     */
    public void commit(long checkpointId);
    
    /**
     * Acknowledge commit phase - clean up after commit
     * @param checkpointId Checkpoint identifier
     * @param timeStamp Timestamp of acknowledgment
     */
    public void ackCommit(long checkpointId, long timeStamp);
    
    /**
     * Rollback checkpoint phase - recover from checkpoint
     * @param checkpointId Checkpoint identifier
     */
    public void rollBack(long checkpointId);
    
    /**
     * Get current processing key
     * @return Current key object
     */
    public Object getCurrentKey();
    
    /**
     * Set current processing key (abstract method)
     * @param currentKey Current key to set
     */
    public abstract void setCurrentKey(Object currentKey);
    
    /**
     * Get current checkpoint ID
     * @return Current checkpoint ID
     */
    public long getCheckpointId();
    
    /**
     * Set checkpoint ID
     * @param checkpointId Checkpoint ID to set
     */
    public void setCheckpointId(long checkpointId);
    
    /**
     * Set processing context with checkpoint and key
     * @param checkpointId Checkpoint identifier
     * @param currentKey Current processing key
     */
    public void setContext(long checkpointId, Object currentKey);
    
    /**
     * Get key group index for current key
     * @return Key group index
     */
    public int getKeyGroupIndex();
}

State Descriptors

State descriptors define the metadata and configuration for different types of states, providing type safety and unique identification.

Abstract State Descriptor

/**
 * Base class for all state descriptors
 */
public abstract class AbstractStateDescriptor<S, T> {
    /**
     * Create state descriptor with name and type
     * @param name Descriptor name
     * @param type Value type class
     */
    public AbstractStateDescriptor(String name, Class<T> type);
    
    /**
     * Get descriptor name
     * @return Descriptor name
     */
    public String getName();
    
    /**
     * Get value type class
     * @return Type class
     */
    public Class<T> getType();
    
    /**
     * Get table name for storage
     * @return Table name
     */
    public String getTableName();
    
    /**
     * Set table name for storage
     * @param tableName Table name
     */
    public void setTableName(String tableName);
    
    /**
     * Get unique identifier for this descriptor
     * @return Unique identifier string
     */
    public String getIdentify();
    
    /**
     * Get state type enumeration
     * @return StateType enum value
     */
    public abstract StateType getStateType();
}

/**
 * State type enumeration
 */
public enum StateType {
    /** Value state type */
    VALUE,
    /** List state type */
    LIST,
    /** Map state type */
    MAP
}

Value State Descriptor

/**
 * Descriptor for value state configuration
 */
public class ValueStateDescriptor<T> extends AbstractStateDescriptor<ValueState<T>, T> {
    /**
     * Create value state descriptor
     * @param name State name
     * @param type Value type class
     * @param defaultValue Default value when state is empty
     */
    public ValueStateDescriptor(String name, Class<T> type, T defaultValue);
    
    /**
     * Factory method for creating value state descriptor
     * @param name State name
     * @param type Value type class
     * @param defaultValue Default value
     * @return ValueStateDescriptor instance
     */
    public static <T> ValueStateDescriptor<T> build(String name, Class<T> type, T defaultValue);
    
    /**
     * Get default value
     * @return Default value
     */
    public T getDefaultValue();
    
    /**
     * Get state type
     * @return StateType.VALUE
     */
    public StateType getStateType();
}

List State Descriptor

/**
 * Descriptor for list state configuration
 */
public class ListStateDescriptor<T> extends AbstractStateDescriptor<ListState<T>, T> {
    /**
     * Factory method for creating list state descriptor
     * @param name State name
     * @param type Element type class
     * @return ListStateDescriptor instance
     */
    public static <T> ListStateDescriptor<T> build(String name, Class<T> type);
    
    /**
     * Factory method for creating list state descriptor with operator flag
     * @param name State name
     * @param type Element type class
     * @param isOperatorList Whether this is an operator-level list state
     * @return ListStateDescriptor instance
     */
    public static <T> ListStateDescriptor<T> build(String name, Class<T> type, boolean isOperatorList);
    
    /**
     * Check if this is an operator list
     * @return True if operator list
     */
    public boolean isOperatorList();
    
    /**
     * Get partition index
     * @return Partition index
     */
    public int getIndex();
    
    /**
     * Set partition index
     * @param index Partition index
     */
    public void setIndex(int index);
    
    /**
     * Get partition number
     * @return Partition number
     */
    public int getPartitionNumber();
    
    /**
     * Set partition number
     * @param number Partition number
     */
    public void setPartitionNumber(int number);
    
    /**
     * Get state type
     * @return StateType.LIST
     */
    public StateType getStateType();
}

Map State Descriptor

/**
 * Descriptor for map state configuration
 */
public class MapStateDescriptor<K, V> extends AbstractStateDescriptor<MapState<K, V>, V> {
    /**
     * Create map state descriptor
     * @param name State name
     * @param keyType Key type class
     * @param valueType Value type class
     */
    public MapStateDescriptor(String name, Class<K> keyType, Class<V> valueType);
    
    /**
     * Factory method for creating map state descriptor
     * @param name State name
     * @param keyType Key type class
     * @param valueType Value type class
     * @return MapStateDescriptor instance
     */
    public static <K, V> MapStateDescriptor<K, V> build(String name, Class<K> keyType, Class<V> valueType);
    
    /**
     * Get state type
     * @return StateType.MAP
     */
    public StateType getStateType();
}

Usage Examples:

// Create descriptors for different state types
ValueStateDescriptor<String> userNameDesc = ValueStateDescriptor.build("user-name", String.class, "anonymous");

ListStateDescriptor<String> eventListDesc = ListStateDescriptor.build("events", String.class);
ListStateDescriptor<String> operatorListDesc = ListStateDescriptor.build("operator-events", String.class, true);

MapStateDescriptor<String, Integer> counterMapDesc = MapStateDescriptor.build("counters", String.class, Integer.class);

// Use descriptors with key state backend
ValueState<String> userNameState = keyStateBackend.getValueState(userNameDesc);
ListState<String> eventListState = keyStateBackend.getListState(eventListDesc);
MapState<String, Integer> counterMapState = keyStateBackend.getMapState(counterMapDesc);

Install with Tessl CLI

npx tessl i tessl/maven-io-ray--streaming-state

docs

backend-management.md

configuration-key-groups.md

index.md

key-state-management.md

serialization-framework.md

state-types-operations.md

transaction-management.md

tile.json