LangChain4j Agentic Framework provides a comprehensive Java library for building multi-agent AI systems with support for workflow orchestration, supervisor agents, planning-based execution, declarative configuration, agent-to-agent communication, and human-in-the-loop workflows.
Execute multiple agents concurrently with configurable thread pool execution, aggregating results from all parallel branches.
→ Declarative @ParallelAgent Reference
import dev.langchain4j.agentic.AgenticServices;
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(validator1, validator2, validator3)
.threadPoolSize(4)
.name("multi-validator")
.build();
Object result = parallel.invoke("Validate this data");Parallel workflows are ideal for:
/**
* Create untyped parallel agent builder
* @return ParallelAgentService for building parallel workflows
*/
static ParallelAgentService<UntypedAgent> parallelBuilder();
/**
* Create typed parallel agent builder
* @param agentServiceClass Agent interface class
* @return ParallelAgentService for building typed parallel workflows
*/
static <T> ParallelAgentService<T> parallelBuilder(Class<T> agentServiceClass);Usage Examples:
import dev.langchain4j.agentic.AgenticServices;
import dev.langchain4j.agentic.UntypedAgent;
// Create untyped parallel workflow
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(agent1, agent2, agent3)
.name("parallel-processor")
.description("Process data through parallel agents")
.build();
Object result = parallel.invoke("Process this data");
// Create typed parallel workflow
interface ParallelProcessor {
List<String> process(String input);
}
ParallelProcessor processor = AgenticServices.parallelBuilder(ParallelProcessor.class)
.subAgents(validator, enricher, analyzer)
.build();Complete configuration interface with executor management.
/**
* Service for parallel agent workflows
* Extends AgenticService with executor configuration
*/
interface ParallelAgentService<T> extends AgenticService<ParallelAgentService<T>, T> {
/**
* Set custom executor service for parallel execution
* @param executor ExecutorService to manage parallel tasks
* @return Builder for chaining
*/
ParallelAgentService<T> executor(ExecutorService executor);
/**
* Set thread pool size for default executor
* @param threadPoolSize Number of threads in the pool
* @return Builder for chaining
*/
ParallelAgentService<T> threadPoolSize(int threadPoolSize);
// Inherits methods from AgenticService:
// - T build()
// - ParallelAgentService<T> subAgents(Object... agents)
// - ParallelAgentService<T> subAgents(List<AgentExecutor> agentExecutors)
// - ParallelAgentService<T> beforeCall(Consumer<AgenticScope> beforeCall)
// - ParallelAgentService<T> name(String name)
// - ParallelAgentService<T> description(String description)
// - ParallelAgentService<T> outputKey(String outputKey)
// - ParallelAgentService<T> output(Function<AgenticScope, Object> output)
// - ParallelAgentService<T> errorHandler(Function<ErrorContext, ErrorRecoveryResult> errorHandler)
// - ParallelAgentService<T> listener(AgentListener listener)
}Usage Examples:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// Parallel workflow with custom executor
ExecutorService customExecutor = Executors.newFixedThreadPool(10);
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(agent1, agent2, agent3, agent4)
.executor(customExecutor)
.name("parallel-analyzer")
.description("Analyze data through multiple parallel agents")
.outputKey("parallel_results")
.build();
// Parallel workflow with thread pool size
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(agent1, agent2, agent3)
.threadPoolSize(5)
.name("parallel-validator")
.beforeCall(scope -> {
scope.writeState("start_time", System.currentTimeMillis());
})
.output(scope -> {
// Aggregate results from all parallel agents
List<Object> results = (List<Object>) scope.readState("parallel_results");
return Map.of(
"count", results.size(),
"results", results,
"timestamp", scope.readState("start_time")
);
})
.build();In parallel workflows:
Example Flow:
// Agent definitions
UntypedAgent sentimentAnalyzer = AgenticServices.agentBuilder()
.chatModel(chatModel)
.name("sentiment-analyzer")
.outputKey("sentiment")
.build();
UntypedAgent entityExtractor = AgenticServices.agentBuilder()
.chatModel(chatModel)
.name("entity-extractor")
.outputKey("entities")
.build();
UntypedAgent categoryClassifier = AgenticServices.agentBuilder()
.chatModel(chatModel)
.name("category-classifier")
.outputKey("categories")
.build();
UntypedAgent languageDetector = AgenticServices.agentBuilder()
.chatModel(chatModel)
.name("language-detector")
.outputKey("language")
.build();
// Parallel workflow - all agents execute concurrently
UntypedAgent analyzer = AgenticServices.parallelBuilder()
.subAgents(sentimentAnalyzer, entityExtractor, categoryClassifier, languageDetector)
.name("text-analyzer")
.threadPoolSize(4)
.output(scope -> {
// Aggregate all analysis results
return Map.of(
"sentiment", scope.readState("sentiment"),
"entities", scope.readState("entities"),
"categories", scope.readState("categories"),
"language", scope.readState("language")
);
})
.build();
// Execute parallel analysis
Object result = analyzer.invoke("Analyze this customer feedback text");Define parallel workflows using @ParallelAgent annotation.
Usage Examples:
interface DataValidationSystem {
// Parallel workflow declaration
@ParallelAgent(
name = "multi-validator",
description = "Validate data through multiple validators in parallel",
outputKey = "validation_results",
threadPoolSize = 4,
subAgents = {
SchemaValidator.class,
BusinessRuleValidator.class,
SecurityValidator.class,
PerformanceValidator.class
}
)
List<ValidationResult> validate(String data);
}
interface SchemaValidator {
@Agent(name = "schema-validator", outputKey = "schema_valid")
ValidationResult validateSchema(String data);
}
interface BusinessRuleValidator {
@Agent(name = "business-validator", outputKey = "business_valid")
ValidationResult validateBusinessRules(String data);
}
DataValidationSystem system = AgenticServices.createAgenticSystem(
DataValidationSystem.class,
chatModel
);
List<ValidationResult> results = system.validate(jsonData);Configure thread pool execution for parallel workflows.
/**
* Set custom executor service for parallel execution
* @param executor ExecutorService to manage parallel tasks
* @return Builder for chaining
*/
ParallelAgentService<T> executor(ExecutorService executor);
/**
* Set thread pool size for default executor
* @param threadPoolSize Number of threads in the pool
* @return Builder for chaining
*/
ParallelAgentService<T> threadPoolSize(int threadPoolSize);Usage Examples:
import java.util.concurrent.*;
// Using custom ExecutorService
ExecutorService customExecutor = new ThreadPoolExecutor(
4, // core pool size
10, // maximum pool size
60L, // keep alive time
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(agent1, agent2, agent3)
.executor(customExecutor)
.build();
// Using thread pool size (creates fixed thread pool)
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(agent1, agent2, agent3, agent4, agent5)
.threadPoolSize(5) // Creates fixed thread pool with 5 threads
.build();
// Default behavior (no executor specified)
// Uses ForkJoinPool.commonPool() for parallel execution
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(agent1, agent2, agent3)
.build();Customize how results from parallel agents are aggregated.
/**
* Set custom output function for aggregating parallel results
* @param output Function receiving AgenticScope and returning final output
* @return Builder for chaining
*/
ParallelAgentService<T> output(Function<AgenticScope, Object> output);Usage Examples:
// Aggregate results into custom structure
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(priceChecker, inventoryChecker, shippingChecker)
.output(scope -> {
double price = (Double) scope.readState("price");
int inventory = (Integer) scope.readState("inventory");
String shipping = (String) scope.readState("shipping");
return new OrderAvailability(price, inventory, shipping);
})
.build();
// Merge and deduplicate results
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(source1, source2, source3)
.output(scope -> {
Set<String> allResults = new HashSet<>();
List<String> results1 = (List<String>) scope.readState("source1_results");
List<String> results2 = (List<String>) scope.readState("source2_results");
List<String> results3 = (List<String>) scope.readState("source3_results");
if (results1 != null) allResults.addAll(results1);
if (results2 != null) allResults.addAll(results2);
if (results3 != null) allResults.addAll(results3);
return new ArrayList<>(allResults);
})
.build();
// Calculate aggregate metrics
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(scorer1, scorer2, scorer3)
.output(scope -> {
double score1 = (Double) scope.readState("score1");
double score2 = (Double) scope.readState("score2");
double score3 = (Double) scope.readState("score3");
double avgScore = (score1 + score2 + score3) / 3.0;
double maxScore = Math.max(score1, Math.max(score2, score3));
return Map.of(
"average", avgScore,
"maximum", maxScore,
"individual", List.of(score1, score2, score3)
);
})
.build();Handle errors during parallel execution with per-agent error handling.
/**
* Set error handler for parallel workflow
* @param errorHandler Function receiving ErrorContext and returning ErrorRecoveryResult
* @return Builder for chaining
*/
ParallelAgentService<T> errorHandler(Function<ErrorContext, ErrorRecoveryResult> errorHandler);Usage Examples:
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(agent1, agent2, agent3, agent4)
.errorHandler(errorContext -> {
Throwable error = errorContext.error();
AgenticScope scope = errorContext.agenticScope();
// Log which agent failed
System.err.println("Agent failed: " + error.getMessage());
// Continue with other agents even if one fails
if (error instanceof TimeoutException) {
scope.writeState("timeout_occurred", true);
return new ErrorRecoveryResult("timeout", true);
}
// For critical errors, stop all parallel execution
if (error instanceof SecurityException) {
return new ErrorRecoveryResult(null, false);
}
// Default: continue with partial results
return new ErrorRecoveryResult(null, true);
})
.build();
// Error handling with partial results
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(validator1, validator2, validator3)
.errorHandler(errorContext -> {
// Allow partial validation results
return new ErrorRecoveryResult("validation_skipped", true);
})
.output(scope -> {
List<Object> results = new ArrayList<>();
// Collect successful results
if (scope.readState("validator1_result") != null) {
results.add(scope.readState("validator1_result"));
}
if (scope.readState("validator2_result") != null) {
results.add(scope.readState("validator2_result"));
}
if (scope.readState("validator3_result") != null) {
results.add(scope.readState("validator3_result"));
}
return Map.of(
"successful_validations", results.size(),
"results", results
);
})
.build();When using parallel workflows, AgenticScope is thread-safe for concurrent access.
// Thread-safe state access
UntypedAgent parallel = AgenticServices.parallelBuilder()
.subAgents(agent1, agent2, agent3)
.beforeCall(scope -> {
// Initialize shared state before parallel execution
scope.writeState("shared_config", loadConfiguration());
})
.build();
// Each parallel agent can safely read and write to AgenticScope
UntypedAgent parallelAgent = AgenticServices.agentBuilder()
.chatModel(chatModel)
.beforeCall(scope -> {
// Read shared state (thread-safe)
Config config = (Config) scope.readState("shared_config");
// Write agent-specific results (thread-safe)
scope.writeState(agentName + "_result", processWithConfig(config));
})
.build();Install with Tessl CLI
npx tessl i tessl/maven-dev-langchain4j--langchain4j-agentic@1.11.0docs
declarative
A2AClientAgent
ActivationCondition
Agent
ConditionalAgent
ErrorHandler
ExitCondition
HumanInTheLoop
HumanInTheLoopResponseSupplier
LoopAgent
LoopCounter
Output
ParallelAgent
ParallelExecutor
PlannerAgent
SequenceAgent
SupervisorAgent
SupervisorRequest
quick-start
workflows