CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/maven-com-embabel-agent--embabel-agent-a2a

A2A protocol integration for Embabel Agent Framework enabling agent-to-agent communication

Overview
Eval results
Files

streaming.mddocs/api/

Streaming API

Server-Sent Events (SSE) stream management for real-time agent-to-agent communication.

A2AStreamingHandler

Manages SSE streams, task state tracking, and resubscription.

@Service
class A2AStreamingHandler(
    private val objectMapper: ObjectMapper,
    private val taskStateManager: TaskStateManager
) {
    /**
     * Creates new SSE stream for real-time communication.
     * @param streamId Unique stream identifier
     * @param taskId Optional task ID to associate with stream
     * @param contextId Optional context ID
     * @return SseEmitter with infinite timeout
     */
    fun createStream(streamId: String, taskId: String? = null, contextId: String? = null): SseEmitter

    /**
     * Resubscribes to existing task by creating new stream and replaying events.
     * @param taskId Task ID to resubscribe to
     * @param newStreamId New stream ID for subscription
     * @return SseEmitter with replayed events
     * @throws IllegalArgumentException if task not found
     */
    fun resubscribeToTask(taskId: String, newStreamId: String): SseEmitter

    /**
     * Sends streaming event to specified stream and records it.
     * @param streamId Target stream ID
     * @param event Streaming event (TaskStatusUpdateEvent, Task, etc.)
     * @param taskId Optional task ID for event recording
     */
    fun sendStreamEvent(streamId: String, event: StreamingEventKind, taskId: String? = null)

    /**
     * Closes stream and cleans up resources.
     * @param streamId Stream ID to close
     */
    fun closeStream(streamId: String)

    /**
     * Shuts down streaming handler and internal scheduler.
     * Call during application shutdown.
     */
    fun shutdown()
}

Stream Lifecycle

1. Create Stream

val streamId = UUID.randomUUID().toString()
val taskId = UUID.randomUUID().toString()
val contextId = UUID.randomUUID().toString()

val emitter = streamingHandler.createStream(streamId, taskId, contextId)
// Stream is active, can receive events

What Happens:

  • Creates SseEmitter with Long.MAX_VALUE timeout
  • Adds to active streams map
  • Registers task in TaskStateManager
  • Sets up completion/timeout handlers

2. Send Events

val event = TaskStatusUpdateEvent.Builder()
    .taskId(taskId)
    .contextId(contextId)
    .status(TaskStatus(
        TaskState.WORKING,
        Message.Builder()
            .messageId(UUID.randomUUID().toString())
            .role(Message.Role.AGENT)
            .parts(listOf(TextPart("Processing...")))
            .build(),
        OffsetDateTime.now()
    ))
    .build()

streamingHandler.sendStreamEvent(streamId, event, taskId)

What Happens:

  • Records event in TaskStateManager
  • Wraps in SendStreamingMessageResponse
  • Sends via SSE
  • Handles errors with completeWithError()

3. Close Stream

streamingHandler.closeStream(streamId)

What Happens:

  • Removes from active streams
  • Completes emitter
  • Triggers cleanup handlers

Event Format

Events wrapped in SendStreamingMessageResponse:

SendStreamingMessageResponse(
    jsonrpc = "2.0",
    id = streamId,
    result = event,  // StreamingEventKind
    error = null
)

SSE Wire Format:

event: message
data: {"jsonrpc":"2.0","id":"stream-123","result":{...},"error":null}

event: message
data: {"jsonrpc":"2.0","id":"stream-123","result":{...},"error":null}

Event Types

TaskStatusUpdateEvent

TaskStatusUpdateEvent.Builder()
    .taskId(taskId)
    .contextId(contextId)
    .status(TaskStatus(TaskState.WORKING, message, timestamp))
    .isFinal(false)
    .build()

Task (Initial)

Task.Builder()
    .id(taskId)
    .contextId(contextId)
    .status(TaskStatus(...))
    .history(listOf(...))
    .artifacts(listOf(...))
    .build()

TaskArtifactUpdateEvent

TaskArtifactUpdateEvent.Builder()
    .taskId(taskId)
    .contextId(contextId)
    .artifact(Artifact(...))
    .build()

Resubscription

Replay historical events for a task:

val newStreamId = UUID.randomUUID().toString()

try {
    val emitter = streamingHandler.resubscribeToTask(taskId, newStreamId)
    // Emitter replays all events, then receives new ones
} catch (e: IllegalArgumentException) {
    // Task not found
}

Process:

  1. Verify task exists
  2. Create new stream with same task/context IDs
  3. Update stream ID in TaskStateManager
  4. Replay historical events asynchronously
  5. If completed: close after replay
  6. If active: continue with new events

Complete Example

@Service
class AgentStreamingService(
    private val streamingHandler: A2AStreamingHandler,
    private val autonomy: Autonomy
) {

    fun executeWithStreaming(intent: String): SseEmitter {
        val streamId = UUID.randomUUID().toString()
        val taskId = UUID.randomUUID().toString()
        val contextId = UUID.randomUUID().toString()

        // Create stream
        val emitter = streamingHandler.createStream(streamId, taskId, contextId)

        // Execute asynchronously
        Thread.startVirtualThread {
            try {
                // Send initial status
                sendWorkingStatus(streamId, taskId, contextId, "Starting task...")

                // Execute agent
                val result = autonomy.chooseAndRunAgent(intent, ProcessOptions())

                // Send progress
                sendWorkingStatus(streamId, taskId, contextId, "Processing results...")

                // Send completion
                sendCompletedStatus(streamId, taskId, contextId, "Task completed")

            } catch (e: Exception) {
                sendFailedStatus(streamId, taskId, contextId, "Error: ${e.message}")
            } finally {
                streamingHandler.closeStream(streamId)
            }
        }

        return emitter
    }

    private fun sendWorkingStatus(streamId: String, taskId: String, contextId: String, text: String) {
        streamingHandler.sendStreamEvent(
            streamId,
            TaskStatusUpdateEvent.Builder()
                .taskId(taskId)
                .contextId(contextId)
                .status(TaskStatus(
                    TaskState.WORKING,
                    createMessage(text),
                    OffsetDateTime.now()
                ))
                .build(),
            taskId
        )
    }

    private fun sendCompletedStatus(streamId: String, taskId: String, contextId: String, text: String) {
        streamingHandler.sendStreamEvent(
            streamId,
            TaskStatusUpdateEvent.Builder()
                .taskId(taskId)
                .contextId(contextId)
                .status(TaskStatus(
                    TaskState.COMPLETED,
                    createMessage(text),
                    OffsetDateTime.now()
                ))
                .isFinal(true)
                .build(),
            taskId
        )
    }

    private fun sendFailedStatus(streamId: String, taskId: String, contextId: String, text: String) {
        streamingHandler.sendStreamEvent(
            streamId,
            TaskStatusUpdateEvent.Builder()
                .taskId(taskId)
                .contextId(contextId)
                .status(TaskStatus(
                    TaskState.FAILED,
                    createMessage(text),
                    OffsetDateTime.now()
                ))
                .build(),
            taskId
        )
    }

    private fun createMessage(text: String): Message {
        return Message.Builder()
            .messageId(UUID.randomUUID().toString())
            .role(Message.Role.AGENT)
            .parts(listOf(TextPart(text)))
            .build()
    }
}

Error Handling

Stream Send Errors

try {
    emitter.send(eventData)
} catch (e: Exception) {
    logger.error("Error sending stream event", e)
    emitter.completeWithError(e)
}

Task Not Found

if (!taskStateManager.taskExists(taskId)) {
    throw IllegalArgumentException("Task not found: $taskId")
}

Replay Errors

Thread.startVirtualThread {
    try {
        taskInfo.events.forEach { event ->
            sendStreamEvent(newStreamId, event, taskId)
        }
    } catch (e: Exception) {
        logger.error("Error replaying events for task {}", taskId, e)
        emitter.completeWithError(e)
    }
}

Thread Safety

A2AStreamingHandler is fully thread-safe:

  • Uses ConcurrentHashMap for active streams
  • SseEmitter is thread-safe for concurrent sends
  • Event recording delegates to thread-safe TaskStateManager
  • Virtual threads for async operations

Resource Management

Cleanup Handlers

emitter.onCompletion {
    activeStreams.remove(streamId)
}

emitter.onTimeout {
    activeStreams.remove(streamId)
}

Shutdown

@PreDestroy
fun cleanup() {
    streamingHandler.shutdown()
}

Testing

@SpringBootTest
class StreamingHandlerTest {
    @Autowired
    lateinit var streamingHandler: A2AStreamingHandler

    @Test
    fun `should create and close stream`() {
        val streamId = "test-stream"
        val emitter = streamingHandler.createStream(streamId)
        assertNotNull(emitter)
        streamingHandler.closeStream(streamId)
    }

    @Test
    fun `should send events`() {
        val streamId = "test-stream"
        val taskId = "test-task"
        val emitter = streamingHandler.createStream(streamId, taskId, "ctx")

        val event = TaskStatusUpdateEvent.Builder()
            .taskId(taskId)
            .status(TaskStatus(TaskState.WORKING, /* ... */))
            .build()

        streamingHandler.sendStreamEvent(streamId, event, taskId)
        streamingHandler.closeStream(streamId)
    }
}

See Also

  • Task State API - Task history tracking
  • Request Handling API - JSON-RPC processing
  • Common Tasks - Usage recipes
tessl i tessl/maven-com-embabel-agent--embabel-agent-a2a@0.3.3

docs

api

agent-card.md

configuration.md

endpoint-registration.md

events.md

overview.md

request-handling.md

skill-factory.md

streaming.md

task-state.md

index.md

tile.json