CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-dev-langchain4j--langchain4j-agentic

LangChain4j Agentic Framework provides a comprehensive Java library for building multi-agent AI systems with support for workflow orchestration, supervisor agents, planning-based execution, declarative configuration, agent-to-agent communication, and human-in-the-loop workflows.

Overview
Eval results
Files

parallel.mddocs/workflows/

Parallel Workflows

Execute multiple agents concurrently with configurable thread pool execution, aggregating results from all parallel branches.

Declarative @ParallelAgent Reference

Quick Start

import dev.langchain4j.agentic.AgenticServices;

UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(validator1, validator2, validator3)
    .threadPoolSize(4)
    .name("multi-validator")
    .build();

Object result = parallel.invoke("Validate this data");

When to Use Parallel Workflows

Parallel workflows are ideal for:

  • Independent tasks that can run simultaneously
  • Validation from multiple sources for comprehensive checks
  • Aggregating results from parallel operations
  • Maximizing throughput and reducing latency
  • Fan-out operations where same input goes to multiple processors

Creating Parallel Workflows

Factory Methods

/**
 * Create untyped parallel agent builder
 * @return ParallelAgentService for building parallel workflows
 */
static ParallelAgentService<UntypedAgent> parallelBuilder();

/**
 * Create typed parallel agent builder
 * @param agentServiceClass Agent interface class
 * @return ParallelAgentService for building typed parallel workflows
 */
static <T> ParallelAgentService<T> parallelBuilder(Class<T> agentServiceClass);

Usage Examples:

import dev.langchain4j.agentic.AgenticServices;
import dev.langchain4j.agentic.UntypedAgent;

// Create untyped parallel workflow
UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(agent1, agent2, agent3)
    .name("parallel-processor")
    .description("Process data through parallel agents")
    .build();

Object result = parallel.invoke("Process this data");

// Create typed parallel workflow
interface ParallelProcessor {
    List<String> process(String input);
}

ParallelProcessor processor = AgenticServices.parallelBuilder(ParallelProcessor.class)
    .subAgents(validator, enricher, analyzer)
    .build();

ParallelAgentService Interface

Complete configuration interface with executor management.

/**
 * Service for parallel agent workflows
 * Extends AgenticService with executor configuration
 */
interface ParallelAgentService<T> extends AgenticService<ParallelAgentService<T>, T> {

    /**
     * Set custom executor service for parallel execution
     * @param executor ExecutorService to manage parallel tasks
     * @return Builder for chaining
     */
    ParallelAgentService<T> executor(ExecutorService executor);

    /**
     * Set thread pool size for default executor
     * @param threadPoolSize Number of threads in the pool
     * @return Builder for chaining
     */
    ParallelAgentService<T> threadPoolSize(int threadPoolSize);

    // Inherits methods from AgenticService:
    // - T build()
    // - ParallelAgentService<T> subAgents(Object... agents)
    // - ParallelAgentService<T> subAgents(List<AgentExecutor> agentExecutors)
    // - ParallelAgentService<T> beforeCall(Consumer<AgenticScope> beforeCall)
    // - ParallelAgentService<T> name(String name)
    // - ParallelAgentService<T> description(String description)
    // - ParallelAgentService<T> outputKey(String outputKey)
    // - ParallelAgentService<T> output(Function<AgenticScope, Object> output)
    // - ParallelAgentService<T> errorHandler(Function<ErrorContext, ErrorRecoveryResult> errorHandler)
    // - ParallelAgentService<T> listener(AgentListener listener)
}

Usage Examples:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// Parallel workflow with custom executor
ExecutorService customExecutor = Executors.newFixedThreadPool(10);

UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(agent1, agent2, agent3, agent4)
    .executor(customExecutor)
    .name("parallel-analyzer")
    .description("Analyze data through multiple parallel agents")
    .outputKey("parallel_results")
    .build();

// Parallel workflow with thread pool size
UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(agent1, agent2, agent3)
    .threadPoolSize(5)
    .name("parallel-validator")
    .beforeCall(scope -> {
        scope.writeState("start_time", System.currentTimeMillis());
    })
    .output(scope -> {
        // Aggregate results from all parallel agents
        List<Object> results = (List<Object>) scope.readState("parallel_results");
        return Map.of(
            "count", results.size(),
            "results", results,
            "timestamp", scope.readState("start_time")
        );
    })
    .build();

Parallel Execution Flow

In parallel workflows:

  1. All agents are invoked concurrently with the same initial input
  2. Each agent executes independently in its own thread
  3. All agents have access to the shared AgenticScope for reading/writing state
  4. The workflow waits for all agents to complete (or fail)
  5. Results from all agents are aggregated into a list
  6. The aggregated results (or custom output function result) are returned

Example Flow:

// Agent definitions
UntypedAgent sentimentAnalyzer = AgenticServices.agentBuilder()
    .chatModel(chatModel)
    .name("sentiment-analyzer")
    .outputKey("sentiment")
    .build();

UntypedAgent entityExtractor = AgenticServices.agentBuilder()
    .chatModel(chatModel)
    .name("entity-extractor")
    .outputKey("entities")
    .build();

UntypedAgent categoryClassifier = AgenticServices.agentBuilder()
    .chatModel(chatModel)
    .name("category-classifier")
    .outputKey("categories")
    .build();

UntypedAgent languageDetector = AgenticServices.agentBuilder()
    .chatModel(chatModel)
    .name("language-detector")
    .outputKey("language")
    .build();

// Parallel workflow - all agents execute concurrently
UntypedAgent analyzer = AgenticServices.parallelBuilder()
    .subAgents(sentimentAnalyzer, entityExtractor, categoryClassifier, languageDetector)
    .name("text-analyzer")
    .threadPoolSize(4)
    .output(scope -> {
        // Aggregate all analysis results
        return Map.of(
            "sentiment", scope.readState("sentiment"),
            "entities", scope.readState("entities"),
            "categories", scope.readState("categories"),
            "language", scope.readState("language")
        );
    })
    .build();

// Execute parallel analysis
Object result = analyzer.invoke("Analyze this customer feedback text");

Declarative Parallel Agents

Define parallel workflows using @ParallelAgent annotation.

Usage Examples:

interface DataValidationSystem {
    // Parallel workflow declaration
    @ParallelAgent(
        name = "multi-validator",
        description = "Validate data through multiple validators in parallel",
        outputKey = "validation_results",
        threadPoolSize = 4,
        subAgents = {
            SchemaValidator.class,
            BusinessRuleValidator.class,
            SecurityValidator.class,
            PerformanceValidator.class
        }
    )
    List<ValidationResult> validate(String data);
}

interface SchemaValidator {
    @Agent(name = "schema-validator", outputKey = "schema_valid")
    ValidationResult validateSchema(String data);
}

interface BusinessRuleValidator {
    @Agent(name = "business-validator", outputKey = "business_valid")
    ValidationResult validateBusinessRules(String data);
}

DataValidationSystem system = AgenticServices.createAgenticSystem(
    DataValidationSystem.class,
    chatModel
);

List<ValidationResult> results = system.validate(jsonData);

Executor Configuration

Configure thread pool execution for parallel workflows.

/**
 * Set custom executor service for parallel execution
 * @param executor ExecutorService to manage parallel tasks
 * @return Builder for chaining
 */
ParallelAgentService<T> executor(ExecutorService executor);

/**
 * Set thread pool size for default executor
 * @param threadPoolSize Number of threads in the pool
 * @return Builder for chaining
 */
ParallelAgentService<T> threadPoolSize(int threadPoolSize);

Usage Examples:

import java.util.concurrent.*;

// Using custom ExecutorService
ExecutorService customExecutor = new ThreadPoolExecutor(
    4,                      // core pool size
    10,                     // maximum pool size
    60L,                    // keep alive time
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100)
);

UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(agent1, agent2, agent3)
    .executor(customExecutor)
    .build();

// Using thread pool size (creates fixed thread pool)
UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(agent1, agent2, agent3, agent4, agent5)
    .threadPoolSize(5)  // Creates fixed thread pool with 5 threads
    .build();

// Default behavior (no executor specified)
// Uses ForkJoinPool.commonPool() for parallel execution
UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(agent1, agent2, agent3)
    .build();

Custom Output Aggregation

Customize how results from parallel agents are aggregated.

/**
 * Set custom output function for aggregating parallel results
 * @param output Function receiving AgenticScope and returning final output
 * @return Builder for chaining
 */
ParallelAgentService<T> output(Function<AgenticScope, Object> output);

Usage Examples:

// Aggregate results into custom structure
UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(priceChecker, inventoryChecker, shippingChecker)
    .output(scope -> {
        double price = (Double) scope.readState("price");
        int inventory = (Integer) scope.readState("inventory");
        String shipping = (String) scope.readState("shipping");

        return new OrderAvailability(price, inventory, shipping);
    })
    .build();

// Merge and deduplicate results
UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(source1, source2, source3)
    .output(scope -> {
        Set<String> allResults = new HashSet<>();

        List<String> results1 = (List<String>) scope.readState("source1_results");
        List<String> results2 = (List<String>) scope.readState("source2_results");
        List<String> results3 = (List<String>) scope.readState("source3_results");

        if (results1 != null) allResults.addAll(results1);
        if (results2 != null) allResults.addAll(results2);
        if (results3 != null) allResults.addAll(results3);

        return new ArrayList<>(allResults);
    })
    .build();

// Calculate aggregate metrics
UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(scorer1, scorer2, scorer3)
    .output(scope -> {
        double score1 = (Double) scope.readState("score1");
        double score2 = (Double) scope.readState("score2");
        double score3 = (Double) scope.readState("score3");

        double avgScore = (score1 + score2 + score3) / 3.0;
        double maxScore = Math.max(score1, Math.max(score2, score3));

        return Map.of(
            "average", avgScore,
            "maximum", maxScore,
            "individual", List.of(score1, score2, score3)
        );
    })
    .build();

Error Handling

Handle errors during parallel execution with per-agent error handling.

/**
 * Set error handler for parallel workflow
 * @param errorHandler Function receiving ErrorContext and returning ErrorRecoveryResult
 * @return Builder for chaining
 */
ParallelAgentService<T> errorHandler(Function<ErrorContext, ErrorRecoveryResult> errorHandler);

Usage Examples:

UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(agent1, agent2, agent3, agent4)
    .errorHandler(errorContext -> {
        Throwable error = errorContext.error();
        AgenticScope scope = errorContext.agenticScope();

        // Log which agent failed
        System.err.println("Agent failed: " + error.getMessage());

        // Continue with other agents even if one fails
        if (error instanceof TimeoutException) {
            scope.writeState("timeout_occurred", true);
            return new ErrorRecoveryResult("timeout", true);
        }

        // For critical errors, stop all parallel execution
        if (error instanceof SecurityException) {
            return new ErrorRecoveryResult(null, false);
        }

        // Default: continue with partial results
        return new ErrorRecoveryResult(null, true);
    })
    .build();

// Error handling with partial results
UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(validator1, validator2, validator3)
    .errorHandler(errorContext -> {
        // Allow partial validation results
        return new ErrorRecoveryResult("validation_skipped", true);
    })
    .output(scope -> {
        List<Object> results = new ArrayList<>();

        // Collect successful results
        if (scope.readState("validator1_result") != null) {
            results.add(scope.readState("validator1_result"));
        }
        if (scope.readState("validator2_result") != null) {
            results.add(scope.readState("validator2_result"));
        }
        if (scope.readState("validator3_result") != null) {
            results.add(scope.readState("validator3_result"));
        }

        return Map.of(
            "successful_validations", results.size(),
            "results", results
        );
    })
    .build();

Thread Safety

When using parallel workflows, AgenticScope is thread-safe for concurrent access.

// Thread-safe state access
UntypedAgent parallel = AgenticServices.parallelBuilder()
    .subAgents(agent1, agent2, agent3)
    .beforeCall(scope -> {
        // Initialize shared state before parallel execution
        scope.writeState("shared_config", loadConfiguration());
    })
    .build();

// Each parallel agent can safely read and write to AgenticScope
UntypedAgent parallelAgent = AgenticServices.agentBuilder()
    .chatModel(chatModel)
    .beforeCall(scope -> {
        // Read shared state (thread-safe)
        Config config = (Config) scope.readState("shared_config");

        // Write agent-specific results (thread-safe)
        scope.writeState(agentName + "_result", processWithConfig(config));
    })
    .build();

See Also

Install with Tessl CLI

npx tessl i tessl/maven-dev-langchain4j--langchain4j-agentic@1.11.0

docs

index.md

tile.json