0
# Human-in-the-Loop & Control Flow
1
2
Interactive workflow capabilities with interruption points, approval flows, and dynamic control flow for human oversight and intervention in automated processes.
3
4
## Capabilities
5
6
### Interruption System
7
8
Core interruption mechanisms for pausing execution and waiting for human input or system events.
9
10
```typescript { .api }
11
/**
12
* Pause execution and return a value (Node.js only)
13
*/
14
function interrupt<I, R>(value: I): R;
15
16
/**
17
* Check if values represent an interrupted state
18
*/
19
function isInterrupted<Value>(values: unknown): values is Interrupt<Value>;
20
21
/**
22
* Interrupt data structure
23
*/
24
interface Interrupt<Value> {
25
value: Value;
26
when: "during" | "before" | "after";
27
nodes: string[];
28
metadata?: Record<string, any>;
29
}
30
```
31
32
**Usage Example:**
33
34
```typescript
35
import { interrupt, isInterrupted } from "@langchain/langgraph";
36
37
// Node that requires human approval
38
async function approvalNode(state: typeof MyState.State) {
39
if (state.requiresApproval) {
40
// Pause execution for human review
41
interrupt({
42
message: "Please review the generated content",
43
data: state.generatedContent,
44
options: ["approve", "reject", "modify"]
45
});
46
}
47
return { approved: true };
48
}
49
50
// Check for interruptions
51
const result = await graph.invoke(input);
52
if (isInterrupted(result)) {
53
console.log("Execution paused:", result.value);
54
// Handle human interaction
55
}
56
```
57
58
### Command System
59
60
Control flow commands for dynamic routing and state manipulation during graph execution.
61
62
```typescript { .api }
63
/**
64
* Command for combining state updates with routing decisions
65
*/
66
class Command<Resume, Update, Nodes> {
67
resume: Resume;
68
update: Update;
69
goto: Nodes;
70
71
constructor(resume: Resume, update?: Update, goto?: Nodes);
72
}
73
74
/**
75
* Command for sending data to specific nodes
76
*/
77
class Send<Node, Args> {
78
node: Node;
79
args: Args;
80
81
constructor(node: Node, args: Args);
82
}
83
84
/**
85
* Type guard for Command instances
86
*/
87
function isCommand(x: unknown): x is Command<any, any, any>;
88
```
89
90
**Usage Examples:**
91
92
```typescript
93
import { Command, Send } from "@langchain/langgraph";
94
95
// Route to different nodes based on conditions
96
async function routerNode(state: State): Promise<Command<null, StateUpdate, string[]>> {
97
if (state.urgent) {
98
return new Command(null, { priority: "high" }, ["urgent_processor"]);
99
}
100
return new Command(null, { priority: "normal" }, ["normal_processor"]);
101
}
102
103
// Send data to multiple nodes
104
async function distributorNode(state: State): Promise<Send<string, any>[]> {
105
return [
106
new Send("logger", { message: state.logMessage }),
107
new Send("notifier", { alert: state.notification }),
108
new Send("archiver", { data: state.dataToArchive })
109
];
110
}
111
```
112
113
### Flow Control Constants
114
115
Special node names for controlling graph flow and defining entry/exit points.
116
117
```typescript { .api }
118
/**
119
* Graph entry point identifier
120
*/
121
const START: "__start__";
122
123
/**
124
* Graph termination identifier
125
*/
126
const END: "__end__";
127
128
/**
129
* Interruption marker identifier
130
*/
131
const INTERRUPT: "__interrupt__";
132
```
133
134
**Usage Example:**
135
136
```typescript
137
import { START, END, INTERRUPT } from "@langchain/langgraph";
138
139
const graph = new StateGraph(annotation)
140
.addNode("process", processNode)
141
.addNode("validate", validateNode)
142
.addEdge(START, "process")
143
.addConditionalEdges("process", (state) => {
144
if (state.needsValidation) return "validate";
145
if (state.shouldInterrupt) return INTERRUPT;
146
return END;
147
})
148
.addEdge("validate", END)
149
.compile({
150
interruptBefore: ["validate"], // Pause before validation
151
});
152
```
153
154
## Interrupt Configuration
155
156
### Pre/Post Node Interrupts
157
158
Configure interruption points before or after specific nodes in the graph execution.
159
160
```typescript { .api }
161
interface PregelOptions {
162
/** Nodes to interrupt before execution */
163
interruptBefore?: string[];
164
165
/** Nodes to interrupt after execution */
166
interruptAfter?: string[];
167
168
/** Checkpointer for state persistence */
169
checkpointer?: BaseCheckpointSaver;
170
}
171
```
172
173
**Usage Examples:**
174
175
```typescript
176
// Interrupt before critical nodes
177
const graph = new StateGraph(annotation)
178
.addNode("analyze", analyzeNode)
179
.addNode("critical_decision", decisionNode)
180
.addNode("execute_action", actionNode)
181
.compile({
182
checkpointer: new MemorySaver(),
183
interruptBefore: ["critical_decision", "execute_action"],
184
interruptAfter: ["analyze"]
185
});
186
187
// Execute with automatic interruptions
188
try {
189
const result = await graph.invoke(
190
{ input: "sensitive operation" },
191
{ configurable: { thread_id: "session-123" } }
192
);
193
} catch (error) {
194
if (error instanceof GraphInterrupt) {
195
console.log("Paused for human review at:", error.node);
196
197
// Resume after human approval
198
const resumeResult = await graph.invoke(null, {
199
configurable: { thread_id: "session-123" }
200
});
201
}
202
}
203
```
204
205
### Dynamic Interruption
206
207
Programmatically trigger interruptions based on runtime conditions.
208
209
```typescript
210
async function conditionalInterruptNode(state: State) {
211
const riskScore = calculateRisk(state.data);
212
213
if (riskScore > 0.8) {
214
// Dynamic interruption for high-risk operations
215
interrupt({
216
reason: "High risk detected",
217
riskScore,
218
data: state.data,
219
recommendedAction: "manual_review"
220
});
221
}
222
223
return { processed: true, riskScore };
224
}
225
```
226
227
## Human Interaction Patterns
228
229
### Approval Workflows
230
231
Common patterns for implementing human approval in automated workflows.
232
233
```typescript
234
// Approval node with timeout
235
async function approvalWorkflow(state: State) {
236
const approvalRequest = {
237
type: "approval" as const,
238
item: state.pendingItem,
239
timeout: 300000, // 5 minutes
240
approver: state.assignedApprover
241
};
242
243
// Pause for human approval
244
interrupt(approvalRequest);
245
246
// This code runs after human responds
247
return {
248
approved: state.humanResponse?.approved || false,
249
approvedBy: state.humanResponse?.userId,
250
approvedAt: new Date().toISOString()
251
};
252
}
253
254
// Multi-level approval
255
async function multiLevelApproval(state: State) {
256
const { amount, department } = state.request;
257
258
if (amount > 10000) {
259
interrupt({
260
type: "manager_approval",
261
amount,
262
department,
263
level: "manager"
264
});
265
}
266
267
if (amount > 50000) {
268
interrupt({
269
type: "director_approval",
270
amount,
271
department,
272
level: "director"
273
});
274
}
275
276
return { fullyApproved: true };
277
}
278
```
279
280
### Input Collection
281
282
Patterns for collecting human input during workflow execution.
283
284
```typescript
285
async function inputCollectionNode(state: State) {
286
const inputRequest = {
287
type: "input",
288
prompt: "Please provide additional context for this request:",
289
required: true,
290
format: "text",
291
maxLength: 500
292
};
293
294
interrupt(inputRequest);
295
296
return {
297
userInput: state.humanResponse?.input,
298
inputReceivedAt: new Date().toISOString()
299
};
300
}
301
302
// Choice-based input
303
async function choiceNode(state: State) {
304
interrupt({
305
type: "choice",
306
prompt: "How should this be processed?",
307
options: [
308
{ value: "auto", label: "Process automatically" },
309
{ value: "manual", label: "Manual processing" },
310
{ value: "defer", label: "Defer to later" }
311
]
312
});
313
314
const choice = state.humanResponse?.choice;
315
return { processingMethod: choice };
316
}
317
```
318
319
### Progressive Disclosure
320
321
Reveal information progressively based on human decisions.
322
323
```typescript
324
async function progressiveDisclosure(state: State) {
325
// First level - basic info
326
interrupt({
327
type: "info",
328
message: "Processing request...",
329
showDetails: false
330
});
331
332
if (state.humanResponse?.requestDetails) {
333
// Second level - detailed info
334
interrupt({
335
type: "detailed_info",
336
details: {
337
processingSteps: state.steps,
338
estimatedTime: state.timeEstimate,
339
resourcesRequired: state.resources
340
}
341
});
342
}
343
344
return { disclosed: true };
345
}
346
```
347
348
## Advanced Control Flow
349
350
### Conditional Branching
351
352
Complex routing logic with multiple decision points and human involvement.
353
354
```typescript
355
async function complexRouter(state: State): Promise<Command<null, any, string> | string> {
356
const { priority, complexity, userType } = state;
357
358
// Human decides on complex cases
359
if (complexity > 0.8) {
360
interrupt({
361
type: "routing_decision",
362
options: ["expert_review", "automated_processing", "escalate"],
363
context: { priority, complexity, userType }
364
});
365
366
const decision = state.humanResponse?.choice;
367
return new Command(null, { routedBy: "human" }, [decision]);
368
}
369
370
// Automated routing for simple cases
371
if (priority === "urgent") return "urgent_processor";
372
if (userType === "premium") return "premium_processor";
373
return "standard_processor";
374
}
375
```
376
377
### Loop Control
378
379
Managing loops and iterations with human oversight.
380
381
```typescript
382
async function iterativeProcessor(state: State) {
383
const maxIterations = 5;
384
let iterations = state.iterations || 0;
385
386
// Process one iteration
387
const result = await processIteration(state.data);
388
iterations++;
389
390
if (!result.converged && iterations < maxIterations) {
391
// Ask human if should continue
392
interrupt({
393
type: "continue_iteration",
394
iteration: iterations,
395
maxIterations,
396
currentResult: result,
397
message: `Iteration ${iterations}/${maxIterations} complete. Continue?`
398
});
399
400
if (state.humanResponse?.approved) {
401
return new Command(
402
null,
403
{ iterations, data: result.data },
404
["iterative_processor"] // Loop back
405
);
406
}
407
}
408
409
return {
410
finalResult: result,
411
iterations,
412
completed: true
413
};
414
}
415
```
416
417
### Error Recovery
418
419
Human-assisted error recovery and decision making.
420
421
```typescript
422
async function errorRecoveryNode(state: State) {
423
if (state.error) {
424
const recoveryOptions = analyzeError(state.error);
425
426
interrupt({
427
type: "error_recovery",
428
error: {
429
message: state.error.message,
430
type: state.error.type,
431
context: state.error.context
432
},
433
recoveryOptions,
434
autoRecoveryAttempted: state.autoRecoveryAttempted || false
435
});
436
437
const selectedRecovery = state.humanResponse?.choice;
438
439
switch (selectedRecovery) {
440
case "retry":
441
return new Command(null, { retryCount: (state.retryCount || 0) + 1 }, ["retry_node"]);
442
case "skip":
443
return { skipped: true, reason: "human_decision" };
444
case "escalate":
445
return new Command(null, { escalated: true }, ["escalation_handler"]);
446
case "abort":
447
return new Command(null, { aborted: true }, [END]);
448
default:
449
return { error: "invalid_recovery_option" };
450
}
451
}
452
453
return state;
454
}
455
```
456
457
## Integration with Checkpointing
458
459
### Resumable Workflows
460
461
Combine interruptions with checkpointing for reliable resumable workflows.
462
463
```typescript
464
async function resumableWorkflow() {
465
const graph = new StateGraph(WorkflowState)
466
.addNode("step1", step1Node)
467
.addNode("approval", approvalNode)
468
.addNode("step2", step2Node)
469
.addNode("final", finalNode)
470
.addEdge(START, "step1")
471
.addEdge("step1", "approval")
472
.addEdge("approval", "step2")
473
.addEdge("step2", "final")
474
.addEdge("final", END)
475
.compile({
476
checkpointer: new MemorySaver(),
477
interruptBefore: ["approval", "final"]
478
});
479
480
// Start execution
481
const threadId = `workflow-${Date.now()}`;
482
try {
483
await graph.invoke(initialInput, {
484
configurable: { thread_id: threadId }
485
});
486
} catch (error) {
487
if (error instanceof GraphInterrupt) {
488
console.log(`Workflow paused at ${error.node}, resumable with thread_id: ${threadId}`);
489
}
490
}
491
492
// Later, resume execution
493
const finalResult = await graph.invoke(null, {
494
configurable: { thread_id: threadId }
495
});
496
}
497
```
498
499
### State Inspection During Interrupts
500
501
```typescript
502
async function inspectAndApprove(state: State) {
503
// Get current state for human review
504
const currentState = await graph.getState({
505
configurable: { thread_id: state.threadId }
506
});
507
508
interrupt({
509
type: "state_review",
510
currentState: currentState.values,
511
nextNodes: currentState.next,
512
executionStep: currentState.metadata.step,
513
recommendation: generateRecommendation(currentState.values)
514
});
515
516
return {
517
reviewCompleted: true,
518
humanDecision: state.humanResponse?.decision
519
};
520
}
521
```