A library for building stateful, multi-agents applications with LLMs
Define node behavior with synchronous and asynchronous actions. Support for configuration-aware actions, routing logic, and interruption handling.
Core action interfaces for defining node behavior.
/**
* Synchronous node action interface
* @param <T> Agent state type
*/
@FunctionalInterface
interface NodeAction<T extends AgentState> {
/**
* Executes synchronous action on agent state
* @param state Current agent state
* @return Map of state updates to apply
* @throws Exception if action execution fails
*/
Map<String, Object> apply(T state) throws Exception;
}
/**
* Asynchronous node action interface
* @param <S> Agent state type
*/
@FunctionalInterface
interface AsyncNodeAction<S extends AgentState> extends Function<S, CompletableFuture<Map<String, Object>>> {
/**
* Executes asynchronous action on agent state
* @param state Current agent state
* @return CompletableFuture with state updates
*/
CompletableFuture<Map<String, Object>> apply(S state);
/**
* Converts synchronous action to asynchronous
* @param syncAction Synchronous node action
* @return Asynchronous wrapper around sync action
*/
static <S extends AgentState> AsyncNodeAction<S> node_async(NodeAction<S> syncAction);
}Usage Examples:
// Simple synchronous action
NodeAction<MyState> syncAction = (state) -> {
String input = state.<String>value("input").orElse("");
String processed = input.toUpperCase();
return Map.of("output", processed, "processed", true);
};
// Asynchronous action with external API call
AsyncNodeAction<MyState> asyncAction = (state) -> {
String query = state.<String>value("query").orElse("");
return CompletableFuture.supplyAsync(() -> {
// Simulate API call
try {
Thread.sleep(1000);
String result = "API response for: " + query;
return Map.of("api_result", result, "timestamp", System.currentTimeMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
};
// Convert sync to async
AsyncNodeAction<MyState> wrappedSync = AsyncNodeAction.node_async(syncAction);
// Add to graph
workflow.addNode("sync_processor", syncAction);
workflow.addNode("async_processor", asyncAction);
workflow.addNode("wrapped_processor", wrappedSync);Actions that access runtime configuration for context-aware behavior.
/**
* Asynchronous node action with configuration access
* @param <S> Agent state type
*/
@FunctionalInterface
interface AsyncNodeActionWithConfig<S extends AgentState> {
/**
* Executes action with access to runtime configuration
* @param state Current agent state
* @param config Runtime configuration
* @return CompletableFuture with state updates
* @throws Exception if action execution fails
*/
CompletableFuture<Map<String, Object>> apply(S state, RunnableConfig config) throws Exception;
/**
* Creates config-aware action from simple async action
* @param action Simple async action
* @return Config-aware action wrapper
*/
static <S extends AgentState> AsyncNodeActionWithConfig<S> of(AsyncNodeAction<S> action);
}Usage Examples:
// Configuration-aware action
AsyncNodeActionWithConfig<MyState> configAwareAction = (state, config) -> {
String threadId = config.threadId().orElse("default");
boolean isStudio = config.isRunningInStudio();
Map<String, Object> updates = new HashMap<>();
updates.put("thread_id", threadId);
updates.put("is_studio", isStudio);
// Access metadata
Object customValue = config.metadata("custom_key").orElse("default");
updates.put("custom_value", customValue);
return CompletableFuture.completedFuture(updates);
};
// Use thread-specific processing
AsyncNodeActionWithConfig<MyState> threadSpecificAction = (state, config) -> {
String threadId = config.threadId().orElse("anonymous");
return CompletableFuture.supplyAsync(() -> {
// Process based on thread context
String result = "Processed by thread: " + threadId;
return Map.of("thread_result", result);
});
};
workflow.addNode("config_aware", configAwareAction);
workflow.addNode("thread_specific", threadSpecificAction);Actions that return routing commands for conditional flow control.
/**
* Synchronous command action for routing decisions
* @param <S> Agent state type
*/
@FunctionalInterface
interface CommandAction<S extends AgentState> {
/**
* Executes action and returns routing command
* @param state Current agent state
* @return Command indicating next node and state updates
* @throws Exception if action execution fails
*/
Command apply(S state) throws Exception;
}
/**
* Asynchronous command action for routing decisions
* @param <S> Agent state type
*/
@FunctionalInterface
interface AsyncCommandAction<S extends AgentState> {
/**
* Executes action and returns routing command asynchronously
* @param state Current agent state
* @param config Runtime configuration
* @return CompletableFuture with routing command
* @throws Exception if action execution fails
*/
CompletableFuture<Command> apply(S state, RunnableConfig config) throws Exception;
/**
* Creates async command action from edge action
* @param edgeAction Edge action to wrap
* @return Async command action
*/
static <S extends AgentState> AsyncCommandAction<S> of(AsyncEdgeAction<S> edgeAction);
}
/**
* Command containing routing decision and state updates
*/
class Command {
/**
* Get target node ID for routing
* @return Node ID to route to
*/
String gotoNode();
/**
* Get state updates to apply
* @return Map of state updates
*/
Map<String, Object> update();
}Usage Examples:
// Simple routing based on state value
AsyncCommandAction<MyState> routingAction = (state, config) -> {
int score = state.<Integer>value("score").orElse(0);
String route;
Map<String, Object> updates = new HashMap<>();
if (score > 80) {
route = "high_score";
updates.put("grade", "A");
} else if (score > 60) {
route = "medium_score";
updates.put("grade", "B");
} else {
route = "low_score";
updates.put("grade", "C");
}
Command command = new Command(route, updates);
return CompletableFuture.completedFuture(command);
};
// Complex routing with external validation
AsyncCommandAction<MyState> validationAction = (state, config) -> {
String data = state.<String>value("user_input").orElse("");
return CompletableFuture.supplyAsync(() -> {
// Simulate validation logic
boolean isValid = data.length() > 5 && data.matches("^[a-zA-Z0-9]+$");
boolean needsReview = data.contains("sensitive");
String route;
Map<String, Object> updates = Map.of("validated_at", System.currentTimeMillis());
if (!isValid) {
route = "validation_failed";
} else if (needsReview) {
route = "needs_review";
} else {
route = "validation_passed";
}
return new Command(route, updates);
});
};
// Add conditional nodes with routing
workflow.addNode("router", routingAction, Map.of(
"high_score", "high_score_handler",
"medium_score", "medium_score_handler",
"low_score", "low_score_handler"
));
workflow.addNode("validator", validationAction, Map.of(
"validation_passed", "process_data",
"validation_failed", "show_error",
"needs_review", "human_review"
));Actions specifically for edge condition evaluation.
/**
* Synchronous edge action for routing decisions
* @param <S> Agent state type
*/
@FunctionalInterface
interface EdgeAction<S extends AgentState> {
/**
* Evaluates edge condition and returns target node
* @param state Current agent state
* @return Target node identifier
* @throws Exception if evaluation fails
*/
String apply(S state) throws Exception;
}
/**
* Asynchronous edge action for routing decisions
* @param <S> Agent state type
*/
@FunctionalInterface
interface AsyncEdgeAction<S extends AgentState> {
/**
* Evaluates edge condition asynchronously
* @param state Current agent state
* @param config Runtime configuration
* @return CompletableFuture with target node identifier
* @throws Exception if evaluation fails
*/
CompletableFuture<String> apply(S state, RunnableConfig config) throws Exception;
}Usage Examples:
// Simple edge condition
AsyncEdgeAction<MyState> simpleEdge = (state, config) -> {
boolean ready = state.<Boolean>value("ready").orElse(false);
String target = ready ? "process" : "wait";
return CompletableFuture.completedFuture(target);
};
// Complex edge with external check
AsyncEdgeAction<MyState> externalEdge = (state, config) -> {
String userId = state.<String>value("user_id").orElse("");
return CompletableFuture.supplyAsync(() -> {
// Simulate external service check
boolean hasPermission = userId.startsWith("admin_");
return hasPermission ? "admin_flow" : "user_flow";
});
};
// Use in conditional edges
workflow.addConditionalEdges("decision_point", simpleEdge, Map.of(
"process", "data_processor",
"wait", "wait_node"
));
workflow.addConditionalEdges("permission_check", externalEdge, Map.of(
"admin_flow", "admin_dashboard",
"user_flow", "user_dashboard"
));Actions that can be interrupted for human-in-the-loop workflows.
/**
* Action that can be interrupted during execution
* @param <State> Agent state type
*/
interface InterruptableAction<State extends AgentState> {
/**
* Check if action should be interrupted
* @param nodeId Current node identifier
* @param state Current state
* @return Optional interruption metadata if should interrupt
*/
Optional<InterruptionMetadata<State>> interrupt(String nodeId, State state);
}
/**
* Metadata for interruption events
* @param <State> Agent state type
*/
class InterruptionMetadata<State extends AgentState> {
/**
* Creates interruption metadata builder
* @param nodeId Node being interrupted
* @param state Current state
* @return Builder for interruption metadata
*/
static <State extends AgentState> Builder<State> builder(String nodeId, State state);
String getNodeId();
State getState();
Optional<String> getReason();
Map<String, Object> getContext();
}Usage Examples:
// Action with interruption logic
AsyncNodeActionWithConfig<MyState> interruptibleAction = new AsyncNodeActionWithConfig<MyState>() {
@Override
public CompletableFuture<Map<String, Object>> apply(MyState state, RunnableConfig config) throws Exception {
String data = state.<String>value("data").orElse("");
return CompletableFuture.supplyAsync(() -> {
// Process data
String result = processData(data);
return Map.of("result", result, "processed", true);
});
}
// Implement interruption check if needed
public Optional<InterruptionMetadata<MyState>> interrupt(String nodeId, MyState state) {
// Check if human review is needed
boolean needsReview = state.<Boolean>value("needs_human_review").orElse(false);
if (needsReview) {
return Optional.of(
InterruptionMetadata.<MyState>builder(nodeId, state)
.reason("Human review required")
.context(Map.of("review_type", "data_validation"))
.build()
);
}
return Optional.empty();
}
private String processData(String data) {
// Simulate data processing
return "Processed: " + data;
}
};
// Use with interruption configuration
CompileConfig config = CompileConfig.builder()
.checkpointSaver(new MemorySaver())
.interruptAfter("review_node")
.build();
CompiledGraph<MyState> app = workflow.compile(config);Special handling for nodes that execute multiple actions in parallel.
// Parallel execution is handled automatically when multiple edges
// target the same node from a single source
// Example: Create parallel branches
workflow.addNode("parallel_task_1", asyncAction1);
workflow.addNode("parallel_task_2", asyncAction2);
workflow.addNode("parallel_task_3", asyncAction3);
workflow.addNode("join_node", joinAction);
// These edges create parallel execution
workflow.addEdge("start", "parallel_task_1");
workflow.addEdge("start", "parallel_task_2");
workflow.addEdge("start", "parallel_task_3");
// Join results
workflow.addEdge("parallel_task_1", "join_node");
workflow.addEdge("parallel_task_2", "join_node");
workflow.addEdge("parallel_task_3", "join_node");
// Add custom executor for parallel node control
RunnableConfig configWithExecutor = RunnableConfig.builder()
.addParallelNodeExecutor("start", Executors.newFixedThreadPool(3))
.build();Actions that maintain internal state across invocations.
class CounterAction implements AsyncNodeAction<MyState> {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public CompletableFuture<Map<String, Object>> apply(MyState state) {
int currentCount = counter.incrementAndGet();
return CompletableFuture.completedFuture(
Map.of("action_count", currentCount)
);
}
}
// Use stateful action
workflow.addNode("counter", new CounterAction());Actions with comprehensive error handling and recovery.
AsyncNodeAction<MyState> resilientAction = (state) -> {
return CompletableFuture.supplyAsync(() -> {
try {
// Risky operation
String result = performRiskyOperation(state);
return Map.of("result", result, "success", true);
} catch (Exception e) {
// Log error and return error state
return Map.of(
"error", e.getMessage(),
"success", false,
"retry_count", state.<Integer>value("retry_count").orElse(0) + 1
);
}
});
};
private String performRiskyOperation(MyState state) throws Exception {
// Simulate operation that might fail
if (Math.random() < 0.3) {
throw new RuntimeException("Random failure");
}
return "Success";
}Actions that combine multiple operations.
AsyncNodeAction<MyState> compositeAction = (state) -> {
// Chain multiple async operations
return validateInput(state)
.thenCompose(this::processData)
.thenCompose(this::saveResults)
.thenApply(result -> Map.of("final_result", result));
};
private CompletableFuture<String> validateInput(MyState state) {
String input = state.<String>value("input").orElse("");
return CompletableFuture.completedFuture(input);
}
private CompletableFuture<String> processData(String input) {
return CompletableFuture.supplyAsync(() -> "Processed: " + input);
}
private CompletableFuture<String> saveResults(String processed) {
return CompletableFuture.supplyAsync(() -> {
// Simulate save operation
return "Saved: " + processed;
});
}Install with Tessl CLI
npx tessl i tessl/maven-org-bsc-langgraph4j--langgraph4j-core