CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-bsc-langgraph4j--langgraph4j-core

A library for building stateful, multi-agents applications with LLMs

Overview
Eval results
Files

actions.mddocs/

Actions and Node Logic

Define node behavior with synchronous and asynchronous actions. Support for configuration-aware actions, routing logic, and interruption handling.

Capabilities

Basic Node Actions

Core action interfaces for defining node behavior.

/**
 * Synchronous node action interface
 * @param <T> Agent state type
 */
@FunctionalInterface
interface NodeAction<T extends AgentState> {
    /**
     * Executes synchronous action on agent state
     * @param state Current agent state
     * @return Map of state updates to apply
     * @throws Exception if action execution fails
     */
    Map<String, Object> apply(T state) throws Exception;
}

/**
 * Asynchronous node action interface
 * @param <S> Agent state type
 */
@FunctionalInterface
interface AsyncNodeAction<S extends AgentState> extends Function<S, CompletableFuture<Map<String, Object>>> {
    /**
     * Executes asynchronous action on agent state
     * @param state Current agent state
     * @return CompletableFuture with state updates
     */
    CompletableFuture<Map<String, Object>> apply(S state);

    /**
     * Converts synchronous action to asynchronous
     * @param syncAction Synchronous node action
     * @return Asynchronous wrapper around sync action
     */
    static <S extends AgentState> AsyncNodeAction<S> node_async(NodeAction<S> syncAction);
}

Usage Examples:

// Simple synchronous action
NodeAction<MyState> syncAction = (state) -> {
    String input = state.<String>value("input").orElse("");
    String processed = input.toUpperCase();
    return Map.of("output", processed, "processed", true);
};

// Asynchronous action with external API call
AsyncNodeAction<MyState> asyncAction = (state) -> {
    String query = state.<String>value("query").orElse("");

    return CompletableFuture.supplyAsync(() -> {
        // Simulate API call
        try {
            Thread.sleep(1000);
            String result = "API response for: " + query;
            return Map.of("api_result", result, "timestamp", System.currentTimeMillis());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
};

// Convert sync to async
AsyncNodeAction<MyState> wrappedSync = AsyncNodeAction.node_async(syncAction);

// Add to graph
workflow.addNode("sync_processor", syncAction);
workflow.addNode("async_processor", asyncAction);
workflow.addNode("wrapped_processor", wrappedSync);

Configuration-Aware Actions

Actions that access runtime configuration for context-aware behavior.

/**
 * Asynchronous node action with configuration access
 * @param <S> Agent state type
 */
@FunctionalInterface
interface AsyncNodeActionWithConfig<S extends AgentState> {
    /**
     * Executes action with access to runtime configuration
     * @param state Current agent state
     * @param config Runtime configuration
     * @return CompletableFuture with state updates
     * @throws Exception if action execution fails
     */
    CompletableFuture<Map<String, Object>> apply(S state, RunnableConfig config) throws Exception;

    /**
     * Creates config-aware action from simple async action
     * @param action Simple async action
     * @return Config-aware action wrapper
     */
    static <S extends AgentState> AsyncNodeActionWithConfig<S> of(AsyncNodeAction<S> action);
}

Usage Examples:

// Configuration-aware action
AsyncNodeActionWithConfig<MyState> configAwareAction = (state, config) -> {
    String threadId = config.threadId().orElse("default");
    boolean isStudio = config.isRunningInStudio();

    Map<String, Object> updates = new HashMap<>();
    updates.put("thread_id", threadId);
    updates.put("is_studio", isStudio);

    // Access metadata
    Object customValue = config.metadata("custom_key").orElse("default");
    updates.put("custom_value", customValue);

    return CompletableFuture.completedFuture(updates);
};

// Use thread-specific processing
AsyncNodeActionWithConfig<MyState> threadSpecificAction = (state, config) -> {
    String threadId = config.threadId().orElse("anonymous");

    return CompletableFuture.supplyAsync(() -> {
        // Process based on thread context
        String result = "Processed by thread: " + threadId;
        return Map.of("thread_result", result);
    });
};

workflow.addNode("config_aware", configAwareAction);
workflow.addNode("thread_specific", threadSpecificAction);

Command Actions for Routing

Actions that return routing commands for conditional flow control.

/**
 * Synchronous command action for routing decisions
 * @param <S> Agent state type
 */
@FunctionalInterface
interface CommandAction<S extends AgentState> {
    /**
     * Executes action and returns routing command
     * @param state Current agent state
     * @return Command indicating next node and state updates
     * @throws Exception if action execution fails
     */
    Command apply(S state) throws Exception;
}

/**
 * Asynchronous command action for routing decisions
 * @param <S> Agent state type
 */
@FunctionalInterface
interface AsyncCommandAction<S extends AgentState> {
    /**
     * Executes action and returns routing command asynchronously
     * @param state Current agent state
     * @param config Runtime configuration
     * @return CompletableFuture with routing command
     * @throws Exception if action execution fails
     */
    CompletableFuture<Command> apply(S state, RunnableConfig config) throws Exception;

    /**
     * Creates async command action from edge action
     * @param edgeAction Edge action to wrap
     * @return Async command action
     */
    static <S extends AgentState> AsyncCommandAction<S> of(AsyncEdgeAction<S> edgeAction);
}

/**
 * Command containing routing decision and state updates
 */
class Command {
    /**
     * Get target node ID for routing
     * @return Node ID to route to
     */
    String gotoNode();

    /**
     * Get state updates to apply
     * @return Map of state updates
     */
    Map<String, Object> update();
}

Usage Examples:

// Simple routing based on state value
AsyncCommandAction<MyState> routingAction = (state, config) -> {
    int score = state.<Integer>value("score").orElse(0);

    String route;
    Map<String, Object> updates = new HashMap<>();

    if (score > 80) {
        route = "high_score";
        updates.put("grade", "A");
    } else if (score > 60) {
        route = "medium_score";
        updates.put("grade", "B");
    } else {
        route = "low_score";
        updates.put("grade", "C");
    }

    Command command = new Command(route, updates);
    return CompletableFuture.completedFuture(command);
};

// Complex routing with external validation
AsyncCommandAction<MyState> validationAction = (state, config) -> {
    String data = state.<String>value("user_input").orElse("");

    return CompletableFuture.supplyAsync(() -> {
        // Simulate validation logic
        boolean isValid = data.length() > 5 && data.matches("^[a-zA-Z0-9]+$");
        boolean needsReview = data.contains("sensitive");

        String route;
        Map<String, Object> updates = Map.of("validated_at", System.currentTimeMillis());

        if (!isValid) {
            route = "validation_failed";
        } else if (needsReview) {
            route = "needs_review";
        } else {
            route = "validation_passed";
        }

        return new Command(route, updates);
    });
};

// Add conditional nodes with routing
workflow.addNode("router", routingAction, Map.of(
    "high_score", "high_score_handler",
    "medium_score", "medium_score_handler",
    "low_score", "low_score_handler"
));

workflow.addNode("validator", validationAction, Map.of(
    "validation_passed", "process_data",
    "validation_failed", "show_error",
    "needs_review", "human_review"
));

Edge Actions

Actions specifically for edge condition evaluation.

/**
 * Synchronous edge action for routing decisions
 * @param <S> Agent state type
 */
@FunctionalInterface
interface EdgeAction<S extends AgentState> {
    /**
     * Evaluates edge condition and returns target node
     * @param state Current agent state
     * @return Target node identifier
     * @throws Exception if evaluation fails
     */
    String apply(S state) throws Exception;
}

/**
 * Asynchronous edge action for routing decisions
 * @param <S> Agent state type
 */
@FunctionalInterface
interface AsyncEdgeAction<S extends AgentState> {
    /**
     * Evaluates edge condition asynchronously
     * @param state Current agent state
     * @param config Runtime configuration
     * @return CompletableFuture with target node identifier
     * @throws Exception if evaluation fails
     */
    CompletableFuture<String> apply(S state, RunnableConfig config) throws Exception;
}

Usage Examples:

// Simple edge condition
AsyncEdgeAction<MyState> simpleEdge = (state, config) -> {
    boolean ready = state.<Boolean>value("ready").orElse(false);
    String target = ready ? "process" : "wait";
    return CompletableFuture.completedFuture(target);
};

// Complex edge with external check
AsyncEdgeAction<MyState> externalEdge = (state, config) -> {
    String userId = state.<String>value("user_id").orElse("");

    return CompletableFuture.supplyAsync(() -> {
        // Simulate external service check
        boolean hasPermission = userId.startsWith("admin_");
        return hasPermission ? "admin_flow" : "user_flow";
    });
};

// Use in conditional edges
workflow.addConditionalEdges("decision_point", simpleEdge, Map.of(
    "process", "data_processor",
    "wait", "wait_node"
));

workflow.addConditionalEdges("permission_check", externalEdge, Map.of(
    "admin_flow", "admin_dashboard",
    "user_flow", "user_dashboard"
));

Interruption Handling

Actions that can be interrupted for human-in-the-loop workflows.

/**
 * Action that can be interrupted during execution
 * @param <State> Agent state type
 */
interface InterruptableAction<State extends AgentState> {
    /**
     * Check if action should be interrupted
     * @param nodeId Current node identifier
     * @param state Current state
     * @return Optional interruption metadata if should interrupt
     */
    Optional<InterruptionMetadata<State>> interrupt(String nodeId, State state);
}

/**
 * Metadata for interruption events
 * @param <State> Agent state type
 */
class InterruptionMetadata<State extends AgentState> {
    /**
     * Creates interruption metadata builder
     * @param nodeId Node being interrupted
     * @param state Current state
     * @return Builder for interruption metadata
     */
    static <State extends AgentState> Builder<State> builder(String nodeId, State state);

    String getNodeId();
    State getState();
    Optional<String> getReason();
    Map<String, Object> getContext();
}

Usage Examples:

// Action with interruption logic
AsyncNodeActionWithConfig<MyState> interruptibleAction = new AsyncNodeActionWithConfig<MyState>() {
    @Override
    public CompletableFuture<Map<String, Object>> apply(MyState state, RunnableConfig config) throws Exception {
        String data = state.<String>value("data").orElse("");

        return CompletableFuture.supplyAsync(() -> {
            // Process data
            String result = processData(data);
            return Map.of("result", result, "processed", true);
        });
    }

    // Implement interruption check if needed
    public Optional<InterruptionMetadata<MyState>> interrupt(String nodeId, MyState state) {
        // Check if human review is needed
        boolean needsReview = state.<Boolean>value("needs_human_review").orElse(false);

        if (needsReview) {
            return Optional.of(
                InterruptionMetadata.<MyState>builder(nodeId, state)
                    .reason("Human review required")
                    .context(Map.of("review_type", "data_validation"))
                    .build()
            );
        }

        return Optional.empty();
    }

    private String processData(String data) {
        // Simulate data processing
        return "Processed: " + data;
    }
};

// Use with interruption configuration
CompileConfig config = CompileConfig.builder()
    .checkpointSaver(new MemorySaver())
    .interruptAfter("review_node")
    .build();

CompiledGraph<MyState> app = workflow.compile(config);

Parallel Node Execution

Special handling for nodes that execute multiple actions in parallel.

// Parallel execution is handled automatically when multiple edges
// target the same node from a single source

// Example: Create parallel branches
workflow.addNode("parallel_task_1", asyncAction1);
workflow.addNode("parallel_task_2", asyncAction2);
workflow.addNode("parallel_task_3", asyncAction3);
workflow.addNode("join_node", joinAction);

// These edges create parallel execution
workflow.addEdge("start", "parallel_task_1");
workflow.addEdge("start", "parallel_task_2");
workflow.addEdge("start", "parallel_task_3");

// Join results
workflow.addEdge("parallel_task_1", "join_node");
workflow.addEdge("parallel_task_2", "join_node");
workflow.addEdge("parallel_task_3", "join_node");

// Add custom executor for parallel node control
RunnableConfig configWithExecutor = RunnableConfig.builder()
    .addParallelNodeExecutor("start", Executors.newFixedThreadPool(3))
    .build();

Action Patterns

Stateful Actions

Actions that maintain internal state across invocations.

class CounterAction implements AsyncNodeAction<MyState> {
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public CompletableFuture<Map<String, Object>> apply(MyState state) {
        int currentCount = counter.incrementAndGet();
        return CompletableFuture.completedFuture(
            Map.of("action_count", currentCount)
        );
    }
}

// Use stateful action
workflow.addNode("counter", new CounterAction());

Error Handling Actions

Actions with comprehensive error handling and recovery.

AsyncNodeAction<MyState> resilientAction = (state) -> {
    return CompletableFuture.supplyAsync(() -> {
        try {
            // Risky operation
            String result = performRiskyOperation(state);
            return Map.of("result", result, "success", true);

        } catch (Exception e) {
            // Log error and return error state
            return Map.of(
                "error", e.getMessage(),
                "success", false,
                "retry_count", state.<Integer>value("retry_count").orElse(0) + 1
            );
        }
    });
};

private String performRiskyOperation(MyState state) throws Exception {
    // Simulate operation that might fail
    if (Math.random() < 0.3) {
        throw new RuntimeException("Random failure");
    }
    return "Success";
}

Composite Actions

Actions that combine multiple operations.

AsyncNodeAction<MyState> compositeAction = (state) -> {
    // Chain multiple async operations
    return validateInput(state)
        .thenCompose(this::processData)
        .thenCompose(this::saveResults)
        .thenApply(result -> Map.of("final_result", result));
};

private CompletableFuture<String> validateInput(MyState state) {
    String input = state.<String>value("input").orElse("");
    return CompletableFuture.completedFuture(input);
}

private CompletableFuture<String> processData(String input) {
    return CompletableFuture.supplyAsync(() -> "Processed: " + input);
}

private CompletableFuture<String> saveResults(String processed) {
    return CompletableFuture.supplyAsync(() -> {
        // Simulate save operation
        return "Saved: " + processed;
    });
}

Install with Tessl CLI

npx tessl i tessl/maven-org-bsc-langgraph4j--langgraph4j-core

docs

actions.md

checkpoints.md

configuration.md

graph-construction.md

graph-execution.md

index.md

prebuilt.md

state-management.md

tile.json