or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdregistry.mdstorage-factory.mdstorage-implementation.mdupload-scheduling.mdwriters.md

writers.mddocs/

0

# State Change Writers

1

2

Writer implementation for persisting state changes to filesystem with batching, upload coordination, and lifecycle management. Writers are created per operator and handle the actual writing of state changes.

3

4

## Capabilities

5

6

### FsStateChangelogWriter

7

8

Core writer implementation that handles state change persistence with sequence number tracking and upload coordination.

9

10

```java { .api }

11

/**

12

* Filesystem-based writer for state changes

13

* Note: This class is not thread-safe and should be used from a single thread

14

*/

15

@NotThreadSafe

16

class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {

17

18

/**

19

* Appends metadata to the changelog

20

* @param value Metadata bytes to append

21

* @throws IOException If append operation fails

22

*/

23

public void appendMeta(byte[] value) throws IOException;

24

25

/**

26

* Appends a state change for a specific key group

27

* @param keyGroup Key group identifier (must be within writer's key group range)

28

* @param value State change data bytes

29

* @throws IOException If append operation fails

30

*/

31

public void append(int keyGroup, byte[] value) throws IOException;

32

33

/**

34

* Returns the initial sequence number for this writer

35

* @return SequenceNumber representing the starting point

36

*/

37

public SequenceNumber initialSequenceNumber();

38

39

/**

40

* Returns the next sequence number for new state changes

41

* @return SequenceNumber for the next state change

42

*/

43

public SequenceNumber nextSequenceNumber();

44

45

/**

46

* Persists accumulated state changes up to the given sequence number

47

* @param from Sequence number to persist from (inclusive)

48

* @param checkpointId Checkpoint identifier for this persistence operation

49

* @return CompletableFuture containing the snapshot result with changelog handle

50

*/

51

public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(

52

SequenceNumber from,

53

long checkpointId

54

);

55

56

/**

57

* Closes the writer and releases all resources

58

* @throws Exception If closing fails

59

*/

60

public void close() throws Exception;

61

62

/**

63

* Truncates state changes up to the given sequence number

64

* @param to Sequence number to truncate to (exclusive)

65

*/

66

public void truncate(SequenceNumber to);

67

68

/**

69

* Truncates state changes from the given sequence number and closes the writer

70

* @param from Sequence number to truncate from (inclusive)

71

*/

72

public void truncateAndClose(SequenceNumber from);

73

74

/**

75

* Confirms state changes in the given range for a checkpoint

76

* @param from Start sequence number (inclusive)

77

* @param to End sequence number (exclusive)

78

* @param checkpointId Checkpoint identifier

79

*/

80

public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);

81

82

/**

83

* Resets state changes in the given range for a checkpoint

84

* @param from Start sequence number (inclusive)

85

* @param to End sequence number (exclusive)

86

* @param checkpointId Checkpoint identifier

87

*/

88

public void reset(SequenceNumber from, SequenceNumber to, long checkpointId);

89

}

90

```

91

92

**Basic Writer Usage Example:**

93

94

```java

95

// Create writer through storage

96

FsStateChangelogStorage storage = new FsStateChangelogStorage(/* ... */);

97

FsStateChangelogWriter writer = storage.createWriter(

98

"my-operator",

99

KeyGroupRange.of(0, 127),

100

mailboxExecutor

101

);

102

103

// Get initial sequence number

104

SequenceNumber initialSeq = writer.initialSequenceNumber();

105

106

// Append metadata

107

byte[] metadata = "operator-metadata".getBytes();

108

writer.appendMeta(metadata);

109

110

// Append state changes for different key groups

111

byte[] stateChange1 = serializeStateChange(state1);

112

writer.append(5, stateChange1);

113

114

byte[] stateChange2 = serializeStateChange(state2);

115

writer.append(15, stateChange2);

116

117

// Persist changes

118

SequenceNumber currentSeq = writer.nextSequenceNumber();

119

CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persistFuture =

120

writer.persist(initialSeq, checkpointId);

121

122

// Handle persistence result

123

persistFuture.thenAccept(result -> {

124

ChangelogStateHandleStreamImpl handle = result.getJobManagerOwnedSnapshot();

125

// Handle contains the persisted changelog data

126

System.out.println("Persisted changelog: " + handle);

127

});

128

129

// Clean up

130

writer.close();

131

```

132

133

### Sequence Number Management

134

135

Writers maintain sequence numbers to track the order of state changes and coordinate with checkpointing.

136

137

```java { .api }

138

/**

139

* Gets the initial sequence number for this writer instance

140

* @return SequenceNumber representing the starting point

141

*/

142

public SequenceNumber initialSequenceNumber();

143

144

/**

145

* Gets the next available sequence number for new state changes

146

* @return SequenceNumber for the next state change to be written

147

*/

148

public SequenceNumber nextSequenceNumber();

149

```

150

151

**Sequence Number Usage Example:**

152

153

```java

154

FsStateChangelogWriter writer = storage.createWriter(/* ... */);

155

156

// Track sequence numbers

157

SequenceNumber start = writer.initialSequenceNumber();

158

System.out.println("Starting from sequence: " + start);

159

160

// Write some state changes

161

writer.append(1, stateData1);

162

writer.append(2, stateData2);

163

164

// Get current position

165

SequenceNumber current = writer.nextSequenceNumber();

166

System.out.println("Next sequence will be: " + current);

167

168

// Persist from start to current

169

writer.persist(start, checkpointId).thenAccept(result -> {

170

System.out.println("Persisted sequence range: " + start + " to " + current);

171

});

172

```

173

174

### State Change Appending

175

176

Methods for appending different types of state change data to the changelog.

177

178

```java { .api }

179

/**

180

* Appends operator metadata to the changelog

181

* @param value Serialized metadata bytes

182

* @throws IOException If the append operation fails

183

*/

184

public void appendMeta(byte[] value) throws IOException;

185

186

/**

187

* Appends state change data for a specific key group

188

* @param keyGroup Key group identifier (must be within the writer's assigned range)

189

* @param value Serialized state change bytes

190

* @throws IOException If the append operation fails

191

*/

192

public void append(int keyGroup, byte[] value) throws IOException;

193

```

194

195

**Appending Examples:**

196

197

```java

198

FsStateChangelogWriter writer = storage.createWriter(

199

"operator-1",

200

KeyGroupRange.of(0, 63), // Key groups 0-63

201

mailboxExecutor

202

);

203

204

// Append operator metadata (initialization info, configuration, etc.)

205

String metadataJson = "{\"operator\":\"MyOperator\",\"version\":\"1.0\"}";

206

writer.appendMeta(metadataJson.getBytes(StandardCharsets.UTF_8));

207

208

// Append state changes for specific key groups

209

for (int keyGroup = 0; keyGroup < 64; keyGroup++) {

210

byte[] stateChange = createStateChangeForKeyGroup(keyGroup);

211

writer.append(keyGroup, stateChange);

212

}

213

214

// Attempting to append to key group outside range will fail

215

try {

216

writer.append(100, someData); // Will throw exception since 100 > 63

217

} catch (IllegalArgumentException e) {

218

System.err.println("Key group out of range: " + e.getMessage());

219

}

220

```

221

222

### Persistence Operations

223

224

Core persistence functionality that coordinates with the upload system to store state changes durably.

225

226

```java { .api }

227

/**

228

* Persists accumulated state changes starting from the given sequence number

229

* @param from Starting sequence number (inclusive)

230

* @param checkpointId Checkpoint identifier for tracking

231

* @return CompletableFuture with snapshot result containing changelog handle

232

*/

233

public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(

234

SequenceNumber from,

235

long checkpointId

236

);

237

```

238

239

**Persistence Examples:**

240

241

```java

242

// Basic persistence

243

SequenceNumber startSeq = writer.initialSequenceNumber();

244

writer.append(1, stateData1);

245

writer.append(2, stateData2);

246

247

CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =

248

writer.persist(startSeq, 12345L);

249

250

future.whenComplete((result, throwable) -> {

251

if (throwable != null) {

252

System.err.println("Persistence failed: " + throwable.getMessage());

253

} else {

254

ChangelogStateHandleStreamImpl handle = result.getJobManagerOwnedSnapshot();

255

System.out.println("Successfully persisted to: " + handle.getStreamStateHandle());

256

257

// Local handle (if local recovery is enabled)

258

StreamStateHandle localHandle = result.getTaskLocalSnapshot();

259

if (localHandle != null) {

260

System.out.println("Local backup at: " + localHandle);

261

}

262

}

263

});

264

265

// Chain multiple persist operations

266

CompletableFuture<Void> chainedPersistence = writer.persist(startSeq, 12345L)

267

.thenCompose(result1 -> {

268

// Write more changes

269

writer.append(3, moreStateData);

270

return writer.persist(writer.nextSequenceNumber(), 12346L);

271

})

272

.thenAccept(result2 -> {

273

System.out.println("Both persistence operations completed");

274

});

275

```

276

277

### Lifecycle Management

278

279

Methods for managing the writer lifecycle including truncation, confirmation, and cleanup.

280

281

```java { .api }

282

/**

283

* Truncates changelog up to the given sequence number

284

* @param to Sequence number to truncate to (exclusive)

285

*/

286

public void truncate(SequenceNumber to);

287

288

/**

289

* Truncates from the given sequence number and closes writer

290

* @param from Sequence number to truncate from (inclusive)

291

*/

292

public void truncateAndClose(SequenceNumber from);

293

294

/**

295

* Confirms successful processing of state changes in range

296

* @param from Start sequence number (inclusive)

297

* @param to End sequence number (exclusive)

298

* @param checkpointId Associated checkpoint ID

299

*/

300

public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);

301

302

/**

303

* Resets state changes in range (e.g., after checkpoint failure)

304

* @param from Start sequence number (inclusive)

305

* @param to End sequence number (exclusive)

306

* @param checkpointId Associated checkpoint ID

307

*/

308

public void reset(SequenceNumber from, SequenceNumber to, long checkpointId);

309

310

/**

311

* Closes writer and releases all resources

312

* @throws Exception If cleanup fails

313

*/

314

public void close() throws Exception;

315

```

316

317

**Lifecycle Management Examples:**

318

319

```java

320

FsStateChangelogWriter writer = storage.createWriter(/* ... */);

321

322

try {

323

// Normal operation

324

SequenceNumber seq1 = writer.nextSequenceNumber();

325

writer.append(1, data1);

326

writer.append(2, data2);

327

328

SequenceNumber seq2 = writer.nextSequenceNumber();

329

writer.persist(seq1, checkpointId).get();

330

331

// Confirm successful checkpoint

332

writer.confirm(seq1, seq2, checkpointId);

333

334

// Continue with more changes

335

writer.append(3, data3);

336

SequenceNumber seq3 = writer.nextSequenceNumber();

337

338

// Truncate old data to save space

339

writer.truncate(seq2);

340

341

} catch (Exception e) {

342

// Reset on failure

343

writer.reset(seq1, seq2, checkpointId);

344

} finally {

345

// Always clean up

346

writer.close();

347

}

348

349

// Alternative: truncate and close in one operation

350

writer.truncateAndClose(someSequenceNumber);

351

```

352

353

### Error Handling

354

355

Common error scenarios and appropriate handling strategies.

356

357

**Error Handling Examples:**

358

359

```java

360

FsStateChangelogWriter writer = storage.createWriter(/* ... */);

361

362

try {

363

// This may fail if key group is out of range

364

writer.append(keyGroup, stateData);

365

} catch (IllegalArgumentException e) {

366

System.err.println("Invalid key group: " + e.getMessage());

367

}

368

369

try {

370

// This may fail due to I/O issues

371

writer.appendMeta(metadata);

372

} catch (IOException e) {

373

System.err.println("Failed to write metadata: " + e.getMessage());

374

// May need to recreate writer or fail the checkpoint

375

}

376

377

// Handle persistence failures

378

writer.persist(sequenceNumber, checkpointId)

379

.exceptionally(throwable -> {

380

System.err.println("Persistence failed: " + throwable.getMessage());

381

// Return null result or take recovery action

382

return null;

383

});

384

385

try {

386

writer.close();

387

} catch (Exception e) {

388

System.err.println("Failed to close writer cleanly: " + e.getMessage());

389

// Log but continue with shutdown

390

}

391

```