or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions.mdcheckpoints.mdconfiguration.mdgraph-construction.mdgraph-execution.mdindex.mdprebuilt.mdstate-management.md

checkpoints.mddocs/

0

# Checkpoints and Persistence

1

2

Persistent state management with checkpointing for debugging, resuming execution, implementing human-in-the-loop workflows, and enabling time travel through execution history.

3

4

## Capabilities

5

6

### BaseCheckpointSaver Interface

7

8

Core interface for persisting graph execution state and enabling resumption.

9

10

```java { .api }

11

/**

12

* Interface for checkpoint persistence and management

13

*/

14

interface BaseCheckpointSaver {

15

/**

16

* Default thread identifier for single-threaded execution

17

*/

18

String THREAD_ID_DEFAULT = "$default";

19

20

/**

21

* Lists all checkpoints for a given thread

22

* @param config Runtime configuration containing thread ID

23

* @return Collection of checkpoints ordered by recency

24

*/

25

Collection<Checkpoint> list(RunnableConfig config);

26

27

/**

28

* Retrieves specific checkpoint for thread

29

* @param config Runtime configuration with thread/checkpoint ID

30

* @return Optional containing checkpoint if found

31

*/

32

Optional<Checkpoint> get(RunnableConfig config);

33

34

/**

35

* Saves checkpoint and returns updated configuration

36

* @param config Runtime configuration

37

* @param checkpoint Checkpoint to save

38

* @return Updated configuration with new checkpoint ID

39

* @throws Exception if save operation fails

40

*/

41

RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;

42

43

/**

44

* Releases thread resources and returns final state

45

* @param config Runtime configuration for thread

46

* @return Tag containing thread ID and final checkpoints

47

* @throws Exception if release operation fails

48

*/

49

Tag release(RunnableConfig config) throws Exception;

50

51

/**

52

* Record containing thread information and checkpoints

53

*/

54

record Tag(String threadId, Collection<Checkpoint> checkpoints) {}

55

}

56

```

57

58

**Usage Examples:**

59

60

```java

61

// Use checkpoint saver in compilation

62

BaseCheckpointSaver saver = new MemorySaver();

63

CompileConfig config = CompileConfig.builder()

64

.checkpointSaver(saver)

65

.build();

66

67

CompiledGraph<MyState> app = workflow.compile(config);

68

69

// Execute with checkpointing

70

RunnableConfig runConfig = RunnableConfig.builder()

71

.threadId("my-session")

72

.build();

73

74

Optional<MyState> result = app.invoke(Map.of("input", "data"), runConfig);

75

76

// List execution history

77

Collection<Checkpoint> history = saver.list(runConfig);

78

System.out.println("Execution had " + history.size() + " checkpoints");

79

80

// Get specific checkpoint

81

Optional<Checkpoint> checkpoint = saver.get(runConfig);

82

if (checkpoint.isPresent()) {

83

System.out.println("Current node: " + checkpoint.get().getNodeId());

84

}

85

```

86

87

### Checkpoint Class

88

89

Immutable snapshot of graph execution state at a specific point.

90

91

```java { .api }

92

/**

93

* Immutable checkpoint containing execution state

94

*/

95

class Checkpoint {

96

/**

97

* Creates checkpoint builder

98

* @return New checkpoint builder instance

99

*/

100

static Builder builder();

101

102

/**

103

* Creates copy of existing checkpoint

104

* @param checkpoint Checkpoint to copy

105

* @return New checkpoint instance

106

*/

107

static Checkpoint copyOf(Checkpoint checkpoint);

108

109

/**

110

* Get unique checkpoint identifier

111

* @return Checkpoint ID

112

*/

113

String getId();

114

115

/**

116

* Get node ID where checkpoint was created

117

* @return Node identifier

118

*/

119

String getNodeId();

120

121

/**

122

* Get next node to execute after this checkpoint

123

* @return Next node identifier

124

*/

125

String getNextNodeId();

126

127

/**

128

* Get state data at checkpoint

129

* @return State as key-value map

130

*/

131

Map<String, Object> getState();

132

133

/**

134

* Creates new checkpoint with updated state

135

* @param values State values to update

136

* @param channels Channel definitions for merge logic

137

* @return New checkpoint with merged state

138

*/

139

Checkpoint updateState(Map<String, Object> values, Map<String, Channel<?>> channels);

140

}

141

```

142

143

**Usage Examples:**

144

145

```java

146

// Create checkpoint manually

147

Checkpoint checkpoint = Checkpoint.builder()

148

.nodeId("current_node")

149

.nextNodeId("next_node")

150

.state(Map.of("data", "value", "step", 1))

151

.build();

152

153

// Copy and modify checkpoint

154

Checkpoint updated = Checkpoint.copyOf(checkpoint)

155

.updateState(Map.of("step", 2), Map.of());

156

157

// Access checkpoint data

158

String nodeId = checkpoint.getNodeId();

159

String nextNode = checkpoint.getNextNodeId();

160

Map<String, Object> stateData = checkpoint.getState();

161

162

System.out.println("At node: " + nodeId + ", going to: " + nextNode);

163

```

164

165

### Memory-Based Checkpoint Savers

166

167

In-memory checkpoint storage for development and testing.

168

169

```java { .api }

170

/**

171

* Simple in-memory checkpoint storage

172

*/

173

class MemorySaver implements BaseCheckpointSaver {

174

/**

175

* Creates new memory-based checkpoint saver

176

*/

177

MemorySaver();

178

179

Collection<Checkpoint> list(RunnableConfig config);

180

Optional<Checkpoint> get(RunnableConfig config);

181

RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;

182

Tag release(RunnableConfig config) throws Exception;

183

}

184

185

/**

186

* Versioned memory checkpoint saver with version tracking

187

*/

188

class VersionedMemorySaver extends MemorySaver implements HasVersions {

189

/**

190

* Creates versioned memory saver

191

*/

192

VersionedMemorySaver();

193

194

/**

195

* Get version information for checkpoint

196

* @param config Runtime configuration

197

* @return Optional containing version info

198

*/

199

Optional<String> getVersion(RunnableConfig config);

200

}

201

```

202

203

**Usage Examples:**

204

205

```java

206

// Basic memory saver

207

MemorySaver memorySaver = new MemorySaver();

208

209

// Versioned memory saver

210

VersionedMemorySaver versionedSaver = new VersionedMemorySaver();

211

212

// Use in graph compilation

213

CompileConfig config = CompileConfig.builder()

214

.checkpointSaver(memorySaver)

215

.build();

216

217

CompiledGraph<MyState> app = workflow.compile(config);

218

219

// Execute with automatic checkpointing

220

RunnableConfig runConfig = RunnableConfig.builder()

221

.threadId("memory-session")

222

.build();

223

224

// Stream execution and see checkpoints being created

225

app.stream(Map.of("input", "test"), runConfig)

226

.forEachAsync(output -> {

227

System.out.println("Node: " + output.node());

228

// Each output represents a checkpoint

229

return CompletableFuture.completedFuture(null);

230

});

231

```

232

233

### File System Checkpoint Saver

234

235

Persistent file-based checkpoint storage for production use.

236

237

```java { .api }

238

/**

239

* File system-based checkpoint persistence

240

*/

241

class FileSystemSaver implements BaseCheckpointSaver {

242

/**

243

* Creates file system saver with specified directory

244

* @param baseDirectory Directory for checkpoint storage

245

*/

246

FileSystemSaver(Path baseDirectory);

247

248

/**

249

* Creates file system saver with state serializer

250

* @param baseDirectory Directory for checkpoint storage

251

* @param stateSerializer Serializer for state objects

252

*/

253

FileSystemSaver(Path baseDirectory, StateSerializer<?> stateSerializer);

254

255

Collection<Checkpoint> list(RunnableConfig config);

256

Optional<Checkpoint> get(RunnableConfig config);

257

RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;

258

Tag release(RunnableConfig config) throws Exception;

259

}

260

```

261

262

**Usage Examples:**

263

264

```java

265

import java.nio.file.Paths;

266

267

// File system saver with default serialization

268

Path checkpointDir = Paths.get("/app/checkpoints");

269

FileSystemSaver fileSaver = new FileSystemSaver(checkpointDir);

270

271

// File system saver with custom serialization

272

StateSerializer<MyState> serializer = new JacksonStateSerializer<>(MyState::new);

273

FileSystemSaver customFileSaver = new FileSystemSaver(checkpointDir, serializer);

274

275

// Use for persistent checkpointing

276

CompileConfig config = CompileConfig.builder()

277

.checkpointSaver(fileSaver)

278

.build();

279

280

CompiledGraph<MyState> app = workflow.compile(config);

281

282

// Execution will persist to filesystem

283

RunnableConfig runConfig = RunnableConfig.builder()

284

.threadId("persistent-session")

285

.build();

286

287

Optional<MyState> result = app.invoke(Map.of("input", "persistent data"), runConfig);

288

289

// Later, resume from filesystem checkpoints

290

Optional<MyState> resumed = app.invoke(new GraphResume(), runConfig);

291

```

292

293

### Checkpoint-Based Graph Operations

294

295

Operations that leverage checkpoints for advanced execution control.

296

297

```java { .api }

298

// Get execution state and history

299

StateSnapshot<MyState> currentState = app.getState(runConfig);

300

Collection<StateSnapshot<MyState>> history = app.getStateHistory(runConfig);

301

302

// Update state at checkpoint

303

RunnableConfig updated = app.updateState(

304

runConfig,

305

Map.of("user_feedback", "approved", "timestamp", System.currentTimeMillis())

306

);

307

308

// Force execution to specific node

309

RunnableConfig withNextNode = app.updateState(

310

runConfig,

311

Map.of("override", true),

312

"specific_node_id"

313

);

314

```

315

316

**Usage Examples:**

317

318

```java

319

// Human-in-the-loop workflow

320

RunnableConfig humanLoopConfig = RunnableConfig.builder()

321

.threadId("human-review-session")

322

.build();

323

324

// Start execution

325

AsyncGenerator<NodeOutput<MyState>> stream = app.stream(

326

Map.of("document", "content to review"),

327

humanLoopConfig

328

);

329

330

// Process until human review needed

331

for (NodeOutput<MyState> output : stream.stream().toList()) {

332

if (output instanceof InterruptionMetadata) {

333

InterruptionMetadata<MyState> interruption = (InterruptionMetadata<MyState>) output;

334

System.out.println("Human review needed at: " + interruption.getNodeId());

335

336

// Pause for human input

337

String humanFeedback = getHumanFeedback(); // Your UI logic

338

339

// Update state with human feedback

340

RunnableConfig withFeedback = app.updateState(

341

humanLoopConfig,

342

Map.of("human_feedback", humanFeedback, "reviewed", true)

343

);

344

345

// Resume execution

346

Optional<MyState> finalResult = app.invoke(new GraphResume(), withFeedback);

347

break;

348

}

349

}

350

```

351

352

### Thread Management and Cleanup

353

354

Manage execution threads and cleanup resources.

355

356

```java { .api }

357

// Configure thread release

358

CompileConfig configWithRelease = CompileConfig.builder()

359

.checkpointSaver(new MemorySaver())

360

.releaseThread(true)

361

.build();

362

363

CompiledGraph<MyState> app = workflow.compile(configWithRelease);

364

365

// Execute with automatic thread release

366

Optional<MyState> result = app.invoke(Map.of("input", "data"), runConfig);

367

368

// Manual thread release

369

BaseCheckpointSaver.Tag released = saver.release(runConfig);

370

System.out.println("Released thread: " + released.threadId());

371

System.out.println("Final checkpoints: " + released.checkpoints().size());

372

```

373

374

## Checkpoint Patterns

375

376

### Debugging and Inspection

377

378

Use checkpoints for detailed execution analysis.

379

380

```java { .api }

381

// Enable checkpointing for debugging

382

CompileConfig debugConfig = CompileConfig.builder()

383

.checkpointSaver(new MemorySaver())

384

.build();

385

386

CompiledGraph<MyState> debugApp = workflow.compile(debugConfig);

387

388

// Execute with full history

389

RunnableConfig debugRunConfig = RunnableConfig.builder()

390

.threadId("debug-session")

391

.build();

392

393

Optional<MyState> result = debugApp.invoke(Map.of("input", "debug data"), debugRunConfig);

394

395

// Analyze execution history

396

Collection<StateSnapshot<MyState>> history = debugApp.getStateHistory(debugRunConfig);

397

398

System.out.println("Execution Analysis:");

399

for (StateSnapshot<MyState> snapshot : history) {

400

System.out.println("Step: " + snapshot.getNodeId());

401

System.out.println("State: " + snapshot.state().data());

402

System.out.println("Next: " + snapshot.getNextNodeId());

403

System.out.println("---");

404

}

405

```

406

407

### Time Travel and State Rollback

408

409

Navigate through execution history and resume from arbitrary points.

410

411

```java { .api }

412

// Get specific checkpoint from history

413

Collection<StateSnapshot<MyState>> history = app.getStateHistory(runConfig);

414

Optional<StateSnapshot<MyState>> targetCheckpoint = history.stream()

415

.filter(snapshot -> snapshot.getNodeId().equals("target_node"))

416

.findFirst();

417

418

if (targetCheckpoint.isPresent()) {

419

// Create config for resuming from specific checkpoint

420

RunnableConfig rollbackConfig = RunnableConfig.builder()

421

.threadId(runConfig.threadId().get())

422

.checkPointId(targetCheckpoint.get().getCheckpointId())

423

.build();

424

425

// Resume from that point with modified state

426

RunnableConfig modifiedConfig = app.updateState(

427

rollbackConfig,

428

Map.of("rollback_reason", "user_correction", "modified", true)

429

);

430

431

Optional<MyState> newResult = app.invoke(new GraphResume(), modifiedConfig);

432

}

433

```

434

435

### Multi-Session Management

436

437

Handle multiple concurrent execution threads.

438

439

```java { .api }

440

// Create multiple sessions

441

String[] sessionIds = {"session-1", "session-2", "session-3"};

442

443

Map<String, RunnableConfig> sessions = new HashMap<>();

444

for (String sessionId : sessionIds) {

445

sessions.put(sessionId, RunnableConfig.builder()

446

.threadId(sessionId)

447

.build());

448

}

449

450

// Execute sessions concurrently

451

List<CompletableFuture<Optional<MyState>>> futures = sessions.values()

452

.stream()

453

.map(config -> CompletableFuture.supplyAsync(() ->

454

app.invoke(Map.of("session", config.threadId().get()), config)

455

))

456

.toList();

457

458

// Wait for all sessions to complete

459

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

460

461

// Check results for each session

462

for (Map.Entry<String, RunnableConfig> entry : sessions.entrySet()) {

463

StateSnapshot<MyState> finalState = app.getState(entry.getValue());

464

System.out.println("Session " + entry.getKey() + " ended at: " + finalState.getNodeId());

465

}

466

```

467

468

### Checkpoint Validation and Integrity

469

470

Ensure checkpoint consistency and validate state.

471

472

```java { .api }

473

// Custom checkpoint saver with validation

474

class ValidatingCheckpointSaver implements BaseCheckpointSaver {

475

private final BaseCheckpointSaver delegate;

476

477

public ValidatingCheckpointSaver(BaseCheckpointSaver delegate) {

478

this.delegate = delegate;

479

}

480

481

@Override

482

public RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception {

483

// Validate checkpoint before saving

484

validateCheckpoint(checkpoint);

485

return delegate.put(config, checkpoint);

486

}

487

488

private void validateCheckpoint(Checkpoint checkpoint) {

489

if (checkpoint.getNodeId() == null) {

490

throw new IllegalArgumentException("Checkpoint must have node ID");

491

}

492

if (checkpoint.getState() == null || checkpoint.getState().isEmpty()) {

493

throw new IllegalArgumentException("Checkpoint must have state data");

494

}

495

// Additional validation logic...

496

}

497

498

// Delegate other methods...

499

@Override

500

public Collection<Checkpoint> list(RunnableConfig config) {

501

return delegate.list(config);

502

}

503

504

@Override

505

public Optional<Checkpoint> get(RunnableConfig config) {

506

return delegate.get(config);

507

}

508

509

@Override

510

public Tag release(RunnableConfig config) throws Exception {

511

return delegate.release(config);

512

}

513

}

514

```