or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

changelog-writers.mdconfiguration-options.mdindex.mdmetrics-monitoring.mdrecovery-system.mdstorage-factory.mdstorage-implementation.mdupload-system.md

recovery-system.mddocs/

0

# Recovery and State Management

1

2

Recovery system providing read-only access to persisted changelog data and lifecycle management for state handles. The recovery system enables checkpoint restoration and manages the lifecycle of changelog state objects.

3

4

## Capabilities

5

6

### FsStateChangelogStorageForRecovery

7

8

Read-only storage implementation for recovery operations during checkpoint restoration.

9

10

```java { .api }

11

/**

12

* Filesystem-based implementation of StateChangelogStorageView for recovery operations.

13

* Provides read-only access to persisted changelog data during checkpoint restoration.

14

*/

15

public class FsStateChangelogStorageForRecovery

16

implements StateChangelogStorageView<ChangelogStateHandleStreamImpl> {

17

18

/**

19

* Creates recovery storage with changelog stream handle reader

20

* @param changelogStreamHandleReader Reader for accessing persisted changelog streams

21

*/

22

public FsStateChangelogStorageForRecovery(

23

ChangelogStreamHandleReader changelogStreamHandleReader

24

);

25

26

/**

27

* Creates a reader for accessing changelog handles during recovery

28

* @return StateChangelogHandleReader for reading persisted changelog data

29

*/

30

public StateChangelogHandleReader<ChangelogStateHandleStreamImpl> createReader();

31

32

/**

33

* Closes the recovery storage and releases resources

34

* @throws Exception if cleanup fails

35

*/

36

public void close() throws Exception;

37

}

38

```

39

40

### Changelog Stream Reading

41

42

Components for reading persisted changelog data from distributed file systems.

43

44

```java { .api }

45

/**

46

* Reader for changelog stream handles with direct file system access

47

*/

48

public class ChangelogStreamHandleReader implements AutoCloseable {

49

50

/** Direct reader without caching */

51

public static final ChangelogStreamHandleReader DIRECT_READER;

52

53

/**

54

* Reads state changes from a changelog handle

55

* @param handle Changelog handle containing stream references

56

* @return CloseableIterator for iterating over state changes

57

* @throws IOException if reading fails

58

*/

59

public CloseableIterator<StateChange> read(ChangelogStateHandleStreamImpl handle)

60

throws IOException;

61

62

/**

63

* Closes the reader and releases resources

64

* @throws IOException if cleanup fails

65

*/

66

public void close() throws IOException;

67

}

68

69

/**

70

* Reader with local caching support for improved performance

71

*/

72

public class ChangelogStreamHandleReaderWithCache extends ChangelogStreamHandleReader {

73

74

/**

75

* Creates cached reader with configuration

76

* @param configuration Flink configuration containing cache settings

77

*/

78

public ChangelogStreamHandleReaderWithCache(Configuration configuration);

79

}

80

```

81

82

### State Change Iteration

83

84

Iterator implementation for traversing state changes during recovery.

85

86

```java { .api }

87

/**

88

* Iterator implementation for reading state changes from changelog streams

89

*/

90

public class StateChangeIteratorImpl implements CloseableIterator<StateChange> {

91

92

/**

93

* Creates iterator with changelog stream reader

94

* @param changelogStreamHandleReader Reader for accessing changelog streams

95

*/

96

public StateChangeIteratorImpl(ChangelogStreamHandleReader changelogStreamHandleReader);

97

98

/**

99

* Checks if more state changes are available

100

* @return true if more changes exist

101

*/

102

public boolean hasNext();

103

104

/**

105

* Returns the next state change

106

* @return Next StateChange in the iteration

107

* @throws NoSuchElementException if no more changes exist

108

*/

109

public StateChange next();

110

111

/**

112

* Closes the iterator and releases resources

113

* @throws IOException if cleanup fails

114

*/

115

public void close() throws IOException;

116

}

117

```

118

119

**Usage Examples:**

120

121

```java

122

import org.apache.flink.changelog.fs.*;

123

import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;

124

125

// Create recovery storage (typically done by factory)

126

ChangelogStreamHandleReader reader = new ChangelogStreamHandleReaderWithCache(config);

127

FsStateChangelogStorageForRecovery recoveryStorage =

128

new FsStateChangelogStorageForRecovery(reader);

129

130

// Create handle reader for recovery

131

StateChangelogHandleReader<ChangelogStateHandleStreamImpl> handleReader =

132

recoveryStorage.createReader();

133

134

// Read state changes from checkpoint handle

135

ChangelogStateHandleStreamImpl checkpointHandle = getCheckpointHandle();

136

try (CloseableIterator<StateChange> iterator = reader.read(checkpointHandle)) {

137

while (iterator.hasNext()) {

138

StateChange change = iterator.next();

139

140

// Process state change during recovery

141

if (change.getKeyGroup() != StateChange.META_KEY_GROUP) {

142

// Regular state change for specific key group

143

int keyGroup = change.getKeyGroup();

144

byte[] changeData = change.getChange();

145

applyStateChange(keyGroup, changeData);

146

} else {

147

// Metadata change

148

byte[] metadata = change.getChange();

149

applyMetadataChange(metadata);

150

}

151

}

152

}

153

154

// Cleanup

155

recoveryStorage.close();

156

```

157

158

### TaskChangelogRegistry

159

160

Registry for managing the lifecycle of changelog state handles and coordinating between job manager and task manager ownership.

161

162

```java { .api }

163

/**

164

* Registry for tracking changelog state objects and managing their lifecycle.

165

* Coordinates between task manager and job manager ownership of state handles.

166

*/

167

public interface TaskChangelogRegistry {

168

169

/**

170

* Starts tracking a state handle with reference counting

171

* @param handle StreamStateHandle to track

172

* @param refCount Initial reference count (number of changelog segments)

173

*/

174

void startTracking(StreamStateHandle handle, long refCount);

175

176

/**

177

* Stops tracking a state handle (JM becomes owner)

178

* @param handle StreamStateHandle to stop tracking

179

*/

180

void stopTracking(StreamStateHandle handle);

181

182

/**

183

* Releases a reference to a state handle (decrements ref count)

184

* @param handle StreamStateHandle to release

185

*/

186

void release(StreamStateHandle handle);

187

188

/**

189

* Creates default registry with specified number of discard threads

190

* @param numDiscardThreads Number of threads for async discard operations

191

* @return TaskChangelogRegistry instance

192

*/

193

static TaskChangelogRegistry defaultChangelogRegistry(int numDiscardThreads);

194

}

195

```

196

197

### TaskChangelogRegistryImpl

198

199

Default implementation of the changelog registry with reference counting and async cleanup.

200

201

```java { .api }

202

/**

203

* Default implementation of TaskChangelogRegistry with reference counting

204

*/

205

public class TaskChangelogRegistryImpl implements TaskChangelogRegistry {

206

207

/**

208

* Creates registry with custom executor for discard operations

209

* @param discardExecutor Executor for running discard operations

210

*/

211

public TaskChangelogRegistryImpl(Executor discardExecutor);

212

213

/**

214

* Closes the registry and shuts down discard operations

215

* @throws Exception if cleanup fails

216

*/

217

public void close() throws Exception;

218

}

219

```

220

221

**Usage Examples:**

222

223

```java

224

// Create changelog registry (typically done by storage)

225

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

226

227

// Start tracking uploaded state handle

228

StreamStateHandle uploadedHandle = uploadResult.getStreamStateHandle();

229

long refCount = 3; // Number of state change sets in this handle

230

registry.startTracking(uploadedHandle, refCount);

231

232

// Release references as state changes become unused

233

registry.release(uploadedHandle); // refCount becomes 2

234

registry.release(uploadedHandle); // refCount becomes 1

235

registry.release(uploadedHandle); // refCount becomes 0, handle is discarded

236

237

// Stop tracking when JM becomes owner (e.g., after checkpoint completion)

238

registry.stopTracking(confirmedHandle);

239

```

240

241

### Local Changelog Registry

242

243

Registry for managing local changelog files when local recovery is enabled.

244

245

```java { .api }

246

/**

247

* Registry for managing local changelog files during recovery

248

*/

249

public interface LocalChangelogRegistry extends AutoCloseable {

250

251

/** No-op implementation when local recovery is disabled */

252

LocalChangelogRegistry NO_OP = /* ... */;

253

254

/**

255

* Registers a local changelog handle for a checkpoint

256

* @param handle Local stream state handle

257

* @param checkpointId Checkpoint identifier

258

*/

259

void register(StreamStateHandle handle, long checkpointId);

260

261

/**

262

* Discards local changelog files up to a checkpoint

263

* @param checkpointId Checkpoint identifier (inclusive)

264

*/

265

void discardUpToCheckpoint(long checkpointId);

266

267

/**

268

* Closes the registry and releases resources

269

* @throws IOException if cleanup fails

270

*/

271

void close() throws IOException;

272

}

273

274

/**

275

* Implementation of LocalChangelogRegistry with async cleanup

276

*/

277

public class LocalChangelogRegistryImpl implements LocalChangelogRegistry {

278

279

/**

280

* Creates local registry with single-threaded executor

281

* @param executor Executor for cleanup operations

282

*/

283

public LocalChangelogRegistryImpl(Executor executor);

284

}

285

```

286

287

### Changelog Stream Wrapping

288

289

Wrapper components for managing changelog stream access and caching.

290

291

```java { .api }

292

/**

293

* Wrapper for changelog streams providing additional functionality

294

*/

295

public class ChangelogStreamWrapper {

296

297

/**

298

* Wraps a changelog stream with additional features

299

* @param inputStream Underlying input stream

300

* @param streamStateHandle Handle for the stream

301

*/

302

public ChangelogStreamWrapper(

303

InputStream inputStream,

304

StreamStateHandle streamStateHandle

305

);

306

}

307

```

308

309

### Recovery Performance and Caching

310

311

The recovery system includes caching for improved performance:

312

313

```java

314

// Configure cache timeout for recovery

315

Configuration config = new Configuration();

316

config.set(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT, Duration.ofMinutes(10));

317

318

// Cached reader automatically manages local cache files

319

ChangelogStreamHandleReaderWithCache cachedReader =

320

new ChangelogStreamHandleReaderWithCache(config);

321

322

// Cache files are automatically cleaned up after idle timeout

323

```

324

325

### Error Handling During Recovery

326

327

Recovery operations handle various failure scenarios:

328

329

```java

330

try {

331

// Read changelog during recovery

332

try (CloseableIterator<StateChange> iterator = reader.read(handle)) {

333

while (iterator.hasNext()) {

334

StateChange change = iterator.next();

335

// Process change...

336

}

337

}

338

} catch (IOException e) {

339

// Handle reading failures

340

log.error("Failed to read changelog during recovery", e);

341

throw new RuntimeException("Recovery failed", e);

342

} catch (RuntimeException e) {

343

// Handle processing failures

344

log.error("Failed to process state change during recovery", e);

345

throw e;

346

}

347

```

348

349

### Integration with Checkpoint Lifecycle

350

351

The recovery system integrates with Flink's checkpoint lifecycle:

352

353

- **Checkpoint Creation**: Handles are created by writers and tracked by registry

354

- **Checkpoint Confirmation**: Registry stops tracking confirmed handles (JM ownership)

355

- **Checkpoint Subsumption**: Registry releases old handles and discards unused state

356

- **Recovery**: Storage view provides read access to persisted handles

357

- **Cleanup**: Registry ensures proper cleanup of unused handles and local files