0
# Ray Streaming State
1
2
Ray Streaming State is a comprehensive state management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications. It enables streaming applications to maintain persistent, transactional state across distributed processing nodes with fault-tolerant recovery mechanisms.
3
4
## Package Information
5
6
- **Package Name**: streaming-state
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to your Maven pom.xml:
10
11
```xml
12
<dependency>
13
<groupId>io.ray</groupId>
14
<artifactId>streaming-state</artifactId>
15
<version>1.10.0</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import io.ray.streaming.state.backend.StateBackendBuilder;
23
import io.ray.streaming.state.backend.KeyStateBackend;
24
import io.ray.streaming.state.keystate.desc.ValueStateDescriptor;
25
import io.ray.streaming.state.keystate.desc.ListStateDescriptor;
26
import io.ray.streaming.state.keystate.desc.MapStateDescriptor;
27
import io.ray.streaming.state.keystate.state.ValueState;
28
import io.ray.streaming.state.keystate.state.ListState;
29
import io.ray.streaming.state.keystate.state.MapState;
30
```
31
32
## Basic Usage
33
34
```java
35
import io.ray.streaming.state.backend.*;
36
import io.ray.streaming.state.keystate.desc.*;
37
import io.ray.streaming.state.keystate.state.*;
38
import io.ray.streaming.state.keystate.KeyGroup;
39
import java.util.Map;
40
import java.util.HashMap;
41
42
// Create state backend
43
Map<String, String> config = new HashMap<>();
44
config.put("state.backend.type", "MEMORY");
45
AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);
46
47
// Create key state backend
48
int numberOfKeyGroups = 128;
49
KeyGroup keyGroup = new KeyGroup(0, 63);
50
KeyStateBackend keyStateBackend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);
51
52
// Create value state
53
ValueStateDescriptor<String> valueDesc = ValueStateDescriptor.build("user-name", String.class, "unknown");
54
ValueState<String> valueState = keyStateBackend.getValueState(valueDesc);
55
56
// Use state with transactional operations
57
keyStateBackend.setCurrentKey("user123");
58
valueState.update("Alice");
59
String name = valueState.get(); // "Alice"
60
61
// Transaction operations for checkpointing
62
long checkpointId = 1001L;
63
keyStateBackend.finish(checkpointId);
64
keyStateBackend.commit(checkpointId);
65
keyStateBackend.ackCommit(checkpointId, System.currentTimeMillis());
66
```
67
68
## Architecture
69
70
Ray Streaming State is built around several key architectural components:
71
72
- **State Backend System**: Pluggable storage backends (currently Memory-based) with configurable strategies
73
- **Transaction Management**: Four-phase commit protocol (finish, commit, ackCommit, rollback) for fault tolerance
74
- **Key-Group Partitioning**: Scalable key distribution across processing nodes for parallel state management
75
- **State Types**: Three core state abstractions (ValueState, ListState, MapState) supporting different data patterns
76
- **Serialization Framework**: Pluggable serialization system with default FST-based implementation
77
- **Storage Strategies**: DUAL_VERSION (rollback support) and SINGLE_VERSION (MVCC optimization) approaches
78
79
## Capabilities
80
81
### State Backend Management
82
83
Core state backend system providing pluggable storage implementations, configuration management, and factory methods for creating state backends with different strategies and storage types.
84
85
```java { .api }
86
public static AbstractStateBackend buildStateBackend(Map<String, String> config);
87
88
public enum BackendType {
89
MEMORY
90
}
91
92
public enum StateStrategy {
93
DUAL_VERSION,
94
SINGLE_VERSION
95
}
96
```
97
98
[State Backend Management](./backend-management.md)
99
100
### Key State Management
101
102
Key-based state management system providing ValueState, ListState, and MapState abstractions with key-group partitioning, transaction support, and fault-tolerant operations for distributed streaming applications.
103
104
```java { .api }
105
public class KeyStateBackend extends AbstractKeyStateBackend {
106
public <T> ValueState<T> getValueState(ValueStateDescriptor<T> stateDescriptor);
107
public <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);
108
public <S, T> MapState<S, T> getMapState(MapStateDescriptor<S, T> stateDescriptor);
109
public void setCurrentKey(Object currentKey);
110
}
111
```
112
113
[Key State Management](./key-state-management.md)
114
115
### State Types and Operations
116
117
Three core state abstractions - ValueState for single values, ListState for ordered collections, and MapState for key-value mappings - each providing specialized operations and transaction support for different data access patterns.
118
119
```java { .api }
120
public interface ValueState<T> extends UnaryState<T> {
121
void update(T value);
122
}
123
124
public interface ListState<T> extends UnaryState<List<T>> {
125
void add(T value);
126
void update(List<T> list);
127
}
128
129
public interface MapState<K, V> extends UnaryState<Map<K, V>> {
130
V get(K key);
131
void put(K key, V value);
132
void remove(K key);
133
void putAll(Map<K, V> map);
134
}
135
```
136
137
[State Types and Operations](./state-types-operations.md)
138
139
### Transaction Management
140
141
Comprehensive transaction system implementing four-phase commit protocol with finish, commit, ackCommit, and rollback operations for ensuring data consistency during failures and supporting checkpoint-based recovery.
142
143
```java { .api }
144
public interface StateStoreManager {
145
void finish(long checkpointId);
146
void commit(long checkpointId);
147
void ackCommit(long checkpointId, long timeStamp);
148
void rollBack(long checkpointId);
149
}
150
```
151
152
[Transaction Management](./transaction-management.md)
153
154
### Serialization Framework
155
156
Pluggable serialization system with default FST-based implementations supporting custom serializers for both key-value stores and key-map stores, enabling efficient state persistence and cross-language compatibility.
157
158
```java { .api }
159
public interface KeyValueStoreSerialization<K, V> {
160
byte[] serializeKey(K key);
161
byte[] serializeValue(V value);
162
V deserializeValue(byte[] valueArray);
163
}
164
165
public interface KeyMapStoreSerializer<K, S, T> {
166
byte[] serializeKey(K key);
167
byte[] serializeUKey(S uk);
168
byte[] serializeUValue(T uv);
169
S deserializeUKey(byte[] ukArray);
170
T deserializeUValue(byte[] uvArray);
171
}
172
```
173
174
[Serialization Framework](./serialization-framework.md)
175
176
### Configuration and Key Groups
177
178
Configuration system and key-group assignment algorithms enabling distributed state partitioning, parallel processing optimization, and configurable backend selection with built-in helper utilities.
179
180
```java { .api }
181
public class KeyGroupAssignment {
182
public static KeyGroup getKeyGroup(int maxParallelism, int parallelism, int index);
183
public static int assignKeyGroupIndexForKey(Object key, int maxParallelism);
184
public static Map<Integer, List<Integer>> computeKeyGroupToTask(int maxParallelism, List<Integer> targetTasks);
185
}
186
187
public class ConfigKey {
188
public static final String STATE_BACKEND_TYPE = "state.backend.type";
189
public static final String STATE_STRATEGY_MODE = "state.strategy.mode";
190
public static final String STATE_TABLE_NAME = "state.table.name";
191
}
192
```
193
194
[Configuration and Key Groups](./configuration-key-groups.md)
195
196
## Common Types
197
198
```java { .api }
199
public class StorageRecord<T> {
200
public StorageRecord(long checkpointId, T value);
201
public T getValue();
202
public long getCheckpointId();
203
public void setCheckpointId(long checkpointId);
204
}
205
206
public class PartitionRecord<T> {
207
public PartitionRecord(int partitionID, T value);
208
public T getValue();
209
public int getPartitionID();
210
public void setPartitionID(int partitionID);
211
}
212
213
public class StateException extends RuntimeException {
214
public StateException(Throwable t);
215
public StateException(String msg);
216
}
217
218
public class KeyGroup {
219
public KeyGroup(int startIndex, int endIndex);
220
public int size();
221
public int getStartIndex();
222
public int getEndIndex();
223
}
224
```