0
# Graph Execution
1
2
Execute compiled graphs with sophisticated control flow, streaming, and state management. Supports synchronous execution, streaming outputs, state snapshots, and resumption from checkpoints.
3
4
## Capabilities
5
6
### Synchronous Execution
7
8
Execute the graph and wait for completion, returning the final state.
9
10
```java { .api }
11
/**
12
* Invokes graph execution with input map and returns final state
13
* @param inputs Input data as key-value map
14
* @return Optional containing final state if execution completed successfully
15
*/
16
Optional<State> invoke(Map<String, Object> inputs);
17
18
/**
19
* Invokes graph execution with graph input and configuration
20
* @param input Graph input (args or resume)
21
* @param config Runtime configuration
22
* @return Optional containing final state if execution completed successfully
23
*/
24
Optional<State> invoke(GraphInput input, RunnableConfig config);
25
26
/**
27
* Invokes graph execution and returns final NodeOutput with execution details
28
* @param input Graph input (args or resume)
29
* @param config Runtime configuration
30
* @return Optional containing final NodeOutput if execution produced output
31
* @since 1.6.1
32
*/
33
Optional<NodeOutput<State>> invokeFinal(GraphInput input, RunnableConfig config);
34
```
35
36
**Usage Examples:**
37
38
```java
39
import org.bsc.langgraph4j.*;
40
import java.util.Map;
41
42
// Simple execution
43
CompiledGraph<MyState> app = workflow.compile();
44
Optional<MyState> result = app.invoke(Map.of("input", "Hello"));
45
46
// Execution with configuration
47
RunnableConfig config = RunnableConfig.builder()
48
.threadId("session-123")
49
.build();
50
51
Optional<MyState> result = app.invoke(
52
new GraphArgs(Map.of("query", "process this")),
53
config
54
);
55
56
// Get final output with node information
57
Optional<NodeOutput<MyState>> finalOutput = app.invokeFinal(
58
new GraphArgs(Map.of("data", "input")),
59
config
60
);
61
if (finalOutput.isPresent()) {
62
System.out.println("Last node: " + finalOutput.get().node());
63
MyState finalState = finalOutput.get().state();
64
}
65
```
66
67
### Streaming Execution
68
69
Stream execution outputs in real-time for monitoring and interactive applications.
70
71
```java { .api }
72
/**
73
* Creates streaming execution with input map and default configuration
74
* @param inputs Input data as key-value map
75
* @return AsyncGenerator stream of NodeOutput
76
*/
77
AsyncGenerator<NodeOutput<State>> stream(Map<String, Object> inputs);
78
79
/**
80
* Creates streaming execution with configuration
81
* @param inputs Input data as key-value map
82
* @param config Runtime configuration
83
* @return AsyncGenerator stream of NodeOutput
84
*/
85
AsyncGenerator<NodeOutput<State>> stream(Map<String, Object> inputs, RunnableConfig config);
86
87
/**
88
* Creates streaming execution with graph input and configuration
89
* @param input Graph input (args or resume)
90
* @param config Runtime configuration
91
* @return AsyncGenerator stream of NodeOutput
92
*/
93
AsyncGenerator<NodeOutput<State>> stream(GraphInput input, RunnableConfig config);
94
```
95
96
**Usage Examples:**
97
98
```java
99
import org.bsc.async.AsyncGenerator;
100
101
// Basic streaming
102
AsyncGenerator<NodeOutput<MyState>> stream = app.stream(Map.of("input", "data"));
103
104
// Process each output as it arrives
105
stream.forEachAsync(output -> {
106
System.out.println("Node: " + output.node());
107
System.out.println("State: " + output.state().data());
108
return CompletableFuture.completedFuture(null);
109
}).get();
110
111
// Collect all outputs
112
List<NodeOutput<MyState>> outputs = stream.stream().toList();
113
114
// Stream with configuration for threading
115
RunnableConfig config = RunnableConfig.builder()
116
.threadId("stream-session")
117
.build();
118
119
AsyncGenerator<NodeOutput<MyState>> configuredStream = app.stream(
120
Map.of("query", "streaming query"),
121
config
122
);
123
```
124
125
### Snapshot Streaming
126
127
Stream state snapshots for debugging and monitoring internal execution state.
128
129
```java { .api }
130
/**
131
* Creates streaming execution that outputs state snapshots
132
* @param input Graph input (args or resume)
133
* @param config Runtime configuration
134
* @return AsyncGenerator stream of state snapshots
135
*/
136
AsyncGenerator<NodeOutput<State>> streamSnapshots(GraphInput input, RunnableConfig config);
137
138
/**
139
* Creates streaming execution that outputs state snapshots
140
* @param inputs Input data as key-value map
141
* @param config Runtime configuration
142
* @return AsyncGenerator stream of state snapshots
143
*/
144
AsyncGenerator<NodeOutput<State>> streamSnapshots(Map<String, Object> inputs, RunnableConfig config);
145
```
146
147
**Usage Examples:**
148
149
```java
150
// Stream state snapshots for debugging
151
AsyncGenerator<NodeOutput<MyState>> snapshots = app.streamSnapshots(
152
Map.of("debug", true),
153
config
154
);
155
156
snapshots.forEachAsync(snapshot -> {
157
if (snapshot instanceof StateSnapshot) {
158
StateSnapshot<MyState> stateSnapshot = (StateSnapshot<MyState>) snapshot;
159
System.out.println("Checkpoint ID: " + stateSnapshot.getCheckpointId());
160
System.out.println("Node ID: " + stateSnapshot.getNodeId());
161
System.out.println("State: " + stateSnapshot.state().data());
162
}
163
return CompletableFuture.completedFuture(null);
164
});
165
```
166
167
### State Management During Execution
168
169
Access and modify execution state during runtime.
170
171
```java { .api }
172
/**
173
* Get current state snapshot for a thread
174
* @param config Runtime configuration with thread ID
175
* @return Current state snapshot
176
* @throws IllegalStateException if checkpoint saver not configured or no checkpoint found
177
*/
178
StateSnapshot<State> getState(RunnableConfig config);
179
180
/**
181
* Get optional state snapshot for a thread
182
* @param config Runtime configuration with thread ID
183
* @return Optional containing state snapshot if exists
184
* @throws IllegalStateException if checkpoint saver not configured
185
*/
186
Optional<StateSnapshot<State>> stateOf(RunnableConfig config);
187
188
/**
189
* Get last state snapshot for a thread
190
* @param config Runtime configuration with thread ID
191
* @return Optional containing last state snapshot if any exists
192
*/
193
Optional<StateSnapshot<State>> lastStateOf(RunnableConfig config);
194
195
/**
196
* Update graph state with new values
197
* @param config Runtime configuration containing graph state
198
* @param values Values to update in state
199
* @return Updated runtime configuration
200
* @throws Exception when update fails
201
*/
202
RunnableConfig updateState(RunnableConfig config, Map<String, Object> values) throws Exception;
203
204
/**
205
* Update graph state and specify next node for execution
206
* @param config Runtime configuration containing graph state
207
* @param values Values to update in state
208
* @param asNode Node ID to execute next (can be null)
209
* @return Updated runtime configuration
210
* @throws Exception when update fails
211
*/
212
RunnableConfig updateState(RunnableConfig config, Map<String, Object> values, String asNode) throws Exception;
213
```
214
215
**Usage Examples:**
216
217
```java
218
// Get current state
219
RunnableConfig config = RunnableConfig.builder()
220
.threadId("my-thread")
221
.build();
222
223
try {
224
StateSnapshot<MyState> currentState = app.getState(config);
225
System.out.println("Current node: " + currentState.getNodeId());
226
System.out.println("State data: " + currentState.state().data());
227
} catch (IllegalStateException e) {
228
System.out.println("No checkpoint found");
229
}
230
231
// Update state during execution
232
RunnableConfig updatedConfig = app.updateState(
233
config,
234
Map.of("user_input", "new value", "timestamp", System.currentTimeMillis())
235
);
236
237
// Update state and force next node
238
RunnableConfig configWithNextNode = app.updateState(
239
config,
240
Map.of("override", true),
241
"specific_node"
242
);
243
```
244
245
### Execution History
246
247
Access complete execution history for debugging and analysis.
248
249
```java { .api }
250
/**
251
* Gets execution history for a thread
252
* @param config Runtime configuration containing thread ID
253
* @return Collection of StateSnapshots ordered by recency (newest first)
254
*/
255
Collection<StateSnapshot<State>> getStateHistory(RunnableConfig config);
256
```
257
258
**Usage Examples:**
259
260
```java
261
// Get execution history
262
Collection<StateSnapshot<MyState>> history = app.getStateHistory(config);
263
264
System.out.println("Execution history (" + history.size() + " steps):");
265
for (StateSnapshot<MyState> snapshot : history) {
266
System.out.println("- Node: " + snapshot.getNodeId() +
267
", Checkpoint: " + snapshot.getCheckpointId());
268
}
269
270
// Find specific checkpoint
271
Optional<StateSnapshot<MyState>> specificStep = history.stream()
272
.filter(snapshot -> snapshot.getNodeId().equals("target_node"))
273
.findFirst();
274
```
275
276
### Execution Control
277
278
Control execution flow with iteration limits and interruption handling.
279
280
```java { .api }
281
/**
282
* Sets maximum number of iterations for graph execution
283
* @param maxIterations Maximum iteration count (must be > 0)
284
* @throws IllegalArgumentException if maxIterations <= 0
285
*/
286
void setMaxIterations(int maxIterations);
287
```
288
289
**Usage Examples:**
290
291
```java
292
// Set iteration limit to prevent infinite loops
293
app.setMaxIterations(100);
294
295
// Execute with limit
296
try {
297
Optional<MyState> result = app.invoke(Map.of("input", "data"));
298
} catch (IllegalStateException e) {
299
if (e.getMessage().contains("Maximum number of iterations")) {
300
System.out.println("Graph execution exceeded iteration limit");
301
}
302
}
303
```
304
305
### Resume Execution
306
307
Resume execution from checkpoints for human-in-the-loop workflows.
308
309
```java { .api }
310
// Resume from checkpoint using GraphResume input
311
GraphInput resumeInput = new GraphResume();
312
RunnableConfig resumeConfig = RunnableConfig.builder()
313
.threadId("interrupted-thread")
314
.checkPointId("checkpoint-123")
315
.build();
316
317
Optional<MyState> result = app.invoke(resumeInput, resumeConfig);
318
```
319
320
**Usage Examples:**
321
322
```java
323
// Resume interrupted execution
324
RunnableConfig interruptedConfig = RunnableConfig.builder()
325
.threadId("session-with-interrupt")
326
.build();
327
328
// Original execution (may be interrupted)
329
AsyncGenerator<NodeOutput<MyState>> stream = app.stream(
330
Map.of("needs_review", true),
331
interruptedConfig
332
);
333
334
// Process until interruption
335
for (NodeOutput<MyState> output : stream.stream().toList()) {
336
if (output instanceof InterruptionMetadata) {
337
System.out.println("Execution interrupted at: " + output.node());
338
break;
339
}
340
}
341
342
// Later, resume execution
343
Optional<MyState> resumedResult = app.invoke(new GraphResume(), interruptedConfig);
344
```
345
346
## Stream Modes
347
348
```java { .api }
349
enum StreamMode {
350
VALUES, // Stream node execution outputs
351
SNAPSHOTS // Stream state snapshots with checkpoint information
352
}
353
```
354
355
## Input Types
356
357
```java { .api }
358
// Base input interface
359
interface GraphInput {}
360
361
// Input with arguments
362
class GraphArgs implements GraphInput {
363
GraphArgs(Map<String, Object> value);
364
Map<String, Object> value();
365
}
366
367
// Resume from checkpoint
368
class GraphResume implements GraphInput {}
369
370
// Static factory methods
371
static GraphInput args(Map<String, Object> inputs);
372
```
373
374
## Output Types
375
376
```java { .api }
377
// Node execution output
378
interface NodeOutput<State extends AgentState> {
379
String node(); // Node ID that produced this output
380
State state(); // State after node execution
381
}
382
383
// State snapshot with checkpoint information
384
class StateSnapshot<State extends AgentState> implements NodeOutput<State> {
385
String getCheckpointId();
386
String getNodeId();
387
String getNextNodeId();
388
State state();
389
RunnableConfig getConfig();
390
}
391
```