State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications
—
Three core state abstractions - ValueState for single values, ListState for ordered collections, and MapState for key-value mappings - each providing specialized operations and transaction support for different data access patterns.
Root interface for all state types providing common functionality for key management.
/**
* Base interface for all state types
*/
public interface State {
/**
* Set current key for state operations
* @param currentKey Current processing key
*/
void setCurrentKey(Object currentKey);
}Interface for states that contain a single value, providing base functionality for value retrieval.
/**
* Interface for single-value states
*/
public interface UnaryState<O> extends State {
/**
* Get the current value stored in state
* @return Current value or null if not set
*/
O get();
}State type for storing and managing single values with update capabilities and default value support.
/**
* Value state for storing single values
*/
public interface ValueState<T> extends UnaryState<T> {
/**
* Update the stored value
* @param value New value to store
*/
void update(T value);
}Usage Examples:
// Create value state for user session information
ValueStateDescriptor<String> sessionDesc = ValueStateDescriptor.build("user-session", String.class, "");
ValueState<String> sessionState = keyStateBackend.getValueState(sessionDesc);
// Set current key and use state
keyStateBackend.setCurrentKey("user123");
// Update and retrieve values
sessionState.update("session-abc-123");
String currentSession = sessionState.get(); // "session-abc-123"
// Check if value exists (null if not set, default value if specified)
keyStateBackend.setCurrentKey("new-user");
String newUserSession = sessionState.get(); // "" (default value)State type for storing and managing ordered collections with add and update operations for list-based data patterns.
/**
* List state for storing ordered collections
*/
public interface ListState<T> extends UnaryState<List<T>> {
/**
* Add a single value to the end of the list
* @param value Value to add
*/
void add(T value);
/**
* Replace the entire list with new values
* @param list New list to store
*/
void update(List<T> list);
}Usage Examples:
import java.util.List;
import java.util.ArrayList;
// Create list state for user events
ListStateDescriptor<String> eventsDesc = ListStateDescriptor.build("user-events", String.class);
ListState<String> eventsState = keyStateBackend.getListState(eventsDesc);
// Set current key
keyStateBackend.setCurrentKey("user123");
// Add individual events
eventsState.add("login");
eventsState.add("page-view:/home");
eventsState.add("click:button-submit");
// Get current list
List<String> currentEvents = eventsState.get();
// Result: ["login", "page-view:/home", "click:button-submit"]
// Replace entire list
List<String> newEvents = new ArrayList<>();
newEvents.add("logout");
eventsState.update(newEvents);
List<String> updatedEvents = eventsState.get();
// Result: ["logout"]State type for storing and managing key-value mappings with comprehensive map operations including get, put, remove, and bulk operations.
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;/**
* Map state for storing key-value mappings
*/
public interface MapState<K, V> extends UnaryState<Map<K, V>> {
/**
* Get value associated with the specified key
* @param key Key to look up
* @return Value associated with key, or null if not found
*/
V get(K key);
/**
* Associate a value with the specified key
* @param key Key to store
* @param value Value to associate with key
*/
void put(K key, V value);
/**
* Replace the entire map with new mappings
* @param map New map to store
*/
void update(Map<K, V> map);
/**
* Copy all mappings from the provided map into state
* @param map Map containing mappings to add
*/
void putAll(Map<K, V> map);
/**
* Remove mapping for the specified key
* @param key Key to remove
*/
void remove(K key);
/**
* Check if mapping exists for the specified key
* @param key Key to check
* @return True if key exists in map
*/
default boolean contains(K key) {
return get().containsKey(key);
}
/**
* Get all key-value entries in the map
* @return Iterable view of all entries
*/
default Iterable<Entry<K, V>> entries() {
return get().entrySet();
}
/**
* Get all keys in the map
* @return Iterable view of all keys
*/
default Iterable<K> keys() {
return get().keySet();
}
/**
* Get all values in the map
* @return Iterable view of all values
*/
default Iterable<V> values() {
return get().values();
}
/**
* Get iterator over all key-value entries
* @return Iterator over all entries
*/
default Iterator<Entry<K, V>> iterator() {
return get().entrySet().iterator();
}
}Usage Examples:
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
// Create map state for user counters
MapStateDescriptor<String, Integer> countersDesc = MapStateDescriptor.build("user-counters", String.class, Integer.class);
MapState<String, Integer> countersState = keyStateBackend.getMapState(countersDesc);
// Set current key
keyStateBackend.setCurrentKey("user123");
// Add individual mappings
countersState.put("clicks", 5);
countersState.put("views", 12);
countersState.put("purchases", 2);
// Get individual values
Integer clicks = countersState.get("clicks"); // 5
Integer views = countersState.get("views"); // 12
// Check if key exists
boolean hasClicks = countersState.contains("clicks"); // true
boolean hasDownloads = countersState.contains("downloads"); // false
// Add multiple mappings at once
Map<String, Integer> additionalCounters = new HashMap<>();
additionalCounters.put("downloads", 3);
additionalCounters.put("shares", 1);
countersState.putAll(additionalCounters);
// Remove specific mapping
countersState.remove("views");
// Iterate over all entries
for (Map.Entry<String, Integer> entry : countersState.entries()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
// Get all keys and values
Iterable<String> allKeys = countersState.keys();
Iterable<Integer> allValues = countersState.values();
// Replace entire map
Map<String, Integer> newCounters = new HashMap<>();
newCounters.put("new-metric", 100);
countersState.update(newCounters);
// Get entire map
Map<String, Integer> currentMap = countersState.get();The library provides default implementations for all state interfaces, managed internally by the key state backend system.
/**
* Default implementation of ValueState interface
*/
public class ValueStateImpl<T> implements ValueState<T> {
// Implementation managed by KeyStateBackend
}/**
* Default implementation of ListState interface
*/
public class ListStateImpl<T> implements ListState<T> {
// Implementation managed by KeyStateBackend
}/**
* Default implementation of MapState interface
*/
public class MapStateImpl<K, V> implements MapState<K, V> {
// Implementation managed by KeyStateBackend
}/**
* Implementation for operator-level state
*/
public class OperatorStateImpl<T> implements ListState<T> {
// Implementation managed by OperatorStateBackend
}Utility class providing helper methods for state operations and common patterns.
/**
* Helper utilities for state operations
*/
public class StateHelper {
// Various utility methods for state management
}Advanced Usage Patterns:
// Pattern 1: Counter with automatic initialization
MapState<String, Integer> counters = keyStateBackend.getMapState(countersDesc);
keyStateBackend.setCurrentKey("user456");
// Increment counter with null-safe operation
String metric = "page-views";
Integer currentCount = counters.get(metric);
counters.put(metric, (currentCount == null) ? 1 : currentCount + 1);
// Pattern 2: List state for sliding window
ListState<Long> timestamps = keyStateBackend.getListState(timestampDesc);
keyStateBackend.setCurrentKey("sensor1");
// Add new timestamp and maintain window size
timestamps.add(System.currentTimeMillis());
List<Long> allTimestamps = timestamps.get();
if (allTimestamps.size() > 100) {
// Keep only last 100 timestamps
List<Long> recentTimestamps = allTimestamps.subList(allTimestamps.size() - 100, allTimestamps.size());
timestamps.update(recentTimestamps);
}
// Pattern 3: Value state with complex objects
class UserProfile {
public String name;
public int age;
public List<String> interests;
}
ValueStateDescriptor<UserProfile> profileDesc = ValueStateDescriptor.build("user-profile", UserProfile.class, null);
ValueState<UserProfile> profileState = keyStateBackend.getValueState(profileDesc);
keyStateBackend.setCurrentKey("user789");
UserProfile profile = new UserProfile();
profile.name = "Alice";
profile.age = 30;
profile.interests = Arrays.asList("technology", "music");
profileState.update(profile);Install with Tessl CLI
npx tessl i tessl/maven-io-ray--streaming-state