or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions.mdcheckpoints.mdconfiguration.mdgraph-construction.mdgraph-execution.mdindex.mdprebuilt.mdstate-management.md

actions.mddocs/

0

# Actions and Node Logic

1

2

Define node behavior with synchronous and asynchronous actions. Support for configuration-aware actions, routing logic, and interruption handling.

3

4

## Capabilities

5

6

### Basic Node Actions

7

8

Core action interfaces for defining node behavior.

9

10

```java { .api }

11

/**

12

* Synchronous node action interface

13

* @param <T> Agent state type

14

*/

15

@FunctionalInterface

16

interface NodeAction<T extends AgentState> {

17

/**

18

* Executes synchronous action on agent state

19

* @param state Current agent state

20

* @return Map of state updates to apply

21

* @throws Exception if action execution fails

22

*/

23

Map<String, Object> apply(T state) throws Exception;

24

}

25

26

/**

27

* Asynchronous node action interface

28

* @param <S> Agent state type

29

*/

30

@FunctionalInterface

31

interface AsyncNodeAction<S extends AgentState> extends Function<S, CompletableFuture<Map<String, Object>>> {

32

/**

33

* Executes asynchronous action on agent state

34

* @param state Current agent state

35

* @return CompletableFuture with state updates

36

*/

37

CompletableFuture<Map<String, Object>> apply(S state);

38

39

/**

40

* Converts synchronous action to asynchronous

41

* @param syncAction Synchronous node action

42

* @return Asynchronous wrapper around sync action

43

*/

44

static <S extends AgentState> AsyncNodeAction<S> node_async(NodeAction<S> syncAction);

45

}

46

```

47

48

**Usage Examples:**

49

50

```java

51

// Simple synchronous action

52

NodeAction<MyState> syncAction = (state) -> {

53

String input = state.<String>value("input").orElse("");

54

String processed = input.toUpperCase();

55

return Map.of("output", processed, "processed", true);

56

};

57

58

// Asynchronous action with external API call

59

AsyncNodeAction<MyState> asyncAction = (state) -> {

60

String query = state.<String>value("query").orElse("");

61

62

return CompletableFuture.supplyAsync(() -> {

63

// Simulate API call

64

try {

65

Thread.sleep(1000);

66

String result = "API response for: " + query;

67

return Map.of("api_result", result, "timestamp", System.currentTimeMillis());

68

} catch (InterruptedException e) {

69

throw new RuntimeException(e);

70

}

71

});

72

};

73

74

// Convert sync to async

75

AsyncNodeAction<MyState> wrappedSync = AsyncNodeAction.node_async(syncAction);

76

77

// Add to graph

78

workflow.addNode("sync_processor", syncAction);

79

workflow.addNode("async_processor", asyncAction);

80

workflow.addNode("wrapped_processor", wrappedSync);

81

```

82

83

### Configuration-Aware Actions

84

85

Actions that access runtime configuration for context-aware behavior.

86

87

```java { .api }

88

/**

89

* Asynchronous node action with configuration access

90

* @param <S> Agent state type

91

*/

92

@FunctionalInterface

93

interface AsyncNodeActionWithConfig<S extends AgentState> {

94

/**

95

* Executes action with access to runtime configuration

96

* @param state Current agent state

97

* @param config Runtime configuration

98

* @return CompletableFuture with state updates

99

* @throws Exception if action execution fails

100

*/

101

CompletableFuture<Map<String, Object>> apply(S state, RunnableConfig config) throws Exception;

102

103

/**

104

* Creates config-aware action from simple async action

105

* @param action Simple async action

106

* @return Config-aware action wrapper

107

*/

108

static <S extends AgentState> AsyncNodeActionWithConfig<S> of(AsyncNodeAction<S> action);

109

}

110

```

111

112

**Usage Examples:**

113

114

```java

115

// Configuration-aware action

116

AsyncNodeActionWithConfig<MyState> configAwareAction = (state, config) -> {

117

String threadId = config.threadId().orElse("default");

118

boolean isStudio = config.isRunningInStudio();

119

120

Map<String, Object> updates = new HashMap<>();

121

updates.put("thread_id", threadId);

122

updates.put("is_studio", isStudio);

123

124

// Access metadata

125

Object customValue = config.metadata("custom_key").orElse("default");

126

updates.put("custom_value", customValue);

127

128

return CompletableFuture.completedFuture(updates);

129

};

130

131

// Use thread-specific processing

132

AsyncNodeActionWithConfig<MyState> threadSpecificAction = (state, config) -> {

133

String threadId = config.threadId().orElse("anonymous");

134

135

return CompletableFuture.supplyAsync(() -> {

136

// Process based on thread context

137

String result = "Processed by thread: " + threadId;

138

return Map.of("thread_result", result);

139

});

140

};

141

142

workflow.addNode("config_aware", configAwareAction);

143

workflow.addNode("thread_specific", threadSpecificAction);

144

```

145

146

### Command Actions for Routing

147

148

Actions that return routing commands for conditional flow control.

149

150

```java { .api }

151

/**

152

* Synchronous command action for routing decisions

153

* @param <S> Agent state type

154

*/

155

@FunctionalInterface

156

interface CommandAction<S extends AgentState> {

157

/**

158

* Executes action and returns routing command

159

* @param state Current agent state

160

* @return Command indicating next node and state updates

161

* @throws Exception if action execution fails

162

*/

163

Command apply(S state) throws Exception;

164

}

165

166

/**

167

* Asynchronous command action for routing decisions

168

* @param <S> Agent state type

169

*/

170

@FunctionalInterface

171

interface AsyncCommandAction<S extends AgentState> {

172

/**

173

* Executes action and returns routing command asynchronously

174

* @param state Current agent state

175

* @param config Runtime configuration

176

* @return CompletableFuture with routing command

177

* @throws Exception if action execution fails

178

*/

179

CompletableFuture<Command> apply(S state, RunnableConfig config) throws Exception;

180

181

/**

182

* Creates async command action from edge action

183

* @param edgeAction Edge action to wrap

184

* @return Async command action

185

*/

186

static <S extends AgentState> AsyncCommandAction<S> of(AsyncEdgeAction<S> edgeAction);

187

}

188

189

/**

190

* Command containing routing decision and state updates

191

*/

192

class Command {

193

/**

194

* Get target node ID for routing

195

* @return Node ID to route to

196

*/

197

String gotoNode();

198

199

/**

200

* Get state updates to apply

201

* @return Map of state updates

202

*/

203

Map<String, Object> update();

204

}

205

```

206

207

**Usage Examples:**

208

209

```java

210

// Simple routing based on state value

211

AsyncCommandAction<MyState> routingAction = (state, config) -> {

212

int score = state.<Integer>value("score").orElse(0);

213

214

String route;

215

Map<String, Object> updates = new HashMap<>();

216

217

if (score > 80) {

218

route = "high_score";

219

updates.put("grade", "A");

220

} else if (score > 60) {

221

route = "medium_score";

222

updates.put("grade", "B");

223

} else {

224

route = "low_score";

225

updates.put("grade", "C");

226

}

227

228

Command command = new Command(route, updates);

229

return CompletableFuture.completedFuture(command);

230

};

231

232

// Complex routing with external validation

233

AsyncCommandAction<MyState> validationAction = (state, config) -> {

234

String data = state.<String>value("user_input").orElse("");

235

236

return CompletableFuture.supplyAsync(() -> {

237

// Simulate validation logic

238

boolean isValid = data.length() > 5 && data.matches("^[a-zA-Z0-9]+$");

239

boolean needsReview = data.contains("sensitive");

240

241

String route;

242

Map<String, Object> updates = Map.of("validated_at", System.currentTimeMillis());

243

244

if (!isValid) {

245

route = "validation_failed";

246

} else if (needsReview) {

247

route = "needs_review";

248

} else {

249

route = "validation_passed";

250

}

251

252

return new Command(route, updates);

253

});

254

};

255

256

// Add conditional nodes with routing

257

workflow.addNode("router", routingAction, Map.of(

258

"high_score", "high_score_handler",

259

"medium_score", "medium_score_handler",

260

"low_score", "low_score_handler"

261

));

262

263

workflow.addNode("validator", validationAction, Map.of(

264

"validation_passed", "process_data",

265

"validation_failed", "show_error",

266

"needs_review", "human_review"

267

));

268

```

269

270

### Edge Actions

271

272

Actions specifically for edge condition evaluation.

273

274

```java { .api }

275

/**

276

* Synchronous edge action for routing decisions

277

* @param <S> Agent state type

278

*/

279

@FunctionalInterface

280

interface EdgeAction<S extends AgentState> {

281

/**

282

* Evaluates edge condition and returns target node

283

* @param state Current agent state

284

* @return Target node identifier

285

* @throws Exception if evaluation fails

286

*/

287

String apply(S state) throws Exception;

288

}

289

290

/**

291

* Asynchronous edge action for routing decisions

292

* @param <S> Agent state type

293

*/

294

@FunctionalInterface

295

interface AsyncEdgeAction<S extends AgentState> {

296

/**

297

* Evaluates edge condition asynchronously

298

* @param state Current agent state

299

* @param config Runtime configuration

300

* @return CompletableFuture with target node identifier

301

* @throws Exception if evaluation fails

302

*/

303

CompletableFuture<String> apply(S state, RunnableConfig config) throws Exception;

304

}

305

```

306

307

**Usage Examples:**

308

309

```java

310

// Simple edge condition

311

AsyncEdgeAction<MyState> simpleEdge = (state, config) -> {

312

boolean ready = state.<Boolean>value("ready").orElse(false);

313

String target = ready ? "process" : "wait";

314

return CompletableFuture.completedFuture(target);

315

};

316

317

// Complex edge with external check

318

AsyncEdgeAction<MyState> externalEdge = (state, config) -> {

319

String userId = state.<String>value("user_id").orElse("");

320

321

return CompletableFuture.supplyAsync(() -> {

322

// Simulate external service check

323

boolean hasPermission = userId.startsWith("admin_");

324

return hasPermission ? "admin_flow" : "user_flow";

325

});

326

};

327

328

// Use in conditional edges

329

workflow.addConditionalEdges("decision_point", simpleEdge, Map.of(

330

"process", "data_processor",

331

"wait", "wait_node"

332

));

333

334

workflow.addConditionalEdges("permission_check", externalEdge, Map.of(

335

"admin_flow", "admin_dashboard",

336

"user_flow", "user_dashboard"

337

));

338

```

339

340

### Interruption Handling

341

342

Actions that can be interrupted for human-in-the-loop workflows.

343

344

```java { .api }

345

/**

346

* Action that can be interrupted during execution

347

* @param <State> Agent state type

348

*/

349

interface InterruptableAction<State extends AgentState> {

350

/**

351

* Check if action should be interrupted

352

* @param nodeId Current node identifier

353

* @param state Current state

354

* @return Optional interruption metadata if should interrupt

355

*/

356

Optional<InterruptionMetadata<State>> interrupt(String nodeId, State state);

357

}

358

359

/**

360

* Metadata for interruption events

361

* @param <State> Agent state type

362

*/

363

class InterruptionMetadata<State extends AgentState> {

364

/**

365

* Creates interruption metadata builder

366

* @param nodeId Node being interrupted

367

* @param state Current state

368

* @return Builder for interruption metadata

369

*/

370

static <State extends AgentState> Builder<State> builder(String nodeId, State state);

371

372

String getNodeId();

373

State getState();

374

Optional<String> getReason();

375

Map<String, Object> getContext();

376

}

377

```

378

379

**Usage Examples:**

380

381

```java

382

// Action with interruption logic

383

AsyncNodeActionWithConfig<MyState> interruptibleAction = new AsyncNodeActionWithConfig<MyState>() {

384

@Override

385

public CompletableFuture<Map<String, Object>> apply(MyState state, RunnableConfig config) throws Exception {

386

String data = state.<String>value("data").orElse("");

387

388

return CompletableFuture.supplyAsync(() -> {

389

// Process data

390

String result = processData(data);

391

return Map.of("result", result, "processed", true);

392

});

393

}

394

395

// Implement interruption check if needed

396

public Optional<InterruptionMetadata<MyState>> interrupt(String nodeId, MyState state) {

397

// Check if human review is needed

398

boolean needsReview = state.<Boolean>value("needs_human_review").orElse(false);

399

400

if (needsReview) {

401

return Optional.of(

402

InterruptionMetadata.<MyState>builder(nodeId, state)

403

.reason("Human review required")

404

.context(Map.of("review_type", "data_validation"))

405

.build()

406

);

407

}

408

409

return Optional.empty();

410

}

411

412

private String processData(String data) {

413

// Simulate data processing

414

return "Processed: " + data;

415

}

416

};

417

418

// Use with interruption configuration

419

CompileConfig config = CompileConfig.builder()

420

.checkpointSaver(new MemorySaver())

421

.interruptAfter("review_node")

422

.build();

423

424

CompiledGraph<MyState> app = workflow.compile(config);

425

```

426

427

### Parallel Node Execution

428

429

Special handling for nodes that execute multiple actions in parallel.

430

431

```java { .api }

432

// Parallel execution is handled automatically when multiple edges

433

// target the same node from a single source

434

435

// Example: Create parallel branches

436

workflow.addNode("parallel_task_1", asyncAction1);

437

workflow.addNode("parallel_task_2", asyncAction2);

438

workflow.addNode("parallel_task_3", asyncAction3);

439

workflow.addNode("join_node", joinAction);

440

441

// These edges create parallel execution

442

workflow.addEdge("start", "parallel_task_1");

443

workflow.addEdge("start", "parallel_task_2");

444

workflow.addEdge("start", "parallel_task_3");

445

446

// Join results

447

workflow.addEdge("parallel_task_1", "join_node");

448

workflow.addEdge("parallel_task_2", "join_node");

449

workflow.addEdge("parallel_task_3", "join_node");

450

451

// Add custom executor for parallel node control

452

RunnableConfig configWithExecutor = RunnableConfig.builder()

453

.addParallelNodeExecutor("start", Executors.newFixedThreadPool(3))

454

.build();

455

```

456

457

## Action Patterns

458

459

### Stateful Actions

460

461

Actions that maintain internal state across invocations.

462

463

```java { .api }

464

class CounterAction implements AsyncNodeAction<MyState> {

465

private final AtomicInteger counter = new AtomicInteger(0);

466

467

@Override

468

public CompletableFuture<Map<String, Object>> apply(MyState state) {

469

int currentCount = counter.incrementAndGet();

470

return CompletableFuture.completedFuture(

471

Map.of("action_count", currentCount)

472

);

473

}

474

}

475

476

// Use stateful action

477

workflow.addNode("counter", new CounterAction());

478

```

479

480

### Error Handling Actions

481

482

Actions with comprehensive error handling and recovery.

483

484

```java { .api }

485

AsyncNodeAction<MyState> resilientAction = (state) -> {

486

return CompletableFuture.supplyAsync(() -> {

487

try {

488

// Risky operation

489

String result = performRiskyOperation(state);

490

return Map.of("result", result, "success", true);

491

492

} catch (Exception e) {

493

// Log error and return error state

494

return Map.of(

495

"error", e.getMessage(),

496

"success", false,

497

"retry_count", state.<Integer>value("retry_count").orElse(0) + 1

498

);

499

}

500

});

501

};

502

503

private String performRiskyOperation(MyState state) throws Exception {

504

// Simulate operation that might fail

505

if (Math.random() < 0.3) {

506

throw new RuntimeException("Random failure");

507

}

508

return "Success";

509

}

510

```

511

512

### Composite Actions

513

514

Actions that combine multiple operations.

515

516

```java { .api }

517

AsyncNodeAction<MyState> compositeAction = (state) -> {

518

// Chain multiple async operations

519

return validateInput(state)

520

.thenCompose(this::processData)

521

.thenCompose(this::saveResults)

522

.thenApply(result -> Map.of("final_result", result));

523

};

524

525

private CompletableFuture<String> validateInput(MyState state) {

526

String input = state.<String>value("input").orElse("");

527

return CompletableFuture.completedFuture(input);

528

}

529

530

private CompletableFuture<String> processData(String input) {

531

return CompletableFuture.supplyAsync(() -> "Processed: " + input);

532

}

533

534

private CompletableFuture<String> saveResults(String processed) {

535

return CompletableFuture.supplyAsync(() -> {

536

// Simulate save operation

537

return "Saved: " + processed;

538

});

539

}

540

```