A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication
A2A request and response event publishing for monitoring and observability.
Event published when A2A request is received.
/**
* Event emitted when A2A request received.
*
* @param agentPlatform Agent platform receiving request
* @param request JSON-RPC request
*/
data class A2ARequestEvent(
override val agentPlatform: AgentPlatform,
val request: JSONRPCRequest<*>
) : AgentPlatformEvent {
override val timestamp: Instant = Instant.now()
}Properties:
agentPlatform - Embabel AgentPlatform handling requestrequest - JSON-RPC request (SendMessageRequest, GetTaskRequest, etc.)timestamp - Automatically set to current timeEvent published when A2A response is sent.
/**
* Event emitted when A2A response sent.
*
* @param agentPlatform Agent platform sending response
* @param response JSON-RPC response
*/
data class A2AResponseEvent(
override val agentPlatform: AgentPlatform,
val response: JSONRPCResponse<*>
) : AgentPlatformEvent {
override val timestamp: Instant = Instant.now()
}Properties:
agentPlatform - Embabel AgentPlatform generating responseresponse - JSON-RPC response (success or error)timestamp - Automatically set to current time1. External agent → HTTP POST /a2a
2. A2ARequestEvent published
3. AutonomyA2ARequestHandler processes request
4. Embabel Autonomy executes agent
5. A2AResponseEvent published
6. Response returned to external agentPublishing in Handler:
override fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*> {
// 1. Publish request event
agenticEventListener.onPlatformEvent(
A2ARequestEvent(autonomy.agentPlatform, request)
)
// 2. Process request
val result = processRequest(request)
// 3. Publish response event
agenticEventListener.onPlatformEvent(
A2AResponseEvent(autonomy.agentPlatform, result)
)
return result
}@Component
class A2AEventHandler {
@EventListener
fun handleA2ARequest(event: A2ARequestEvent) {
logger.info(
"[A2A Request] method={}, id={}",
event.request.method,
event.request.id
)
}
@EventListener
fun handleA2AResponse(event: A2AResponseEvent) {
val success = event.response !is JSONRPCErrorResponse
logger.info(
"[A2A Response] id={}, success={}",
event.response.id,
success
)
}
}@Component
class CustomA2AListener : AgenticEventListener {
override fun onPlatformEvent(event: AgentPlatformEvent) {
when (event) {
is A2ARequestEvent -> handleRequest(event)
is A2AResponseEvent -> handleResponse(event)
}
}
private fun handleRequest(event: A2ARequestEvent) {
// Custom request handling
}
private fun handleResponse(event: A2AResponseEvent) {
// Custom response handling
}
}@Component
class A2ALogger {
@EventListener
fun logRequest(event: A2ARequestEvent) {
logger.info(
"[A2A Request] timestamp={}, method={}, id={}, platform={}",
event.timestamp,
event.request.method,
event.request.id,
event.agentPlatform.name
)
}
@EventListener
fun logResponse(event: A2AResponseEvent) {
logger.info(
"[A2A Response] timestamp={}, id={}, platform={}",
event.timestamp,
event.response.id,
event.agentPlatform.name
)
}
}@Component
class A2AMetrics(private val meterRegistry: MeterRegistry) {
@EventListener
fun recordRequest(event: A2ARequestEvent) {
meterRegistry.counter(
"a2a.requests",
"method", event.request.method,
"platform", event.agentPlatform.name
).increment()
}
@EventListener
fun recordResponse(event: A2AResponseEvent) {
val status = if (event.response is JSONRPCErrorResponse) "error" else "success"
meterRegistry.counter(
"a2a.responses",
"status", status,
"platform", event.agentPlatform.name
).increment()
}
}@Component
class A2AMonitor {
private val activeRequests = ConcurrentHashMap<String, Instant>()
@EventListener
fun onRequest(event: A2ARequestEvent) {
val requestId = event.request.id?.toString() ?: return
activeRequests[requestId] = event.timestamp
}
@EventListener
fun onResponse(event: A2AResponseEvent) {
val responseId = event.response.id?.toString() ?: return
val requestTime = activeRequests.remove(responseId) ?: return
val duration = Duration.between(requestTime, event.timestamp)
logger.info("Request {} completed in {}ms", responseId, duration.toMillis())
if (duration.toSeconds() > 30) {
logger.warn("Slow request: {} took {}s", responseId, duration.toSeconds())
}
}
}@Component
class A2AErrorNotifier(private val notificationService: NotificationService) {
@EventListener
fun notifyOnError(event: A2AResponseEvent) {
if (event.response is JSONRPCErrorResponse) {
val error = event.response.error
notificationService.send(
subject = "A2A Request Failed",
message = """
Error Code: ${error.code}
Message: ${error.message}
Request ID: ${event.response.id}
Platform: ${event.agentPlatform.name}
Timestamp: ${event.timestamp}
""".trimIndent()
)
}
}
}@Component
class A2AAuditLogger(private val auditRepository: AuditRepository) {
@EventListener
@Transactional
fun auditRequest(event: A2ARequestEvent) {
auditRepository.save(AuditEntry(
timestamp = event.timestamp,
eventType = "A2A_REQUEST",
method = event.request.method,
requestId = event.request.id?.toString(),
platform = event.agentPlatform.name,
details = objectMapper.writeValueAsString(event.request)
))
}
@EventListener
@Transactional
fun auditResponse(event: A2AResponseEvent) {
auditRepository.save(AuditEntry(
timestamp = event.timestamp,
eventType = "A2A_RESPONSE",
responseId = event.response.id?.toString(),
platform = event.agentPlatform.name,
success = event.response !is JSONRPCErrorResponse,
details = objectMapper.writeValueAsString(event.response)
))
}
}For expensive operations, use async processing:
@Configuration
@EnableAsync
class AsyncConfig {
@Bean
fun taskExecutor(): Executor {
val executor = ThreadPoolTaskExecutor()
executor.corePoolSize = 4
executor.maxPoolSize = 10
executor.queueCapacity = 100
executor.setThreadNamePrefix("a2a-event-")
executor.initialize()
return executor
}
}
@Component
class AsyncA2AHandlers {
@Async
@EventListener
fun handleRequestAsync(event: A2ARequestEvent) {
// Expensive processing won't block request handling
performExpensiveAnalysis(event)
}
@Async
@EventListener
fun handleResponseAsync(event: A2AResponseEvent) {
// Expensive processing won't delay response
updateAnalyticsDashboard(event)
}
}Events are published synchronously in order:
This guarantees:
@SpringBootTest
class A2AEventTest {
@Autowired
lateinit var requestHandler: AutonomyA2ARequestHandler
private val capturedEvents = mutableListOf<AgentPlatformEvent>()
@Component
class TestEventListener {
@EventListener
fun captureA2ARequest(event: A2ARequestEvent) {
capturedEvents.add(event)
}
@EventListener
fun captureA2AResponse(event: A2AResponseEvent) {
capturedEvents.add(event)
}
}
@Test
fun `should publish request and response events`() {
capturedEvents.clear()
val request = SendMessageRequest(
id = "test-1",
params = MessageSendParams(
message = Message.Builder()
.messageId("m1")
.role(Message.Role.USER)
.parts(listOf(TextPart("Test")))
.build()
)
)
requestHandler.handleJsonRpc(request)
assertEquals(2, capturedEvents.size)
assertTrue(capturedEvents[0] is A2ARequestEvent)
assertTrue(capturedEvents[1] is A2AResponseEvent)
val requestEvent = capturedEvents[0] as A2ARequestEvent
assertEquals("message/send", requestEvent.request.method)
val responseEvent = capturedEvents[1] as A2AResponseEvent
assertEquals("test-1", responseEvent.response.id)
}
}tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3