A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication
Task execution history and state tracking for streaming operations and resubscription support.
Manages task state, event history, and lifecycle.
@Service
class TaskStateManager {
/**
* Registers new task with stream ID.
*/
fun registerTask(taskId: String, contextId: String, streamId: String)
/**
* Records event for task, updates state if terminal.
* Automatically moves to completed state for COMPLETED/FAILED/CANCELED.
*/
fun recordEvent(taskId: String, event: StreamingEventKind)
/**
* Gets complete task information.
* @return TaskInfo or null if not found
*/
fun getTaskInfo(taskId: String): TaskInfo?
/**
* Gets stream ID associated with task.
* @return Stream ID or null if task not found
*/
fun getStreamId(taskId: String): String?
/**
* Gets all events for task.
* @return List of events (empty if task not found)
*/
fun getTaskEvents(taskId: String): List<StreamingEventKind>
/**
* Checks if task is active (not completed).
*/
fun isTaskActive(taskId: String): Boolean
/**
* Checks if task exists (active or completed).
*/
fun taskExists(taskId: String): Boolean
/**
* Updates stream ID for task (used during resubscription).
*/
fun updateStreamId(taskId: String, newStreamId: String)
/**
* Removes old completed tasks.
* @param olderThan Cutoff timestamp
*/
fun cleanupOldTasks(olderThan: Instant)
}Data class storing complete task information.
/**
* Task information storage.
*
* @param taskId Unique task identifier
* @param contextId Context identifier
* @param streamId Current stream identifier
* @param events Mutable list of all events
* @param createdAt Task creation timestamp
* @param completedAt Task completion timestamp (null if active)
* @param currentState Current task state
*/
data class TaskInfo(
val taskId: String,
val contextId: String,
val streamId: String,
val events: MutableList<StreamingEventKind> = mutableListOf(),
val createdAt: Instant = Instant.now(),
var completedAt: Instant? = null,
var currentState: TaskState = TaskState.WORKING
)val taskId = UUID.randomUUID().toString()
val contextId = UUID.randomUUID().toString()
val streamId = UUID.randomUUID().toString()
taskStateManager.registerTask(taskId, contextId, streamId)
// Task now in activeTasks map with WORKING stateval event = TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.status(TaskStatus(TaskState.WORKING, /* ... */))
.build()
taskStateManager.recordEvent(taskId, event)
// Event added to task's events list
// Task remains activeval finalEvent = TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.status(TaskStatus(TaskState.COMPLETED, /* ... */))
.isFinal(true)
.build()
taskStateManager.recordEvent(taskId, finalEvent)
// Task moved from activeTasks to completedTasks
// completedAt timestamp set
// currentState updated to COMPLETEDval cutoff = Instant.now().minus(Duration.ofDays(7))
taskStateManager.cleanupOldTasks(cutoff)
// Old completed tasks removedStates triggering task completion:
TaskState.COMPLETEDTaskState.FAILEDTaskState.CANCELLEDNon-terminal states:
TaskState.WORKINGTaskState.PENDING@Service
class TaskExecutionService(
private val taskStateManager: TaskStateManager
) {
fun executeTask(intent: String): String {
// Create task
val taskId = UUID.randomUUID().toString()
val contextId = UUID.randomUUID().toString()
val streamId = UUID.randomUUID().toString()
// Register
taskStateManager.registerTask(taskId, contextId, streamId)
// Record start
taskStateManager.recordEvent(
taskId,
TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.contextId(contextId)
.status(TaskStatus(
TaskState.WORKING,
createMessage("Starting task..."),
OffsetDateTime.now()
))
.build()
)
try {
// Execute
val result = performWork(intent)
// Record completion
taskStateManager.recordEvent(
taskId,
TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.contextId(contextId)
.status(TaskStatus(
TaskState.COMPLETED,
createMessage("Task completed: $result"),
OffsetDateTime.now()
))
.isFinal(true)
.build()
)
return result
} catch (e: Exception) {
// Record failure
taskStateManager.recordEvent(
taskId,
TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.contextId(contextId)
.status(TaskStatus(
TaskState.FAILED,
createMessage("Error: ${e.message}"),
OffsetDateTime.now()
))
.isFinal(true)
.build()
)
throw e
}
}
private fun performWork(intent: String): String {
// Implementation
return "Result"
}
private fun createMessage(text: String): Message {
return Message.Builder()
.messageId(UUID.randomUUID().toString())
.role(Message.Role.AGENT)
.parts(listOf(TextPart(text)))
.build()
}
}if (taskStateManager.taskExists(taskId)) {
// Task found (active or completed)
}if (taskStateManager.isTaskActive(taskId)) {
// Task is still running
}val info = taskStateManager.getTaskInfo(taskId)
if (info != null) {
println("Task: ${info.taskId}")
println("State: ${info.currentState}")
println("Events: ${info.events.size}")
println("Created: ${info.createdAt}")
println("Completed: ${info.completedAt}")
}val events = taskStateManager.getTaskEvents(taskId)
events.forEach { event ->
when (event) {
is Task -> println("Task created")
is TaskStatusUpdateEvent -> println("Status: ${event.status.state}")
is TaskArtifactUpdateEvent -> println("Artifact update")
}
}@Component
class TaskCleanup(private val taskStateManager: TaskStateManager) {
@Scheduled(cron = "0 0 2 * * ?") // Daily at 2 AM
fun cleanupOldTasks() {
val cutoff = Instant.now().minus(Duration.ofDays(7))
taskStateManager.cleanupOldTasks(cutoff)
logger.info("Cleaned up tasks older than {}", cutoff)
}
@Scheduled(fixedRate = 3600000) // Every hour
fun hourlyCleanup() {
val cutoff = Instant.now().minus(Duration.ofHours(24))
taskStateManager.cleanupOldTasks(cutoff)
}
}TaskStateManager is fully thread-safe:
Concurrent Usage:
// Multiple threads can safely operate on different tasks
Thread.startVirtualThread {
taskStateManager.registerTask("task1", "ctx1", "stream1")
}
Thread.startVirtualThread {
taskStateManager.registerTask("task2", "ctx2", "stream2")
}
// Or same task from different threads
Thread.startVirtualThread {
taskStateManager.recordEvent("task1", event1)
}
Thread.startVirtualThread {
taskStateManager.recordEvent("task1", event2)
}For long-running applications:
Example Strategy:
@Component
class TaskMemoryManager(private val taskStateManager: TaskStateManager) {
@Scheduled(fixedRate = 300000) // Every 5 minutes
fun monitorTaskState() {
val activeCount = countActiveTasks()
val completedCount = countCompletedTasks()
logger.info("Tasks - Active: {}, Completed: {}", activeCount, completedCount)
if (completedCount > 10000) {
// Aggressive cleanup
val cutoff = Instant.now().minus(Duration.ofHours(1))
taskStateManager.cleanupOldTasks(cutoff)
}
}
}@SpringBootTest
class TaskStateManagerTest {
@Autowired
lateinit var taskStateManager: TaskStateManager
@Test
fun `should register and retrieve task`() {
val taskId = "test-task"
taskStateManager.registerTask(taskId, "ctx", "stream")
val info = taskStateManager.getTaskInfo(taskId)
assertNotNull(info)
assertEquals(taskId, info?.taskId)
assertTrue(taskStateManager.isTaskActive(taskId))
}
@Test
fun `should move task to completed on terminal event`() {
val taskId = "test-task-2"
taskStateManager.registerTask(taskId, "ctx", "stream")
assertTrue(taskStateManager.isTaskActive(taskId))
val terminalEvent = TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.status(TaskStatus(TaskState.COMPLETED, /* ... */))
.build()
taskStateManager.recordEvent(taskId, terminalEvent)
assertFalse(taskStateManager.isTaskActive(taskId))
assertTrue(taskStateManager.taskExists(taskId))
}
}tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3