CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-a2a

A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication

Overview
Eval results
Files

events.mddocs/api/

Events API

A2A request and response event publishing for monitoring and observability.

A2ARequestEvent

Event published when A2A request is received.

/**
 * Event emitted when A2A request received.
 *
 * @param agentPlatform Agent platform receiving request
 * @param request JSON-RPC request
 */
data class A2ARequestEvent(
    override val agentPlatform: AgentPlatform,
    val request: JSONRPCRequest<*>
) : AgentPlatformEvent {
    override val timestamp: Instant = Instant.now()
}

Properties:

  • agentPlatform - Embabel AgentPlatform handling request
  • request - JSON-RPC request (SendMessageRequest, GetTaskRequest, etc.)
  • timestamp - Automatically set to current time

A2AResponseEvent

Event published when A2A response is sent.

/**
 * Event emitted when A2A response sent.
 *
 * @param agentPlatform Agent platform sending response
 * @param response JSON-RPC response
 */
data class A2AResponseEvent(
    override val agentPlatform: AgentPlatform,
    val response: JSONRPCResponse<*>
) : AgentPlatformEvent {
    override val timestamp: Instant = Instant.now()
}

Properties:

  • agentPlatform - Embabel AgentPlatform generating response
  • response - JSON-RPC response (success or error)
  • timestamp - Automatically set to current time

Event Flow

1. External agent → HTTP POST /a2a
2. A2ARequestEvent published
3. AutonomyA2ARequestHandler processes request
4. Embabel Autonomy executes agent
5. A2AResponseEvent published
6. Response returned to external agent

Publishing in Handler:

override fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*> {
    // 1. Publish request event
    agenticEventListener.onPlatformEvent(
        A2ARequestEvent(autonomy.agentPlatform, request)
    )

    // 2. Process request
    val result = processRequest(request)

    // 3. Publish response event
    agenticEventListener.onPlatformEvent(
        A2AResponseEvent(autonomy.agentPlatform, result)
    )

    return result
}

Listening to Events

Spring @EventListener

@Component
class A2AEventHandler {

    @EventListener
    fun handleA2ARequest(event: A2ARequestEvent) {
        logger.info(
            "[A2A Request] method={}, id={}",
            event.request.method,
            event.request.id
        )
    }

    @EventListener
    fun handleA2AResponse(event: A2AResponseEvent) {
        val success = event.response !is JSONRPCErrorResponse
        logger.info(
            "[A2A Response] id={}, success={}",
            event.response.id,
            success
        )
    }
}

AgenticEventListener Interface

@Component
class CustomA2AListener : AgenticEventListener {

    override fun onPlatformEvent(event: AgentPlatformEvent) {
        when (event) {
            is A2ARequestEvent -> handleRequest(event)
            is A2AResponseEvent -> handleResponse(event)
        }
    }

    private fun handleRequest(event: A2ARequestEvent) {
        // Custom request handling
    }

    private fun handleResponse(event: A2AResponseEvent) {
        // Custom response handling
    }
}

Common Use Cases

1. Request/Response Logging

@Component
class A2ALogger {

    @EventListener
    fun logRequest(event: A2ARequestEvent) {
        logger.info(
            "[A2A Request] timestamp={}, method={}, id={}, platform={}",
            event.timestamp,
            event.request.method,
            event.request.id,
            event.agentPlatform.name
        )
    }

    @EventListener
    fun logResponse(event: A2AResponseEvent) {
        logger.info(
            "[A2A Response] timestamp={}, id={}, platform={}",
            event.timestamp,
            event.response.id,
            event.agentPlatform.name
        )
    }
}

2. Metrics Collection

@Component
class A2AMetrics(private val meterRegistry: MeterRegistry) {

    @EventListener
    fun recordRequest(event: A2ARequestEvent) {
        meterRegistry.counter(
            "a2a.requests",
            "method", event.request.method,
            "platform", event.agentPlatform.name
        ).increment()
    }

    @EventListener
    fun recordResponse(event: A2AResponseEvent) {
        val status = if (event.response is JSONRPCErrorResponse) "error" else "success"
        meterRegistry.counter(
            "a2a.responses",
            "status", status,
            "platform", event.agentPlatform.name
        ).increment()
    }
}

3. Request Duration Tracking

@Component
class A2AMonitor {
    private val activeRequests = ConcurrentHashMap<String, Instant>()

    @EventListener
    fun onRequest(event: A2ARequestEvent) {
        val requestId = event.request.id?.toString() ?: return
        activeRequests[requestId] = event.timestamp
    }

    @EventListener
    fun onResponse(event: A2AResponseEvent) {
        val responseId = event.response.id?.toString() ?: return
        val requestTime = activeRequests.remove(responseId) ?: return
        val duration = Duration.between(requestTime, event.timestamp)

        logger.info("Request {} completed in {}ms", responseId, duration.toMillis())

        if (duration.toSeconds() > 30) {
            logger.warn("Slow request: {} took {}s", responseId, duration.toSeconds())
        }
    }
}

4. Error Notification

@Component
class A2AErrorNotifier(private val notificationService: NotificationService) {

    @EventListener
    fun notifyOnError(event: A2AResponseEvent) {
        if (event.response is JSONRPCErrorResponse) {
            val error = event.response.error
            notificationService.send(
                subject = "A2A Request Failed",
                message = """
                    Error Code: ${error.code}
                    Message: ${error.message}
                    Request ID: ${event.response.id}
                    Platform: ${event.agentPlatform.name}
                    Timestamp: ${event.timestamp}
                """.trimIndent()
            )
        }
    }
}

5. Audit Trail

@Component
class A2AAuditLogger(private val auditRepository: AuditRepository) {

    @EventListener
    @Transactional
    fun auditRequest(event: A2ARequestEvent) {
        auditRepository.save(AuditEntry(
            timestamp = event.timestamp,
            eventType = "A2A_REQUEST",
            method = event.request.method,
            requestId = event.request.id?.toString(),
            platform = event.agentPlatform.name,
            details = objectMapper.writeValueAsString(event.request)
        ))
    }

    @EventListener
    @Transactional
    fun auditResponse(event: A2AResponseEvent) {
        auditRepository.save(AuditEntry(
            timestamp = event.timestamp,
            eventType = "A2A_RESPONSE",
            responseId = event.response.id?.toString(),
            platform = event.agentPlatform.name,
            success = event.response !is JSONRPCErrorResponse,
            details = objectMapper.writeValueAsString(event.response)
        ))
    }
}

Async Event Processing

For expensive operations, use async processing:

@Configuration
@EnableAsync
class AsyncConfig {
    @Bean
    fun taskExecutor(): Executor {
        val executor = ThreadPoolTaskExecutor()
        executor.corePoolSize = 4
        executor.maxPoolSize = 10
        executor.queueCapacity = 100
        executor.setThreadNamePrefix("a2a-event-")
        executor.initialize()
        return executor
    }
}

@Component
class AsyncA2AHandlers {

    @Async
    @EventListener
    fun handleRequestAsync(event: A2ARequestEvent) {
        // Expensive processing won't block request handling
        performExpensiveAnalysis(event)
    }

    @Async
    @EventListener
    fun handleResponseAsync(event: A2AResponseEvent) {
        // Expensive processing won't delay response
        updateAnalyticsDashboard(event)
    }
}

Event Ordering

Events are published synchronously in order:

  1. A2ARequestEvent - Before processing
  2. Request processing
  3. A2AResponseEvent - After processing, before returning

This guarantees:

  • Request events always before response events
  • Event listeners can track request/response pairs
  • Accurate timing measurements

Testing

@SpringBootTest
class A2AEventTest {

    @Autowired
    lateinit var requestHandler: AutonomyA2ARequestHandler

    private val capturedEvents = mutableListOf<AgentPlatformEvent>()

    @Component
    class TestEventListener {
        @EventListener
        fun captureA2ARequest(event: A2ARequestEvent) {
            capturedEvents.add(event)
        }

        @EventListener
        fun captureA2AResponse(event: A2AResponseEvent) {
            capturedEvents.add(event)
        }
    }

    @Test
    fun `should publish request and response events`() {
        capturedEvents.clear()

        val request = SendMessageRequest(
            id = "test-1",
            params = MessageSendParams(
                message = Message.Builder()
                    .messageId("m1")
                    .role(Message.Role.USER)
                    .parts(listOf(TextPart("Test")))
                    .build()
            )
        )

        requestHandler.handleJsonRpc(request)

        assertEquals(2, capturedEvents.size)
        assertTrue(capturedEvents[0] is A2ARequestEvent)
        assertTrue(capturedEvents[1] is A2AResponseEvent)

        val requestEvent = capturedEvents[0] as A2ARequestEvent
        assertEquals("message/send", requestEvent.request.method)

        val responseEvent = capturedEvents[1] as A2AResponseEvent
        assertEquals("test-1", responseEvent.response.id)
    }
}

See Also

  • Request Handling API - Where events are published
  • Common Tasks - Monitoring examples
  • Integration Patterns - Observable A2A pattern
tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3

docs

api

agent-card.md

configuration.md

endpoint-registration.md

events.md

overview.md

request-handling.md

skill-factory.md

streaming.md

task-state.md

index.md

tile.json