A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication
Architectural patterns and best practices for A2A integration.
Use Case: Expose all agent capabilities through one endpoint
@Configuration
class SimpleA2AConfig {
@Bean
fun agentCardHandler(
agentPlatform: AgentPlatform,
requestHandler: AutonomyA2ARequestHandler
): AgentCardHandler {
return EmbabelServerGoalsAgentCardHandler(
path = "a2a",
agentPlatform = agentPlatform,
a2ARequestHandler = requestHandler,
goalFilter = { true }
)
}
}Endpoints:
/a2a/.well-known/agent.json/a2aBenefits:
When to Use: Small applications, internal services, prototyping
Use Case: Expose different capability sets to different audiences
@Configuration
class TieredA2AConfig {
@Bean
fun publicEndpoint(
agentPlatform: AgentPlatform,
requestHandler: AutonomyA2ARequestHandler
): AgentCardHandler {
return EmbabelServerGoalsAgentCardHandler(
path = "a2a-public",
agentPlatform = agentPlatform,
a2ARequestHandler = requestHandler,
goalFilter = { goal -> goal.tags.contains("public") }
)
}
@Bean
fun partnerEndpoint(
agentPlatform: AgentPlatform,
requestHandler: AutonomyA2ARequestHandler
): AgentCardHandler {
return EmbabelServerGoalsAgentCardHandler(
path = "a2a-partner",
agentPlatform = agentPlatform,
a2ARequestHandler = requestHandler,
goalFilter = { goal -> goal.tags.contains("partner") }
)
}
@Bean
fun internalEndpoint(
agentPlatform: AgentPlatform,
requestHandler: AutonomyA2ARequestHandler
): AgentCardHandler {
return EmbabelServerGoalsAgentCardHandler(
path = "a2a-internal",
agentPlatform = agentPlatform,
a2ARequestHandler = requestHandler,
goalFilter = { goal -> goal.tags.contains("internal") }
)
}
}Security Configuration:
@Configuration
@EnableWebSecurity
class TieredSecurityConfig {
@Bean
fun filterChain(http: HttpSecurity): SecurityFilterChain {
http.authorizeHttpRequests { auth ->
auth
// Public endpoint
.requestMatchers("/a2a-public/**").permitAll()
// Partner endpoint - API key
.requestMatchers("/a2a-partner/**").hasRole("PARTNER")
// Internal endpoint - mTLS or service account
.requestMatchers("/a2a-internal/**").hasRole("INTERNAL")
}
return http.build()
}
}Benefits:
When to Use: SaaS applications, B2B integrations, multi-tenant systems
Use Case: Full observability with metrics, logging, and tracing
@Component
class A2AObservability(
private val meterRegistry: MeterRegistry,
private val tracer: Tracer
) {
private val activeRequests = ConcurrentHashMap<String, Span>()
@EventListener
fun onRequest(event: A2ARequestEvent) {
// Metrics
meterRegistry.counter(
"a2a.requests.total",
"method", event.request.method,
"platform", event.agentPlatform.name
).increment()
// Tracing
val span = tracer.spanBuilder("a2a.request")
.setAttribute("method", event.request.method)
.setAttribute("platform", event.agentPlatform.name)
.startSpan()
activeRequests[event.request.id?.toString() ?: ""] = span
// Logging
logger.info(
"A2A Request: method={}, id={}, timestamp={}",
event.request.method,
event.request.id,
event.timestamp
)
}
@EventListener
fun onResponse(event: A2AResponseEvent) {
// Complete trace
val span = activeRequests.remove(event.response.id?.toString())
span?.end()
// Metrics
val status = if (event.response is JSONRPCErrorResponse) "error" else "success"
meterRegistry.counter(
"a2a.responses.total",
"status", status,
"platform", event.agentPlatform.name
).increment()
// Logging
logger.info(
"A2A Response: id={}, status={}, timestamp={}",
event.response.id,
status,
event.timestamp
)
}
@EventListener
fun onError(event: A2AResponseEvent) {
if (event.response is JSONRPCErrorResponse) {
val error = event.response.error
logger.error(
"A2A Error: code={}, message={}, id={}",
error.code,
error.message,
event.response.id
)
}
}
}Dashboard Integration:
@Component
class A2ADashboard(private val dashboardService: DashboardService) {
@EventListener
@Async
fun updateDashboard(event: A2AResponseEvent) {
dashboardService.recordMetric(
metric = "a2a.response",
timestamp = event.timestamp,
success = event.response !is JSONRPCErrorResponse
)
}
}Benefits:
When to Use: Production services, mission-critical systems
Use Case: Non-blocking event handling for analytics and audit trails
@Configuration
@EnableAsync
class AsyncConfig {
@Bean
fun taskExecutor(): Executor {
val executor = ThreadPoolTaskExecutor()
executor.corePoolSize = 4
executor.maxPoolSize = 10
executor.queueCapacity = 100
executor.setThreadNamePrefix("a2a-event-")
executor.initialize()
return executor
}
}
@Component
class AsyncA2AHandlers(
private val auditRepository: AuditRepository,
private val analyticsService: AnalyticsService
) {
@Async
@EventListener
fun auditRequest(event: A2ARequestEvent) {
// Expensive audit logging doesn't block request processing
auditRepository.save(AuditEntry(
timestamp = event.timestamp,
eventType = "A2A_REQUEST",
method = event.request.method,
details = objectMapper.writeValueAsString(event.request)
))
}
@Async
@EventListener
fun analyzeResponse(event: A2AResponseEvent) {
// Analytics processing runs asynchronously
analyticsService.recordResponse(
platform = event.agentPlatform.name,
success = event.response !is JSONRPCErrorResponse,
timestamp = event.timestamp
)
}
}Benefits:
When to Use: High-traffic services, complex analytics, audit requirements
Use Case: Enable A2A only in specific environments or with feature flags
@Configuration
@ConditionalOnProperty(
prefix = "features",
name = ["a2a.enabled"],
havingValue = "true"
)
class ConditionalA2AConfig {
@Bean
fun agentCardHandler(
agentPlatform: AgentPlatform,
requestHandler: AutonomyA2ARequestHandler
): AgentCardHandler {
return EmbabelServerGoalsAgentCardHandler(
path = "a2a",
agentPlatform = agentPlatform,
a2ARequestHandler = requestHandler,
goalFilter = { true }
)
}
}Feature Configuration:
# application-dev.yml
features:
a2a:
enabled: true
# application-prod.yml
features:
a2a:
enabled: false # Disable in prod initiallyDynamic Feature Flags:
@Configuration
class DynamicA2AConfig(
private val featureFlagService: FeatureFlagService
) {
@Bean
@ConditionalOnBean(name = ["featureFlagService"])
fun agentCardHandler(...): AgentCardHandler? {
return if (featureFlagService.isEnabled("a2a-protocol")) {
EmbabelServerGoalsAgentCardHandler(...)
} else {
null
}
}
}Benefits:
When to Use: New deployments, beta features, gradual migrations
Use Case: Custom processing logic beyond standard Autonomy integration
@Service
class CustomA2ARequestHandler(
private val customService: CustomService,
private val agenticEventListener: AgenticEventListener,
private val streamingHandler: A2AStreamingHandler
) : A2ARequestHandler {
override fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*> {
// Publish event
agenticEventListener.onPlatformEvent(
A2ARequestEvent(customService.platform, request)
)
// Custom routing
val response = when (request) {
is SendMessageRequest -> {
val intent = extractIntent(request)
customService.processMessage(intent, request.params)
}
else -> throw UnsupportedOperationException("Method ${request.method} not supported")
}
// Publish response event
agenticEventListener.onPlatformEvent(
A2AResponseEvent(customService.platform, response)
)
return response
}
override fun handleJsonRpcStream(request: StreamingJSONRPCRequest<*>): SseEmitter {
// Custom streaming logic
return customService.handleStreaming(request)
}
private fun extractIntent(request: SendMessageRequest): String {
return request.params.message.parts
.filterIsInstance<TextPart>()
.firstOrNull()?.text ?: ""
}
}Benefits:
When to Use: Custom architectures, legacy system integration, specialized workflows
Use Case: Persist task state across server restarts
@Service
class PersistentTaskStateManager(
private val taskRepository: TaskRepository,
private val taskStateManager: TaskStateManager
) {
@EventListener
fun onApplicationReady(event: ApplicationReadyEvent) {
// Restore tasks on startup
taskRepository.findActiveTasks().forEach { task ->
taskStateManager.registerTask(
task.taskId,
task.contextId,
task.streamId
)
task.events.forEach { event ->
taskStateManager.recordEvent(task.taskId, event)
}
}
logger.info("Restored {} active tasks", taskRepository.countActiveTasks())
}
@EventListener
@Transactional
fun persistTaskEvent(event: A2AResponseEvent) {
// Save task events to database
if (event.response is SendStreamingMessageResponse) {
val result = event.response.result
if (result is TaskStatusUpdateEvent) {
taskRepository.saveEvent(
taskId = result.taskId,
event = result,
timestamp = event.timestamp
)
}
}
}
@Scheduled(fixedRate = 300000) // Every 5 minutes
fun persistActiveTasks() {
// Periodically persist active task state
taskRepository.syncActiveTasks(taskStateManager.getAllActiveTasks())
}
}Benefits:
When to Use: Long-running tasks, high-availability requirements, audit compliance
Use Case: Protect service from abuse and ensure fair usage
@Component
class A2ARateLimiter {
private val rateLimiters = ConcurrentHashMap<String, RateLimiter>()
@EventListener(condition = "#event.request.method == 'message/send' or #event.request.method == 'message/stream'")
fun checkRateLimit(event: A2ARequestEvent) {
val clientId = extractClientId(event.request)
val limiter = rateLimiters.computeIfAbsent(clientId) {
RateLimiter.create(10.0) // 10 requests per second
}
if (!limiter.tryAcquire()) {
throw RateLimitExceededException("Rate limit exceeded for client: $clientId")
}
}
private fun extractClientId(request: JSONRPCRequest<*>): String {
// Extract from authentication context or headers
return SecurityContextHolder.getContext().authentication?.name ?: "anonymous"
}
}
@ControllerAdvice
class A2AExceptionHandler {
@ExceptionHandler(RateLimitExceededException::class)
fun handleRateLimit(e: RateLimitExceededException): ResponseEntity<JSONRPCErrorResponse> {
return ResponseEntity.status(429).body(
JSONRPCErrorResponse(
null,
JSONRPCError(429, "Too Many Requests", e.message)
)
)
}
}Benefits:
When to Use: Public endpoints, multi-tenant systems, high-traffic services
tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3