0
# Key State Management
1
2
Key-based state management system providing ValueState, ListState, and MapState abstractions with key-group partitioning, transaction support, and fault-tolerant operations for distributed streaming applications.
3
4
## Capabilities
5
6
### Key State Backend
7
8
Main state backend implementation for managing different types of keyed states with key-group partitioning and transaction support. Note: This class is not thread-safe.
9
10
```java { .api }
11
/**
12
* Key state backend manager for different types of states (not thread-safe)
13
*/
14
public class KeyStateBackend extends AbstractKeyStateBackend {
15
/**
16
* Create key state backend with partitioning configuration
17
* @param numberOfKeyGroups Total number of key groups for partitioning
18
* @param keyGroup Key group range assigned to this backend instance
19
* @param abstractStateBackend Underlying state backend for storage
20
*/
21
public KeyStateBackend(int numberOfKeyGroups, KeyGroup keyGroup, AbstractStateBackend abstractStateBackend);
22
23
/**
24
* Get or create value state instance
25
* @param stateDescriptor Value state descriptor defining the state
26
* @return ValueState instance for the specified descriptor
27
*/
28
public <T> ValueState<T> getValueState(ValueStateDescriptor<T> stateDescriptor);
29
30
/**
31
* Get or create list state instance
32
* @param stateDescriptor List state descriptor defining the state
33
* @return ListState instance for the specified descriptor
34
*/
35
public <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);
36
37
/**
38
* Get or create map state instance
39
* @param stateDescriptor Map state descriptor defining the state
40
* @return MapState instance for the specified descriptor
41
*/
42
public <S, T> MapState<S, T> getMapState(MapStateDescriptor<S, T> stateDescriptor);
43
44
/**
45
* Set current processing key for state operations
46
* @param currentKey Current key for state access
47
*/
48
public void setCurrentKey(Object currentKey);
49
50
/**
51
* Get number of key groups for partitioning
52
* @return Total number of key groups
53
*/
54
public int getNumberOfKeyGroups();
55
56
/**
57
* Get assigned key group range
58
* @return KeyGroup representing the assigned range
59
*/
60
public KeyGroup getKeyGroup();
61
62
/**
63
* Close backend and clean up resources
64
*/
65
public void close();
66
}
67
```
68
69
**Usage Examples:**
70
71
```java
72
import io.ray.streaming.state.backend.*;
73
import io.ray.streaming.state.keystate.*;
74
import io.ray.streaming.state.keystate.desc.*;
75
import io.ray.streaming.state.keystate.state.*;
76
77
// Create key state backend
78
Map<String, String> config = new HashMap<>();
79
config.put("state.backend.type", "MEMORY");
80
AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);
81
82
int numberOfKeyGroups = 128;
83
KeyGroup keyGroup = new KeyGroup(0, 63); // Handle key groups 0-63
84
KeyStateBackend keyStateBackend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);
85
86
// Create and use different state types
87
ValueStateDescriptor<String> valueDesc = ValueStateDescriptor.build("user-session", String.class, "");
88
ValueState<String> valueState = keyStateBackend.getValueState(valueDesc);
89
90
ListStateDescriptor<String> listDesc = ListStateDescriptor.build("user-events", String.class);
91
ListState<String> listState = keyStateBackend.getListState(listDesc);
92
93
MapStateDescriptor<String, Integer> mapDesc = MapStateDescriptor.build("user-counters", String.class, Integer.class);
94
MapState<String, Integer> mapState = keyStateBackend.getMapState(mapDesc);
95
96
// Set current key and use states
97
keyStateBackend.setCurrentKey("user123");
98
valueState.update("session-abc");
99
listState.add("login-event");
100
mapState.put("clicks", 5);
101
102
// Close when done
103
keyStateBackend.close();
104
```
105
106
### Abstract Key State Backend
107
108
Base class providing transaction support and common state management functionality for key-based state backends. Note: This class is not thread-safe.
109
110
```java { .api }
111
/**
112
* Base class providing transaction support and state management (not thread-safe)
113
*/
114
public abstract class AbstractKeyStateBackend {
115
/**
116
* Create abstract key state backend
117
* @param backend Underlying state backend
118
*/
119
public AbstractKeyStateBackend(AbstractStateBackend backend);
120
121
/**
122
* Put value into state with descriptor and key
123
* @param descriptor State descriptor
124
* @param key State key
125
* @param value Value to store
126
*/
127
public <K, T> void put(AbstractStateDescriptor descriptor, K key, T value);
128
129
/**
130
* Get value from state with descriptor and key
131
* @param descriptor State descriptor
132
* @param key State key
133
* @return Retrieved value
134
*/
135
public <K, T> T get(AbstractStateDescriptor descriptor, K key);
136
137
/**
138
* Finish checkpoint phase - complete batch data saving and serialization
139
* @param checkpointId Checkpoint identifier
140
*/
141
public void finish(long checkpointId);
142
143
/**
144
* Commit checkpoint phase - persist data (can be async)
145
* @param checkpointId Checkpoint identifier
146
*/
147
public void commit(long checkpointId);
148
149
/**
150
* Acknowledge commit phase - clean up after commit
151
* @param checkpointId Checkpoint identifier
152
* @param timeStamp Timestamp of acknowledgment
153
*/
154
public void ackCommit(long checkpointId, long timeStamp);
155
156
/**
157
* Rollback checkpoint phase - recover from checkpoint
158
* @param checkpointId Checkpoint identifier
159
*/
160
public void rollBack(long checkpointId);
161
162
/**
163
* Get current processing key
164
* @return Current key object
165
*/
166
public Object getCurrentKey();
167
168
/**
169
* Set current processing key (abstract method)
170
* @param currentKey Current key to set
171
*/
172
public abstract void setCurrentKey(Object currentKey);
173
174
/**
175
* Get current checkpoint ID
176
* @return Current checkpoint ID
177
*/
178
public long getCheckpointId();
179
180
/**
181
* Set checkpoint ID
182
* @param checkpointId Checkpoint ID to set
183
*/
184
public void setCheckpointId(long checkpointId);
185
186
/**
187
* Set processing context with checkpoint and key
188
* @param checkpointId Checkpoint identifier
189
* @param currentKey Current processing key
190
*/
191
public void setContext(long checkpointId, Object currentKey);
192
193
/**
194
* Get key group index for current key
195
* @return Key group index
196
*/
197
public int getKeyGroupIndex();
198
}
199
```
200
201
### State Descriptors
202
203
State descriptors define the metadata and configuration for different types of states, providing type safety and unique identification.
204
205
#### Abstract State Descriptor
206
207
```java { .api }
208
/**
209
* Base class for all state descriptors
210
*/
211
public abstract class AbstractStateDescriptor<S, T> {
212
/**
213
* Create state descriptor with name and type
214
* @param name Descriptor name
215
* @param type Value type class
216
*/
217
public AbstractStateDescriptor(String name, Class<T> type);
218
219
/**
220
* Get descriptor name
221
* @return Descriptor name
222
*/
223
public String getName();
224
225
/**
226
* Get value type class
227
* @return Type class
228
*/
229
public Class<T> getType();
230
231
/**
232
* Get table name for storage
233
* @return Table name
234
*/
235
public String getTableName();
236
237
/**
238
* Set table name for storage
239
* @param tableName Table name
240
*/
241
public void setTableName(String tableName);
242
243
/**
244
* Get unique identifier for this descriptor
245
* @return Unique identifier string
246
*/
247
public String getIdentify();
248
249
/**
250
* Get state type enumeration
251
* @return StateType enum value
252
*/
253
public abstract StateType getStateType();
254
}
255
256
/**
257
* State type enumeration
258
*/
259
public enum StateType {
260
/** Value state type */
261
VALUE,
262
/** List state type */
263
LIST,
264
/** Map state type */
265
MAP
266
}
267
```
268
269
#### Value State Descriptor
270
271
```java { .api }
272
/**
273
* Descriptor for value state configuration
274
*/
275
public class ValueStateDescriptor<T> extends AbstractStateDescriptor<ValueState<T>, T> {
276
/**
277
* Create value state descriptor
278
* @param name State name
279
* @param type Value type class
280
* @param defaultValue Default value when state is empty
281
*/
282
public ValueStateDescriptor(String name, Class<T> type, T defaultValue);
283
284
/**
285
* Factory method for creating value state descriptor
286
* @param name State name
287
* @param type Value type class
288
* @param defaultValue Default value
289
* @return ValueStateDescriptor instance
290
*/
291
public static <T> ValueStateDescriptor<T> build(String name, Class<T> type, T defaultValue);
292
293
/**
294
* Get default value
295
* @return Default value
296
*/
297
public T getDefaultValue();
298
299
/**
300
* Get state type
301
* @return StateType.VALUE
302
*/
303
public StateType getStateType();
304
}
305
```
306
307
#### List State Descriptor
308
309
```java { .api }
310
/**
311
* Descriptor for list state configuration
312
*/
313
public class ListStateDescriptor<T> extends AbstractStateDescriptor<ListState<T>, T> {
314
/**
315
* Factory method for creating list state descriptor
316
* @param name State name
317
* @param type Element type class
318
* @return ListStateDescriptor instance
319
*/
320
public static <T> ListStateDescriptor<T> build(String name, Class<T> type);
321
322
/**
323
* Factory method for creating list state descriptor with operator flag
324
* @param name State name
325
* @param type Element type class
326
* @param isOperatorList Whether this is an operator-level list state
327
* @return ListStateDescriptor instance
328
*/
329
public static <T> ListStateDescriptor<T> build(String name, Class<T> type, boolean isOperatorList);
330
331
/**
332
* Check if this is an operator list
333
* @return True if operator list
334
*/
335
public boolean isOperatorList();
336
337
/**
338
* Get partition index
339
* @return Partition index
340
*/
341
public int getIndex();
342
343
/**
344
* Set partition index
345
* @param index Partition index
346
*/
347
public void setIndex(int index);
348
349
/**
350
* Get partition number
351
* @return Partition number
352
*/
353
public int getPartitionNumber();
354
355
/**
356
* Set partition number
357
* @param number Partition number
358
*/
359
public void setPartitionNumber(int number);
360
361
/**
362
* Get state type
363
* @return StateType.LIST
364
*/
365
public StateType getStateType();
366
}
367
```
368
369
#### Map State Descriptor
370
371
```java { .api }
372
/**
373
* Descriptor for map state configuration
374
*/
375
public class MapStateDescriptor<K, V> extends AbstractStateDescriptor<MapState<K, V>, V> {
376
/**
377
* Create map state descriptor
378
* @param name State name
379
* @param keyType Key type class
380
* @param valueType Value type class
381
*/
382
public MapStateDescriptor(String name, Class<K> keyType, Class<V> valueType);
383
384
/**
385
* Factory method for creating map state descriptor
386
* @param name State name
387
* @param keyType Key type class
388
* @param valueType Value type class
389
* @return MapStateDescriptor instance
390
*/
391
public static <K, V> MapStateDescriptor<K, V> build(String name, Class<K> keyType, Class<V> valueType);
392
393
/**
394
* Get state type
395
* @return StateType.MAP
396
*/
397
public StateType getStateType();
398
}
399
```
400
401
**Usage Examples:**
402
403
```java
404
// Create descriptors for different state types
405
ValueStateDescriptor<String> userNameDesc = ValueStateDescriptor.build("user-name", String.class, "anonymous");
406
407
ListStateDescriptor<String> eventListDesc = ListStateDescriptor.build("events", String.class);
408
ListStateDescriptor<String> operatorListDesc = ListStateDescriptor.build("operator-events", String.class, true);
409
410
MapStateDescriptor<String, Integer> counterMapDesc = MapStateDescriptor.build("counters", String.class, Integer.class);
411
412
// Use descriptors with key state backend
413
ValueState<String> userNameState = keyStateBackend.getValueState(userNameDesc);
414
ListState<String> eventListState = keyStateBackend.getListState(eventListDesc);
415
MapState<String, Integer> counterMapState = keyStateBackend.getMapState(counterMapDesc);
416
```