or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkeyed-state-management.mdstate-backend-configuration.mdstate-types-operations.md

index.mddocs/

0

# Flink Changelog State Backend

1

2

Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications. This state backend enables incremental checkpointing by logging state changes to a changelog, allowing for efficient recovery and state consistency in distributed streaming applications.

3

4

## Package Information

5

6

- **Package Name**: flink-statebackend-changelog

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-statebackend-changelog_2.11

11

- **Installation**: Add to Maven dependencies:

12

13

```xml

14

<dependency>

15

<groupId>org.apache.flink</groupId>

16

<artifactId>flink-statebackend-changelog_2.11</artifactId>

17

<version>1.13.6</version>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

import org.apache.flink.state.changelog.ChangelogStateBackend;

25

import org.apache.flink.runtime.state.StateBackend;

26

import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;

27

import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;

28

import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;

29

import org.apache.flink.runtime.state.ConfigurableStateBackend;

30

```

31

32

## Basic Usage

33

34

```java

35

import org.apache.flink.state.changelog.ChangelogStateBackend;

36

import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;

37

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

38

39

// Create a changelog state backend wrapping a HashMap state backend

40

StateBackend delegateBackend = new HashMapStateBackend();

41

ChangelogStateBackend changelogBackend = new ChangelogStateBackend(delegateBackend);

42

43

// Configure Flink to use the changelog state backend

44

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

45

env.setStateBackend(changelogBackend);

46

47

// State operations will now be logged to changelog for incremental checkpointing

48

// Use standard Flink state operations - they will be transparently logged

49

```

50

51

## Architecture

52

53

The changelog state backend is built around a delegation pattern with several key components:

54

55

- **ChangelogStateBackend**: Main entry point that wraps any existing Flink state backend

56

- **ChangelogKeyedStateBackend**: Internal keyed state backend that logs state changes to changelog

57

- **State Wrappers**: Transparent wrappers for all Flink state types (Value, List, Map, Reducing, Aggregating)

58

- **Delegation Pattern**: All operations delegate to underlying state backend while logging changes

59

- **Incremental Checkpointing**: State changes are logged to enable efficient incremental checkpoints

60

61

This design allows existing Flink applications to benefit from changelog-based incremental checkpointing without code changes, simply by configuring the changelog state backend.

62

63

## Capabilities

64

65

### State Backend Configuration

66

67

Core state backend configuration and initialization functionality for integrating changelog capabilities with existing Flink state backends.

68

69

```java { .api }

70

public class ChangelogStateBackend implements DelegatingStateBackend, ConfigurableStateBackend {

71

public ChangelogStateBackend(StateBackend stateBackend);

72

public StateBackend getDelegatedStateBackend();

73

public StateBackend configure(ReadableConfig config, ClassLoader classLoader);

74

}

75

```

76

77

[State Backend Configuration](./state-backend-configuration.md)

78

79

### Keyed State Management

80

81

Keyed state backend implementation that provides transparent changelog logging for all state operations while maintaining full compatibility with Flink's state management system.

82

83

```java { .api }

84

public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(

85

Environment env,

86

JobID jobID,

87

String operatorIdentifier,

88

TypeSerializer<K> keySerializer,

89

int numberOfKeyGroups,

90

KeyGroupRange keyGroupRange,

91

TaskKvStateRegistry kvStateRegistry,

92

TtlTimeProvider ttlTimeProvider,

93

MetricGroup metricGroup,

94

@Nonnull Collection<KeyedStateHandle> stateHandles,

95

CloseableRegistry cancelStreamRegistry

96

) throws Exception;

97

```

98

99

[Keyed State Management](./keyed-state-management.md)

100

101

### State Types and Operations

102

103

Comprehensive support for all Flink state types including Value, List, Map, Reducing, and Aggregating states, with transparent changelog logging for all state mutations.

104

105

```java { .api }

106

// State creation and access patterns

107

public <N, S extends State> S getPartitionedState(

108

N namespace,

109

TypeSerializer<N> namespaceSerializer,

110

StateDescriptor<S, ?> stateDescriptor

111

) throws Exception;

112

113

public <N, S extends State, T> S getOrCreateKeyedState(

114

TypeSerializer<N> namespaceSerializer,

115

StateDescriptor<S, T> stateDescriptor

116

) throws Exception;

117

```

118

119

[State Types and Operations](./state-types-operations.md)

120

121

## Types

122

123

### Core State Backend Types

124

125

```java { .api }

126

public class ChangelogStateBackend implements DelegatingStateBackend, ConfigurableStateBackend {

127

// Constructor and configuration

128

public ChangelogStateBackend(StateBackend stateBackend);

129

public StateBackend getDelegatedStateBackend();

130

public boolean useManagedMemory();

131

public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;

132

133

// State backend creation methods

134

public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(

135

Environment env, JobID jobID, String operatorIdentifier,

136

TypeSerializer<K> keySerializer, int numberOfKeyGroups,

137

KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry,

138

TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup,

139

Collection<KeyedStateHandle> stateHandles,

140

CloseableRegistry cancelStreamRegistry

141

) throws Exception;

142

143

public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(

144

Environment env, JobID jobID, String operatorIdentifier,

145

TypeSerializer<K> keySerializer, int numberOfKeyGroups,

146

KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry,

147

TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup,

148

Collection<KeyedStateHandle> stateHandles,

149

CloseableRegistry cancelStreamRegistry, double managedMemoryFraction

150

) throws Exception;

151

152

public OperatorStateBackend createOperatorStateBackend(

153

Environment env, String operatorIdentifier,

154

Collection<OperatorStateHandle> stateHandles,

155

CloseableRegistry cancelStreamRegistry

156

) throws Exception;

157

}

158

159

interface DelegatingStateBackend extends StateBackend {

160

StateBackend getDelegatedStateBackend();

161

}

162

163

interface ConfigurableStateBackend extends StateBackend {

164

StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;

165

}

166

```

167

168

### Keyed State Backend Types

169

170

```java { .api }

171

class ChangelogKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend {

172

// Core state backend operations

173

public KeyGroupRange getKeyGroupRange();

174

public void setCurrentKey(K newKey);

175

public K getCurrentKey();

176

public TypeSerializer<K> getKeySerializer();

177

178

// State access and management

179

public <N, S extends State> S getPartitionedState(

180

N namespace, TypeSerializer<N> namespaceSerializer,

181

StateDescriptor<S, ?> stateDescriptor

182

) throws Exception;

183

184

public <N, S extends State, T> S getOrCreateKeyedState(

185

TypeSerializer<N> namespaceSerializer,

186

StateDescriptor<S, T> stateDescriptor

187

) throws Exception;

188

189

// Lifecycle management

190

public void close() throws IOException;

191

public void dispose();

192

193

// Checkpointing

194

public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(

195

long checkpointId, long timestamp,

196

CheckpointStreamFactory streamFactory,

197

CheckpointOptions checkpointOptions

198

) throws Exception;

199

200

// Priority queue support

201

public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>

202

KeyGroupedInternalPriorityQueue<T> create(

203

String stateName,

204

TypeSerializer<T> byteOrderedElementSerializer

205

);

206

}

207

```