or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-writers.mdconfiguration-options.mdindex.mdmetrics-monitoring.mdrecovery-system.mdstorage-factory.mdstorage-implementation.mdupload-system.md

changelog-writers.mddocs/

0

# Changelog Writers

1

2

Writers for appending state changes and managing persistence operations with preemptive flushing and checkpoint coordination. Writers handle the accumulation, batching, and persistence of state changes for individual operators.

3

4

## Capabilities

5

6

### StateChangelogWriter Interface

7

8

Core interface for writing state changes with checkpoint coordination and lifecycle management.

9

10

```java { .api }

11

/**

12

* Interface for writing state changes to changelog storage.

13

* Provides methods for appending changes, managing persistence, and coordinating with checkpoints.

14

*/

15

public interface StateChangelogWriter<T> extends AutoCloseable {

16

17

/**

18

* Appends a state change for a specific key group

19

* @param keyGroup Key group identifier (0-based)

20

* @param value Serialized state change data

21

* @throws IOException if append operation fails

22

*/

23

void append(int keyGroup, byte[] value) throws IOException;

24

25

/**

26

* Appends metadata changes (not associated with any key group)

27

* @param value Serialized metadata change data

28

* @throws IOException if append operation fails

29

*/

30

void appendMeta(byte[] value) throws IOException;

31

32

/**

33

* Returns the initial sequence number for this writer

34

* @return Initial SequenceNumber (typically 0)

35

*/

36

SequenceNumber initialSequenceNumber();

37

38

/**

39

* Advances to the next sequence number, creating a rollover point

40

* @return Next SequenceNumber for distinguishing change batches

41

*/

42

SequenceNumber nextSequenceNumber();

43

44

/**

45

* Persists accumulated changes starting from the specified sequence number

46

* @param from Starting sequence number (inclusive)

47

* @param checkpointId Checkpoint identifier for this persistence operation

48

* @return CompletableFuture with snapshot result containing changelog handles

49

* @throws IOException if persistence fails

50

*/

51

CompletableFuture<SnapshotResult<T>> persist(SequenceNumber from, long checkpointId)

52

throws IOException;

53

54

/**

55

* Truncates changes up to the specified sequence number

56

* @param to Sequence number to truncate up to (exclusive)

57

*/

58

void truncate(SequenceNumber to);

59

60

/**

61

* Truncates changes from the specified sequence number and closes writer

62

* @param from Sequence number to truncate from (inclusive)

63

*/

64

void truncateAndClose(SequenceNumber from);

65

66

/**

67

* Confirms successful checkpoint completion for a range of sequence numbers

68

* @param from Starting sequence number (inclusive)

69

* @param to Ending sequence number (exclusive)

70

* @param checkpointId Checkpoint identifier

71

*/

72

void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);

73

74

/**

75

* Resets writer state after checkpoint abort

76

* @param from Starting sequence number (inclusive)

77

* @param to Ending sequence number (exclusive)

78

* @param checkpointId Checkpoint identifier

79

*/

80

void reset(SequenceNumber from, SequenceNumber to, long checkpointId);

81

82

/**

83

* Closes the writer and releases resources

84

*/

85

void close();

86

}

87

```

88

89

### FsStateChangelogWriter Implementation

90

91

Filesystem-specific implementation with preemptive persistence and batch management.

92

93

```java { .api }

94

/**

95

* Filesystem-based implementation of StateChangelogWriter.

96

* Not thread-safe - designed for single-threaded use per operator.

97

*/

98

class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {

99

100

/**

101

* Constructor for filesystem changelog writer

102

* @param logId Unique identifier for this writer's log

103

* @param keyGroupRange Key group range this writer handles

104

* @param uploader Upload scheduler for persistence operations

105

* @param preEmptivePersistThresholdInBytes Size threshold for preemptive persistence

106

* @param mailboxExecutor Executor for callback processing

107

* @param changelogRegistry Registry for managing changelog state lifecycle

108

* @param localRecoveryConfig Configuration for local recovery

109

* @param localChangelogRegistry Registry for local changelog files

110

*/

111

FsStateChangelogWriter(

112

UUID logId,

113

KeyGroupRange keyGroupRange,

114

StateChangeUploadScheduler uploader,

115

long preEmptivePersistThresholdInBytes,

116

MailboxExecutor mailboxExecutor,

117

TaskChangelogRegistry changelogRegistry,

118

LocalRecoveryConfig localRecoveryConfig,

119

LocalChangelogRegistry localChangelogRegistry

120

);

121

}

122

```

123

124

**Usage Examples:**

125

126

```java

127

import org.apache.flink.changelog.fs.FsStateChangelogWriter;

128

import org.apache.flink.runtime.state.changelog.SequenceNumber;

129

130

// Writer is typically created by FsStateChangelogStorage

131

FsStateChangelogWriter writer = storage.createWriter(

132

"my-operator",

133

KeyGroupRange.of(0, 127),

134

mailboxExecutor

135

);

136

137

// Append state changes during processing

138

writer.append(5, stateChangeForKeyGroup5);

139

writer.append(10, stateChangeForKeyGroup10);

140

writer.appendMeta(operatorMetadata);

141

142

// Get sequence number for checkpoint coordination

143

SequenceNumber checkpointSqn = writer.nextSequenceNumber();

144

145

// Persist changes during checkpoint

146

CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =

147

writer.persist(writer.initialSequenceNumber(), checkpointId);

148

149

future.thenAccept(result -> {

150

// Checkpoint snapshot completed

151

ChangelogStateHandleStreamImpl handle = result.getJobManagerOwnedSnapshot();

152

log.info("Persisted changelog with {} handles", handle.getStreamStateHandles().size());

153

});

154

155

// Cleanup after checkpoint completion

156

writer.confirm(fromSqn, toSqn, checkpointId);

157

```

158

159

### Preemptive Persistence

160

161

Writers automatically trigger persistence when accumulated changes exceed the configured threshold:

162

163

```java

164

// Configure preemptive persistence threshold

165

Configuration config = new Configuration();

166

config.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.parse("5MB"));

167

168

// Writer automatically flushes when threshold is exceeded

169

writer.append(keyGroup, largeStateChange); // May trigger preemptive flush

170

```

171

172

The preemptive persistence:

173

- Reduces checkpoint duration by avoiding large uploads during checkpoint

174

- Provides quasi-continuous uploading of state changes

175

- Maintains ordering guarantees across preemptive and checkpoint-triggered persistence

176

177

### Sequence Number Management

178

179

Writers use sequence numbers to track and coordinate state change batches:

180

181

```java

182

// Initial state

183

SequenceNumber initial = writer.initialSequenceNumber(); // Typically SequenceNumber.of(0)

184

185

// Advance sequence number to create rollover points

186

writer.append(1, change1);

187

writer.append(2, change2);

188

SequenceNumber rollover1 = writer.nextSequenceNumber();

189

190

writer.append(3, change3);

191

writer.append(4, change4);

192

SequenceNumber rollover2 = writer.nextSequenceNumber();

193

194

// Persist changes from specific sequence number

195

CompletableFuture<SnapshotResult> result = writer.persist(rollover1, checkpointId);

196

```

197

198

### Error Handling and Recovery

199

200

Writers handle various error conditions and coordinate with Flink's fault tolerance:

201

202

```java

203

try {

204

writer.append(keyGroup, stateChange);

205

206

CompletableFuture<SnapshotResult> persistFuture = writer.persist(sqn, checkpointId);

207

208

persistFuture.whenComplete((result, throwable) -> {

209

if (throwable != null) {

210

// Handle persistence failure

211

log.error("Changelog persistence failed for checkpoint {}", checkpointId, throwable);

212

// Flink will trigger checkpoint abort and recovery

213

} else {

214

// Persistence successful

215

log.debug("Changelog persisted successfully for checkpoint {}", checkpointId);

216

}

217

});

218

219

} catch (IOException e) {

220

// Handle append failures

221

log.error("Failed to append state change", e);

222

throw new RuntimeException("State change append failed", e);

223

}

224

```

225

226

### Lifecycle Management

227

228

Writers coordinate with Flink's checkpoint lifecycle:

229

230

```java

231

// During checkpoint

232

CompletableFuture<SnapshotResult> snapshotFuture = writer.persist(fromSqn, checkpointId);

233

234

// On checkpoint completion

235

writer.confirm(fromSqn, toSqn, checkpointId);

236

237

// On checkpoint abort/failure

238

writer.reset(fromSqn, toSqn, checkpointId);

239

240

// On state truncation (after successful checkpoint)

241

writer.truncate(truncateUpToSqn);

242

243

// On operator shutdown

244

writer.close();

245

```

246

247

### Local Recovery Integration

248

249

When local recovery is enabled, writers coordinate with local changelog registries:

250

251

```java

252

// Local recovery handles are automatically managed

253

CompletableFuture<SnapshotResult> result = writer.persist(sqn, checkpointId);

254

255

result.thenAccept(snapshotResult -> {

256

// Both remote and local handles are available

257

ChangelogStateHandleStreamImpl remoteHandle = snapshotResult.getJobManagerOwnedSnapshot();

258

ChangelogStateHandleStreamImpl localHandle = snapshotResult.getTaskLocalSnapshot();

259

260

// Local registry tracks local handles for recovery

261

});

262

```