CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-a2a

A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication

Overview
Eval results
Files

integration-patterns.mddocs/guides/

Integration Patterns

Architectural patterns and best practices for A2A integration.

Pattern 1: Simple Single Endpoint

Use Case: Expose all agent capabilities through one endpoint

@Configuration
class SimpleA2AConfig {
    @Bean
    fun agentCardHandler(
        agentPlatform: AgentPlatform,
        requestHandler: AutonomyA2ARequestHandler
    ): AgentCardHandler {
        return EmbabelServerGoalsAgentCardHandler(
            path = "a2a",
            agentPlatform = agentPlatform,
            a2ARequestHandler = requestHandler,
            goalFilter = { true }
        )
    }
}

Endpoints:

  • /a2a/.well-known/agent.json
  • /a2a

Benefits:

  • Simplest setup
  • Single discovery point
  • Minimal configuration

When to Use: Small applications, internal services, prototyping


Pattern 2: Multi-Tier Endpoints

Use Case: Expose different capability sets to different audiences

@Configuration
class TieredA2AConfig {

    @Bean
    fun publicEndpoint(
        agentPlatform: AgentPlatform,
        requestHandler: AutonomyA2ARequestHandler
    ): AgentCardHandler {
        return EmbabelServerGoalsAgentCardHandler(
            path = "a2a-public",
            agentPlatform = agentPlatform,
            a2ARequestHandler = requestHandler,
            goalFilter = { goal -> goal.tags.contains("public") }
        )
    }

    @Bean
    fun partnerEndpoint(
        agentPlatform: AgentPlatform,
        requestHandler: AutonomyA2ARequestHandler
    ): AgentCardHandler {
        return EmbabelServerGoalsAgentCardHandler(
            path = "a2a-partner",
            agentPlatform = agentPlatform,
            a2ARequestHandler = requestHandler,
            goalFilter = { goal -> goal.tags.contains("partner") }
        )
    }

    @Bean
    fun internalEndpoint(
        agentPlatform: AgentPlatform,
        requestHandler: AutonomyA2ARequestHandler
    ): AgentCardHandler {
        return EmbabelServerGoalsAgentCardHandler(
            path = "a2a-internal",
            agentPlatform = agentPlatform,
            a2ARequestHandler = requestHandler,
            goalFilter = { goal -> goal.tags.contains("internal") }
        )
    }
}

Security Configuration:

@Configuration
@EnableWebSecurity
class TieredSecurityConfig {
    @Bean
    fun filterChain(http: HttpSecurity): SecurityFilterChain {
        http.authorizeHttpRequests { auth ->
            auth
                // Public endpoint
                .requestMatchers("/a2a-public/**").permitAll()
                // Partner endpoint - API key
                .requestMatchers("/a2a-partner/**").hasRole("PARTNER")
                // Internal endpoint - mTLS or service account
                .requestMatchers("/a2a-internal/**").hasRole("INTERNAL")
        }
        return http.build()
    }
}

Benefits:

  • Clear capability segregation
  • Different security policies per tier
  • Audience-specific exposure

When to Use: SaaS applications, B2B integrations, multi-tenant systems


Pattern 3: Observable A2A Service

Use Case: Full observability with metrics, logging, and tracing

@Component
class A2AObservability(
    private val meterRegistry: MeterRegistry,
    private val tracer: Tracer
) {
    private val activeRequests = ConcurrentHashMap<String, Span>()

    @EventListener
    fun onRequest(event: A2ARequestEvent) {
        // Metrics
        meterRegistry.counter(
            "a2a.requests.total",
            "method", event.request.method,
            "platform", event.agentPlatform.name
        ).increment()

        // Tracing
        val span = tracer.spanBuilder("a2a.request")
            .setAttribute("method", event.request.method)
            .setAttribute("platform", event.agentPlatform.name)
            .startSpan()
        activeRequests[event.request.id?.toString() ?: ""] = span

        // Logging
        logger.info(
            "A2A Request: method={}, id={}, timestamp={}",
            event.request.method,
            event.request.id,
            event.timestamp
        )
    }

    @EventListener
    fun onResponse(event: A2AResponseEvent) {
        // Complete trace
        val span = activeRequests.remove(event.response.id?.toString())
        span?.end()

        // Metrics
        val status = if (event.response is JSONRPCErrorResponse) "error" else "success"
        meterRegistry.counter(
            "a2a.responses.total",
            "status", status,
            "platform", event.agentPlatform.name
        ).increment()

        // Logging
        logger.info(
            "A2A Response: id={}, status={}, timestamp={}",
            event.response.id,
            status,
            event.timestamp
        )
    }

    @EventListener
    fun onError(event: A2AResponseEvent) {
        if (event.response is JSONRPCErrorResponse) {
            val error = event.response.error
            logger.error(
                "A2A Error: code={}, message={}, id={}",
                error.code,
                error.message,
                event.response.id
            )
        }
    }
}

Dashboard Integration:

@Component
class A2ADashboard(private val dashboardService: DashboardService) {

    @EventListener
    @Async
    fun updateDashboard(event: A2AResponseEvent) {
        dashboardService.recordMetric(
            metric = "a2a.response",
            timestamp = event.timestamp,
            success = event.response !is JSONRPCErrorResponse
        )
    }
}

Benefits:

  • Complete visibility into A2A operations
  • Integration with monitoring tools
  • Performance tracking
  • Error alerting

When to Use: Production services, mission-critical systems


Pattern 4: Async Event Processing

Use Case: Non-blocking event handling for analytics and audit trails

@Configuration
@EnableAsync
class AsyncConfig {
    @Bean
    fun taskExecutor(): Executor {
        val executor = ThreadPoolTaskExecutor()
        executor.corePoolSize = 4
        executor.maxPoolSize = 10
        executor.queueCapacity = 100
        executor.setThreadNamePrefix("a2a-event-")
        executor.initialize()
        return executor
    }
}

@Component
class AsyncA2AHandlers(
    private val auditRepository: AuditRepository,
    private val analyticsService: AnalyticsService
) {

    @Async
    @EventListener
    fun auditRequest(event: A2ARequestEvent) {
        // Expensive audit logging doesn't block request processing
        auditRepository.save(AuditEntry(
            timestamp = event.timestamp,
            eventType = "A2A_REQUEST",
            method = event.request.method,
            details = objectMapper.writeValueAsString(event.request)
        ))
    }

    @Async
    @EventListener
    fun analyzeResponse(event: A2AResponseEvent) {
        // Analytics processing runs asynchronously
        analyticsService.recordResponse(
            platform = event.agentPlatform.name,
            success = event.response !is JSONRPCErrorResponse,
            timestamp = event.timestamp
        )
    }
}

Benefits:

  • Request processing not blocked by event handlers
  • Improved performance
  • Scalable analytics pipeline

When to Use: High-traffic services, complex analytics, audit requirements


Pattern 5: Conditional A2A Activation

Use Case: Enable A2A only in specific environments or with feature flags

@Configuration
@ConditionalOnProperty(
    prefix = "features",
    name = ["a2a.enabled"],
    havingValue = "true"
)
class ConditionalA2AConfig {

    @Bean
    fun agentCardHandler(
        agentPlatform: AgentPlatform,
        requestHandler: AutonomyA2ARequestHandler
    ): AgentCardHandler {
        return EmbabelServerGoalsAgentCardHandler(
            path = "a2a",
            agentPlatform = agentPlatform,
            a2ARequestHandler = requestHandler,
            goalFilter = { true }
        )
    }
}

Feature Configuration:

# application-dev.yml
features:
  a2a:
    enabled: true

# application-prod.yml
features:
  a2a:
    enabled: false  # Disable in prod initially

Dynamic Feature Flags:

@Configuration
class DynamicA2AConfig(
    private val featureFlagService: FeatureFlagService
) {

    @Bean
    @ConditionalOnBean(name = ["featureFlagService"])
    fun agentCardHandler(...): AgentCardHandler? {
        return if (featureFlagService.isEnabled("a2a-protocol")) {
            EmbabelServerGoalsAgentCardHandler(...)
        } else {
            null
        }
    }
}

Benefits:

  • Gradual rollout
  • Environment-specific configuration
  • Easy A/B testing
  • Quick disable in emergencies

When to Use: New deployments, beta features, gradual migrations


Pattern 6: Custom Request Handler

Use Case: Custom processing logic beyond standard Autonomy integration

@Service
class CustomA2ARequestHandler(
    private val customService: CustomService,
    private val agenticEventListener: AgenticEventListener,
    private val streamingHandler: A2AStreamingHandler
) : A2ARequestHandler {

    override fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*> {
        // Publish event
        agenticEventListener.onPlatformEvent(
            A2ARequestEvent(customService.platform, request)
        )

        // Custom routing
        val response = when (request) {
            is SendMessageRequest -> {
                val intent = extractIntent(request)
                customService.processMessage(intent, request.params)
            }
            else -> throw UnsupportedOperationException("Method ${request.method} not supported")
        }

        // Publish response event
        agenticEventListener.onPlatformEvent(
            A2AResponseEvent(customService.platform, response)
        )

        return response
    }

    override fun handleJsonRpcStream(request: StreamingJSONRPCRequest<*>): SseEmitter {
        // Custom streaming logic
        return customService.handleStreaming(request)
    }

    private fun extractIntent(request: SendMessageRequest): String {
        return request.params.message.parts
            .filterIsInstance<TextPart>()
            .firstOrNull()?.text ?: ""
    }
}

Benefits:

  • Full control over request processing
  • Custom routing logic
  • Integration with non-Autonomy systems

When to Use: Custom architectures, legacy system integration, specialized workflows


Pattern 7: Task State Persistence

Use Case: Persist task state across server restarts

@Service
class PersistentTaskStateManager(
    private val taskRepository: TaskRepository,
    private val taskStateManager: TaskStateManager
) {

    @EventListener
    fun onApplicationReady(event: ApplicationReadyEvent) {
        // Restore tasks on startup
        taskRepository.findActiveTasks().forEach { task ->
            taskStateManager.registerTask(
                task.taskId,
                task.contextId,
                task.streamId
            )
            task.events.forEach { event ->
                taskStateManager.recordEvent(task.taskId, event)
            }
        }
        logger.info("Restored {} active tasks", taskRepository.countActiveTasks())
    }

    @EventListener
    @Transactional
    fun persistTaskEvent(event: A2AResponseEvent) {
        // Save task events to database
        if (event.response is SendStreamingMessageResponse) {
            val result = event.response.result
            if (result is TaskStatusUpdateEvent) {
                taskRepository.saveEvent(
                    taskId = result.taskId,
                    event = result,
                    timestamp = event.timestamp
                )
            }
        }
    }

    @Scheduled(fixedRate = 300000)  // Every 5 minutes
    fun persistActiveTasks() {
        // Periodically persist active task state
        taskRepository.syncActiveTasks(taskStateManager.getAllActiveTasks())
    }
}

Benefits:

  • Task continuity across restarts
  • Durability for long-running tasks
  • Audit trail persistence

When to Use: Long-running tasks, high-availability requirements, audit compliance


Pattern 8: Rate Limiting and Throttling

Use Case: Protect service from abuse and ensure fair usage

@Component
class A2ARateLimiter {
    private val rateLimiters = ConcurrentHashMap<String, RateLimiter>()

    @EventListener(condition = "#event.request.method == 'message/send' or #event.request.method == 'message/stream'")
    fun checkRateLimit(event: A2ARequestEvent) {
        val clientId = extractClientId(event.request)
        val limiter = rateLimiters.computeIfAbsent(clientId) {
            RateLimiter.create(10.0)  // 10 requests per second
        }

        if (!limiter.tryAcquire()) {
            throw RateLimitExceededException("Rate limit exceeded for client: $clientId")
        }
    }

    private fun extractClientId(request: JSONRPCRequest<*>): String {
        // Extract from authentication context or headers
        return SecurityContextHolder.getContext().authentication?.name ?: "anonymous"
    }
}

@ControllerAdvice
class A2AExceptionHandler {

    @ExceptionHandler(RateLimitExceededException::class)
    fun handleRateLimit(e: RateLimitExceededException): ResponseEntity<JSONRPCErrorResponse> {
        return ResponseEntity.status(429).body(
            JSONRPCErrorResponse(
                null,
                JSONRPCError(429, "Too Many Requests", e.message)
            )
        )
    }
}

Benefits:

  • Service protection
  • Fair resource allocation
  • DoS prevention

When to Use: Public endpoints, multi-tenant systems, high-traffic services


Best Practices Summary

  1. Security: Always secure JSON-RPC endpoints; allow public access only to AgentCard
  2. Observability: Implement comprehensive monitoring and logging
  3. Error Handling: Use async event processing for non-critical operations
  4. Resource Management: Schedule periodic task cleanup
  5. Scalability: Use rate limiting for public endpoints
  6. Testing: Test both streaming and non-streaming modes
  7. Documentation: Maintain clear AgentCard descriptions and skill examples

See Also

  • Common Tasks - Implementation recipes
  • API Reference - Complete API documentation
  • Troubleshooting - Problem resolution
tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3

docs

index.md

tile.json