or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-io-ray--streaming-state

State management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/io.ray/streaming-state@1.10.x

To install, run

npx @tessl/cli install tessl/maven-io-ray--streaming-state@1.10.0

0

# Ray Streaming State

1

2

Ray Streaming State is a comprehensive state management library for Ray Streaming framework that provides transactional state storage with checkpoint and rollback capabilities for streaming data processing applications. It enables streaming applications to maintain persistent, transactional state across distributed processing nodes with fault-tolerant recovery mechanisms.

3

4

## Package Information

5

6

- **Package Name**: streaming-state

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add to your Maven pom.xml:

10

11

```xml

12

<dependency>

13

<groupId>io.ray</groupId>

14

<artifactId>streaming-state</artifactId>

15

<version>1.10.0</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

import io.ray.streaming.state.backend.StateBackendBuilder;

23

import io.ray.streaming.state.backend.KeyStateBackend;

24

import io.ray.streaming.state.keystate.desc.ValueStateDescriptor;

25

import io.ray.streaming.state.keystate.desc.ListStateDescriptor;

26

import io.ray.streaming.state.keystate.desc.MapStateDescriptor;

27

import io.ray.streaming.state.keystate.state.ValueState;

28

import io.ray.streaming.state.keystate.state.ListState;

29

import io.ray.streaming.state.keystate.state.MapState;

30

```

31

32

## Basic Usage

33

34

```java

35

import io.ray.streaming.state.backend.*;

36

import io.ray.streaming.state.keystate.desc.*;

37

import io.ray.streaming.state.keystate.state.*;

38

import io.ray.streaming.state.keystate.KeyGroup;

39

import java.util.Map;

40

import java.util.HashMap;

41

42

// Create state backend

43

Map<String, String> config = new HashMap<>();

44

config.put("state.backend.type", "MEMORY");

45

AbstractStateBackend stateBackend = StateBackendBuilder.buildStateBackend(config);

46

47

// Create key state backend

48

int numberOfKeyGroups = 128;

49

KeyGroup keyGroup = new KeyGroup(0, 63);

50

KeyStateBackend keyStateBackend = new KeyStateBackend(numberOfKeyGroups, keyGroup, stateBackend);

51

52

// Create value state

53

ValueStateDescriptor<String> valueDesc = ValueStateDescriptor.build("user-name", String.class, "unknown");

54

ValueState<String> valueState = keyStateBackend.getValueState(valueDesc);

55

56

// Use state with transactional operations

57

keyStateBackend.setCurrentKey("user123");

58

valueState.update("Alice");

59

String name = valueState.get(); // "Alice"

60

61

// Transaction operations for checkpointing

62

long checkpointId = 1001L;

63

keyStateBackend.finish(checkpointId);

64

keyStateBackend.commit(checkpointId);

65

keyStateBackend.ackCommit(checkpointId, System.currentTimeMillis());

66

```

67

68

## Architecture

69

70

Ray Streaming State is built around several key architectural components:

71

72

- **State Backend System**: Pluggable storage backends (currently Memory-based) with configurable strategies

73

- **Transaction Management**: Four-phase commit protocol (finish, commit, ackCommit, rollback) for fault tolerance

74

- **Key-Group Partitioning**: Scalable key distribution across processing nodes for parallel state management

75

- **State Types**: Three core state abstractions (ValueState, ListState, MapState) supporting different data patterns

76

- **Serialization Framework**: Pluggable serialization system with default FST-based implementation

77

- **Storage Strategies**: DUAL_VERSION (rollback support) and SINGLE_VERSION (MVCC optimization) approaches

78

79

## Capabilities

80

81

### State Backend Management

82

83

Core state backend system providing pluggable storage implementations, configuration management, and factory methods for creating state backends with different strategies and storage types.

84

85

```java { .api }

86

public static AbstractStateBackend buildStateBackend(Map<String, String> config);

87

88

public enum BackendType {

89

MEMORY

90

}

91

92

public enum StateStrategy {

93

DUAL_VERSION,

94

SINGLE_VERSION

95

}

96

```

97

98

[State Backend Management](./backend-management.md)

99

100

### Key State Management

101

102

Key-based state management system providing ValueState, ListState, and MapState abstractions with key-group partitioning, transaction support, and fault-tolerant operations for distributed streaming applications.

103

104

```java { .api }

105

public class KeyStateBackend extends AbstractKeyStateBackend {

106

public <T> ValueState<T> getValueState(ValueStateDescriptor<T> stateDescriptor);

107

public <T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);

108

public <S, T> MapState<S, T> getMapState(MapStateDescriptor<S, T> stateDescriptor);

109

public void setCurrentKey(Object currentKey);

110

}

111

```

112

113

[Key State Management](./key-state-management.md)

114

115

### State Types and Operations

116

117

Three core state abstractions - ValueState for single values, ListState for ordered collections, and MapState for key-value mappings - each providing specialized operations and transaction support for different data access patterns.

118

119

```java { .api }

120

public interface ValueState<T> extends UnaryState<T> {

121

void update(T value);

122

}

123

124

public interface ListState<T> extends UnaryState<List<T>> {

125

void add(T value);

126

void update(List<T> list);

127

}

128

129

public interface MapState<K, V> extends UnaryState<Map<K, V>> {

130

V get(K key);

131

void put(K key, V value);

132

void remove(K key);

133

void putAll(Map<K, V> map);

134

}

135

```

136

137

[State Types and Operations](./state-types-operations.md)

138

139

### Transaction Management

140

141

Comprehensive transaction system implementing four-phase commit protocol with finish, commit, ackCommit, and rollback operations for ensuring data consistency during failures and supporting checkpoint-based recovery.

142

143

```java { .api }

144

public interface StateStoreManager {

145

void finish(long checkpointId);

146

void commit(long checkpointId);

147

void ackCommit(long checkpointId, long timeStamp);

148

void rollBack(long checkpointId);

149

}

150

```

151

152

[Transaction Management](./transaction-management.md)

153

154

### Serialization Framework

155

156

Pluggable serialization system with default FST-based implementations supporting custom serializers for both key-value stores and key-map stores, enabling efficient state persistence and cross-language compatibility.

157

158

```java { .api }

159

public interface KeyValueStoreSerialization<K, V> {

160

byte[] serializeKey(K key);

161

byte[] serializeValue(V value);

162

V deserializeValue(byte[] valueArray);

163

}

164

165

public interface KeyMapStoreSerializer<K, S, T> {

166

byte[] serializeKey(K key);

167

byte[] serializeUKey(S uk);

168

byte[] serializeUValue(T uv);

169

S deserializeUKey(byte[] ukArray);

170

T deserializeUValue(byte[] uvArray);

171

}

172

```

173

174

[Serialization Framework](./serialization-framework.md)

175

176

### Configuration and Key Groups

177

178

Configuration system and key-group assignment algorithms enabling distributed state partitioning, parallel processing optimization, and configurable backend selection with built-in helper utilities.

179

180

```java { .api }

181

public class KeyGroupAssignment {

182

public static KeyGroup getKeyGroup(int maxParallelism, int parallelism, int index);

183

public static int assignKeyGroupIndexForKey(Object key, int maxParallelism);

184

public static Map<Integer, List<Integer>> computeKeyGroupToTask(int maxParallelism, List<Integer> targetTasks);

185

}

186

187

public class ConfigKey {

188

public static final String STATE_BACKEND_TYPE = "state.backend.type";

189

public static final String STATE_STRATEGY_MODE = "state.strategy.mode";

190

public static final String STATE_TABLE_NAME = "state.table.name";

191

}

192

```

193

194

[Configuration and Key Groups](./configuration-key-groups.md)

195

196

## Common Types

197

198

```java { .api }

199

public class StorageRecord<T> {

200

public StorageRecord(long checkpointId, T value);

201

public T getValue();

202

public long getCheckpointId();

203

public void setCheckpointId(long checkpointId);

204

}

205

206

public class PartitionRecord<T> {

207

public PartitionRecord(int partitionID, T value);

208

public T getValue();

209

public int getPartitionID();

210

public void setPartitionID(int partitionID);

211

}

212

213

public class StateException extends RuntimeException {

214

public StateException(Throwable t);

215

public StateException(String msg);

216

}

217

218

public class KeyGroup {

219

public KeyGroup(int startIndex, int endIndex);

220

public int size();

221

public int getStartIndex();

222

public int getEndIndex();

223

}

224

```