CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-a2a

A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication

Overview
Eval results
Files

task-state.mddocs/api/

Task State API

Task execution history and state tracking for streaming operations and resubscription support.

TaskStateManager

Manages task state, event history, and lifecycle.

@Service
class TaskStateManager {
    /**
     * Registers new task with stream ID.
     */
    fun registerTask(taskId: String, contextId: String, streamId: String)

    /**
     * Records event for task, updates state if terminal.
     * Automatically moves to completed state for COMPLETED/FAILED/CANCELED.
     */
    fun recordEvent(taskId: String, event: StreamingEventKind)

    /**
     * Gets complete task information.
     * @return TaskInfo or null if not found
     */
    fun getTaskInfo(taskId: String): TaskInfo?

    /**
     * Gets stream ID associated with task.
     * @return Stream ID or null if task not found
     */
    fun getStreamId(taskId: String): String?

    /**
     * Gets all events for task.
     * @return List of events (empty if task not found)
     */
    fun getTaskEvents(taskId: String): List<StreamingEventKind>

    /**
     * Checks if task is active (not completed).
     */
    fun isTaskActive(taskId: String): Boolean

    /**
     * Checks if task exists (active or completed).
     */
    fun taskExists(taskId: String): Boolean

    /**
     * Updates stream ID for task (used during resubscription).
     */
    fun updateStreamId(taskId: String, newStreamId: String)

    /**
     * Removes old completed tasks.
     * @param olderThan Cutoff timestamp
     */
    fun cleanupOldTasks(olderThan: Instant)
}

TaskInfo

Data class storing complete task information.

/**
 * Task information storage.
 *
 * @param taskId Unique task identifier
 * @param contextId Context identifier
 * @param streamId Current stream identifier
 * @param events Mutable list of all events
 * @param createdAt Task creation timestamp
 * @param completedAt Task completion timestamp (null if active)
 * @param currentState Current task state
 */
data class TaskInfo(
    val taskId: String,
    val contextId: String,
    val streamId: String,
    val events: MutableList<StreamingEventKind> = mutableListOf(),
    val createdAt: Instant = Instant.now(),
    var completedAt: Instant? = null,
    var currentState: TaskState = TaskState.WORKING
)

Task Lifecycle

1. Registration

val taskId = UUID.randomUUID().toString()
val contextId = UUID.randomUUID().toString()
val streamId = UUID.randomUUID().toString()

taskStateManager.registerTask(taskId, contextId, streamId)
// Task now in activeTasks map with WORKING state

2. Event Recording

val event = TaskStatusUpdateEvent.Builder()
    .taskId(taskId)
    .status(TaskStatus(TaskState.WORKING, /* ... */))
    .build()

taskStateManager.recordEvent(taskId, event)
// Event added to task's events list
// Task remains active

3. Completion

val finalEvent = TaskStatusUpdateEvent.Builder()
    .taskId(taskId)
    .status(TaskStatus(TaskState.COMPLETED, /* ... */))
    .isFinal(true)
    .build()

taskStateManager.recordEvent(taskId, finalEvent)
// Task moved from activeTasks to completedTasks
// completedAt timestamp set
// currentState updated to COMPLETED

4. Cleanup

val cutoff = Instant.now().minus(Duration.ofDays(7))
taskStateManager.cleanupOldTasks(cutoff)
// Old completed tasks removed

State Management

Active Tasks

  • Stored in activeTasks ConcurrentHashMap
  • State: WORKING or PENDING
  • Can receive new events
  • completedAt is null

Completed Tasks

  • Stored in completedTasks ConcurrentHashMap
  • State: COMPLETED, FAILED, or CANCELLED
  • Read-only for resubscription
  • completedAt is set

Terminal States

States triggering task completion:

  • TaskState.COMPLETED
  • TaskState.FAILED
  • TaskState.CANCELLED

Non-terminal states:

  • TaskState.WORKING
  • TaskState.PENDING

Complete Example

@Service
class TaskExecutionService(
    private val taskStateManager: TaskStateManager
) {

    fun executeTask(intent: String): String {
        // Create task
        val taskId = UUID.randomUUID().toString()
        val contextId = UUID.randomUUID().toString()
        val streamId = UUID.randomUUID().toString()

        // Register
        taskStateManager.registerTask(taskId, contextId, streamId)

        // Record start
        taskStateManager.recordEvent(
            taskId,
            TaskStatusUpdateEvent.Builder()
                .taskId(taskId)
                .contextId(contextId)
                .status(TaskStatus(
                    TaskState.WORKING,
                    createMessage("Starting task..."),
                    OffsetDateTime.now()
                ))
                .build()
        )

        try {
            // Execute
            val result = performWork(intent)

            // Record completion
            taskStateManager.recordEvent(
                taskId,
                TaskStatusUpdateEvent.Builder()
                    .taskId(taskId)
                    .contextId(contextId)
                    .status(TaskStatus(
                        TaskState.COMPLETED,
                        createMessage("Task completed: $result"),
                        OffsetDateTime.now()
                    ))
                    .isFinal(true)
                    .build()
            )

            return result

        } catch (e: Exception) {
            // Record failure
            taskStateManager.recordEvent(
                taskId,
                TaskStatusUpdateEvent.Builder()
                    .taskId(taskId)
                    .contextId(contextId)
                    .status(TaskStatus(
                        TaskState.FAILED,
                        createMessage("Error: ${e.message}"),
                        OffsetDateTime.now()
                    ))
                    .isFinal(true)
                    .build()
            )
            throw e
        }
    }

    private fun performWork(intent: String): String {
        // Implementation
        return "Result"
    }

    private fun createMessage(text: String): Message {
        return Message.Builder()
            .messageId(UUID.randomUUID().toString())
            .role(Message.Role.AGENT)
            .parts(listOf(TextPart(text)))
            .build()
    }
}

Query Methods

Check Existence

if (taskStateManager.taskExists(taskId)) {
    // Task found (active or completed)
}

Check Active Status

if (taskStateManager.isTaskActive(taskId)) {
    // Task is still running
}

Get Full Info

val info = taskStateManager.getTaskInfo(taskId)
if (info != null) {
    println("Task: ${info.taskId}")
    println("State: ${info.currentState}")
    println("Events: ${info.events.size}")
    println("Created: ${info.createdAt}")
    println("Completed: ${info.completedAt}")
}

Get Events

val events = taskStateManager.getTaskEvents(taskId)
events.forEach { event ->
    when (event) {
        is Task -> println("Task created")
        is TaskStatusUpdateEvent -> println("Status: ${event.status.state}")
        is TaskArtifactUpdateEvent -> println("Artifact update")
    }
}

Scheduled Cleanup

@Component
class TaskCleanup(private val taskStateManager: TaskStateManager) {

    @Scheduled(cron = "0 0 2 * * ?")  // Daily at 2 AM
    fun cleanupOldTasks() {
        val cutoff = Instant.now().minus(Duration.ofDays(7))
        taskStateManager.cleanupOldTasks(cutoff)
        logger.info("Cleaned up tasks older than {}", cutoff)
    }

    @Scheduled(fixedRate = 3600000)  // Every hour
    fun hourlyCleanup() {
        val cutoff = Instant.now().minus(Duration.ofHours(24))
        taskStateManager.cleanupOldTasks(cutoff)
    }
}

Thread Safety

TaskStateManager is fully thread-safe:

  • Uses ConcurrentHashMap for storage
  • All operations are atomic
  • Safe for concurrent access
  • No external synchronization required

Concurrent Usage:

// Multiple threads can safely operate on different tasks
Thread.startVirtualThread {
    taskStateManager.registerTask("task1", "ctx1", "stream1")
}

Thread.startVirtualThread {
    taskStateManager.registerTask("task2", "ctx2", "stream2")
}

// Or same task from different threads
Thread.startVirtualThread {
    taskStateManager.recordEvent("task1", event1)
}

Thread.startVirtualThread {
    taskStateManager.recordEvent("task1", event2)
}

Memory Management

For long-running applications:

  1. Regular Cleanup: Schedule periodic cleanup
  2. Retention Policy: Define how long to keep completed tasks
  3. Monitoring: Monitor map sizes

Example Strategy:

@Component
class TaskMemoryManager(private val taskStateManager: TaskStateManager) {

    @Scheduled(fixedRate = 300000)  // Every 5 minutes
    fun monitorTaskState() {
        val activeCount = countActiveTasks()
        val completedCount = countCompletedTasks()

        logger.info("Tasks - Active: {}, Completed: {}", activeCount, completedCount)

        if (completedCount > 10000) {
            // Aggressive cleanup
            val cutoff = Instant.now().minus(Duration.ofHours(1))
            taskStateManager.cleanupOldTasks(cutoff)
        }
    }
}

Testing

@SpringBootTest
class TaskStateManagerTest {

    @Autowired
    lateinit var taskStateManager: TaskStateManager

    @Test
    fun `should register and retrieve task`() {
        val taskId = "test-task"
        taskStateManager.registerTask(taskId, "ctx", "stream")

        val info = taskStateManager.getTaskInfo(taskId)
        assertNotNull(info)
        assertEquals(taskId, info?.taskId)
        assertTrue(taskStateManager.isTaskActive(taskId))
    }

    @Test
    fun `should move task to completed on terminal event`() {
        val taskId = "test-task-2"
        taskStateManager.registerTask(taskId, "ctx", "stream")

        assertTrue(taskStateManager.isTaskActive(taskId))

        val terminalEvent = TaskStatusUpdateEvent.Builder()
            .taskId(taskId)
            .status(TaskStatus(TaskState.COMPLETED, /* ... */))
            .build()

        taskStateManager.recordEvent(taskId, terminalEvent)

        assertFalse(taskStateManager.isTaskActive(taskId))
        assertTrue(taskStateManager.taskExists(taskId))
    }
}

See Also

  • Streaming API - SSE stream management
  • Request Handling API - JSON-RPC processing
  • Common Tasks - Cleanup examples
tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3

docs

api

agent-card.md

configuration.md

endpoint-registration.md

events.md

overview.md

request-handling.md

skill-factory.md

streaming.md

task-state.md

index.md

tile.json