or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkeyed-state-management.mdstate-backend-configuration.mdstate-types-operations.md

state-backend-configuration.mddocs/

0

# State Backend Configuration

1

2

Core state backend configuration and initialization functionality for integrating changelog capabilities with existing Flink state backends. This capability provides the main entry point for configuring changelog-based incremental checkpointing.

3

4

## Capabilities

5

6

### ChangelogStateBackend Constructor

7

8

Creates a new changelog state backend that wraps an existing Flink state backend to add changelog functionality.

9

10

```java { .api }

11

/**

12

* Creates a changelog state backend wrapping the specified delegated state backend.

13

* The delegated state backend handles the actual state storage while changelog

14

* functionality is added transparently.

15

*

16

* @param stateBackend The underlying state backend to wrap (cannot be null)

17

* @throws IllegalArgumentException if stateBackend is null or is already a DelegatingStateBackend

18

*/

19

public ChangelogStateBackend(StateBackend stateBackend);

20

```

21

22

**Usage Example:**

23

24

```java

25

import org.apache.flink.state.changelog.ChangelogStateBackend;

26

import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;

27

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

28

29

// Wrap a HashMap state backend

30

StateBackend hashMapBackend = new HashMapStateBackend();

31

ChangelogStateBackend changelogHashMap = new ChangelogStateBackend(hashMapBackend);

32

33

// Wrap a RocksDB state backend

34

StateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend();

35

ChangelogStateBackend changelogRocksDB = new ChangelogStateBackend(rocksDBBackend);

36

```

37

38

### Get Delegated State Backend

39

40

Returns the underlying state backend that is being wrapped by the changelog state backend.

41

42

```java { .api }

43

/**

44

* Returns the underlying state backend that this changelog state backend delegates to.

45

*

46

* @return The wrapped state backend instance

47

*/

48

public StateBackend getDelegatedStateBackend();

49

```

50

51

### Memory Usage Configuration

52

53

Indicates whether the changelog state backend uses managed memory, based on the underlying delegated state backend.

54

55

```java { .api }

56

/**

57

* Indicates whether this state backend uses Flink's managed memory.

58

* The result depends on the wrapped state backend's memory usage.

59

*

60

* @return true if the delegated state backend uses managed memory, false otherwise

61

*/

62

public boolean useManagedMemory();

63

```

64

65

### Configuration

66

67

Configures the changelog state backend with the provided configuration and class loader. If the delegated state backend is configurable, it will be configured as well.

68

69

```java { .api }

70

/**

71

* Configures the changelog state backend with the specified configuration.

72

* If the delegated state backend implements ConfigurableStateBackend,

73

* it will be configured and wrapped in a new ChangelogStateBackend instance.

74

*

75

* @param config Configuration to apply

76

* @param classLoader Class loader for loading configuration-specific classes

77

* @return Configured state backend instance

78

* @throws IllegalConfigurationException if configuration is invalid

79

*/

80

public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;

81

```

82

83

### Keyed State Backend Creation

84

85

Creates a keyed state backend for managing partitioned state with changelog functionality.

86

87

```java { .api }

88

/**

89

* Creates a keyed state backend that wraps the delegated state backend's

90

* keyed state backend with changelog functionality.

91

*

92

* @param env Execution environment

93

* @param jobID Job identifier

94

* @param operatorIdentifier Operator identifier

95

* @param keySerializer Serializer for state keys

96

* @param numberOfKeyGroups Total number of key groups

97

* @param keyGroupRange Range of key groups assigned to this backend

98

* @param kvStateRegistry Registry for queryable state

99

* @param ttlTimeProvider Time provider for TTL functionality

100

* @param metricGroup Metric group for measurements

101

* @param stateHandles State handles for recovery

102

* @param cancelStreamRegistry Registry for cancellable streams

103

* @return ChangelogKeyedStateBackend instance with changelog capabilities

104

* @throws Exception if backend creation fails

105

*/

106

public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(

107

Environment env,

108

JobID jobID,

109

String operatorIdentifier,

110

TypeSerializer<K> keySerializer,

111

int numberOfKeyGroups,

112

KeyGroupRange keyGroupRange,

113

TaskKvStateRegistry kvStateRegistry,

114

TtlTimeProvider ttlTimeProvider,

115

MetricGroup metricGroup,

116

@Nonnull Collection<KeyedStateHandle> stateHandles,

117

CloseableRegistry cancelStreamRegistry

118

) throws Exception;

119

```

120

121

### Keyed State Backend Creation with Memory Fraction

122

123

Creates a keyed state backend with explicit managed memory fraction specification.

124

125

```java { .api }

126

/**

127

* Creates a keyed state backend with explicit managed memory fraction.

128

* This version allows fine-grained control over memory allocation.

129

*

130

* @param env Execution environment

131

* @param jobID Job identifier

132

* @param operatorIdentifier Operator identifier

133

* @param keySerializer Serializer for state keys

134

* @param numberOfKeyGroups Total number of key groups

135

* @param keyGroupRange Range of key groups assigned to this backend

136

* @param kvStateRegistry Registry for queryable state

137

* @param ttlTimeProvider Time provider for TTL functionality

138

* @param metricGroup Metric group for measurements

139

* @param stateHandles State handles for recovery

140

* @param cancelStreamRegistry Registry for cancellable streams

141

* @param managedMemoryFraction Fraction of managed memory to use (0.0 to 1.0)

142

* @return CheckpointableKeyedStateBackend instance with changelog capabilities

143

* @throws Exception if backend creation fails

144

*/

145

public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(

146

Environment env,

147

JobID jobID,

148

String operatorIdentifier,

149

TypeSerializer<K> keySerializer,

150

int numberOfKeyGroups,

151

KeyGroupRange keyGroupRange,

152

TaskKvStateRegistry kvStateRegistry,

153

TtlTimeProvider ttlTimeProvider,

154

MetricGroup metricGroup,

155

@Nonnull Collection<KeyedStateHandle> stateHandles,

156

CloseableRegistry cancelStreamRegistry,

157

double managedMemoryFraction

158

) throws Exception;

159

```

160

161

### Operator State Backend Creation

162

163

Creates an operator state backend by delegating to the underlying state backend. Operator state is not affected by changelog functionality.

164

165

```java { .api }

166

/**

167

* Creates an operator state backend by delegating to the wrapped state backend.

168

* Operator state does not use changelog functionality.

169

*

170

* @param env Execution environment

171

* @param operatorIdentifier Operator identifier

172

* @param stateHandles State handles for recovery

173

* @param cancelStreamRegistry Registry for cancellable streams

174

* @return OperatorStateBackend instance from the delegated backend

175

* @throws Exception if backend creation fails

176

*/

177

public OperatorStateBackend createOperatorStateBackend(

178

Environment env,

179

String operatorIdentifier,

180

@Nonnull Collection<OperatorStateHandle> stateHandles,

181

CloseableRegistry cancelStreamRegistry

182

) throws Exception;

183

```

184

185

## Configuration Integration

186

187

The changelog state backend integrates with Flink's configuration system through the standard state backend loading mechanisms:

188

189

```java

190

// Via configuration

191

Configuration config = new Configuration();

192

config.setString(StateBackendOptions.STATE_BACKEND, "changelog");

193

// Additional changelog-specific configuration can be added

194

195

// Via programmatic configuration

196

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

197

StateBackend delegateBackend = new HashMapStateBackend();

198

ChangelogStateBackend changelogBackend = new ChangelogStateBackend(delegateBackend);

199

env.setStateBackend(changelogBackend);

200

```

201

202

## Error Handling

203

204

The changelog state backend validates that:

205

- The delegated state backend is not null

206

- Recursive delegation is not allowed (cannot wrap another DelegatingStateBackend)

207

- All delegated operations are properly forwarded and their exceptions propagated