or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

backend-management.mdconfiguration-key-groups.mdindex.mdkey-state-management.mdserialization-framework.mdstate-types-operations.mdtransaction-management.md

key-state-management.mddocs/

0

# Key State Management

1

2

Key-based state management system providing ValueState, ListState, and MapState abstractions with key-group partitioning, transaction support, and fault-tolerant operations for distributed streaming applications.

3

4

## Capabilities

5

6

### Key State Backend

7

8

Main state backend implementation for managing different types of keyed states with key-group partitioning and transaction support. Note: This class is not thread-safe.

9

10

```java { .api }

11

/**

12

* Key state backend manager for different types of states (not thread-safe)

13

*/

14

public class KeyStateBackend extends AbstractKeyStateBackend {

15

/**

16

* Create key state backend with partitioning configuration

17

* @param numberOfKeyGroups Total number of key groups for partitioning

18

* @param keyGroup Key group range assigned to this backend instance

19

* @param abstractStateBackend Underlying state backend for storage

20

*/

21

public KeyStateBackend(int numberOfKeyGroups, KeyGroup keyGroup, AbstractStateBackend abstractStateBackend);

22

23

/**

24

* Get or create value state instance

25

* @param stateDescriptor Value state descriptor defining the state

26

* @return ValueState instance for the specified descriptor

27

*/

28

public <T> ValueState<T> getValueState(ValueStateDescriptor<T> stateDescriptor);

29

30

/**

31

* Get or create list state instance

32

* @param stateDescriptor List state descriptor defining the state

33

* @return ListState instance for the specified descriptor

34

*/

35

public <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);

36

37

/**

38

* Get or create map state instance

39

* @param stateDescriptor Map state descriptor defining the state

40

* @return MapState instance for the specified descriptor

41

*/

42

public <S, T> MapState<S, T> getMapState(MapStateDescriptor<S, T> stateDescriptor);

43

44

/**

45

* Set current processing key for state operations

46

* @param currentKey Current key for state access

47

*/

48

public void setCurrentKey(Object currentKey);

49

50

/**

51

* Get number of key groups for partitioning

52

* @return Total number of key groups

53

*/

54

public int getNumberOfKeyGroups();

55

56

/**

57

* Get assigned key group range

58

* @return KeyGroup representing the assigned range

59

*/

60

public KeyGroup getKeyGroup();

61

62

/**

63

* Close backend and clean up resources

64

*/

65

public void close();

66

}

67

```

68

69

**Usage Examples:**

70

71

```java

72

import io.ray.streaming.state.backend.*;

73

import io.ray.streaming.state.keystate.*;

74

import io.ray.streaming.state.keystate.desc.*;

75

import io.ray.streaming.state.keystate.state.*;

76

77

// Create key state backend

78

Map<String, String> config = new HashMap<>();

79

config.put("state.backend.type", "MEMORY");

80

AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);

81

82

int numberOfKeyGroups = 128;

83

KeyGroup keyGroup = new KeyGroup(0, 63); // Handle key groups 0-63

84

KeyStateBackend keyStateBackend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);

85

86

// Create and use different state types

87

ValueStateDescriptor<String> valueDesc = ValueStateDescriptor.build("user-session", String.class, "");

88

ValueState<String> valueState = keyStateBackend.getValueState(valueDesc);

89

90

ListStateDescriptor<String> listDesc = ListStateDescriptor.build("user-events", String.class);

91

ListState<String> listState = keyStateBackend.getListState(listDesc);

92

93

MapStateDescriptor<String, Integer> mapDesc = MapStateDescriptor.build("user-counters", String.class, Integer.class);

94

MapState<String, Integer> mapState = keyStateBackend.getMapState(mapDesc);

95

96

// Set current key and use states

97

keyStateBackend.setCurrentKey("user123");

98

valueState.update("session-abc");

99

listState.add("login-event");

100

mapState.put("clicks", 5);

101

102

// Close when done

103

keyStateBackend.close();

104

```

105

106

### Abstract Key State Backend

107

108

Base class providing transaction support and common state management functionality for key-based state backends. Note: This class is not thread-safe.

109

110

```java { .api }

111

/**

112

* Base class providing transaction support and state management (not thread-safe)

113

*/

114

public abstract class AbstractKeyStateBackend {

115

/**

116

* Create abstract key state backend

117

* @param backend Underlying state backend

118

*/

119

public AbstractKeyStateBackend(AbstractStateBackend backend);

120

121

/**

122

* Put value into state with descriptor and key

123

* @param descriptor State descriptor

124

* @param key State key

125

* @param value Value to store

126

*/

127

public <K, T> void put(AbstractStateDescriptor descriptor, K key, T value);

128

129

/**

130

* Get value from state with descriptor and key

131

* @param descriptor State descriptor

132

* @param key State key

133

* @return Retrieved value

134

*/

135

public <K, T> T get(AbstractStateDescriptor descriptor, K key);

136

137

/**

138

* Finish checkpoint phase - complete batch data saving and serialization

139

* @param checkpointId Checkpoint identifier

140

*/

141

public void finish(long checkpointId);

142

143

/**

144

* Commit checkpoint phase - persist data (can be async)

145

* @param checkpointId Checkpoint identifier

146

*/

147

public void commit(long checkpointId);

148

149

/**

150

* Acknowledge commit phase - clean up after commit

151

* @param checkpointId Checkpoint identifier

152

* @param timeStamp Timestamp of acknowledgment

153

*/

154

public void ackCommit(long checkpointId, long timeStamp);

155

156

/**

157

* Rollback checkpoint phase - recover from checkpoint

158

* @param checkpointId Checkpoint identifier

159

*/

160

public void rollBack(long checkpointId);

161

162

/**

163

* Get current processing key

164

* @return Current key object

165

*/

166

public Object getCurrentKey();

167

168

/**

169

* Set current processing key (abstract method)

170

* @param currentKey Current key to set

171

*/

172

public abstract void setCurrentKey(Object currentKey);

173

174

/**

175

* Get current checkpoint ID

176

* @return Current checkpoint ID

177

*/

178

public long getCheckpointId();

179

180

/**

181

* Set checkpoint ID

182

* @param checkpointId Checkpoint ID to set

183

*/

184

public void setCheckpointId(long checkpointId);

185

186

/**

187

* Set processing context with checkpoint and key

188

* @param checkpointId Checkpoint identifier

189

* @param currentKey Current processing key

190

*/

191

public void setContext(long checkpointId, Object currentKey);

192

193

/**

194

* Get key group index for current key

195

* @return Key group index

196

*/

197

public int getKeyGroupIndex();

198

}

199

```

200

201

### State Descriptors

202

203

State descriptors define the metadata and configuration for different types of states, providing type safety and unique identification.

204

205

#### Abstract State Descriptor

206

207

```java { .api }

208

/**

209

* Base class for all state descriptors

210

*/

211

public abstract class AbstractStateDescriptor<S, T> {

212

/**

213

* Create state descriptor with name and type

214

* @param name Descriptor name

215

* @param type Value type class

216

*/

217

public AbstractStateDescriptor(String name, Class<T> type);

218

219

/**

220

* Get descriptor name

221

* @return Descriptor name

222

*/

223

public String getName();

224

225

/**

226

* Get value type class

227

* @return Type class

228

*/

229

public Class<T> getType();

230

231

/**

232

* Get table name for storage

233

* @return Table name

234

*/

235

public String getTableName();

236

237

/**

238

* Set table name for storage

239

* @param tableName Table name

240

*/

241

public void setTableName(String tableName);

242

243

/**

244

* Get unique identifier for this descriptor

245

* @return Unique identifier string

246

*/

247

public String getIdentify();

248

249

/**

250

* Get state type enumeration

251

* @return StateType enum value

252

*/

253

public abstract StateType getStateType();

254

}

255

256

/**

257

* State type enumeration

258

*/

259

public enum StateType {

260

/** Value state type */

261

VALUE,

262

/** List state type */

263

LIST,

264

/** Map state type */

265

MAP

266

}

267

```

268

269

#### Value State Descriptor

270

271

```java { .api }

272

/**

273

* Descriptor for value state configuration

274

*/

275

public class ValueStateDescriptor<T> extends AbstractStateDescriptor<ValueState<T>, T> {

276

/**

277

* Create value state descriptor

278

* @param name State name

279

* @param type Value type class

280

* @param defaultValue Default value when state is empty

281

*/

282

public ValueStateDescriptor(String name, Class<T> type, T defaultValue);

283

284

/**

285

* Factory method for creating value state descriptor

286

* @param name State name

287

* @param type Value type class

288

* @param defaultValue Default value

289

* @return ValueStateDescriptor instance

290

*/

291

public static <T> ValueStateDescriptor<T> build(String name, Class<T> type, T defaultValue);

292

293

/**

294

* Get default value

295

* @return Default value

296

*/

297

public T getDefaultValue();

298

299

/**

300

* Get state type

301

* @return StateType.VALUE

302

*/

303

public StateType getStateType();

304

}

305

```

306

307

#### List State Descriptor

308

309

```java { .api }

310

/**

311

* Descriptor for list state configuration

312

*/

313

public class ListStateDescriptor<T> extends AbstractStateDescriptor<ListState<T>, T> {

314

/**

315

* Factory method for creating list state descriptor

316

* @param name State name

317

* @param type Element type class

318

* @return ListStateDescriptor instance

319

*/

320

public static <T> ListStateDescriptor<T> build(String name, Class<T> type);

321

322

/**

323

* Factory method for creating list state descriptor with operator flag

324

* @param name State name

325

* @param type Element type class

326

* @param isOperatorList Whether this is an operator-level list state

327

* @return ListStateDescriptor instance

328

*/

329

public static <T> ListStateDescriptor<T> build(String name, Class<T> type, boolean isOperatorList);

330

331

/**

332

* Check if this is an operator list

333

* @return True if operator list

334

*/

335

public boolean isOperatorList();

336

337

/**

338

* Get partition index

339

* @return Partition index

340

*/

341

public int getIndex();

342

343

/**

344

* Set partition index

345

* @param index Partition index

346

*/

347

public void setIndex(int index);

348

349

/**

350

* Get partition number

351

* @return Partition number

352

*/

353

public int getPartitionNumber();

354

355

/**

356

* Set partition number

357

* @param number Partition number

358

*/

359

public void setPartitionNumber(int number);

360

361

/**

362

* Get state type

363

* @return StateType.LIST

364

*/

365

public StateType getStateType();

366

}

367

```

368

369

#### Map State Descriptor

370

371

```java { .api }

372

/**

373

* Descriptor for map state configuration

374

*/

375

public class MapStateDescriptor<K, V> extends AbstractStateDescriptor<MapState<K, V>, V> {

376

/**

377

* Create map state descriptor

378

* @param name State name

379

* @param keyType Key type class

380

* @param valueType Value type class

381

*/

382

public MapStateDescriptor(String name, Class<K> keyType, Class<V> valueType);

383

384

/**

385

* Factory method for creating map state descriptor

386

* @param name State name

387

* @param keyType Key type class

388

* @param valueType Value type class

389

* @return MapStateDescriptor instance

390

*/

391

public static <K, V> MapStateDescriptor<K, V> build(String name, Class<K> keyType, Class<V> valueType);

392

393

/**

394

* Get state type

395

* @return StateType.MAP

396

*/

397

public StateType getStateType();

398

}

399

```

400

401

**Usage Examples:**

402

403

```java

404

// Create descriptors for different state types

405

ValueStateDescriptor<String> userNameDesc = ValueStateDescriptor.build("user-name", String.class, "anonymous");

406

407

ListStateDescriptor<String> eventListDesc = ListStateDescriptor.build("events", String.class);

408

ListStateDescriptor<String> operatorListDesc = ListStateDescriptor.build("operator-events", String.class, true);

409

410

MapStateDescriptor<String, Integer> counterMapDesc = MapStateDescriptor.build("counters", String.class, Integer.class);

411

412

// Use descriptors with key state backend

413

ValueState<String> userNameState = keyStateBackend.getValueState(userNameDesc);

414

ListState<String> eventListState = keyStateBackend.getListState(eventListDesc);

415

MapState<String, Integer> counterMapState = keyStateBackend.getMapState(counterMapDesc);

416

```