CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-a2a

A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication

Overview
Eval results
Files

request-handling.mddocs/api/

Request Handling API

JSON-RPC 2.0 request processing with support for both synchronous and streaming modes.

A2ARequestHandler

Core interface for handling JSON-RPC requests.

interface A2ARequestHandler {
    /**
     * Handle non-streaming JSON-RPC request.
     * @param request NonStreamingJSONRPCRequest (SendMessageRequest, GetTaskRequest, CancelTaskRequest)
     * @return JSONRPCResponse with result or error
     */
    fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*>

    /**
     * Handle streaming JSON-RPC request using SSE.
     * @param request StreamingJSONRPCRequest (SendStreamingMessageRequest)
     * @return SseEmitter for streaming events
     * @throws UnsupportedOperationException if streaming not supported
     */
    fun handleJsonRpcStream(request: StreamingJSONRPCRequest<*>): SseEmitter
}

AutonomyA2ARequestHandler

Implementation that integrates with Embabel Autonomy system.

@Service
class AutonomyA2ARequestHandler(
    private val autonomy: Autonomy,
    private val agenticEventListener: AgenticEventListener,
    private val streamingHandler: A2AStreamingHandler
) : A2ARequestHandler {

    /**
     * Handles non-streaming requests.
     * Supports: SendMessageRequest
     * Not yet implemented: GetTaskRequest, CancelTaskRequest
     */
    override fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*>

    /**
     * Handles streaming requests with SSE.
     * Supports: SendStreamingMessageRequest
     */
    override fun handleJsonRpcStream(request: StreamingJSONRPCRequest<*>): SseEmitter

    /**
     * Handles custom streaming requests not in standard SDK.
     * Supports: TaskResubscriptionRequest (task/resubscribe)
     */
    fun handleCustomStreamingRequest(
        method: String,
        requestMap: Map<String, Any>,
        objectMapper: ObjectMapper
    ): SseEmitter

    /**
     * Handles message/stream requests.
     * Executes agent and sends real-time status updates via SSE.
     */
    fun handleMessageStream(request: SendStreamingMessageRequest): SseEmitter

    /**
     * Handles task/resubscribe requests.
     * Replays historical events for specified task.
     * @throws IllegalArgumentException if task not found
     */
    fun handleTaskResubscribe(request: TaskResubscriptionRequest): SseEmitter
}

Supported Request Types

Non-Streaming

SendMessageRequest (message/send):

val request = SendMessageRequest(
    id = "msg-123",
    params = MessageSendParams(
        message = Message.Builder()
            .messageId("m1")
            .role(Message.Role.USER)
            .parts(listOf(TextPart("Find news for Scorpios")))
            .build()
    )
)

val response: SendMessageResponse = handler.handleJsonRpc(request)

GetTaskRequest (task/get) - Not yet implemented:

// Returns TODO() error in current version
val request = GetTaskRequest(id = "req-1", params = TaskIdParams(id = "task-123"))
val response = handler.handleJsonRpc(request)  // Throws UnsupportedOperationException

CancelTaskRequest (task/cancel) - Not yet implemented:

// Returns TODO() error in current version
val request = CancelTaskRequest(id = "req-1", params = TaskIdParams(id = "task-123"))
val response = handler.handleJsonRpc(request)  // Throws UnsupportedOperationException

Streaming

SendStreamingMessageRequest (message/stream):

val request = SendStreamingMessageRequest(
    id = "stream-456",
    params = MessageSendParams(
        message = Message.Builder()
            .messageId("m2")
            .role(Message.Role.USER)
            .parts(listOf(TextPart("Research Australian elections")))
            .build()
    )
)

val emitter: SseEmitter = handler.handleJsonRpcStream(request)

TaskResubscriptionRequest (task/resubscribe):

val request = TaskResubscriptionRequest(
    id = "resubscribe-789",
    params = TaskIdParams(id = "task-123")
)

val emitter: SseEmitter = handler.handleTaskResubscribe(request)
// Replays all historical events, then continues with new ones

Request Processing Flow

Non-Streaming (message/send)

1. Client sends POST /a2a with SendMessageRequest
2. A2ARequestEvent published
3. Extract intent from message parts
4. Execute autonomy.chooseAndRunAgent(intent, options)
5. Build Task with COMPLETED/FAILED status
6. Create SendMessageResponse
7. A2AResponseEvent published
8. Return response to client

Streaming (message/stream)

1. Client sends POST /a2a with SendStreamingMessageRequest
2. A2ARequestEvent published
3. Create SSE stream with streamingHandler
4. Start virtual thread for async execution:
   a. Send initial TaskStatusUpdateEvent (WORKING)
   b. Execute autonomy.chooseAndRunAgent(intent, options)
   c. Send progress TaskStatusUpdateEvents
   d. Send final TaskStatusUpdateEvent (COMPLETED/FAILED)
5. Close stream
6. A2AResponseEvent published
7. Return SseEmitter to client

Resubscription (task/resubscribe)

1. Client sends POST /a2a with TaskResubscriptionRequest
2. Verify task exists in TaskStateManager
3. Create new stream with resubscribeToTask()
4. Replay all historical events asynchronously
5. If task completed: close stream after replay
6. If task active: continue sending new events
7. Return SseEmitter to client

Extension Functions

/**
 * Extension for creating success responses from SendMessageRequest.
 */
fun SendMessageRequest.successResponseWith(result: EventKind): SendMessageResponse

Usage:

val request = SendMessageRequest(/* ... */)
val task = Task.Builder().id("task-1").status(/* ... */).build()
val response = request.successResponseWith(result = task)

Event Publishing

Requests and responses trigger events:

// Published before processing
A2ARequestEvent(
    agentPlatform = autonomy.agentPlatform,
    request = request
)

// Published after processing
A2AResponseEvent(
    agentPlatform = autonomy.agentPlatform,
    response = response
)

Error Handling

UnsupportedOperationException

override fun handleJsonRpc(request: NonStreamingJSONRPCRequest<*>): JSONRPCResponse<*> {
    return when (request) {
        is SendMessageRequest -> handleMessageSend(request, request.params)
        else -> throw UnsupportedOperationException("Method ${request.method} not supported")
    }
}

JSON-RPC Error Response

catch (e: Exception) {
    JSONRPCErrorResponse(
        taskId,
        TaskNotFoundError(null, "Internal error: ${e.message}", e.stackTraceToString())
    )
}

Streaming Errors

try {
    // streaming logic
} catch (e: Exception) {
    streamingHandler.sendStreamEvent(
        streamId,
        TaskStatusUpdateEvent.Builder()
            .taskId(taskId)
            .status(createFailedTaskStatus(params, e))
            .build(),
        taskId
    )
} finally {
    streamingHandler.closeStream(streamId)
}

Thread Safety

  • AutonomyA2ARequestHandler is a singleton Spring service
  • Request handlers are stateless
  • Task state managed by thread-safe TaskStateManager
  • Streaming uses separate virtual threads
  • SseEmitter instances are thread-safe

Complete Examples

Synchronous Request

@Service
class MessageService(private val handler: AutonomyA2ARequestHandler) {

    fun sendMessage(text: String): Task {
        val request = SendMessageRequest(
            id = UUID.randomUUID().toString(),
            params = MessageSendParams(
                message = Message.Builder()
                    .messageId(UUID.randomUUID().toString())
                    .role(Message.Role.USER)
                    .parts(listOf(TextPart(text)))
                    .build()
            )
        )

        val response = handler.handleJsonRpc(request) as SendMessageResponse
        return response.result as Task
    }
}

Streaming Request

@RestController
class StreamingController(private val handler: AutonomyA2ARequestHandler) {

    @PostMapping("/stream")
    fun streamMessage(@RequestBody text: String): SseEmitter {
        val request = SendStreamingMessageRequest(
            id = UUID.randomUUID().toString(),
            params = MessageSendParams(
                message = Message.Builder()
                    .messageId(UUID.randomUUID().toString())
                    .role(Message.Role.USER)
                    .parts(listOf(TextPart(text)))
                    .build()
            )
        )

        return handler.handleJsonRpcStream(request)
    }
}

Resubscription

@Service
class TaskResubscriptionService(private val handler: AutonomyA2ARequestHandler) {

    fun resubscribeToTask(taskId: String): SseEmitter {
        val request = TaskResubscriptionRequest(
            id = UUID.randomUUID().toString(),
            params = TaskIdParams(id = taskId)
        )

        return try {
            handler.handleTaskResubscribe(request)
        } catch (e: IllegalArgumentException) {
            throw TaskNotFoundException("Task not found: $taskId")
        }
    }
}

See Also

  • Streaming API - SSE stream management
  • Task State API - Task history tracking
  • Events API - Event publishing
  • Common Tasks - Usage recipes
tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3

docs

api

agent-card.md

configuration.md

endpoint-registration.md

events.md

overview.md

request-handling.md

skill-factory.md

streaming.md

task-state.md

index.md

tile.json