CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-api

Fluent DSL and Kotlin DSL for building autonomous agents with planning capabilities on the JVM, featuring annotation-based and programmatic configuration for agentic flows with Spring Boot integration

Overview
Eval results
Files

runtime-context.mddocs/

Runtime Context

Runtime context interfaces provide access to execution state, platform services, and agent capabilities during action execution. The context hierarchy progresses from OperationContext (read-only) to ExecutingOperationContext (with messaging) to ActionContext (full action execution).

Capabilities

OperationContext

Core runtime context with read-only access to state and services.

interface OperationContext : Blackboard, ToolGroupConsumer {
    /** Process context for this operation */
    val processContext: ProcessContext

    /** The agent process */
    val agentProcess: AgentProcess

    /** The current operation */
    val operation: Operation

    /** Fire another agent */
    fun <T> fireAgent(obj: Any, resultType: Class<T>): CompletableFuture<T>?

    /** Access AI functionality */
    fun ai(): Ai

    /** Get prompt runner for LLM operations */
    fun promptRunner(): PromptRunner

    /** Execute operations in parallel */
    fun <T, R> parallelMap(
        items: List<T>,
        maxConcurrency: Int = 10,
        transform: (T) -> R
    ): List<R>

    /** Get most recent value of type from blackboard */
    fun <T> last(type: Class<T>): T?
}

OperationContext Usage:

@Action(description = "Process items in parallel")
fun processItems(items: List<Item>, context: OperationContext): List<Result> {
    // Process up to 5 items concurrently
    return context.parallelMap(items, maxConcurrency = 5) { item ->
        processItem(item)
    }
}

@Action(description = "Advanced parallel processing")
fun advancedParallelProcessing(context: OperationContext): Summary {
    val documents = context.objectsOfType<Document>()

    // parallelMap signature: (items, maxConcurrency, transform)
    // Returns List<R> with results in same order as input
    val analyzed = context.parallelMap(
        items = documents,
        maxConcurrency = 10  // Max concurrent executions
    ) { doc ->
        // Transform function runs concurrently
        context.promptRunner()
            .createObject<Analysis>("Analyze document: ${doc.title}")
    }

    return Summary(analyzed)
}

@Action(description = "Analyze data with AI")
fun analyzeData(context: OperationContext): Analysis {
    val dataset = context.last(Dataset::class.java)
        ?: throw IllegalStateException("No dataset available")

    return context.promptRunner()
        .createObject<Analysis>("Analyze: ${dataset.summary}")
}

@Action(description = "Fire agent for dynamic invocation")
fun dynamicAgentInvocation(input: DataRequest, context: OperationContext): CompletableFuture<Report>? {
    // Fire another agent asynchronously
    // Returns CompletableFuture<T> if agent found, null otherwise
    val reportFuture = context.fireAgent(input, Report::class.java)

    return reportFuture?.thenApply { report ->
        context.put("generated_report", report)
        report
    }
}

@Action(description = "Fire multiple agents in parallel")
fun fireMultipleAgents(requests: List<Request>, context: OperationContext): List<Response> {
    // Fire agents in parallel and collect results
    val futures = requests.mapNotNull { request ->
        context.fireAgent(request, Response::class.java)
    }

    // Wait for all to complete
    return CompletableFuture.allOf(*futures.toTypedArray())
        .thenApply {
            futures.map { it.join() }
        }
        .join()
}

ExecutingOperationContext

Extended context with execution capabilities like messaging and subprocess control.

interface ExecutingOperationContext : OperationContext {
    /** Send message to output channel */
    fun sendMessage(message: Message)

    /** Send message and save to conversation */
    fun sendAndSave(message: Message)

    /** Update progress for monitoring */
    fun updateProgress(message: String)

    /** Send custom output channel event */
    fun sendOutputChannelEvent(event: OutputChannelEvent)

    /** Execute subagent and return result */
    fun <O> asSubProcess(outputClass: Class<O>, agentScopeBuilder: AgentScopeBuilder): O

    /** Execute agent as subprocess */
    fun <O> asSubProcess(outputClass: Class<O>, agent: Agent): O
}

ExecutingOperationContext Usage:

@Action(description = "Process order with updates")
fun processOrder(
    order: Order,
    context: ExecutingOperationContext
): ProcessedOrder {
    context.updateProgress("Validating order...")
    val validation = validateOrder(order)

    context.updateProgress("Processing payment...")
    val payment = processPayment(order)

    context.updateProgress("Arranging shipment...")
    val shipment = arrangeShipment(order)

    // Send message to output channel
    context.sendMessage(Message.user("Order ${order.id} processed successfully"))

    return ProcessedOrder(order, payment, shipment)
}

@Action(description = "Message handling with conversation persistence")
fun handleWithPersistence(context: ExecutingOperationContext) {
    // sendMessage: Send to output channel only (ephemeral)
    context.sendMessage(Message.user("Processing started..."))

    val result = performOperation()

    // sendAndSave: Send AND persist to conversation history
    context.sendAndSave(Message.assistant("Operation completed: ${result.summary}"))

    // Saved messages affect conversation context for future LLM calls
}

@Action(description = "Custom output channel events")
fun sendCustomEvents(context: ExecutingOperationContext) {
    // Send custom events for specialized handling
    context.sendOutputChannelEvent(
        CustomProgressEvent(percentage = 50, stage = "Processing")
    )

    context.sendOutputChannelEvent(
        MetricsEvent(cpu = 45.2, memory = 1024)
    )

    context.sendOutputChannelEvent(
        ValidationEvent(errors = listOf("Invalid field: email"))
    )
}

@Action(description = "Subprocess execution")
fun executeSubprocess(task: ComplexTask, context: ExecutingOperationContext): Result {
    // Execute agent as subprocess with type-safe result
    val specialistResult = context.asSubProcess(
        outputClass = SpecialistResult::class.java,
        agent = specialistAgent
    )

    // Or use AgentScopeBuilder for custom configuration
    val customResult = context.asSubProcess(
        outputClass = CustomResult::class.java
    ) { scope ->
        scope
            .withToolGroups("specialized-tools")
            .withInput(task)
            .withTimeout(Duration.ofMinutes(5))
    }

    return Result(specialistResult, customResult)
}

ActionContext

Full action execution context with access to action metadata and domain objects.

interface ActionContext : ExecutingOperationContext {
    /** The action being executed (null for non-action contexts) */
    val action: Action?

    /** Get all domain object instances on blackboard */
    fun domainObjectInstances(): List<Any>
}

ActionContext Usage:

@Action(description = "Generate report with context")
fun generateReport(context: ActionContext): Report {
    // Access action metadata
    val actionName = context.action?.name ?: "unknown"
    logger.info("Executing action: $actionName")

    // Get all domain objects
    val domainObjects = context.domainObjectInstances()
    logger.info("Available domain objects: ${domainObjects.size}")

    // Access specific types from blackboard
    val data = context.last(Dataset::class.java)
    val config = context.last(ReportConfig::class.java)

    // Use LLM
    val insights = context.promptRunner()
        .createObject<Insights>("Extract insights from: ${data?.summary}")

    // Update progress
    context.updateProgress("Generated report with ${insights.items.size} insights")

    return Report(data, config, insights)
}

InputActionContext

Context with single typed input.

interface InputActionContext<I> : InputsActionContext {
    /** The typed input */
    val input: I
}

InputActionContext Usage:

// This is typically used internally, but shows the typed input pattern
fun processWithTypedInput(context: InputActionContext<Order>) {
    val order = context.input  // Typed as Order
    // Process order
}

InputsActionContext

Context with multiple inputs.

interface InputsActionContext : ActionContext {
    /** All inputs to the action */
    val inputs: List<Any>
}

InputsActionContext Usage:

@Action(description = "Combine multiple sources")
fun combineSources(context: InputsActionContext): CombinedData {
    // Access all inputs
    val sources = context.inputs.filterIsInstance<DataSource>()

    return sources.fold(CombinedData()) { acc, source ->
        acc.merge(source.data)
    }
}

TransformationActionContext

Context for transformation actions with strongly-typed input and output types.

interface TransformationActionContext<I, O> {
    /** The input value */
    val input: I

    /** Process context */
    val processContext: ProcessContext

    /** The action being executed */
    val action: Action

    /** Input type class for reflection */
    val inputClass: Class<I>

    /** Output type class for reflection */
    val outputClass: Class<O>
}

TransformationActionContext Usage:

// Define transformation action
@TransformationAction(
    inputType = RawData::class,
    outputType = ProcessedData::class,
    description = "Transform raw data to processed format"
)
fun transformData(context: TransformationActionContext<RawData, ProcessedData>): ProcessedData {
    // Access typed input
    val raw = context.input

    // Use type information for reflection/validation
    println("Transforming ${context.inputClass.simpleName} to ${context.outputClass.simpleName}")

    // Access process info
    val processId = context.processContext.processId

    // Transform
    return ProcessedData(
        id = raw.id,
        content = raw.content.process(),
        processedAt = Instant.now()
    )
}

@TransformationAction(
    inputType = Document::class,
    outputType = Summary::class,
    description = "Summarize document"
)
fun summarizeDocument(context: TransformationActionContext<Document, Summary>): Summary {
    val doc = context.input

    // Type information available for dynamic behavior
    if (context.inputClass.getDeclaredField("metadata") != null) {
        println("Input has metadata field")
    }

    return Summary(
        title = doc.title,
        summary = extractSummary(doc.content),
        wordCount = doc.content.split(" ").size
    )
}

SupplierActionContext

Context for supplier actions that produce outputs without required inputs.

interface SupplierActionContext<O> {
    /** Expected output type */
    val outputClass: Class<O>

    /** Available inputs (may be empty) */
    val inputs: List<Any>

    /** Process context */
    val processContext: ProcessContext

    /** The action */
    val action: Action
}

SupplierActionContext Usage:

// Supplier action with no required inputs
@SupplierAction(
    outputType = Configuration::class,
    description = "Load system configuration"
)
fun loadConfiguration(context: SupplierActionContext<Configuration>): Configuration {
    // No required input, but can access optional inputs
    val environment = context.inputs.filterIsInstance<Environment>().firstOrNull()

    // Know expected output type
    println("Supplying ${context.outputClass.simpleName}")

    return Configuration.load(environment)
}

@SupplierAction(
    outputType = Report::class,
    description = "Generate report from available data"
)
fun generateReport(context: SupplierActionContext<Report>): Report {
    // Supplier can use whatever data is available
    val data = context.inputs.filterIsInstance<DataSource>()
    val config = context.inputs.filterIsInstance<ReportConfig>().firstOrNull()
        ?: ReportConfig.default()

    // Generate output without requiring specific input
    return Report(
        sources = data,
        config = config,
        generatedAt = Instant.now()
    )
}

@SupplierAction(
    outputType = List::class,
    description = "List available resources"
)
fun listResources(context: SupplierActionContext<List<Resource>>): List<Resource> {
    // Pure supplier - no inputs needed
    val processCtx = context.processContext
    println("Listing resources for process ${processCtx.processId}")

    return resourceRepository.findAll()
}

Blackboard Access

All context interfaces extend Blackboard for state storage and retrieval.

interface Blackboard {
    /** Unique identifier for this blackboard */
    val blackboardId: String

    /** Get value by key and type */
    fun <T> get(key: String, type: Class<T>): T?

    /** Put value with key */
    fun put(key: String, value: Any)

    /** Get most recent value of type */
    fun <T> last(type: Class<T>): T?

    /** Clear all values */
    fun clear()

    /** Hide object from retrieval (still stored but not returned by queries) */
    fun hide(what: Any)

    /** Thread-safe get or create value */
    fun <V> getOrPut(name: String, creator: () -> V): V

    /** Check if variable exists with given type */
    fun hasValue(variable: String, type: Class<*>, dataDictionary: DataDictionary): Boolean

    /** Get variable value with type checking */
    fun getValue(variable: String, type: Class<*>, dataDictionary: DataDictionary): Any?

    /** Count objects of given type on blackboard */
    fun <T> count(clazz: Class<T>): Int

    /** Create independent child blackboard with isolated state */
    fun spawn(): Blackboard

    /** Set condition value explicitly */
    fun setCondition(key: String, value: Boolean): Blackboard

    /** Get condition value */
    fun getCondition(key: String): Boolean?

    /** Get model for expression evaluation */
    fun expressionEvaluationModel(): Map<String, Any>
}

/** Extension functions for type-safe access */
inline fun <reified T> Blackboard.objectsOfType(): List<T>
inline fun <reified T> Blackboard.count(): Int
inline fun <reified T> Blackboard.last(): T?

Blackboard Usage:

@Action(description = "Store and retrieve data")
fun manageData(data: Data, context: ActionContext) {
    // Store with explicit key
    context.put("processed_data", data.process())

    // Store with type-based key
    context.put("config", Config(enabled = true))

    // Retrieve by key
    val processed = context.get("processed_data", Data::class.java)

    // Retrieve most recent by type
    val config = context.last(Config::class.java)

    // Count objects of a type
    val dataCount = context.count(Data::class.java)
    println("Found $dataCount data objects")

    // Thread-safe initialization
    val cache = context.getOrPut("cache") {
        ExpensiveCache().apply { initialize() }
    }

    // Hide sensitive data from retrieval
    val credentials = Credentials("user", "pass")
    context.put("creds", credentials)
    context.hide(credentials)  // Still stored but won't be returned by queries
}

@Action(description = "Use spawned blackboard for isolation")
fun processWithIsolation(context: ActionContext) {
    // Create isolated child blackboard
    val childBoard = context.spawn()

    // Changes to child don't affect parent
    childBoard.put("temp_data", "isolated")
    childBoard.setCondition("experimental_feature", true)

    // Parent blackboard unchanged
    val parentData = context.get("temp_data", String::class.java)  // null
    val parentCondition = context.getCondition("experimental_feature")  // null
}

@Action(description = "Manage conditions")
fun manageConditions(context: ActionContext) {
    // Set conditions for control flow
    context.setCondition("debug_mode", true)
    context.setCondition("use_cache", false)

    // Check conditions
    if (context.getCondition("debug_mode") == true) {
        println("Debug logging enabled")
    }

    // Use in expression evaluation
    val model = context.expressionEvaluationModel()
    // model contains all blackboard state for SpEL expressions
}

@Action(description = "Type-safe extension functions")
fun useExtensions(context: ActionContext) {
    // Reified generics for cleaner code
    val allOrders = context.objectsOfType<Order>()
    val orderCount = context.count<Order>()
    val lastOrder = context.last<Order>()

    println("Found $orderCount orders, last: $lastOrder")
}

Bindable Interface

Bindable provides operator overloading for fluent blackboard manipulation.

interface Bindable : Blackboard {
    /** Set operator for fluent assignment */
    operator fun set(key: String, value: Any)

    /** Bind value to blackboard */
    fun bind(value: Any): Bindable

    /** Bind value with protected visibility */
    fun bindProtected(value: Any): Bindable

    /** Add object to blackboard */
    fun addObject(obj: Any): Bindable

    /** Plus-assign operator for adding objects */
    operator fun plusAssign(obj: Any)
}

Bindable Usage:

@Action(description = "Fluent blackboard operations")
fun fluentOperations(context: ActionContext) {
    // Operator overloading for assignment
    context["user_id"] = "12345"
    context["session"] = Session()

    // Fluent binding
    context
        .bind(Order())
        .bind(Customer())
        .bindProtected(Credentials())  // Protected from certain queries

    // Add objects fluently
    context.addObject(Product())
        .addObject(Cart())

    // Plus-assign operator
    context += ShippingInfo()
    context += PaymentMethod()
}

MayHaveLastResult Interface

Interface for contexts that may have a result from the previous operation.

interface MayHaveLastResult {
    /** Get result from last operation, if any */
    fun lastResult(): Any?
}

MayHaveLastResult Usage:

@Action(description = "Access previous result")
fun usePreviousResult(context: ActionContext) {
    // Check if context has last result
    if (context is MayHaveLastResult) {
        val previousResult = context.lastResult()
        if (previousResult != null) {
            println("Previous operation returned: $previousResult")
            // Use result in current operation
        }
    }
}

AI Access

Access AI capabilities through context.

interface Ai {
    /** Get embedding service */
    fun withEmbeddingService(criteria: EmbeddingCriteria): EmbeddingService

    /** Get LLM prompt runner */
    fun withLlm(options: LlmOptions): PromptRunner
}

interface AiBuilder {
    fun build(): Ai
}

AI Usage:

@Action(description = "Use specific model")
fun useSpecificModel(context: ActionContext): String {
    // Use specific model with options
    return context.ai()
        .withLlm(LlmOptions.builder()
            .model(AnthropicModels.CLAUDE_3_OPUS)
            .temperature(0.7)
            .build())
        .generateText("Generate creative content")
}

@Action(description = "Generate embeddings")
fun generateEmbeddings(texts: List<String>, context: ActionContext): List<Embedding> {
    val embeddingService = context.ai()
        .withEmbeddingService(EmbeddingCriteria.builder()
            .model("text-embedding-3-large")
            .build())

    return texts.map { text ->
        embeddingService.embed(text)
    }
}

Subprocess Execution

Execute subagents from context.

@Action(description = "Delegate to specialist")
fun delegateToSpecialist(
    task: ComplexTask,
    context: ExecutingOperationContext
): SpecialistResult {
    // Execute specialist agent as subprocess
    return context.asSubProcess(
        outputClass = SpecialistResult::class.java,
        agent = specialistAgent
    )
}

Fire Agent

Asynchronously fire another agent.

@Action(description = "Trigger parallel processing")
fun triggerParallel(context: OperationContext): CompletableFuture<Report> {
    val data = context.last(Dataset::class.java)

    // Fire agent asynchronously
    return context.fireAgent(data, Report::class.java)
        ?: CompletableFuture.completedFuture(null)
}

Tool Group Access

Access and manage tool groups through context.

interface ToolGroupConsumer {
    /** Register a tool group with this context */
    fun withToolGroup(group: ToolGroup): Unit
}

Tool Group Usage:

@Action(
    description = "Process with specialized tools",
    toolGroups = ["file-operations", "data-processing"]
)
fun processWithTools(context: ActionContext): ProcessedData {
    // Tool groups are automatically available
    val result = context.promptRunner()
        .withToolGroup(fileToolGroup)
        .generateText("Process the files in /data")

    return ProcessedData(result)
}

@Action(description = "Dynamic tool group registration")
fun dynamicToolGroups(context: OperationContext) {
    // Register tool groups dynamically based on conditions
    if (context.getCondition("use_advanced_tools") == true) {
        context.withToolGroup(advancedToolGroup)
    }

    // Tool group now available for LLM operations
    val result = context.promptRunner()
        .generateText("Use the registered tools to analyze data")
}

@Action(description = "Multiple tool group registration")
fun multipleToolGroups(context: OperationContext): Report {
    // Register multiple tool groups
    context.withToolGroup(fileToolGroup)
    context.withToolGroup(databaseToolGroup)
    context.withToolGroup(apiToolGroup)

    // All tool groups available to LLM
    return context.promptRunner()
        .createObject<Report>("Generate comprehensive report using all available tools")
}

Process Context

Access process-level information and metadata.

interface ProcessContext {
    /** Unique process identifier */
    val processId: String

    /** Process start timestamp */
    val startTime: Instant

    /** Current process status */
    val status: OperationStatus
}

Process Context Usage:

@Action(description = "Log process info")
fun logProcessInfo(context: ActionContext) {
    val processCtx = context.processContext
    logger.info("Process ID: ${processCtx.processId}")
    logger.info("Started at: ${processCtx.startTime}")
    logger.info("Status: ${processCtx.status}")
}

@Action(description = "Track process duration")
fun trackProcessDuration(context: ActionContext): Duration {
    val processCtx = context.processContext
    val duration = Duration.between(processCtx.startTime, Instant.now())

    context.sendMessage(
        Message.user("Process ${processCtx.processId} running for ${duration.toMinutes()}m")
    )

    return duration
}

@Action(description = "Conditional processing based on status")
fun conditionalProcessing(context: ActionContext) {
    val processCtx = context.processContext

    when (processCtx.status) {
        OperationStatus.PENDING -> {
            logger.info("Process ${processCtx.processId} not yet started")
        }
        OperationStatus.IN_PROGRESS -> {
            val elapsed = Duration.between(processCtx.startTime, Instant.now())
            logger.info("Process running for ${elapsed.seconds}s")
        }
        OperationStatus.COMPLETED -> {
            logger.info("Process completed successfully")
        }
        OperationStatus.FAILED -> {
            logger.error("Process failed")
        }
    }
}

Advanced Examples

Spawned Blackboards for Isolated Processing

@Action(description = "Process with isolation guarantees")
fun processWithIsolation(requests: List<Request>, context: ActionContext): List<Response> {
    return requests.map { request ->
        // Create isolated blackboard for each request
        val isolatedContext = context.spawn()

        // Populate isolated context
        isolatedContext.put("request", request)
        isolatedContext.setCondition("isolated_mode", true)

        // Process without affecting parent context
        val response = processRequest(request, isolatedContext)

        // Only desired results returned to parent
        response
    }
}

@Action(description = "Parallel processing with isolated state")
fun parallelWithIsolation(items: List<Item>, context: OperationContext): List<Result> {
    return context.parallelMap(items, maxConcurrency = 10) { item ->
        // Each parallel execution gets isolated context
        val isolated = context.spawn()
        isolated.put("current_item", item)

        // Thread-safe processing
        val cache = isolated.getOrPut("item_cache") {
            mutableMapOf<String, Any>()
        }

        processItem(item, isolated)
    }
}

Thread-Safe Operations with getOrPut

@Action(description = "Thread-safe initialization")
fun threadSafeInit(context: ActionContext): CachedData {
    // getOrPut ensures creator only runs once across threads
    val cache = context.getOrPut("expensive_cache") {
        println("Initializing cache (runs once)")
        ExpensiveCache().apply {
            loadFromDatabase()
            buildIndices()
        }
    }

    // Subsequent calls return existing instance
    val sameCache = context.getOrPut("expensive_cache") {
        println("This won't execute")
        ExpensiveCache()
    }

    return cache.getData()
}

@Action(description = "Lazy resource initialization")
fun lazyResources(context: ActionContext) {
    // Initialize resources on-demand
    val database = context.getOrPut("db_connection") {
        DatabaseConnection.create(context.get("db_config", Config::class.java))
    }

    val apiClient = context.getOrPut("api_client") {
        ApiClient.builder()
            .baseUrl(context.get("api_url", String::class.java))
            .build()
    }

    // Use resources knowing they're initialized exactly once
}

Hiding Objects from Retrieval

@Action(description = "Secure credential handling")
fun handleCredentials(context: ActionContext) {
    // Store credentials
    val credentials = Credentials.load()
    context.put("credentials", credentials)

    // Hide from general queries
    context.hide(credentials)

    // Can still retrieve by exact key
    val creds = context.get("credentials", Credentials::class.java)  // Works

    // But won't appear in type queries
    val allObjects = context.domainObjectInstances()  // Won't include hidden credentials
}

@Action(description = "Hide intermediate results")
fun processWithIntermediates(context: ActionContext): FinalResult {
    val step1 = computeStep1()
    context.put("step1", step1)
    context.hide(step1)  // Don't expose to LLM or general queries

    val step2 = computeStep2()
    context.put("step2", step2)
    context.hide(step2)

    // Final result not hidden
    val result = computeFinal(step1, step2)
    context.put("final", result)

    return result
}

Condition Management

@Action(description = "Feature flags with conditions")
fun featureFlagProcessing(context: ActionContext): Result {
    // Set feature flags
    context.setCondition("use_new_algorithm", true)
    context.setCondition("enable_caching", true)
    context.setCondition("debug_logging", false)

    // Check conditions for control flow
    val algorithm = if (context.getCondition("use_new_algorithm") == true) {
        NewAlgorithm()
    } else {
        LegacyAlgorithm()
    }

    val result = algorithm.process()

    if (context.getCondition("enable_caching") == true) {
        cacheResult(result)
    }

    if (context.getCondition("debug_logging") == true) {
        logger.debug("Result: $result")
    }

    return result
}

@Action(description = "Condition-based expression evaluation")
fun expressionEvaluation(context: ActionContext): Boolean {
    // Set up evaluation context
    context.setCondition("is_premium", true)
    context.setCondition("has_access", true)
    context.put("user_level", 5)

    // Get model for SpEL expression evaluation
    val model = context.expressionEvaluationModel()

    // model contains:
    // - All conditions as boolean values
    // - All blackboard objects
    // - Process context information

    // Can be used with SpEL evaluator
    val expression = "is_premium && has_access && user_level > 3"
    return evaluateExpression(expression, model)
}

Concurrent Agent Invocation

@Action(description = "Fire multiple agents concurrently")
fun concurrentAgentProcessing(requests: List<DataRequest>, context: OperationContext): Summary {
    // Fire multiple agents in parallel
    val futures = requests.mapNotNull { request ->
        context.fireAgent(request, Report::class.java)
    }

    // Wait for all with timeout
    val reports = try {
        CompletableFuture.allOf(*futures.toTypedArray())
            .thenApply {
                futures.map { it.join() }
            }
            .get(5, TimeUnit.MINUTES)
    } catch (e: TimeoutException) {
        logger.warn("Some agents timed out")
        futures.mapNotNull { it.getNow(null) }
    }

    return Summary(reports)
}

@Action(description = "Dynamic agent selection")
fun dynamicAgentSelection(task: Task, context: OperationContext): Result? {
    // Select agent based on task type
    val agentInput = when (task.type) {
        TaskType.ANALYSIS -> AnalysisRequest(task.data)
        TaskType.TRANSFORM -> TransformRequest(task.data)
        TaskType.VALIDATE -> ValidationRequest(task.data)
        else -> GenericRequest(task.data)
    }

    // Fire appropriate agent
    return context.fireAgent(agentInput, Result::class.java)?.get()
}

Parallel Processing with Context

@Action(description = "Complex parallel processing")
fun complexParallelProcessing(documents: List<Document>, context: OperationContext): Analysis {
    // Process documents in parallel with LLM
    val summaries = context.parallelMap(documents, maxConcurrency = 5) { doc ->
        // Each parallel task can use AI
        context.promptRunner()
            .createObject<Summary>("Summarize: ${doc.content}")
    }

    // Process summaries in parallel
    val insights = context.parallelMap(summaries, maxConcurrency = 3) { summary ->
        context.promptRunner()
            .createObject<Insight>("Extract key insight from: ${summary.text}")
    }

    // Aggregate results
    return Analysis(
        documentCount = documents.size,
        summaries = summaries,
        insights = insights
    )
}

@Action(description = "Parallel with error handling")
fun parallelWithErrorHandling(items: List<Item>, context: OperationContext): Results {
    val results = context.parallelMap(items, maxConcurrency = 10) { item ->
        try {
            processItem(item)
        } catch (e: Exception) {
            logger.error("Failed to process ${item.id}", e)
            FailedResult(item, e.message)
        }
    }

    val successful = results.filterIsInstance<SuccessResult>()
    val failed = results.filterIsInstance<FailedResult>()

    context.sendMessage(
        Message.user("Processed ${successful.size} items, ${failed.size} failed")
    )

    return Results(successful, failed)
}

Types

interface AgentProcess {
    val agent: Agent
    val status: OperationStatus
    val result: Any?

    fun kill()
    fun pause()
    fun resume()
}

interface Operation {
    val id: String
    val type: String
    val status: OperationStatus
}

enum class OperationStatus {
    PENDING,
    IN_PROGRESS,
    COMPLETED,
    FAILED
}

interface Agent {
    val name: String
    val description: String
    val provider: String
}

interface Action {
    val name: String
    val description: String
}

Install with Tessl CLI

npx tessl i tessl/maven-com-embabel-agent--embabel-agent-api@0.3.0

docs

actions-goals.md

agent-definition.md

builtin-tools.md

chat.md

conditions.md

events.md

human-in-the-loop.md

index.md

invocation.md

io-binding.md

llm-interaction.md

models.md

planning-workflows.md

platform-management.md

runtime-context.md

state-management.md

streaming.md

subagents.md

tools.md

type-system.md

typed-operations.md

validation.md

tile.json