0
# Checkpoints and Persistence
1
2
Persistent state management with checkpointing for debugging, resuming execution, implementing human-in-the-loop workflows, and enabling time travel through execution history.
3
4
## Capabilities
5
6
### BaseCheckpointSaver Interface
7
8
Core interface for persisting graph execution state and enabling resumption.
9
10
```java { .api }
11
/**
12
* Interface for checkpoint persistence and management
13
*/
14
interface BaseCheckpointSaver {
15
/**
16
* Default thread identifier for single-threaded execution
17
*/
18
String THREAD_ID_DEFAULT = "$default";
19
20
/**
21
* Lists all checkpoints for a given thread
22
* @param config Runtime configuration containing thread ID
23
* @return Collection of checkpoints ordered by recency
24
*/
25
Collection<Checkpoint> list(RunnableConfig config);
26
27
/**
28
* Retrieves specific checkpoint for thread
29
* @param config Runtime configuration with thread/checkpoint ID
30
* @return Optional containing checkpoint if found
31
*/
32
Optional<Checkpoint> get(RunnableConfig config);
33
34
/**
35
* Saves checkpoint and returns updated configuration
36
* @param config Runtime configuration
37
* @param checkpoint Checkpoint to save
38
* @return Updated configuration with new checkpoint ID
39
* @throws Exception if save operation fails
40
*/
41
RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;
42
43
/**
44
* Releases thread resources and returns final state
45
* @param config Runtime configuration for thread
46
* @return Tag containing thread ID and final checkpoints
47
* @throws Exception if release operation fails
48
*/
49
Tag release(RunnableConfig config) throws Exception;
50
51
/**
52
* Record containing thread information and checkpoints
53
*/
54
record Tag(String threadId, Collection<Checkpoint> checkpoints) {}
55
}
56
```
57
58
**Usage Examples:**
59
60
```java
61
// Use checkpoint saver in compilation
62
BaseCheckpointSaver saver = new MemorySaver();
63
CompileConfig config = CompileConfig.builder()
64
.checkpointSaver(saver)
65
.build();
66
67
CompiledGraph<MyState> app = workflow.compile(config);
68
69
// Execute with checkpointing
70
RunnableConfig runConfig = RunnableConfig.builder()
71
.threadId("my-session")
72
.build();
73
74
Optional<MyState> result = app.invoke(Map.of("input", "data"), runConfig);
75
76
// List execution history
77
Collection<Checkpoint> history = saver.list(runConfig);
78
System.out.println("Execution had " + history.size() + " checkpoints");
79
80
// Get specific checkpoint
81
Optional<Checkpoint> checkpoint = saver.get(runConfig);
82
if (checkpoint.isPresent()) {
83
System.out.println("Current node: " + checkpoint.get().getNodeId());
84
}
85
```
86
87
### Checkpoint Class
88
89
Immutable snapshot of graph execution state at a specific point.
90
91
```java { .api }
92
/**
93
* Immutable checkpoint containing execution state
94
*/
95
class Checkpoint {
96
/**
97
* Creates checkpoint builder
98
* @return New checkpoint builder instance
99
*/
100
static Builder builder();
101
102
/**
103
* Creates copy of existing checkpoint
104
* @param checkpoint Checkpoint to copy
105
* @return New checkpoint instance
106
*/
107
static Checkpoint copyOf(Checkpoint checkpoint);
108
109
/**
110
* Get unique checkpoint identifier
111
* @return Checkpoint ID
112
*/
113
String getId();
114
115
/**
116
* Get node ID where checkpoint was created
117
* @return Node identifier
118
*/
119
String getNodeId();
120
121
/**
122
* Get next node to execute after this checkpoint
123
* @return Next node identifier
124
*/
125
String getNextNodeId();
126
127
/**
128
* Get state data at checkpoint
129
* @return State as key-value map
130
*/
131
Map<String, Object> getState();
132
133
/**
134
* Creates new checkpoint with updated state
135
* @param values State values to update
136
* @param channels Channel definitions for merge logic
137
* @return New checkpoint with merged state
138
*/
139
Checkpoint updateState(Map<String, Object> values, Map<String, Channel<?>> channels);
140
}
141
```
142
143
**Usage Examples:**
144
145
```java
146
// Create checkpoint manually
147
Checkpoint checkpoint = Checkpoint.builder()
148
.nodeId("current_node")
149
.nextNodeId("next_node")
150
.state(Map.of("data", "value", "step", 1))
151
.build();
152
153
// Copy and modify checkpoint
154
Checkpoint updated = Checkpoint.copyOf(checkpoint)
155
.updateState(Map.of("step", 2), Map.of());
156
157
// Access checkpoint data
158
String nodeId = checkpoint.getNodeId();
159
String nextNode = checkpoint.getNextNodeId();
160
Map<String, Object> stateData = checkpoint.getState();
161
162
System.out.println("At node: " + nodeId + ", going to: " + nextNode);
163
```
164
165
### Memory-Based Checkpoint Savers
166
167
In-memory checkpoint storage for development and testing.
168
169
```java { .api }
170
/**
171
* Simple in-memory checkpoint storage
172
*/
173
class MemorySaver implements BaseCheckpointSaver {
174
/**
175
* Creates new memory-based checkpoint saver
176
*/
177
MemorySaver();
178
179
Collection<Checkpoint> list(RunnableConfig config);
180
Optional<Checkpoint> get(RunnableConfig config);
181
RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;
182
Tag release(RunnableConfig config) throws Exception;
183
}
184
185
/**
186
* Versioned memory checkpoint saver with version tracking
187
*/
188
class VersionedMemorySaver extends MemorySaver implements HasVersions {
189
/**
190
* Creates versioned memory saver
191
*/
192
VersionedMemorySaver();
193
194
/**
195
* Get version information for checkpoint
196
* @param config Runtime configuration
197
* @return Optional containing version info
198
*/
199
Optional<String> getVersion(RunnableConfig config);
200
}
201
```
202
203
**Usage Examples:**
204
205
```java
206
// Basic memory saver
207
MemorySaver memorySaver = new MemorySaver();
208
209
// Versioned memory saver
210
VersionedMemorySaver versionedSaver = new VersionedMemorySaver();
211
212
// Use in graph compilation
213
CompileConfig config = CompileConfig.builder()
214
.checkpointSaver(memorySaver)
215
.build();
216
217
CompiledGraph<MyState> app = workflow.compile(config);
218
219
// Execute with automatic checkpointing
220
RunnableConfig runConfig = RunnableConfig.builder()
221
.threadId("memory-session")
222
.build();
223
224
// Stream execution and see checkpoints being created
225
app.stream(Map.of("input", "test"), runConfig)
226
.forEachAsync(output -> {
227
System.out.println("Node: " + output.node());
228
// Each output represents a checkpoint
229
return CompletableFuture.completedFuture(null);
230
});
231
```
232
233
### File System Checkpoint Saver
234
235
Persistent file-based checkpoint storage for production use.
236
237
```java { .api }
238
/**
239
* File system-based checkpoint persistence
240
*/
241
class FileSystemSaver implements BaseCheckpointSaver {
242
/**
243
* Creates file system saver with specified directory
244
* @param baseDirectory Directory for checkpoint storage
245
*/
246
FileSystemSaver(Path baseDirectory);
247
248
/**
249
* Creates file system saver with state serializer
250
* @param baseDirectory Directory for checkpoint storage
251
* @param stateSerializer Serializer for state objects
252
*/
253
FileSystemSaver(Path baseDirectory, StateSerializer<?> stateSerializer);
254
255
Collection<Checkpoint> list(RunnableConfig config);
256
Optional<Checkpoint> get(RunnableConfig config);
257
RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;
258
Tag release(RunnableConfig config) throws Exception;
259
}
260
```
261
262
**Usage Examples:**
263
264
```java
265
import java.nio.file.Paths;
266
267
// File system saver with default serialization
268
Path checkpointDir = Paths.get("/app/checkpoints");
269
FileSystemSaver fileSaver = new FileSystemSaver(checkpointDir);
270
271
// File system saver with custom serialization
272
StateSerializer<MyState> serializer = new JacksonStateSerializer<>(MyState::new);
273
FileSystemSaver customFileSaver = new FileSystemSaver(checkpointDir, serializer);
274
275
// Use for persistent checkpointing
276
CompileConfig config = CompileConfig.builder()
277
.checkpointSaver(fileSaver)
278
.build();
279
280
CompiledGraph<MyState> app = workflow.compile(config);
281
282
// Execution will persist to filesystem
283
RunnableConfig runConfig = RunnableConfig.builder()
284
.threadId("persistent-session")
285
.build();
286
287
Optional<MyState> result = app.invoke(Map.of("input", "persistent data"), runConfig);
288
289
// Later, resume from filesystem checkpoints
290
Optional<MyState> resumed = app.invoke(new GraphResume(), runConfig);
291
```
292
293
### Checkpoint-Based Graph Operations
294
295
Operations that leverage checkpoints for advanced execution control.
296
297
```java { .api }
298
// Get execution state and history
299
StateSnapshot<MyState> currentState = app.getState(runConfig);
300
Collection<StateSnapshot<MyState>> history = app.getStateHistory(runConfig);
301
302
// Update state at checkpoint
303
RunnableConfig updated = app.updateState(
304
runConfig,
305
Map.of("user_feedback", "approved", "timestamp", System.currentTimeMillis())
306
);
307
308
// Force execution to specific node
309
RunnableConfig withNextNode = app.updateState(
310
runConfig,
311
Map.of("override", true),
312
"specific_node_id"
313
);
314
```
315
316
**Usage Examples:**
317
318
```java
319
// Human-in-the-loop workflow
320
RunnableConfig humanLoopConfig = RunnableConfig.builder()
321
.threadId("human-review-session")
322
.build();
323
324
// Start execution
325
AsyncGenerator<NodeOutput<MyState>> stream = app.stream(
326
Map.of("document", "content to review"),
327
humanLoopConfig
328
);
329
330
// Process until human review needed
331
for (NodeOutput<MyState> output : stream.stream().toList()) {
332
if (output instanceof InterruptionMetadata) {
333
InterruptionMetadata<MyState> interruption = (InterruptionMetadata<MyState>) output;
334
System.out.println("Human review needed at: " + interruption.getNodeId());
335
336
// Pause for human input
337
String humanFeedback = getHumanFeedback(); // Your UI logic
338
339
// Update state with human feedback
340
RunnableConfig withFeedback = app.updateState(
341
humanLoopConfig,
342
Map.of("human_feedback", humanFeedback, "reviewed", true)
343
);
344
345
// Resume execution
346
Optional<MyState> finalResult = app.invoke(new GraphResume(), withFeedback);
347
break;
348
}
349
}
350
```
351
352
### Thread Management and Cleanup
353
354
Manage execution threads and cleanup resources.
355
356
```java { .api }
357
// Configure thread release
358
CompileConfig configWithRelease = CompileConfig.builder()
359
.checkpointSaver(new MemorySaver())
360
.releaseThread(true)
361
.build();
362
363
CompiledGraph<MyState> app = workflow.compile(configWithRelease);
364
365
// Execute with automatic thread release
366
Optional<MyState> result = app.invoke(Map.of("input", "data"), runConfig);
367
368
// Manual thread release
369
BaseCheckpointSaver.Tag released = saver.release(runConfig);
370
System.out.println("Released thread: " + released.threadId());
371
System.out.println("Final checkpoints: " + released.checkpoints().size());
372
```
373
374
## Checkpoint Patterns
375
376
### Debugging and Inspection
377
378
Use checkpoints for detailed execution analysis.
379
380
```java { .api }
381
// Enable checkpointing for debugging
382
CompileConfig debugConfig = CompileConfig.builder()
383
.checkpointSaver(new MemorySaver())
384
.build();
385
386
CompiledGraph<MyState> debugApp = workflow.compile(debugConfig);
387
388
// Execute with full history
389
RunnableConfig debugRunConfig = RunnableConfig.builder()
390
.threadId("debug-session")
391
.build();
392
393
Optional<MyState> result = debugApp.invoke(Map.of("input", "debug data"), debugRunConfig);
394
395
// Analyze execution history
396
Collection<StateSnapshot<MyState>> history = debugApp.getStateHistory(debugRunConfig);
397
398
System.out.println("Execution Analysis:");
399
for (StateSnapshot<MyState> snapshot : history) {
400
System.out.println("Step: " + snapshot.getNodeId());
401
System.out.println("State: " + snapshot.state().data());
402
System.out.println("Next: " + snapshot.getNextNodeId());
403
System.out.println("---");
404
}
405
```
406
407
### Time Travel and State Rollback
408
409
Navigate through execution history and resume from arbitrary points.
410
411
```java { .api }
412
// Get specific checkpoint from history
413
Collection<StateSnapshot<MyState>> history = app.getStateHistory(runConfig);
414
Optional<StateSnapshot<MyState>> targetCheckpoint = history.stream()
415
.filter(snapshot -> snapshot.getNodeId().equals("target_node"))
416
.findFirst();
417
418
if (targetCheckpoint.isPresent()) {
419
// Create config for resuming from specific checkpoint
420
RunnableConfig rollbackConfig = RunnableConfig.builder()
421
.threadId(runConfig.threadId().get())
422
.checkPointId(targetCheckpoint.get().getCheckpointId())
423
.build();
424
425
// Resume from that point with modified state
426
RunnableConfig modifiedConfig = app.updateState(
427
rollbackConfig,
428
Map.of("rollback_reason", "user_correction", "modified", true)
429
);
430
431
Optional<MyState> newResult = app.invoke(new GraphResume(), modifiedConfig);
432
}
433
```
434
435
### Multi-Session Management
436
437
Handle multiple concurrent execution threads.
438
439
```java { .api }
440
// Create multiple sessions
441
String[] sessionIds = {"session-1", "session-2", "session-3"};
442
443
Map<String, RunnableConfig> sessions = new HashMap<>();
444
for (String sessionId : sessionIds) {
445
sessions.put(sessionId, RunnableConfig.builder()
446
.threadId(sessionId)
447
.build());
448
}
449
450
// Execute sessions concurrently
451
List<CompletableFuture<Optional<MyState>>> futures = sessions.values()
452
.stream()
453
.map(config -> CompletableFuture.supplyAsync(() ->
454
app.invoke(Map.of("session", config.threadId().get()), config)
455
))
456
.toList();
457
458
// Wait for all sessions to complete
459
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
460
461
// Check results for each session
462
for (Map.Entry<String, RunnableConfig> entry : sessions.entrySet()) {
463
StateSnapshot<MyState> finalState = app.getState(entry.getValue());
464
System.out.println("Session " + entry.getKey() + " ended at: " + finalState.getNodeId());
465
}
466
```
467
468
### Checkpoint Validation and Integrity
469
470
Ensure checkpoint consistency and validate state.
471
472
```java { .api }
473
// Custom checkpoint saver with validation
474
class ValidatingCheckpointSaver implements BaseCheckpointSaver {
475
private final BaseCheckpointSaver delegate;
476
477
public ValidatingCheckpointSaver(BaseCheckpointSaver delegate) {
478
this.delegate = delegate;
479
}
480
481
@Override
482
public RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception {
483
// Validate checkpoint before saving
484
validateCheckpoint(checkpoint);
485
return delegate.put(config, checkpoint);
486
}
487
488
private void validateCheckpoint(Checkpoint checkpoint) {
489
if (checkpoint.getNodeId() == null) {
490
throw new IllegalArgumentException("Checkpoint must have node ID");
491
}
492
if (checkpoint.getState() == null || checkpoint.getState().isEmpty()) {
493
throw new IllegalArgumentException("Checkpoint must have state data");
494
}
495
// Additional validation logic...
496
}
497
498
// Delegate other methods...
499
@Override
500
public Collection<Checkpoint> list(RunnableConfig config) {
501
return delegate.list(config);
502
}
503
504
@Override
505
public Optional<Checkpoint> get(RunnableConfig config) {
506
return delegate.get(config);
507
}
508
509
@Override
510
public Tag release(RunnableConfig config) throws Exception {
511
return delegate.release(config);
512
}
513
}
514
```