or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

backend-management.mdconfiguration-key-groups.mdindex.mdkey-state-management.mdserialization-framework.mdstate-types-operations.mdtransaction-management.md
tile.json

tessl/maven-io-ray--streaming-state

State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/io.ray/streaming-state@1.10.x

To install, run

npx @tessl/cli install tessl/maven-io-ray--streaming-state@1.10.0

index.mddocs/

Ray Streaming State

Ray Streaming State is a comprehensive state management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications. It enables streaming applications to maintain persistent, transactional state across distributed processing nodes with fault-tolerant recovery mechanisms.

Package Information

  • Package Name: streaming-state
  • Package Type: maven
  • Language: Java
  • Installation: Add to your Maven pom.xml:
<dependency>
    <groupId>io.ray</groupId>
    <artifactId>streaming-state</artifactId>
    <version>1.10.0</version>
</dependency>

Core Imports

import io.ray.streaming.state.backend.StateBackendBuilder;
import io.ray.streaming.state.backend.KeyStateBackend;
import io.ray.streaming.state.keystate.desc.ValueStateDescriptor;
import io.ray.streaming.state.keystate.desc.ListStateDescriptor;
import io.ray.streaming.state.keystate.desc.MapStateDescriptor;
import io.ray.streaming.state.keystate.state.ValueState;
import io.ray.streaming.state.keystate.state.ListState;
import io.ray.streaming.state.keystate.state.MapState;

Basic Usage

import io.ray.streaming.state.backend.*;
import io.ray.streaming.state.keystate.desc.*;
import io.ray.streaming.state.keystate.state.*;
import io.ray.streaming.state.keystate.KeyGroup;
import java.util.Map;
import java.util.HashMap;

// Create state backend
Map<String, String> config = new HashMap<>();
config.put("state.backend.type", "MEMORY");
AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);

// Create key state backend
int numberOfKeyGroups = 128;
KeyGroup keyGroup = new KeyGroup(0, 63);
KeyStateBackend keyStateBackend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);

// Create value state
ValueStateDescriptor<String> valueDesc = ValueStateDescriptor.build("user-name", String.class, "unknown");
ValueState<String> valueState = keyStateBackend.getValueState(valueDesc);

// Use state with transactional operations
keyStateBackend.setCurrentKey("user123");
valueState.update("Alice");
String name = valueState.get(); // "Alice"

// Transaction operations for checkpointing
long checkpointId = 1001L;
keyStateBackend.finish(checkpointId);
keyStateBackend.commit(checkpointId);
keyStateBackend.ackCommit(checkpointId, System.currentTimeMillis());

Architecture

Ray Streaming State is built around several key architectural components:

  • State Backend System: Pluggable storage backends (currently Memory-based) with configurable strategies
  • Transaction Management: Four-phase commit protocol (finish, commit, ackCommit, rollback) for fault tolerance
  • Key-Group Partitioning: Scalable key distribution across processing nodes for parallel state management
  • State Types: Three core state abstractions (ValueState, ListState, MapState) supporting different data patterns
  • Serialization Framework: Pluggable serialization system with default FST-based implementation
  • Storage Strategies: DUAL_VERSION (rollback support) and SINGLE_VERSION (MVCC optimization) approaches

Capabilities

State Backend Management

Core state backend system providing pluggable storage implementations, configuration management, and factory methods for creating state backends with different strategies and storage types.

public static AbstractStateBackend buildStateBackend(Map<String, String> config);

public enum BackendType {
    MEMORY
}

public enum StateStrategy {
    DUAL_VERSION,
    SINGLE_VERSION  
}

State Backend Management

Key State Management

Key-based state management system providing ValueState, ListState, and MapState abstractions with key-group partitioning, transaction support, and fault-tolerant operations for distributed streaming applications.

public class KeyStateBackend extends AbstractKeyStateBackend {
    public <T> ValueState<T> getValueState(ValueStateDescriptor<T> stateDescriptor);
    public <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);
    public <S, T> MapState<S, T> getMapState(MapStateDescriptor<S, T> stateDescriptor);
    public void setCurrentKey(Object currentKey);
}

Key State Management

State Types and Operations

Three core state abstractions - ValueState for single values, ListState for ordered collections, and MapState for key-value mappings - each providing specialized operations and transaction support for different data access patterns.

public interface ValueState<T> extends UnaryState<T> {
    void update(T value);
}

public interface ListState<T> extends UnaryState<List<T>> {
    void add(T value);
    void update(List<T> list);
}

public interface MapState<K, V> extends UnaryState<Map<K, V>> {
    V get(K key);
    void put(K key, V value);
    void remove(K key);
    void putAll(Map<K, V> map);
}

State Types and Operations

Transaction Management

Comprehensive transaction system implementing four-phase commit protocol with finish, commit, ackCommit, and rollback operations for ensuring data consistency during failures and supporting checkpoint-based recovery.

public interface StateStoreManager {
    void finish(long checkpointId);
    void commit(long checkpointId);
    void ackCommit(long checkpointId, long timeStamp);
    void rollBack(long checkpointId);
}

Transaction Management

Serialization Framework

Pluggable serialization system with default FST-based implementations supporting custom serializers for both key-value stores and key-map stores, enabling efficient state persistence and cross-language compatibility.

public interface KeyValueStoreSerialization<K, V> {
    byte[] serializeKey(K key);
    byte[] serializeValue(V value);
    V deserializeValue(byte[] valueArray);
}

public interface KeyMapStoreSerializer<K, S, T> {
    byte[] serializeKey(K key);
    byte[] serializeUKey(S uk);
    byte[] serializeUValue(T uv);
    S deserializeUKey(byte[] ukArray);
    T deserializeUValue(byte[] uvArray);
}

Serialization Framework

Configuration and Key Groups

Configuration system and key-group assignment algorithms enabling distributed state partitioning, parallel processing optimization, and configurable backend selection with built-in helper utilities.

public class KeyGroupAssignment {
    public static KeyGroup getKeyGroup(int maxParallelism, int parallelism, int index);
    public static int assignKeyGroupIndexForKey(Object key, int maxParallelism);
    public static Map<Integer, List<Integer>> computeKeyGroupToTask(int maxParallelism, List<Integer> targetTasks);
}

public class ConfigKey {
    public static final String STATE_BACKEND_TYPE = "state.backend.type";
    public static final String STATE_STRATEGY_MODE = "state.strategy.mode";
    public static final String STATE_TABLE_NAME = "state.table.name";
}

Configuration and Key Groups

Common Types

public class StorageRecord<T> {
    public StorageRecord(long checkpointId, T value);
    public T getValue();
    public long getCheckpointId();
    public void setCheckpointId(long checkpointId);
}

public class PartitionRecord<T> {
    public PartitionRecord(int partitionID, T value);
    public T getValue();
    public int getPartitionID();
    public void setPartitionID(int partitionID);
}

public class StateException extends RuntimeException {
    public StateException(Throwable t);
    public StateException(String msg);
}

public class KeyGroup {
    public KeyGroup(int startIndex, int endIndex);
    public int size();
    public int getStartIndex();
    public int getEndIndex();
}