A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication
JSON-RPC 2.0 request processing with support for both synchronous and streaming modes.
Core interface for handling JSON-RPC requests.
interface A2ARequestHandler {
/**
* Handle non-streaming JSON-RPC request.
* @param request NonStreamingJSONRPCRequest (SendMessageRequest, GetTaskRequest, CancelTaskRequest)
* @return JSONRPCResponse with result or error
*/
fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*>
/**
* Handle streaming JSON-RPC request using SSE.
* @param request StreamingJSONRPCRequest (SendStreamingMessageRequest)
* @return SseEmitter for streaming events
* @throws UnsupportedOperationException if streaming not supported
*/
fun handleJsonRpcStream(request: StreamingJSONRPCRequest<*>): SseEmitter
}Implementation that integrates with Embabel Autonomy system.
@Service
class AutonomyA2ARequestHandler(
private val autonomy: Autonomy,
private val agenticEventListener: AgenticEventListener,
private val streamingHandler: A2AStreamingHandler
) : A2ARequestHandler {
/**
* Handles non-streaming requests.
* Supports: SendMessageRequest
* Not yet implemented: GetTaskRequest, CancelTaskRequest
*/
override fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*>
/**
* Handles streaming requests with SSE.
* Supports: SendStreamingMessageRequest
*/
override fun handleJsonRpcStream(request: StreamingJSONRPCRequest<*>): SseEmitter
/**
* Handles custom streaming requests not in standard SDK.
* Supports: TaskResubscriptionRequest (task/resubscribe)
*/
fun handleCustomStreamingRequest(
method: String,
requestMap: Map<String, Any>,
objectMapper: ObjectMapper
): SseEmitter
/**
* Handles message/stream requests.
* Executes agent and sends real-time status updates via SSE.
*/
fun handleMessageStream(request: SendStreamingMessageRequest): SseEmitter
/**
* Handles task/resubscribe requests.
* Replays historical events for specified task.
* @throws IllegalArgumentException if task not found
*/
fun handleTaskResubscribe(request: TaskResubscriptionRequest): SseEmitter
}SendMessageRequest (message/send):
val request = SendMessageRequest(
id = "msg-123",
params = MessageSendParams(
message = Message.Builder()
.messageId("m1")
.role(Message.Role.USER)
.parts(listOf(TextPart("Find news for Scorpios")))
.build()
)
)
val response: SendMessageResponse = handler.handleJsonRpc(request)GetTaskRequest (task/get) - Not yet implemented:
// Returns TODO() error in current version
val request = GetTaskRequest(id = "req-1", params = TaskIdParams(id = "task-123"))
val response = handler.handleJsonRpc(request) // Throws UnsupportedOperationExceptionCancelTaskRequest (task/cancel) - Not yet implemented:
// Returns TODO() error in current version
val request = CancelTaskRequest(id = "req-1", params = TaskIdParams(id = "task-123"))
val response = handler.handleJsonRpc(request) // Throws UnsupportedOperationExceptionSendStreamingMessageRequest (message/stream):
val request = SendStreamingMessageRequest(
id = "stream-456",
params = MessageSendParams(
message = Message.Builder()
.messageId("m2")
.role(Message.Role.USER)
.parts(listOf(TextPart("Research Australian elections")))
.build()
)
)
val emitter: SseEmitter = handler.handleJsonRpcStream(request)TaskResubscriptionRequest (task/resubscribe):
val request = TaskResubscriptionRequest(
id = "resubscribe-789",
params = TaskIdParams(id = "task-123")
)
val emitter: SseEmitter = handler.handleTaskResubscribe(request)
// Replays all historical events, then continues with new ones1. Client sends POST /a2a with SendMessageRequest
2. A2ARequestEvent published
3. Extract intent from message parts
4. Execute autonomy.chooseAndRunAgent(intent, options)
5. Build Task with COMPLETED/FAILED status
6. Create SendMessageResponse
7. A2AResponseEvent published
8. Return response to client1. Client sends POST /a2a with SendStreamingMessageRequest
2. A2ARequestEvent published
3. Create SSE stream with streamingHandler
4. Start virtual thread for async execution:
a. Send initial TaskStatusUpdateEvent (WORKING)
b. Execute autonomy.chooseAndRunAgent(intent, options)
c. Send progress TaskStatusUpdateEvents
d. Send final TaskStatusUpdateEvent (COMPLETED/FAILED)
5. Close stream
6. A2AResponseEvent published
7. Return SseEmitter to client1. Client sends POST /a2a with TaskResubscriptionRequest
2. Verify task exists in TaskStateManager
3. Create new stream with resubscribeToTask()
4. Replay all historical events asynchronously
5. If task completed: close stream after replay
6. If task active: continue sending new events
7. Return SseEmitter to client/**
* Extension for creating success responses from SendMessageRequest.
*/
fun SendMessageRequest.successResponseWith(result: EventKind): SendMessageResponseUsage:
val request = SendMessageRequest(/* ... */)
val task = Task.Builder().id("task-1").status(/* ... */).build()
val response = request.successResponseWith(result = task)Requests and responses trigger events:
// Published before processing
A2ARequestEvent(
agentPlatform = autonomy.agentPlatform,
request = request
)
// Published after processing
A2AResponseEvent(
agentPlatform = autonomy.agentPlatform,
response = response
)override fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*> {
return when (request) {
is SendMessageRequest -> handleMessageSend(request, request.params)
else -> throw UnsupportedOperationException("Method ${request.method} not supported")
}
}catch (e: Exception) {
JSONRPCErrorResponse(
taskId,
TaskNotFoundError(null, "Internal error: ${e.message}", e.stackTraceToString())
)
}try {
// streaming logic
} catch (e: Exception) {
streamingHandler.sendStreamEvent(
streamId,
TaskStatusUpdateEvent.Builder()
.taskId(taskId)
.status(createFailedTaskStatus(params, e))
.build(),
taskId
)
} finally {
streamingHandler.closeStream(streamId)
}@Service
class MessageService(private val handler: AutonomyA2ARequestHandler) {
fun sendMessage(text: String): Task {
val request = SendMessageRequest(
id = UUID.randomUUID().toString(),
params = MessageSendParams(
message = Message.Builder()
.messageId(UUID.randomUUID().toString())
.role(Message.Role.USER)
.parts(listOf(TextPart(text)))
.build()
)
)
val response = handler.handleJsonRpc(request) as SendMessageResponse
return response.result as Task
}
}@RestController
class StreamingController(private val handler: AutonomyA2ARequestHandler) {
@PostMapping("/stream")
fun streamMessage(@RequestBody text: String): SseEmitter {
val request = SendStreamingMessageRequest(
id = UUID.randomUUID().toString(),
params = MessageSendParams(
message = Message.Builder()
.messageId(UUID.randomUUID().toString())
.role(Message.Role.USER)
.parts(listOf(TextPart(text)))
.build()
)
)
return handler.handleJsonRpcStream(request)
}
}@Service
class TaskResubscriptionService(private val handler: AutonomyA2ARequestHandler) {
fun resubscribeToTask(taskId: String): SseEmitter {
val request = TaskResubscriptionRequest(
id = UUID.randomUUID().toString(),
params = TaskIdParams(id = taskId)
)
return try {
handler.handleTaskResubscribe(request)
} catch (e: IllegalArgumentException) {
throw TaskNotFoundException("Task not found: $taskId")
}
}
}tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3