0
# Actions and Node Logic
1
2
Define node behavior with synchronous and asynchronous actions. Support for configuration-aware actions, routing logic, and interruption handling.
3
4
## Capabilities
5
6
### Basic Node Actions
7
8
Core action interfaces for defining node behavior.
9
10
```java { .api }
11
/**
12
* Synchronous node action interface
13
* @param <T> Agent state type
14
*/
15
@FunctionalInterface
16
interface NodeAction<T extends AgentState> {
17
/**
18
* Executes synchronous action on agent state
19
* @param state Current agent state
20
* @return Map of state updates to apply
21
* @throws Exception if action execution fails
22
*/
23
Map<String, Object> apply(T state) throws Exception;
24
}
25
26
/**
27
* Asynchronous node action interface
28
* @param <S> Agent state type
29
*/
30
@FunctionalInterface
31
interface AsyncNodeAction<S extends AgentState> extends Function<S, CompletableFuture<Map<String, Object>>> {
32
/**
33
* Executes asynchronous action on agent state
34
* @param state Current agent state
35
* @return CompletableFuture with state updates
36
*/
37
CompletableFuture<Map<String, Object>> apply(S state);
38
39
/**
40
* Converts synchronous action to asynchronous
41
* @param syncAction Synchronous node action
42
* @return Asynchronous wrapper around sync action
43
*/
44
static <S extends AgentState> AsyncNodeAction<S> node_async(NodeAction<S> syncAction);
45
}
46
```
47
48
**Usage Examples:**
49
50
```java
51
// Simple synchronous action
52
NodeAction<MyState> syncAction = (state) -> {
53
String input = state.<String>value("input").orElse("");
54
String processed = input.toUpperCase();
55
return Map.of("output", processed, "processed", true);
56
};
57
58
// Asynchronous action with external API call
59
AsyncNodeAction<MyState> asyncAction = (state) -> {
60
String query = state.<String>value("query").orElse("");
61
62
return CompletableFuture.supplyAsync(() -> {
63
// Simulate API call
64
try {
65
Thread.sleep(1000);
66
String result = "API response for: " + query;
67
return Map.of("api_result", result, "timestamp", System.currentTimeMillis());
68
} catch (InterruptedException e) {
69
throw new RuntimeException(e);
70
}
71
});
72
};
73
74
// Convert sync to async
75
AsyncNodeAction<MyState> wrappedSync = AsyncNodeAction.node_async(syncAction);
76
77
// Add to graph
78
workflow.addNode("sync_processor", syncAction);
79
workflow.addNode("async_processor", asyncAction);
80
workflow.addNode("wrapped_processor", wrappedSync);
81
```
82
83
### Configuration-Aware Actions
84
85
Actions that access runtime configuration for context-aware behavior.
86
87
```java { .api }
88
/**
89
* Asynchronous node action with configuration access
90
* @param <S> Agent state type
91
*/
92
@FunctionalInterface
93
interface AsyncNodeActionWithConfig<S extends AgentState> {
94
/**
95
* Executes action with access to runtime configuration
96
* @param state Current agent state
97
* @param config Runtime configuration
98
* @return CompletableFuture with state updates
99
* @throws Exception if action execution fails
100
*/
101
CompletableFuture<Map<String, Object>> apply(S state, RunnableConfig config) throws Exception;
102
103
/**
104
* Creates config-aware action from simple async action
105
* @param action Simple async action
106
* @return Config-aware action wrapper
107
*/
108
static <S extends AgentState> AsyncNodeActionWithConfig<S> of(AsyncNodeAction<S> action);
109
}
110
```
111
112
**Usage Examples:**
113
114
```java
115
// Configuration-aware action
116
AsyncNodeActionWithConfig<MyState> configAwareAction = (state, config) -> {
117
String threadId = config.threadId().orElse("default");
118
boolean isStudio = config.isRunningInStudio();
119
120
Map<String, Object> updates = new HashMap<>();
121
updates.put("thread_id", threadId);
122
updates.put("is_studio", isStudio);
123
124
// Access metadata
125
Object customValue = config.metadata("custom_key").orElse("default");
126
updates.put("custom_value", customValue);
127
128
return CompletableFuture.completedFuture(updates);
129
};
130
131
// Use thread-specific processing
132
AsyncNodeActionWithConfig<MyState> threadSpecificAction = (state, config) -> {
133
String threadId = config.threadId().orElse("anonymous");
134
135
return CompletableFuture.supplyAsync(() -> {
136
// Process based on thread context
137
String result = "Processed by thread: " + threadId;
138
return Map.of("thread_result", result);
139
});
140
};
141
142
workflow.addNode("config_aware", configAwareAction);
143
workflow.addNode("thread_specific", threadSpecificAction);
144
```
145
146
### Command Actions for Routing
147
148
Actions that return routing commands for conditional flow control.
149
150
```java { .api }
151
/**
152
* Synchronous command action for routing decisions
153
* @param <S> Agent state type
154
*/
155
@FunctionalInterface
156
interface CommandAction<S extends AgentState> {
157
/**
158
* Executes action and returns routing command
159
* @param state Current agent state
160
* @return Command indicating next node and state updates
161
* @throws Exception if action execution fails
162
*/
163
Command apply(S state) throws Exception;
164
}
165
166
/**
167
* Asynchronous command action for routing decisions
168
* @param <S> Agent state type
169
*/
170
@FunctionalInterface
171
interface AsyncCommandAction<S extends AgentState> {
172
/**
173
* Executes action and returns routing command asynchronously
174
* @param state Current agent state
175
* @param config Runtime configuration
176
* @return CompletableFuture with routing command
177
* @throws Exception if action execution fails
178
*/
179
CompletableFuture<Command> apply(S state, RunnableConfig config) throws Exception;
180
181
/**
182
* Creates async command action from edge action
183
* @param edgeAction Edge action to wrap
184
* @return Async command action
185
*/
186
static <S extends AgentState> AsyncCommandAction<S> of(AsyncEdgeAction<S> edgeAction);
187
}
188
189
/**
190
* Command containing routing decision and state updates
191
*/
192
class Command {
193
/**
194
* Get target node ID for routing
195
* @return Node ID to route to
196
*/
197
String gotoNode();
198
199
/**
200
* Get state updates to apply
201
* @return Map of state updates
202
*/
203
Map<String, Object> update();
204
}
205
```
206
207
**Usage Examples:**
208
209
```java
210
// Simple routing based on state value
211
AsyncCommandAction<MyState> routingAction = (state, config) -> {
212
int score = state.<Integer>value("score").orElse(0);
213
214
String route;
215
Map<String, Object> updates = new HashMap<>();
216
217
if (score > 80) {
218
route = "high_score";
219
updates.put("grade", "A");
220
} else if (score > 60) {
221
route = "medium_score";
222
updates.put("grade", "B");
223
} else {
224
route = "low_score";
225
updates.put("grade", "C");
226
}
227
228
Command command = new Command(route, updates);
229
return CompletableFuture.completedFuture(command);
230
};
231
232
// Complex routing with external validation
233
AsyncCommandAction<MyState> validationAction = (state, config) -> {
234
String data = state.<String>value("user_input").orElse("");
235
236
return CompletableFuture.supplyAsync(() -> {
237
// Simulate validation logic
238
boolean isValid = data.length() > 5 && data.matches("^[a-zA-Z0-9]+$");
239
boolean needsReview = data.contains("sensitive");
240
241
String route;
242
Map<String, Object> updates = Map.of("validated_at", System.currentTimeMillis());
243
244
if (!isValid) {
245
route = "validation_failed";
246
} else if (needsReview) {
247
route = "needs_review";
248
} else {
249
route = "validation_passed";
250
}
251
252
return new Command(route, updates);
253
});
254
};
255
256
// Add conditional nodes with routing
257
workflow.addNode("router", routingAction, Map.of(
258
"high_score", "high_score_handler",
259
"medium_score", "medium_score_handler",
260
"low_score", "low_score_handler"
261
));
262
263
workflow.addNode("validator", validationAction, Map.of(
264
"validation_passed", "process_data",
265
"validation_failed", "show_error",
266
"needs_review", "human_review"
267
));
268
```
269
270
### Edge Actions
271
272
Actions specifically for edge condition evaluation.
273
274
```java { .api }
275
/**
276
* Synchronous edge action for routing decisions
277
* @param <S> Agent state type
278
*/
279
@FunctionalInterface
280
interface EdgeAction<S extends AgentState> {
281
/**
282
* Evaluates edge condition and returns target node
283
* @param state Current agent state
284
* @return Target node identifier
285
* @throws Exception if evaluation fails
286
*/
287
String apply(S state) throws Exception;
288
}
289
290
/**
291
* Asynchronous edge action for routing decisions
292
* @param <S> Agent state type
293
*/
294
@FunctionalInterface
295
interface AsyncEdgeAction<S extends AgentState> {
296
/**
297
* Evaluates edge condition asynchronously
298
* @param state Current agent state
299
* @param config Runtime configuration
300
* @return CompletableFuture with target node identifier
301
* @throws Exception if evaluation fails
302
*/
303
CompletableFuture<String> apply(S state, RunnableConfig config) throws Exception;
304
}
305
```
306
307
**Usage Examples:**
308
309
```java
310
// Simple edge condition
311
AsyncEdgeAction<MyState> simpleEdge = (state, config) -> {
312
boolean ready = state.<Boolean>value("ready").orElse(false);
313
String target = ready ? "process" : "wait";
314
return CompletableFuture.completedFuture(target);
315
};
316
317
// Complex edge with external check
318
AsyncEdgeAction<MyState> externalEdge = (state, config) -> {
319
String userId = state.<String>value("user_id").orElse("");
320
321
return CompletableFuture.supplyAsync(() -> {
322
// Simulate external service check
323
boolean hasPermission = userId.startsWith("admin_");
324
return hasPermission ? "admin_flow" : "user_flow";
325
});
326
};
327
328
// Use in conditional edges
329
workflow.addConditionalEdges("decision_point", simpleEdge, Map.of(
330
"process", "data_processor",
331
"wait", "wait_node"
332
));
333
334
workflow.addConditionalEdges("permission_check", externalEdge, Map.of(
335
"admin_flow", "admin_dashboard",
336
"user_flow", "user_dashboard"
337
));
338
```
339
340
### Interruption Handling
341
342
Actions that can be interrupted for human-in-the-loop workflows.
343
344
```java { .api }
345
/**
346
* Action that can be interrupted during execution
347
* @param <State> Agent state type
348
*/
349
interface InterruptableAction<State extends AgentState> {
350
/**
351
* Check if action should be interrupted
352
* @param nodeId Current node identifier
353
* @param state Current state
354
* @return Optional interruption metadata if should interrupt
355
*/
356
Optional<InterruptionMetadata<State>> interrupt(String nodeId, State state);
357
}
358
359
/**
360
* Metadata for interruption events
361
* @param <State> Agent state type
362
*/
363
class InterruptionMetadata<State extends AgentState> {
364
/**
365
* Creates interruption metadata builder
366
* @param nodeId Node being interrupted
367
* @param state Current state
368
* @return Builder for interruption metadata
369
*/
370
static <State extends AgentState> Builder<State> builder(String nodeId, State state);
371
372
String getNodeId();
373
State getState();
374
Optional<String> getReason();
375
Map<String, Object> getContext();
376
}
377
```
378
379
**Usage Examples:**
380
381
```java
382
// Action with interruption logic
383
AsyncNodeActionWithConfig<MyState> interruptibleAction = new AsyncNodeActionWithConfig<MyState>() {
384
@Override
385
public CompletableFuture<Map<String, Object>> apply(MyState state, RunnableConfig config) throws Exception {
386
String data = state.<String>value("data").orElse("");
387
388
return CompletableFuture.supplyAsync(() -> {
389
// Process data
390
String result = processData(data);
391
return Map.of("result", result, "processed", true);
392
});
393
}
394
395
// Implement interruption check if needed
396
public Optional<InterruptionMetadata<MyState>> interrupt(String nodeId, MyState state) {
397
// Check if human review is needed
398
boolean needsReview = state.<Boolean>value("needs_human_review").orElse(false);
399
400
if (needsReview) {
401
return Optional.of(
402
InterruptionMetadata.<MyState>builder(nodeId, state)
403
.reason("Human review required")
404
.context(Map.of("review_type", "data_validation"))
405
.build()
406
);
407
}
408
409
return Optional.empty();
410
}
411
412
private String processData(String data) {
413
// Simulate data processing
414
return "Processed: " + data;
415
}
416
};
417
418
// Use with interruption configuration
419
CompileConfig config = CompileConfig.builder()
420
.checkpointSaver(new MemorySaver())
421
.interruptAfter("review_node")
422
.build();
423
424
CompiledGraph<MyState> app = workflow.compile(config);
425
```
426
427
### Parallel Node Execution
428
429
Special handling for nodes that execute multiple actions in parallel.
430
431
```java { .api }
432
// Parallel execution is handled automatically when multiple edges
433
// target the same node from a single source
434
435
// Example: Create parallel branches
436
workflow.addNode("parallel_task_1", asyncAction1);
437
workflow.addNode("parallel_task_2", asyncAction2);
438
workflow.addNode("parallel_task_3", asyncAction3);
439
workflow.addNode("join_node", joinAction);
440
441
// These edges create parallel execution
442
workflow.addEdge("start", "parallel_task_1");
443
workflow.addEdge("start", "parallel_task_2");
444
workflow.addEdge("start", "parallel_task_3");
445
446
// Join results
447
workflow.addEdge("parallel_task_1", "join_node");
448
workflow.addEdge("parallel_task_2", "join_node");
449
workflow.addEdge("parallel_task_3", "join_node");
450
451
// Add custom executor for parallel node control
452
RunnableConfig configWithExecutor = RunnableConfig.builder()
453
.addParallelNodeExecutor("start", Executors.newFixedThreadPool(3))
454
.build();
455
```
456
457
## Action Patterns
458
459
### Stateful Actions
460
461
Actions that maintain internal state across invocations.
462
463
```java { .api }
464
class CounterAction implements AsyncNodeAction<MyState> {
465
private final AtomicInteger counter = new AtomicInteger(0);
466
467
@Override
468
public CompletableFuture<Map<String, Object>> apply(MyState state) {
469
int currentCount = counter.incrementAndGet();
470
return CompletableFuture.completedFuture(
471
Map.of("action_count", currentCount)
472
);
473
}
474
}
475
476
// Use stateful action
477
workflow.addNode("counter", new CounterAction());
478
```
479
480
### Error Handling Actions
481
482
Actions with comprehensive error handling and recovery.
483
484
```java { .api }
485
AsyncNodeAction<MyState> resilientAction = (state) -> {
486
return CompletableFuture.supplyAsync(() -> {
487
try {
488
// Risky operation
489
String result = performRiskyOperation(state);
490
return Map.of("result", result, "success", true);
491
492
} catch (Exception e) {
493
// Log error and return error state
494
return Map.of(
495
"error", e.getMessage(),
496
"success", false,
497
"retry_count", state.<Integer>value("retry_count").orElse(0) + 1
498
);
499
}
500
});
501
};
502
503
private String performRiskyOperation(MyState state) throws Exception {
504
// Simulate operation that might fail
505
if (Math.random() < 0.3) {
506
throw new RuntimeException("Random failure");
507
}
508
return "Success";
509
}
510
```
511
512
### Composite Actions
513
514
Actions that combine multiple operations.
515
516
```java { .api }
517
AsyncNodeAction<MyState> compositeAction = (state) -> {
518
// Chain multiple async operations
519
return validateInput(state)
520
.thenCompose(this::processData)
521
.thenCompose(this::saveResults)
522
.thenApply(result -> Map.of("final_result", result));
523
};
524
525
private CompletableFuture<String> validateInput(MyState state) {
526
String input = state.<String>value("input").orElse("");
527
return CompletableFuture.completedFuture(input);
528
}
529
530
private CompletableFuture<String> processData(String input) {
531
return CompletableFuture.supplyAsync(() -> "Processed: " + input);
532
}
533
534
private CompletableFuture<String> saveResults(String processed) {
535
return CompletableFuture.supplyAsync(() -> {
536
// Simulate save operation
537
return "Saved: " + processed;
538
});
539
}
540
```