A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication
Server-Sent Events (SSE) stream management for real-time agent-to-agent communication.
Manages SSE streams, task state tracking, and resubscription.
@Service
class A2AStreamingHandler(
private val objectMapper: ObjectMapper,
private val taskStateManager: TaskStateManager
) {
/**
* Creates new SSE stream for real-time communication.
* @param streamId Unique stream identifier
* @param taskId Optional task ID to associate with stream
* @param contextId Optional context ID
* @return SseEmitter with infinite timeout
*/
fun createStream(streamId: String, taskId: String? = null, contextId: String? = null): SseEmitter
/**
* Resubscribes to existing task by creating new stream and replaying events.
* @param taskId Task ID to resubscribe to
* @param newStreamId New stream ID for subscription
* @return SseEmitter with replayed events
* @throws IllegalArgumentException if task not found
*/
fun resubscribeToTask(taskId: String, newStreamId: String): SseEmitter
/**
* Sends streaming event to specified stream and records it.
* @param streamId Target stream ID
* @param event Streaming event (TaskStatusUpdateEvent, Task, etc.)
* @param taskId Optional task ID for event recording
*/
fun sendStreamEvent(streamId: String, event: StreamingEventKind, taskId: String? = null)
/**
* Closes stream and cleans up resources.
* @param streamId Stream ID to close
*/
fun closeStream(streamId: String)
/**
* Shuts down streaming handler and internal scheduler.
* Call during application shutdown.
*/
fun shutdown()
}val streamId = UUID.randomUUID().toString()
val taskId = UUID.randomUUID().toString()
val contextId = UUID.randomUUID().toString()
val emitter = streamingHandler.createStream(streamId, taskId, contextId)
// Stream is active, can receive eventsWhat Happens:
Long.MAX_VALUE timeoutval event = TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.contextId(contextId)
.status(TaskStatus(
TaskState.WORKING,
Message.Builder()
.messageId(UUID.randomUUID().toString())
.role(Message.Role.AGENT)
.parts(listOf(TextPart("Processing...")))
.build(),
OffsetDateTime.now()
))
.build()
streamingHandler.sendStreamEvent(streamId, event, taskId)What Happens:
streamingHandler.closeStream(streamId)What Happens:
Events wrapped in SendStreamingMessageResponse:
SendStreamingMessageResponse(
jsonrpc = "2.0",
id = streamId,
result = event, // StreamingEventKind
error = null
)SSE Wire Format:
event: message
data: {"jsonrpc":"2.0","id":"stream-123","result":{...},"error":null}
event: message
data: {"jsonrpc":"2.0","id":"stream-123","result":{...},"error":null}TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.contextId(contextId)
.status(TaskStatus(TaskState.WORKING, message, timestamp))
.isFinal(false)
.build()Task.Builder()
.id(taskId)
.contextId(contextId)
.status(TaskStatus(...))
.history(listOf(...))
.artifacts(listOf(...))
.build()TaskArtifactUpdateEvent.Builder()
.taskId(taskId)
.contextId(contextId)
.artifact(Artifact(...))
.build()Replay historical events for a task:
val newStreamId = UUID.randomUUID().toString()
try {
val emitter = streamingHandler.resubscribeToTask(taskId, newStreamId)
// Emitter replays all events, then receives new ones
} catch (e: IllegalArgumentException) {
// Task not found
}Process:
@Service
class AgentStreamingService(
private val streamingHandler: A2AStreamingHandler,
private val autonomy: Autonomy
) {
fun executeWithStreaming(intent: String): SseEmitter {
val streamId = UUID.randomUUID().toString()
val taskId = UUID.randomUUID().toString()
val contextId = UUID.randomUUID().toString()
// Create stream
val emitter = streamingHandler.createStream(streamId, taskId, contextId)
// Execute asynchronously
Thread.startVirtualThread {
try {
// Send initial status
sendWorkingStatus(streamId, taskId, contextId, "Starting task...")
// Execute agent
val result = autonomy.chooseAndRunAgent(intent, ProcessOptions())
// Send progress
sendWorkingStatus(streamId, taskId, contextId, "Processing results...")
// Send completion
sendCompletedStatus(streamId, taskId, contextId, "Task completed")
} catch (e: Exception) {
sendFailedStatus(streamId, taskId, contextId, "Error: ${e.message}")
} finally {
streamingHandler.closeStream(streamId)
}
}
return emitter
}
private fun sendWorkingStatus(streamId: String, taskId: String, contextId: String, text: String) {
streamingHandler.sendStreamEvent(
streamId,
TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.contextId(contextId)
.status(TaskStatus(
TaskState.WORKING,
createMessage(text),
OffsetDateTime.now()
))
.build(),
taskId
)
}
private fun sendCompletedStatus(streamId: String, taskId: String, contextId: String, text: String) {
streamingHandler.sendStreamEvent(
streamId,
TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.contextId(contextId)
.status(TaskStatus(
TaskState.COMPLETED,
createMessage(text),
OffsetDateTime.now()
))
.isFinal(true)
.build(),
taskId
)
}
private fun sendFailedStatus(streamId: String, taskId: String, contextId: String, text: String) {
streamingHandler.sendStreamEvent(
streamId,
TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.contextId(contextId)
.status(TaskStatus(
TaskState.FAILED,
createMessage(text),
OffsetDateTime.now()
))
.build(),
taskId
)
}
private fun createMessage(text: String): Message {
return Message.Builder()
.messageId(UUID.randomUUID().toString())
.role(Message.Role.AGENT)
.parts(listOf(TextPart(text)))
.build()
}
}try {
emitter.send(eventData)
} catch (e: Exception) {
logger.error("Error sending stream event", e)
emitter.completeWithError(e)
}if (!taskStateManager.taskExists(taskId)) {
throw IllegalArgumentException("Task not found: $taskId")
}Thread.startVirtualThread {
try {
taskInfo.events.forEach { event ->
sendStreamEvent(newStreamId, event, taskId)
}
} catch (e: Exception) {
logger.error("Error replaying events for task {}", taskId, e)
emitter.completeWithError(e)
}
}A2AStreamingHandler is fully thread-safe:
emitter.onCompletion {
activeStreams.remove(streamId)
}
emitter.onTimeout {
activeStreams.remove(streamId)
}@PreDestroy
fun cleanup() {
streamingHandler.shutdown()
}@SpringBootTest
class StreamingHandlerTest {
@Autowired
lateinit var streamingHandler: A2AStreamingHandler
@Test
fun `should create and close stream`() {
val streamId = "test-stream"
val emitter = streamingHandler.createStream(streamId)
assertNotNull(emitter)
streamingHandler.closeStream(streamId)
}
@Test
fun `should send events`() {
val streamId = "test-stream"
val taskId = "test-task"
val emitter = streamingHandler.createStream(streamId, taskId, "ctx")
val event = TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.status(TaskStatus(TaskState.WORKING, /* ... */))
.build()
streamingHandler.sendStreamEvent(streamId, event, taskId)
streamingHandler.closeStream(streamId)
}
}tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3