0
# Configuration and Key Groups
1
2
Configuration system and key-group assignment algorithms enabling distributed state partitioning, parallel processing optimization, and configurable backend selection with built-in helper utilities.
3
4
## Capabilities
5
6
### Configuration Keys
7
8
Central configuration constants and helper methods for state backend configuration and parameter management.
9
10
```java { .api }
11
/**
12
* Configuration keys and helper methods for state management
13
*/
14
public class ConfigKey {
15
/** Backend type configuration key */
16
public static final String STATE_BACKEND_TYPE = "state.backend.type";
17
18
/** Table name configuration key */
19
public static final String STATE_TABLE_NAME = "state.table.name";
20
21
/** Strategy mode configuration key */
22
public static final String STATE_STRATEGY_MODE = "state.strategy.mode";
23
24
/** Number per checkpoint configuration key */
25
public static final String NUMBER_PER_CHECKPOINT = "number.per.checkpoint";
26
27
/** Maximum parallelism configuration key */
28
public static final String JOB_MAX_PARALLEL = "job.max.parallel";
29
30
/** String delimiter for internal use */
31
public static final String DELIMITER = "delimiter.string";
32
33
/**
34
* Get state strategy from configuration map
35
* @param config Configuration map
36
* @return State strategy string value
37
*/
38
public static String getStateStrategyEnum(Map<String, String> config);
39
40
/**
41
* Get backend type from configuration map
42
* @param config Configuration map
43
* @return Backend type string value
44
*/
45
public static String getBackendType(Map<String, String> config);
46
47
/**
48
* Get number per checkpoint from configuration map
49
* @param config Configuration map
50
* @return Number per checkpoint value
51
*/
52
public static int getNumberPerCheckpoint(Map<String, String> config);
53
54
/**
55
* Get state table name from configuration map
56
* @param config Configuration map
57
* @return State table name
58
*/
59
public static String getStateTableName(Map<String, String> config);
60
}
61
```
62
63
**Usage Examples:**
64
65
```java
66
import io.ray.streaming.state.config.ConfigKey;
67
import java.util.Map;
68
import java.util.HashMap;
69
70
// Create configuration map
71
Map<String, String> config = new HashMap<>();
72
config.put(ConfigKey.STATE_BACKEND_TYPE, "MEMORY");
73
config.put(ConfigKey.STATE_STRATEGY_MODE, "DUAL_VERSION");
74
config.put(ConfigKey.STATE_TABLE_NAME, "user-state-table");
75
config.put(ConfigKey.NUMBER_PER_CHECKPOINT, "1000");
76
config.put(ConfigKey.JOB_MAX_PARALLEL, "16");
77
78
// Retrieve configuration values
79
String backendType = ConfigKey.getBackendType(config); // "MEMORY"
80
String strategy = ConfigKey.getStateStrategyEnum(config); // "DUAL_VERSION"
81
String tableName = ConfigKey.getStateTableName(config); // "user-state-table"
82
int numberPerCheckpoint = ConfigKey.getNumberPerCheckpoint(config); // 1000
83
84
// Use with state backend builder
85
AbstractStateBackend backend = StateBackendBuilder.buildStateBackend(config);
86
```
87
88
### Configuration Helper Utilities
89
90
Helper methods for processing configuration maps with type conversion and default value support.
91
92
```java { .api }
93
/**
94
* Helper methods for configuration processing
95
*/
96
public class ConfigHelper {
97
/**
98
* Get integer value from configuration with default fallback
99
* @param config Configuration map
100
* @param configKey Configuration key to look up
101
* @param defaultValue Default value if key not found or invalid
102
* @return Integer value or default
103
*/
104
public static int getIntegerOrDefault(Map config, String configKey, int defaultValue);
105
106
/**
107
* Get string value from configuration with default fallback
108
* @param config Configuration map
109
* @param configKey Configuration key to look up
110
* @param defaultValue Default value if key not found
111
* @return String value or default
112
*/
113
public static String getStringOrDefault(Map config, String configKey, String defaultValue);
114
}
115
```
116
117
**Usage Examples:**
118
119
```java
120
import io.ray.streaming.state.config.ConfigHelper;
121
122
// Safe configuration retrieval with defaults
123
Map<String, String> config = getConfigurationFromSource();
124
125
int parallelism = ConfigHelper.getIntegerOrDefault(config, "parallelism", 4);
126
String environment = ConfigHelper.getStringOrDefault(config, "environment", "production");
127
int bufferSize = ConfigHelper.getIntegerOrDefault(config, "buffer.size", 8192);
128
129
// Handle missing or malformed configuration gracefully
130
int timeout = ConfigHelper.getIntegerOrDefault(config, "timeout", 5000); // Will use 5000 if "timeout" is missing or not a valid integer
131
```
132
133
### Key Group Management
134
135
Key group system providing scalable partitioning for distributed state processing across multiple parallel instances.
136
137
#### Key Group Class
138
139
```java { .api }
140
/**
141
* Defines key-groups for partitioned state processing
142
*/
143
public class KeyGroup {
144
/**
145
* Create key group with inclusive range
146
* @param startIndex Start index (inclusive)
147
* @param endIndex End index (inclusive)
148
*/
149
public KeyGroup(int startIndex, int endIndex);
150
151
/**
152
* Get number of key-groups in this range
153
* @return Number of key-groups
154
*/
155
public int size();
156
157
/**
158
* Get start index of range
159
* @return Start index (inclusive)
160
*/
161
public int getStartIndex();
162
163
/**
164
* Get end index of range
165
* @return End index (inclusive)
166
*/
167
public int getEndIndex();
168
}
169
```
170
171
**Usage Examples:**
172
173
```java
174
// Create key groups for different parallel instances
175
KeyGroup keyGroup1 = new KeyGroup(0, 31); // Handles key groups 0-31 (32 groups)
176
KeyGroup keyGroup2 = new KeyGroup(32, 63); // Handles key groups 32-63 (32 groups)
177
KeyGroup keyGroup3 = new KeyGroup(64, 95); // Handles key groups 64-95 (32 groups)
178
KeyGroup keyGroup4 = new KeyGroup(96, 127); // Handles key groups 96-127 (32 groups)
179
180
// Check key group properties
181
int size1 = keyGroup1.size(); // 32
182
int start1 = keyGroup1.getStartIndex(); // 0
183
int end1 = keyGroup1.getEndIndex(); // 31
184
185
// Use with key state backend
186
KeyStateBackend backend1 = new KeyStateBackend(128, keyGroup1, stateBackend);
187
KeyStateBackend backend2 = new KeyStateBackend(128, keyGroup2, stateBackend);
188
```
189
190
#### Key Group Assignment Algorithms
191
192
```java { .api }
193
/**
194
* Key-group assignment algorithms for distributed processing
195
*/
196
public class KeyGroupAssignment {
197
/**
198
* Compute key-group range for specific operator instance
199
* @param maxParallelism Maximum parallelism (total key groups)
200
* @param parallelism Current parallelism level
201
* @param index Operator instance index (0-based)
202
* @return KeyGroup representing assigned range
203
*/
204
public static KeyGroup getKeyGroup(int maxParallelism, int parallelism, int index);
205
206
/**
207
* Assign key to specific key-group index using hash function
208
* @param key Key object to assign
209
* @param maxParallelism Maximum parallelism (total key groups)
210
* @return Key-group index for the key
211
*/
212
public static int assignKeyGroupIndexForKey(Object key, int maxParallelism);
213
214
/**
215
* Compute mapping from key-groups to task instances
216
* @param maxParallelism Maximum parallelism (total key groups)
217
* @param targetTasks List of target task IDs
218
* @return Map from key-group index to list of task IDs
219
*/
220
public static Map<Integer, List<Integer>> computeKeyGroupToTask(int maxParallelism, List<Integer> targetTasks);
221
}
222
```
223
224
**Advanced Usage Examples:**
225
226
```java
227
import io.ray.streaming.state.keystate.KeyGroupAssignment;
228
import java.util.List;
229
import java.util.Arrays;
230
import java.util.Map;
231
232
// Example 1: Distribute key groups across parallel instances
233
int maxParallelism = 128; // Total key groups
234
int parallelism = 4; // Number of parallel instances
235
236
// Calculate key group ranges for each instance
237
KeyGroup range0 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 0); // 0-31
238
KeyGroup range1 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 1); // 32-63
239
KeyGroup range2 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 2); // 64-95
240
KeyGroup range3 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 3); // 96-127
241
242
System.out.println("Instance 0 handles key groups: " + range0.getStartIndex() + "-" + range0.getEndIndex());
243
System.out.println("Instance 1 handles key groups: " + range1.getStartIndex() + "-" + range1.getEndIndex());
244
245
// Example 2: Determine which instance should handle a specific key
246
String userKey1 = "user123";
247
String userKey2 = "user456";
248
String userKey3 = "user789";
249
250
int keyGroup1 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey1, maxParallelism); // e.g., 45
251
int keyGroup2 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey2, maxParallelism); // e.g., 12
252
int keyGroup3 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey3, maxParallelism); // e.g., 89
253
254
// Determine which instance handles each key
255
int instance1 = keyGroup1 / (maxParallelism / parallelism); // 45 / 32 = 1
256
int instance2 = keyGroup2 / (maxParallelism / parallelism); // 12 / 32 = 0
257
int instance3 = keyGroup3 / (maxParallelism / parallelism); // 89 / 32 = 2
258
259
System.out.println(userKey1 + " -> key group " + keyGroup1 + " -> instance " + instance1);
260
System.out.println(userKey2 + " -> key group " + keyGroup2 + " -> instance " + instance2);
261
System.out.println(userKey3 + " -> key group " + keyGroup3 + " -> instance " + instance3);
262
263
// Example 3: Task assignment mapping
264
List<Integer> taskIds = Arrays.asList(100, 101, 102, 103);
265
Map<Integer, List<Integer>> keyGroupToTasks = KeyGroupAssignment.computeKeyGroupToTask(maxParallelism, taskIds);
266
267
// Show which tasks handle each key group
268
for (Map.Entry<Integer, List<Integer>> entry : keyGroupToTasks.entrySet()) {
269
System.out.println("Key group " + entry.getKey() + " -> tasks " + entry.getValue());
270
}
271
```
272
273
### Complete Configuration Example
274
275
```java
276
/**
277
* Complete example showing configuration setup for distributed state processing
278
*/
279
public class DistributedStateConfiguration {
280
281
public static void setupDistributedStateProcessing() {
282
// 1. Create configuration
283
Map<String, String> config = new HashMap<>();
284
config.put(ConfigKey.STATE_BACKEND_TYPE, "MEMORY");
285
config.put(ConfigKey.STATE_STRATEGY_MODE, "DUAL_VERSION");
286
config.put(ConfigKey.JOB_MAX_PARALLEL, "4");
287
config.put(ConfigKey.NUMBER_PER_CHECKPOINT, "1000");
288
289
// 2. Setup parallelism parameters
290
int maxParallelism = ConfigHelper.getIntegerOrDefault(config, ConfigKey.JOB_MAX_PARALLEL, 1);
291
int currentParallelism = 4;
292
293
// 3. Create state backends for each parallel instance
294
AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);
295
296
KeyStateBackend[] backends = new KeyStateBackend[currentParallelism];
297
for (int i = 0; i < currentParallelism; i++) {
298
KeyGroup keyGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, currentParallelism, i);
299
backends[i] = new KeyStateBackend(maxParallelism, keyGroup, stateBackend);
300
301
System.out.println("Backend " + i + " handles key groups " +
302
keyGroup.getStartIndex() + "-" + keyGroup.getEndIndex());
303
}
304
305
// 4. Route keys to appropriate backend
306
String[] testKeys = {"user1", "user2", "user3", "user4", "user5"};
307
308
for (String key : testKeys) {
309
int keyGroupIndex = KeyGroupAssignment.assignKeyGroupIndexForKey(key, maxParallelism);
310
int backendIndex = findBackendForKeyGroup(keyGroupIndex, backends);
311
312
System.out.println("Key '" + key + "' -> key group " + keyGroupIndex +
313
" -> backend " + backendIndex);
314
315
// Use the appropriate backend for this key
316
KeyStateBackend targetBackend = backends[backendIndex];
317
targetBackend.setCurrentKey(key);
318
319
// Now you can use states with this backend
320
ValueStateDescriptor<String> desc = ValueStateDescriptor.build("value-state", String.class, "");
321
ValueState<String> state = targetBackend.getValueState(desc);
322
state.update("value-for-" + key);
323
}
324
}
325
326
private static int findBackendForKeyGroup(int keyGroupIndex, KeyStateBackend[] backends) {
327
for (int i = 0; i < backends.length; i++) {
328
KeyGroup keyGroup = backends[i].getKeyGroup();
329
if (keyGroupIndex >= keyGroup.getStartIndex() && keyGroupIndex <= keyGroup.getEndIndex()) {
330
return i;
331
}
332
}
333
throw new IllegalArgumentException("No backend found for key group " + keyGroupIndex);
334
}
335
336
// Utility method for dynamic scaling
337
public static void redistributeKeyGroups(int oldParallelism, int newParallelism, int maxParallelism) {
338
System.out.println("Redistributing key groups from " + oldParallelism + " to " + newParallelism + " instances:");
339
340
// Show old distribution
341
System.out.println("Old distribution:");
342
for (int i = 0; i < oldParallelism; i++) {
343
KeyGroup oldGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, oldParallelism, i);
344
System.out.println(" Instance " + i + ": " + oldGroup.getStartIndex() + "-" + oldGroup.getEndIndex());
345
}
346
347
// Show new distribution
348
System.out.println("New distribution:");
349
for (int i = 0; i < newParallelism; i++) {
350
KeyGroup newGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, newParallelism, i);
351
System.out.println(" Instance " + i + ": " + newGroup.getStartIndex() + "-" + newGroup.getEndIndex());
352
}
353
}
354
}
355
```
356
357
### Configuration Best Practices
358
359
```java
360
/**
361
* Best practices for configuration management
362
*/
363
public class ConfigurationBestPractices {
364
365
// 1. Use configuration builder pattern
366
public static class StateConfigBuilder {
367
private final Map<String, String> config = new HashMap<>();
368
369
public StateConfigBuilder backendType(String type) {
370
config.put(ConfigKey.STATE_BACKEND_TYPE, type);
371
return this;
372
}
373
374
public StateConfigBuilder strategy(String strategy) {
375
config.put(ConfigKey.STATE_STRATEGY_MODE, strategy);
376
return this;
377
}
378
379
public StateConfigBuilder maxParallelism(int parallelism) {
380
config.put(ConfigKey.JOB_MAX_PARALLEL, String.valueOf(parallelism));
381
return this;
382
}
383
384
public StateConfigBuilder numberPerCheckpoint(int number) {
385
config.put(ConfigKey.NUMBER_PER_CHECKPOINT, String.valueOf(number));
386
return this;
387
}
388
389
public Map<String, String> build() {
390
return new HashMap<>(config);
391
}
392
}
393
394
// 2. Environment-specific configuration
395
public static Map<String, String> createEnvironmentConfig(String environment) {
396
StateConfigBuilder builder = new StateConfigBuilder()
397
.backendType("MEMORY")
398
.strategy("DUAL_VERSION");
399
400
switch (environment.toLowerCase()) {
401
case "development":
402
return builder
403
.maxParallelism(2)
404
.numberPerCheckpoint(100)
405
.build();
406
407
case "testing":
408
return builder
409
.maxParallelism(4)
410
.numberPerCheckpoint(500)
411
.build();
412
413
case "production":
414
return builder
415
.maxParallelism(16)
416
.numberPerCheckpoint(1000)
417
.build();
418
419
default:
420
throw new IllegalArgumentException("Unknown environment: " + environment);
421
}
422
}
423
424
// 3. Configuration validation
425
public static void validateConfiguration(Map<String, String> config) {
426
// Validate required keys
427
String backendType = config.get(ConfigKey.STATE_BACKEND_TYPE);
428
if (backendType == null || backendType.trim().isEmpty()) {
429
throw new IllegalArgumentException("Backend type must be specified");
430
}
431
432
// Validate numeric values
433
String maxParallelStr = config.get(ConfigKey.JOB_MAX_PARALLEL);
434
if (maxParallelStr != null) {
435
try {
436
int maxParallel = Integer.parseInt(maxParallelStr);
437
if (maxParallel <= 0) {
438
throw new IllegalArgumentException("Max parallelism must be positive");
439
}
440
} catch (NumberFormatException e) {
441
throw new IllegalArgumentException("Max parallelism must be a valid integer");
442
}
443
}
444
445
System.out.println("Configuration validation passed");
446
}
447
}
448
```