A library for building stateful, multi-agents applications with LLMs
Persistent state management with checkpointing for debugging, resuming execution, implementing human-in-the-loop workflows, and enabling time travel through execution history.
Core interface for persisting graph execution state and enabling resumption.
/**
* Interface for checkpoint persistence and management
*/
interface BaseCheckpointSaver {
/**
* Default thread identifier for single-threaded execution
*/
String THREAD_ID_DEFAULT = "$default";
/**
* Lists all checkpoints for a given thread
* @param config Runtime configuration containing thread ID
* @return Collection of checkpoints ordered by recency
*/
Collection<Checkpoint> list(RunnableConfig config);
/**
* Retrieves specific checkpoint for thread
* @param config Runtime configuration with thread/checkpoint ID
* @return Optional containing checkpoint if found
*/
Optional<Checkpoint> get(RunnableConfig config);
/**
* Saves checkpoint and returns updated configuration
* @param config Runtime configuration
* @param checkpoint Checkpoint to save
* @return Updated configuration with new checkpoint ID
* @throws Exception if save operation fails
*/
RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;
/**
* Releases thread resources and returns final state
* @param config Runtime configuration for thread
* @return Tag containing thread ID and final checkpoints
* @throws Exception if release operation fails
*/
Tag release(RunnableConfig config) throws Exception;
/**
* Record containing thread information and checkpoints
*/
record Tag(String threadId, Collection<Checkpoint> checkpoints) {}
}Usage Examples:
// Use checkpoint saver in compilation
BaseCheckpointSaver saver = new MemorySaver();
CompileConfig config = CompileConfig.builder()
.checkpointSaver(saver)
.build();
CompiledGraph<MyState> app = workflow.compile(config);
// Execute with checkpointing
RunnableConfig runConfig = RunnableConfig.builder()
.threadId("my-session")
.build();
Optional<MyState> result = app.invoke(Map.of("input", "data"), runConfig);
// List execution history
Collection<Checkpoint> history = saver.list(runConfig);
System.out.println("Execution had " + history.size() + " checkpoints");
// Get specific checkpoint
Optional<Checkpoint> checkpoint = saver.get(runConfig);
if (checkpoint.isPresent()) {
System.out.println("Current node: " + checkpoint.get().getNodeId());
}Immutable snapshot of graph execution state at a specific point.
/**
* Immutable checkpoint containing execution state
*/
class Checkpoint {
/**
* Creates checkpoint builder
* @return New checkpoint builder instance
*/
static Builder builder();
/**
* Creates copy of existing checkpoint
* @param checkpoint Checkpoint to copy
* @return New checkpoint instance
*/
static Checkpoint copyOf(Checkpoint checkpoint);
/**
* Get unique checkpoint identifier
* @return Checkpoint ID
*/
String getId();
/**
* Get node ID where checkpoint was created
* @return Node identifier
*/
String getNodeId();
/**
* Get next node to execute after this checkpoint
* @return Next node identifier
*/
String getNextNodeId();
/**
* Get state data at checkpoint
* @return State as key-value map
*/
Map<String, Object> getState();
/**
* Creates new checkpoint with updated state
* @param values State values to update
* @param channels Channel definitions for merge logic
* @return New checkpoint with merged state
*/
Checkpoint updateState(Map<String, Object> values, Map<String, Channel<?>> channels);
}Usage Examples:
// Create checkpoint manually
Checkpoint checkpoint = Checkpoint.builder()
.nodeId("current_node")
.nextNodeId("next_node")
.state(Map.of("data", "value", "step", 1))
.build();
// Copy and modify checkpoint
Checkpoint updated = Checkpoint.copyOf(checkpoint)
.updateState(Map.of("step", 2), Map.of());
// Access checkpoint data
String nodeId = checkpoint.getNodeId();
String nextNode = checkpoint.getNextNodeId();
Map<String, Object> stateData = checkpoint.getState();
System.out.println("At node: " + nodeId + ", going to: " + nextNode);In-memory checkpoint storage for development and testing.
/**
* Simple in-memory checkpoint storage
*/
class MemorySaver implements BaseCheckpointSaver {
/**
* Creates new memory-based checkpoint saver
*/
MemorySaver();
Collection<Checkpoint> list(RunnableConfig config);
Optional<Checkpoint> get(RunnableConfig config);
RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;
Tag release(RunnableConfig config) throws Exception;
}
/**
* Versioned memory checkpoint saver with version tracking
*/
class VersionedMemorySaver extends MemorySaver implements HasVersions {
/**
* Creates versioned memory saver
*/
VersionedMemorySaver();
/**
* Get version information for checkpoint
* @param config Runtime configuration
* @return Optional containing version info
*/
Optional<String> getVersion(RunnableConfig config);
}Usage Examples:
// Basic memory saver
MemorySaver memorySaver = new MemorySaver();
// Versioned memory saver
VersionedMemorySaver versionedSaver = new VersionedMemorySaver();
// Use in graph compilation
CompileConfig config = CompileConfig.builder()
.checkpointSaver(memorySaver)
.build();
CompiledGraph<MyState> app = workflow.compile(config);
// Execute with automatic checkpointing
RunnableConfig runConfig = RunnableConfig.builder()
.threadId("memory-session")
.build();
// Stream execution and see checkpoints being created
app.stream(Map.of("input", "test"), runConfig)
.forEachAsync(output -> {
System.out.println("Node: " + output.node());
// Each output represents a checkpoint
return CompletableFuture.completedFuture(null);
});Persistent file-based checkpoint storage for production use.
/**
* File system-based checkpoint persistence
*/
class FileSystemSaver implements BaseCheckpointSaver {
/**
* Creates file system saver with specified directory
* @param baseDirectory Directory for checkpoint storage
*/
FileSystemSaver(Path baseDirectory);
/**
* Creates file system saver with state serializer
* @param baseDirectory Directory for checkpoint storage
* @param stateSerializer Serializer for state objects
*/
FileSystemSaver(Path baseDirectory, StateSerializer<?> stateSerializer);
Collection<Checkpoint> list(RunnableConfig config);
Optional<Checkpoint> get(RunnableConfig config);
RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;
Tag release(RunnableConfig config) throws Exception;
}Usage Examples:
import java.nio.file.Paths;
// File system saver with default serialization
Path checkpointDir = Paths.get("/app/checkpoints");
FileSystemSaver fileSaver = new FileSystemSaver(checkpointDir);
// File system saver with custom serialization
StateSerializer<MyState> serializer = new JacksonStateSerializer<>(MyState::new);
FileSystemSaver customFileSaver = new FileSystemSaver(checkpointDir, serializer);
// Use for persistent checkpointing
CompileConfig config = CompileConfig.builder()
.checkpointSaver(fileSaver)
.build();
CompiledGraph<MyState> app = workflow.compile(config);
// Execution will persist to filesystem
RunnableConfig runConfig = RunnableConfig.builder()
.threadId("persistent-session")
.build();
Optional<MyState> result = app.invoke(Map.of("input", "persistent data"), runConfig);
// Later, resume from filesystem checkpoints
Optional<MyState> resumed = app.invoke(new GraphResume(), runConfig);Operations that leverage checkpoints for advanced execution control.
// Get execution state and history
StateSnapshot<MyState> currentState = app.getState(runConfig);
Collection<StateSnapshot<MyState>> history = app.getStateHistory(runConfig);
// Update state at checkpoint
RunnableConfig updated = app.updateState(
runConfig,
Map.of("user_feedback", "approved", "timestamp", System.currentTimeMillis())
);
// Force execution to specific node
RunnableConfig withNextNode = app.updateState(
runConfig,
Map.of("override", true),
"specific_node_id"
);Usage Examples:
// Human-in-the-loop workflow
RunnableConfig humanLoopConfig = RunnableConfig.builder()
.threadId("human-review-session")
.build();
// Start execution
AsyncGenerator<NodeOutput<MyState>> stream = app.stream(
Map.of("document", "content to review"),
humanLoopConfig
);
// Process until human review needed
for (NodeOutput<MyState> output : stream.stream().toList()) {
if (output instanceof InterruptionMetadata) {
InterruptionMetadata<MyState> interruption = (InterruptionMetadata<MyState>) output;
System.out.println("Human review needed at: " + interruption.getNodeId());
// Pause for human input
String humanFeedback = getHumanFeedback(); // Your UI logic
// Update state with human feedback
RunnableConfig withFeedback = app.updateState(
humanLoopConfig,
Map.of("human_feedback", humanFeedback, "reviewed", true)
);
// Resume execution
Optional<MyState> finalResult = app.invoke(new GraphResume(), withFeedback);
break;
}
}Manage execution threads and cleanup resources.
// Configure thread release
CompileConfig configWithRelease = CompileConfig.builder()
.checkpointSaver(new MemorySaver())
.releaseThread(true)
.build();
CompiledGraph<MyState> app = workflow.compile(configWithRelease);
// Execute with automatic thread release
Optional<MyState> result = app.invoke(Map.of("input", "data"), runConfig);
// Manual thread release
BaseCheckpointSaver.Tag released = saver.release(runConfig);
System.out.println("Released thread: " + released.threadId());
System.out.println("Final checkpoints: " + released.checkpoints().size());Use checkpoints for detailed execution analysis.
// Enable checkpointing for debugging
CompileConfig debugConfig = CompileConfig.builder()
.checkpointSaver(new MemorySaver())
.build();
CompiledGraph<MyState> debugApp = workflow.compile(debugConfig);
// Execute with full history
RunnableConfig debugRunConfig = RunnableConfig.builder()
.threadId("debug-session")
.build();
Optional<MyState> result = debugApp.invoke(Map.of("input", "debug data"), debugRunConfig);
// Analyze execution history
Collection<StateSnapshot<MyState>> history = debugApp.getStateHistory(debugRunConfig);
System.out.println("Execution Analysis:");
for (StateSnapshot<MyState> snapshot : history) {
System.out.println("Step: " + snapshot.getNodeId());
System.out.println("State: " + snapshot.state().data());
System.out.println("Next: " + snapshot.getNextNodeId());
System.out.println("---");
}Navigate through execution history and resume from arbitrary points.
// Get specific checkpoint from history
Collection<StateSnapshot<MyState>> history = app.getStateHistory(runConfig);
Optional<StateSnapshot<MyState>> targetCheckpoint = history.stream()
.filter(snapshot -> snapshot.getNodeId().equals("target_node"))
.findFirst();
if (targetCheckpoint.isPresent()) {
// Create config for resuming from specific checkpoint
RunnableConfig rollbackConfig = RunnableConfig.builder()
.threadId(runConfig.threadId().get())
.checkPointId(targetCheckpoint.get().getCheckpointId())
.build();
// Resume from that point with modified state
RunnableConfig modifiedConfig = app.updateState(
rollbackConfig,
Map.of("rollback_reason", "user_correction", "modified", true)
);
Optional<MyState> newResult = app.invoke(new GraphResume(), modifiedConfig);
}Handle multiple concurrent execution threads.
// Create multiple sessions
String[] sessionIds = {"session-1", "session-2", "session-3"};
Map<String, RunnableConfig> sessions = new HashMap<>();
for (String sessionId : sessionIds) {
sessions.put(sessionId, RunnableConfig.builder()
.threadId(sessionId)
.build());
}
// Execute sessions concurrently
List<CompletableFuture<Optional<MyState>>> futures = sessions.values()
.stream()
.map(config -> CompletableFuture.supplyAsync(() ->
app.invoke(Map.of("session", config.threadId().get()), config)
))
.toList();
// Wait for all sessions to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
// Check results for each session
for (Map.Entry<String, RunnableConfig> entry : sessions.entrySet()) {
StateSnapshot<MyState> finalState = app.getState(entry.getValue());
System.out.println("Session " + entry.getKey() + " ended at: " + finalState.getNodeId());
}Ensure checkpoint consistency and validate state.
// Custom checkpoint saver with validation
class ValidatingCheckpointSaver implements BaseCheckpointSaver {
private final BaseCheckpointSaver delegate;
public ValidatingCheckpointSaver(BaseCheckpointSaver delegate) {
this.delegate = delegate;
}
@Override
public RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception {
// Validate checkpoint before saving
validateCheckpoint(checkpoint);
return delegate.put(config, checkpoint);
}
private void validateCheckpoint(Checkpoint checkpoint) {
if (checkpoint.getNodeId() == null) {
throw new IllegalArgumentException("Checkpoint must have node ID");
}
if (checkpoint.getState() == null || checkpoint.getState().isEmpty()) {
throw new IllegalArgumentException("Checkpoint must have state data");
}
// Additional validation logic...
}
// Delegate other methods...
@Override
public Collection<Checkpoint> list(RunnableConfig config) {
return delegate.list(config);
}
@Override
public Optional<Checkpoint> get(RunnableConfig config) {
return delegate.get(config);
}
@Override
public Tag release(RunnableConfig config) throws Exception {
return delegate.release(config);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-bsc-langgraph4j--langgraph4j-core