CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-bsc-langgraph4j--langgraph4j-core

A library for building stateful, multi-agents applications with LLMs

Overview
Eval results
Files

checkpoints.mddocs/

Checkpoints and Persistence

Persistent state management with checkpointing for debugging, resuming execution, implementing human-in-the-loop workflows, and enabling time travel through execution history.

Capabilities

BaseCheckpointSaver Interface

Core interface for persisting graph execution state and enabling resumption.

/**
 * Interface for checkpoint persistence and management
 */
interface BaseCheckpointSaver {
    /**
     * Default thread identifier for single-threaded execution
     */
    String THREAD_ID_DEFAULT = "$default";

    /**
     * Lists all checkpoints for a given thread
     * @param config Runtime configuration containing thread ID
     * @return Collection of checkpoints ordered by recency
     */
    Collection<Checkpoint> list(RunnableConfig config);

    /**
     * Retrieves specific checkpoint for thread
     * @param config Runtime configuration with thread/checkpoint ID
     * @return Optional containing checkpoint if found
     */
    Optional<Checkpoint> get(RunnableConfig config);

    /**
     * Saves checkpoint and returns updated configuration
     * @param config Runtime configuration
     * @param checkpoint Checkpoint to save
     * @return Updated configuration with new checkpoint ID
     * @throws Exception if save operation fails
     */
    RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;

    /**
     * Releases thread resources and returns final state
     * @param config Runtime configuration for thread
     * @return Tag containing thread ID and final checkpoints
     * @throws Exception if release operation fails
     */
    Tag release(RunnableConfig config) throws Exception;

    /**
     * Record containing thread information and checkpoints
     */
    record Tag(String threadId, Collection<Checkpoint> checkpoints) {}
}

Usage Examples:

// Use checkpoint saver in compilation
BaseCheckpointSaver saver = new MemorySaver();
CompileConfig config = CompileConfig.builder()
    .checkpointSaver(saver)
    .build();

CompiledGraph<MyState> app = workflow.compile(config);

// Execute with checkpointing
RunnableConfig runConfig = RunnableConfig.builder()
    .threadId("my-session")
    .build();

Optional<MyState> result = app.invoke(Map.of("input", "data"), runConfig);

// List execution history
Collection<Checkpoint> history = saver.list(runConfig);
System.out.println("Execution had " + history.size() + " checkpoints");

// Get specific checkpoint
Optional<Checkpoint> checkpoint = saver.get(runConfig);
if (checkpoint.isPresent()) {
    System.out.println("Current node: " + checkpoint.get().getNodeId());
}

Checkpoint Class

Immutable snapshot of graph execution state at a specific point.

/**
 * Immutable checkpoint containing execution state
 */
class Checkpoint {
    /**
     * Creates checkpoint builder
     * @return New checkpoint builder instance
     */
    static Builder builder();

    /**
     * Creates copy of existing checkpoint
     * @param checkpoint Checkpoint to copy
     * @return New checkpoint instance
     */
    static Checkpoint copyOf(Checkpoint checkpoint);

    /**
     * Get unique checkpoint identifier
     * @return Checkpoint ID
     */
    String getId();

    /**
     * Get node ID where checkpoint was created
     * @return Node identifier
     */
    String getNodeId();

    /**
     * Get next node to execute after this checkpoint
     * @return Next node identifier
     */
    String getNextNodeId();

    /**
     * Get state data at checkpoint
     * @return State as key-value map
     */
    Map<String, Object> getState();

    /**
     * Creates new checkpoint with updated state
     * @param values State values to update
     * @param channels Channel definitions for merge logic
     * @return New checkpoint with merged state
     */
    Checkpoint updateState(Map<String, Object> values, Map<String, Channel<?>> channels);
}

Usage Examples:

// Create checkpoint manually
Checkpoint checkpoint = Checkpoint.builder()
    .nodeId("current_node")
    .nextNodeId("next_node")
    .state(Map.of("data", "value", "step", 1))
    .build();

// Copy and modify checkpoint
Checkpoint updated = Checkpoint.copyOf(checkpoint)
    .updateState(Map.of("step", 2), Map.of());

// Access checkpoint data
String nodeId = checkpoint.getNodeId();
String nextNode = checkpoint.getNextNodeId();
Map<String, Object> stateData = checkpoint.getState();

System.out.println("At node: " + nodeId + ", going to: " + nextNode);

Memory-Based Checkpoint Savers

In-memory checkpoint storage for development and testing.

/**
 * Simple in-memory checkpoint storage
 */
class MemorySaver implements BaseCheckpointSaver {
    /**
     * Creates new memory-based checkpoint saver
     */
    MemorySaver();

    Collection<Checkpoint> list(RunnableConfig config);
    Optional<Checkpoint> get(RunnableConfig config);
    RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;
    Tag release(RunnableConfig config) throws Exception;
}

/**
 * Versioned memory checkpoint saver with version tracking
 */
class VersionedMemorySaver extends MemorySaver implements HasVersions {
    /**
     * Creates versioned memory saver
     */
    VersionedMemorySaver();

    /**
     * Get version information for checkpoint
     * @param config Runtime configuration
     * @return Optional containing version info
     */
    Optional<String> getVersion(RunnableConfig config);
}

Usage Examples:

// Basic memory saver
MemorySaver memorySaver = new MemorySaver();

// Versioned memory saver
VersionedMemorySaver versionedSaver = new VersionedMemorySaver();

// Use in graph compilation
CompileConfig config = CompileConfig.builder()
    .checkpointSaver(memorySaver)
    .build();

CompiledGraph<MyState> app = workflow.compile(config);

// Execute with automatic checkpointing
RunnableConfig runConfig = RunnableConfig.builder()
    .threadId("memory-session")
    .build();

// Stream execution and see checkpoints being created
app.stream(Map.of("input", "test"), runConfig)
   .forEachAsync(output -> {
       System.out.println("Node: " + output.node());
       // Each output represents a checkpoint
       return CompletableFuture.completedFuture(null);
   });

File System Checkpoint Saver

Persistent file-based checkpoint storage for production use.

/**
 * File system-based checkpoint persistence
 */
class FileSystemSaver implements BaseCheckpointSaver {
    /**
     * Creates file system saver with specified directory
     * @param baseDirectory Directory for checkpoint storage
     */
    FileSystemSaver(Path baseDirectory);

    /**
     * Creates file system saver with state serializer
     * @param baseDirectory Directory for checkpoint storage
     * @param stateSerializer Serializer for state objects
     */
    FileSystemSaver(Path baseDirectory, StateSerializer<?> stateSerializer);

    Collection<Checkpoint> list(RunnableConfig config);
    Optional<Checkpoint> get(RunnableConfig config);
    RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;
    Tag release(RunnableConfig config) throws Exception;
}

Usage Examples:

import java.nio.file.Paths;

// File system saver with default serialization
Path checkpointDir = Paths.get("/app/checkpoints");
FileSystemSaver fileSaver = new FileSystemSaver(checkpointDir);

// File system saver with custom serialization
StateSerializer<MyState> serializer = new JacksonStateSerializer<>(MyState::new);
FileSystemSaver customFileSaver = new FileSystemSaver(checkpointDir, serializer);

// Use for persistent checkpointing
CompileConfig config = CompileConfig.builder()
    .checkpointSaver(fileSaver)
    .build();

CompiledGraph<MyState> app = workflow.compile(config);

// Execution will persist to filesystem
RunnableConfig runConfig = RunnableConfig.builder()
    .threadId("persistent-session")
    .build();

Optional<MyState> result = app.invoke(Map.of("input", "persistent data"), runConfig);

// Later, resume from filesystem checkpoints
Optional<MyState> resumed = app.invoke(new GraphResume(), runConfig);

Checkpoint-Based Graph Operations

Operations that leverage checkpoints for advanced execution control.

// Get execution state and history
StateSnapshot<MyState> currentState = app.getState(runConfig);
Collection<StateSnapshot<MyState>> history = app.getStateHistory(runConfig);

// Update state at checkpoint
RunnableConfig updated = app.updateState(
    runConfig,
    Map.of("user_feedback", "approved", "timestamp", System.currentTimeMillis())
);

// Force execution to specific node
RunnableConfig withNextNode = app.updateState(
    runConfig,
    Map.of("override", true),
    "specific_node_id"
);

Usage Examples:

// Human-in-the-loop workflow
RunnableConfig humanLoopConfig = RunnableConfig.builder()
    .threadId("human-review-session")
    .build();

// Start execution
AsyncGenerator<NodeOutput<MyState>> stream = app.stream(
    Map.of("document", "content to review"),
    humanLoopConfig
);

// Process until human review needed
for (NodeOutput<MyState> output : stream.stream().toList()) {
    if (output instanceof InterruptionMetadata) {
        InterruptionMetadata<MyState> interruption = (InterruptionMetadata<MyState>) output;
        System.out.println("Human review needed at: " + interruption.getNodeId());

        // Pause for human input
        String humanFeedback = getHumanFeedback(); // Your UI logic

        // Update state with human feedback
        RunnableConfig withFeedback = app.updateState(
            humanLoopConfig,
            Map.of("human_feedback", humanFeedback, "reviewed", true)
        );

        // Resume execution
        Optional<MyState> finalResult = app.invoke(new GraphResume(), withFeedback);
        break;
    }
}

Thread Management and Cleanup

Manage execution threads and cleanup resources.

// Configure thread release
CompileConfig configWithRelease = CompileConfig.builder()
    .checkpointSaver(new MemorySaver())
    .releaseThread(true)
    .build();

CompiledGraph<MyState> app = workflow.compile(configWithRelease);

// Execute with automatic thread release
Optional<MyState> result = app.invoke(Map.of("input", "data"), runConfig);

// Manual thread release
BaseCheckpointSaver.Tag released = saver.release(runConfig);
System.out.println("Released thread: " + released.threadId());
System.out.println("Final checkpoints: " + released.checkpoints().size());

Checkpoint Patterns

Debugging and Inspection

Use checkpoints for detailed execution analysis.

// Enable checkpointing for debugging
CompileConfig debugConfig = CompileConfig.builder()
    .checkpointSaver(new MemorySaver())
    .build();

CompiledGraph<MyState> debugApp = workflow.compile(debugConfig);

// Execute with full history
RunnableConfig debugRunConfig = RunnableConfig.builder()
    .threadId("debug-session")
    .build();

Optional<MyState> result = debugApp.invoke(Map.of("input", "debug data"), debugRunConfig);

// Analyze execution history
Collection<StateSnapshot<MyState>> history = debugApp.getStateHistory(debugRunConfig);

System.out.println("Execution Analysis:");
for (StateSnapshot<MyState> snapshot : history) {
    System.out.println("Step: " + snapshot.getNodeId());
    System.out.println("State: " + snapshot.state().data());
    System.out.println("Next: " + snapshot.getNextNodeId());
    System.out.println("---");
}

Time Travel and State Rollback

Navigate through execution history and resume from arbitrary points.

// Get specific checkpoint from history
Collection<StateSnapshot<MyState>> history = app.getStateHistory(runConfig);
Optional<StateSnapshot<MyState>> targetCheckpoint = history.stream()
    .filter(snapshot -> snapshot.getNodeId().equals("target_node"))
    .findFirst();

if (targetCheckpoint.isPresent()) {
    // Create config for resuming from specific checkpoint
    RunnableConfig rollbackConfig = RunnableConfig.builder()
        .threadId(runConfig.threadId().get())
        .checkPointId(targetCheckpoint.get().getCheckpointId())
        .build();

    // Resume from that point with modified state
    RunnableConfig modifiedConfig = app.updateState(
        rollbackConfig,
        Map.of("rollback_reason", "user_correction", "modified", true)
    );

    Optional<MyState> newResult = app.invoke(new GraphResume(), modifiedConfig);
}

Multi-Session Management

Handle multiple concurrent execution threads.

// Create multiple sessions
String[] sessionIds = {"session-1", "session-2", "session-3"};

Map<String, RunnableConfig> sessions = new HashMap<>();
for (String sessionId : sessionIds) {
    sessions.put(sessionId, RunnableConfig.builder()
        .threadId(sessionId)
        .build());
}

// Execute sessions concurrently
List<CompletableFuture<Optional<MyState>>> futures = sessions.values()
    .stream()
    .map(config -> CompletableFuture.supplyAsync(() ->
        app.invoke(Map.of("session", config.threadId().get()), config)
    ))
    .toList();

// Wait for all sessions to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

// Check results for each session
for (Map.Entry<String, RunnableConfig> entry : sessions.entrySet()) {
    StateSnapshot<MyState> finalState = app.getState(entry.getValue());
    System.out.println("Session " + entry.getKey() + " ended at: " + finalState.getNodeId());
}

Checkpoint Validation and Integrity

Ensure checkpoint consistency and validate state.

// Custom checkpoint saver with validation
class ValidatingCheckpointSaver implements BaseCheckpointSaver {
    private final BaseCheckpointSaver delegate;

    public ValidatingCheckpointSaver(BaseCheckpointSaver delegate) {
        this.delegate = delegate;
    }

    @Override
    public RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception {
        // Validate checkpoint before saving
        validateCheckpoint(checkpoint);
        return delegate.put(config, checkpoint);
    }

    private void validateCheckpoint(Checkpoint checkpoint) {
        if (checkpoint.getNodeId() == null) {
            throw new IllegalArgumentException("Checkpoint must have node ID");
        }
        if (checkpoint.getState() == null || checkpoint.getState().isEmpty()) {
            throw new IllegalArgumentException("Checkpoint must have state data");
        }
        // Additional validation logic...
    }

    // Delegate other methods...
    @Override
    public Collection<Checkpoint> list(RunnableConfig config) {
        return delegate.list(config);
    }

    @Override
    public Optional<Checkpoint> get(RunnableConfig config) {
        return delegate.get(config);
    }

    @Override
    public Tag release(RunnableConfig config) throws Exception {
        return delegate.release(config);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-bsc-langgraph4j--langgraph4j-core

docs

actions.md

checkpoints.md

configuration.md

graph-construction.md

graph-execution.md

index.md

prebuilt.md

state-management.md

tile.json