or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

backend-management.mdconfiguration-key-groups.mdindex.mdkey-state-management.mdserialization-framework.mdstate-types-operations.mdtransaction-management.md

configuration-key-groups.mddocs/

0

# Configuration and Key Groups

1

2

Configuration system and key-group assignment algorithms enabling distributed state partitioning, parallel processing optimization, and configurable backend selection with built-in helper utilities.

3

4

## Capabilities

5

6

### Configuration Keys

7

8

Central configuration constants and helper methods for state backend configuration and parameter management.

9

10

```java { .api }

11

/**

12

* Configuration keys and helper methods for state management

13

*/

14

public class ConfigKey {

15

/** Backend type configuration key */

16

public static final String STATE_BACKEND_TYPE = "state.backend.type";

17

18

/** Table name configuration key */

19

public static final String STATE_TABLE_NAME = "state.table.name";

20

21

/** Strategy mode configuration key */

22

public static final String STATE_STRATEGY_MODE = "state.strategy.mode";

23

24

/** Number per checkpoint configuration key */

25

public static final String NUMBER_PER_CHECKPOINT = "number.per.checkpoint";

26

27

/** Maximum parallelism configuration key */

28

public static final String JOB_MAX_PARALLEL = "job.max.parallel";

29

30

/** String delimiter for internal use */

31

public static final String DELIMITER = "delimiter.string";

32

33

/**

34

* Get state strategy from configuration map

35

* @param config Configuration map

36

* @return State strategy string value

37

*/

38

public static String getStateStrategyEnum(Map<String, String> config);

39

40

/**

41

* Get backend type from configuration map

42

* @param config Configuration map

43

* @return Backend type string value

44

*/

45

public static String getBackendType(Map<String, String> config);

46

47

/**

48

* Get number per checkpoint from configuration map

49

* @param config Configuration map

50

* @return Number per checkpoint value

51

*/

52

public static int getNumberPerCheckpoint(Map<String, String> config);

53

54

/**

55

* Get state table name from configuration map

56

* @param config Configuration map

57

* @return State table name

58

*/

59

public static String getStateTableName(Map<String, String> config);

60

}

61

```

62

63

**Usage Examples:**

64

65

```java

66

import io.ray.streaming.state.config.ConfigKey;

67

import java.util.Map;

68

import java.util.HashMap;

69

70

// Create configuration map

71

Map<String, String> config = new HashMap<>();

72

config.put(ConfigKey.STATE_BACKEND_TYPE, "MEMORY");

73

config.put(ConfigKey.STATE_STRATEGY_MODE, "DUAL_VERSION");

74

config.put(ConfigKey.STATE_TABLE_NAME, "user-state-table");

75

config.put(ConfigKey.NUMBER_PER_CHECKPOINT, "1000");

76

config.put(ConfigKey.JOB_MAX_PARALLEL, "16");

77

78

// Retrieve configuration values

79

String backendType = ConfigKey.getBackendType(config); // "MEMORY"

80

String strategy = ConfigKey.getStateStrategyEnum(config); // "DUAL_VERSION"

81

String tableName = ConfigKey.getStateTableName(config); // "user-state-table"

82

int numberPerCheckpoint = ConfigKey.getNumberPerCheckpoint(config); // 1000

83

84

// Use with state backend builder

85

AbstractStateBackend backend = StateBackendBuilder.buildStateBackend(config);

86

```

87

88

### Configuration Helper Utilities

89

90

Helper methods for processing configuration maps with type conversion and default value support.

91

92

```java { .api }

93

/**

94

* Helper methods for configuration processing

95

*/

96

public class ConfigHelper {

97

/**

98

* Get integer value from configuration with default fallback

99

* @param config Configuration map

100

* @param configKey Configuration key to look up

101

* @param defaultValue Default value if key not found or invalid

102

* @return Integer value or default

103

*/

104

public static int getIntegerOrDefault(Map config, String configKey, int defaultValue);

105

106

/**

107

* Get string value from configuration with default fallback

108

* @param config Configuration map

109

* @param configKey Configuration key to look up

110

* @param defaultValue Default value if key not found

111

* @return String value or default

112

*/

113

public static String getStringOrDefault(Map config, String configKey, String defaultValue);

114

}

115

```

116

117

**Usage Examples:**

118

119

```java

120

import io.ray.streaming.state.config.ConfigHelper;

121

122

// Safe configuration retrieval with defaults

123

Map<String, String> config = getConfigurationFromSource();

124

125

int parallelism = ConfigHelper.getIntegerOrDefault(config, "parallelism", 4);

126

String environment = ConfigHelper.getStringOrDefault(config, "environment", "production");

127

int bufferSize = ConfigHelper.getIntegerOrDefault(config, "buffer.size", 8192);

128

129

// Handle missing or malformed configuration gracefully

130

int timeout = ConfigHelper.getIntegerOrDefault(config, "timeout", 5000); // Will use 5000 if "timeout" is missing or not a valid integer

131

```

132

133

### Key Group Management

134

135

Key group system providing scalable partitioning for distributed state processing across multiple parallel instances.

136

137

#### Key Group Class

138

139

```java { .api }

140

/**

141

* Defines key-groups for partitioned state processing

142

*/

143

public class KeyGroup {

144

/**

145

* Create key group with inclusive range

146

* @param startIndex Start index (inclusive)

147

* @param endIndex End index (inclusive)

148

*/

149

public KeyGroup(int startIndex, int endIndex);

150

151

/**

152

* Get number of key-groups in this range

153

* @return Number of key-groups

154

*/

155

public int size();

156

157

/**

158

* Get start index of range

159

* @return Start index (inclusive)

160

*/

161

public int getStartIndex();

162

163

/**

164

* Get end index of range

165

* @return End index (inclusive)

166

*/

167

public int getEndIndex();

168

}

169

```

170

171

**Usage Examples:**

172

173

```java

174

// Create key groups for different parallel instances

175

KeyGroup keyGroup1 = new KeyGroup(0, 31); // Handles key groups 0-31 (32 groups)

176

KeyGroup keyGroup2 = new KeyGroup(32, 63); // Handles key groups 32-63 (32 groups)

177

KeyGroup keyGroup3 = new KeyGroup(64, 95); // Handles key groups 64-95 (32 groups)

178

KeyGroup keyGroup4 = new KeyGroup(96, 127); // Handles key groups 96-127 (32 groups)

179

180

// Check key group properties

181

int size1 = keyGroup1.size(); // 32

182

int start1 = keyGroup1.getStartIndex(); // 0

183

int end1 = keyGroup1.getEndIndex(); // 31

184

185

// Use with key state backend

186

KeyStateBackend backend1 = new KeyStateBackend(128, keyGroup1, stateBackend);

187

KeyStateBackend backend2 = new KeyStateBackend(128, keyGroup2, stateBackend);

188

```

189

190

#### Key Group Assignment Algorithms

191

192

```java { .api }

193

/**

194

* Key-group assignment algorithms for distributed processing

195

*/

196

public class KeyGroupAssignment {

197

/**

198

* Compute key-group range for specific operator instance

199

* @param maxParallelism Maximum parallelism (total key groups)

200

* @param parallelism Current parallelism level

201

* @param index Operator instance index (0-based)

202

* @return KeyGroup representing assigned range

203

*/

204

public static KeyGroup getKeyGroup(int maxParallelism, int parallelism, int index);

205

206

/**

207

* Assign key to specific key-group index using hash function

208

* @param key Key object to assign

209

* @param maxParallelism Maximum parallelism (total key groups)

210

* @return Key-group index for the key

211

*/

212

public static int assignKeyGroupIndexForKey(Object key, int maxParallelism);

213

214

/**

215

* Compute mapping from key-groups to task instances

216

* @param maxParallelism Maximum parallelism (total key groups)

217

* @param targetTasks List of target task IDs

218

* @return Map from key-group index to list of task IDs

219

*/

220

public static Map<Integer, List<Integer>> computeKeyGroupToTask(int maxParallelism, List<Integer> targetTasks);

221

}

222

```

223

224

**Advanced Usage Examples:**

225

226

```java

227

import io.ray.streaming.state.keystate.KeyGroupAssignment;

228

import java.util.List;

229

import java.util.Arrays;

230

import java.util.Map;

231

232

// Example 1: Distribute key groups across parallel instances

233

int maxParallelism = 128; // Total key groups

234

int parallelism = 4; // Number of parallel instances

235

236

// Calculate key group ranges for each instance

237

KeyGroup range0 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 0); // 0-31

238

KeyGroup range1 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 1); // 32-63

239

KeyGroup range2 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 2); // 64-95

240

KeyGroup range3 = KeyGroupAssignment.getKeyGroup(maxParallelism, parallelism, 3); // 96-127

241

242

System.out.println("Instance 0 handles key groups: " + range0.getStartIndex() + "-" + range0.getEndIndex());

243

System.out.println("Instance 1 handles key groups: " + range1.getStartIndex() + "-" + range1.getEndIndex());

244

245

// Example 2: Determine which instance should handle a specific key

246

String userKey1 = "user123";

247

String userKey2 = "user456";

248

String userKey3 = "user789";

249

250

int keyGroup1 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey1, maxParallelism); // e.g., 45

251

int keyGroup2 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey2, maxParallelism); // e.g., 12

252

int keyGroup3 = KeyGroupAssignment.assignKeyGroupIndexForKey(userKey3, maxParallelism); // e.g., 89

253

254

// Determine which instance handles each key

255

int instance1 = keyGroup1 / (maxParallelism / parallelism); // 45 / 32 = 1

256

int instance2 = keyGroup2 / (maxParallelism / parallelism); // 12 / 32 = 0

257

int instance3 = keyGroup3 / (maxParallelism / parallelism); // 89 / 32 = 2

258

259

System.out.println(userKey1 + " -> key group " + keyGroup1 + " -> instance " + instance1);

260

System.out.println(userKey2 + " -> key group " + keyGroup2 + " -> instance " + instance2);

261

System.out.println(userKey3 + " -> key group " + keyGroup3 + " -> instance " + instance3);

262

263

// Example 3: Task assignment mapping

264

List<Integer> taskIds = Arrays.asList(100, 101, 102, 103);

265

Map<Integer, List<Integer>> keyGroupToTasks = KeyGroupAssignment.computeKeyGroupToTask(maxParallelism, taskIds);

266

267

// Show which tasks handle each key group

268

for (Map.Entry<Integer, List<Integer>> entry : keyGroupToTasks.entrySet()) {

269

System.out.println("Key group " + entry.getKey() + " -> tasks " + entry.getValue());

270

}

271

```

272

273

### Complete Configuration Example

274

275

```java

276

/**

277

* Complete example showing configuration setup for distributed state processing

278

*/

279

public class DistributedStateConfiguration {

280

281

public static void setupDistributedStateProcessing() {

282

// 1. Create configuration

283

Map<String, String> config = new HashMap<>();

284

config.put(ConfigKey.STATE_BACKEND_TYPE, "MEMORY");

285

config.put(ConfigKey.STATE_STRATEGY_MODE, "DUAL_VERSION");

286

config.put(ConfigKey.JOB_MAX_PARALLEL, "4");

287

config.put(ConfigKey.NUMBER_PER_CHECKPOINT, "1000");

288

289

// 2. Setup parallelism parameters

290

int maxParallelism = ConfigHelper.getIntegerOrDefault(config, ConfigKey.JOB_MAX_PARALLEL, 1);

291

int currentParallelism = 4;

292

293

// 3. Create state backends for each parallel instance

294

AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);

295

296

KeyStateBackend[] backends = new KeyStateBackend[currentParallelism];

297

for (int i = 0; i < currentParallelism; i++) {

298

KeyGroup keyGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, currentParallelism, i);

299

backends[i] = new KeyStateBackend(maxParallelism, keyGroup, stateBackend);

300

301

System.out.println("Backend " + i + " handles key groups " +

302

keyGroup.getStartIndex() + "-" + keyGroup.getEndIndex());

303

}

304

305

// 4. Route keys to appropriate backend

306

String[] testKeys = {"user1", "user2", "user3", "user4", "user5"};

307

308

for (String key : testKeys) {

309

int keyGroupIndex = KeyGroupAssignment.assignKeyGroupIndexForKey(key, maxParallelism);

310

int backendIndex = findBackendForKeyGroup(keyGroupIndex, backends);

311

312

System.out.println("Key '" + key + "' -> key group " + keyGroupIndex +

313

" -> backend " + backendIndex);

314

315

// Use the appropriate backend for this key

316

KeyStateBackend targetBackend = backends[backendIndex];

317

targetBackend.setCurrentKey(key);

318

319

// Now you can use states with this backend

320

ValueStateDescriptor<String> desc = ValueStateDescriptor.build("value-state", String.class, "");

321

ValueState<String> state = targetBackend.getValueState(desc);

322

state.update("value-for-" + key);

323

}

324

}

325

326

private static int findBackendForKeyGroup(int keyGroupIndex, KeyStateBackend[] backends) {

327

for (int i = 0; i < backends.length; i++) {

328

KeyGroup keyGroup = backends[i].getKeyGroup();

329

if (keyGroupIndex >= keyGroup.getStartIndex() && keyGroupIndex <= keyGroup.getEndIndex()) {

330

return i;

331

}

332

}

333

throw new IllegalArgumentException("No backend found for key group " + keyGroupIndex);

334

}

335

336

// Utility method for dynamic scaling

337

public static void redistributeKeyGroups(int oldParallelism, int newParallelism, int maxParallelism) {

338

System.out.println("Redistributing key groups from " + oldParallelism + " to " + newParallelism + " instances:");

339

340

// Show old distribution

341

System.out.println("Old distribution:");

342

for (int i = 0; i < oldParallelism; i++) {

343

KeyGroup oldGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, oldParallelism, i);

344

System.out.println(" Instance " + i + ": " + oldGroup.getStartIndex() + "-" + oldGroup.getEndIndex());

345

}

346

347

// Show new distribution

348

System.out.println("New distribution:");

349

for (int i = 0; i < newParallelism; i++) {

350

KeyGroup newGroup = KeyGroupAssignment.getKeyGroup(maxParallelism, newParallelism, i);

351

System.out.println(" Instance " + i + ": " + newGroup.getStartIndex() + "-" + newGroup.getEndIndex());

352

}

353

}

354

}

355

```

356

357

### Configuration Best Practices

358

359

```java

360

/**

361

* Best practices for configuration management

362

*/

363

public class ConfigurationBestPractices {

364

365

// 1. Use configuration builder pattern

366

public static class StateConfigBuilder {

367

private final Map<String, String> config = new HashMap<>();

368

369

public StateConfigBuilder backendType(String type) {

370

config.put(ConfigKey.STATE_BACKEND_TYPE, type);

371

return this;

372

}

373

374

public StateConfigBuilder strategy(String strategy) {

375

config.put(ConfigKey.STATE_STRATEGY_MODE, strategy);

376

return this;

377

}

378

379

public StateConfigBuilder maxParallelism(int parallelism) {

380

config.put(ConfigKey.JOB_MAX_PARALLEL, String.valueOf(parallelism));

381

return this;

382

}

383

384

public StateConfigBuilder numberPerCheckpoint(int number) {

385

config.put(ConfigKey.NUMBER_PER_CHECKPOINT, String.valueOf(number));

386

return this;

387

}

388

389

public Map<String, String> build() {

390

return new HashMap<>(config);

391

}

392

}

393

394

// 2. Environment-specific configuration

395

public static Map<String, String> createEnvironmentConfig(String environment) {

396

StateConfigBuilder builder = new StateConfigBuilder()

397

.backendType("MEMORY")

398

.strategy("DUAL_VERSION");

399

400

switch (environment.toLowerCase()) {

401

case "development":

402

return builder

403

.maxParallelism(2)

404

.numberPerCheckpoint(100)

405

.build();

406

407

case "testing":

408

return builder

409

.maxParallelism(4)

410

.numberPerCheckpoint(500)

411

.build();

412

413

case "production":

414

return builder

415

.maxParallelism(16)

416

.numberPerCheckpoint(1000)

417

.build();

418

419

default:

420

throw new IllegalArgumentException("Unknown environment: " + environment);

421

}

422

}

423

424

// 3. Configuration validation

425

public static void validateConfiguration(Map<String, String> config) {

426

// Validate required keys

427

String backendType = config.get(ConfigKey.STATE_BACKEND_TYPE);

428

if (backendType == null || backendType.trim().isEmpty()) {

429

throw new IllegalArgumentException("Backend type must be specified");

430

}

431

432

// Validate numeric values

433

String maxParallelStr = config.get(ConfigKey.JOB_MAX_PARALLEL);

434

if (maxParallelStr != null) {

435

try {

436

int maxParallel = Integer.parseInt(maxParallelStr);

437

if (maxParallel <= 0) {

438

throw new IllegalArgumentException("Max parallelism must be positive");

439

}

440

} catch (NumberFormatException e) {

441

throw new IllegalArgumentException("Max parallelism must be a valid integer");

442

}

443

}

444

445

System.out.println("Configuration validation passed");

446

}

447

}

448

```