Events
Agentend streams agent execution via the AG-UI protocol — a set of 13 typed events delivered over Server-Sent Events (SSE). These events are compatible with the CopilotKit AG-UI spec, making it straightforward to connect any AG-UI frontend to an Agentend backend.
13 Event Types
Every event extends a base AgentEvent class with a type, timestamp, and optional run_id.
| Event Type | Class | Description |
|---|---|---|
| run_started | RunStarted | Emitted when a run begins. Includes user_id, session_id, and input text. |
| text_message_start | TextMessageStart | Agent begins generating text output. |
| text_message_content | TextMessageContent | Streamed text content. Supports delta mode for token-by-token streaming. |
| text_message_end | TextMessageEnd | Agent finishes text generation. Includes stop_reason. |
| tool_call_start | ToolCallStart | Agent begins calling a tool. Includes tool_name and tool_use_id. |
| tool_call_args | ToolCallArgs | Tool arguments being streamed. Supports delta mode. |
| tool_call_end | ToolCallEnd | Tool call completes with result. Includes is_error flag. |
| state_snapshot | StateSnapshot | Full state snapshot including agent state and memory context. |
| state_delta | StateDelta | Incremental state change. Includes JSON path, value, and operation (set, delete, append). |
| thinking_step | ThinkingStep | Internal reasoning. Types: planning, reasoning, reflection. |
| interrupt | Interrupt | Agent pauses for human input. Actions: approve, select, input. |
| run_finished | RunFinished | Run completes successfully. Includes result, messages_sent, tools_used. |
| run_error | RunError | Run encounters an error. Includes error_type, message, and recoverable flag. |
Event structure
Every event is a Python dataclass that serializes to JSON. The base event contains the type, a UTC timestamp, and an optional run ID:
# Python
@dataclass
class AgentEvent:
type: EventType
timestamp: str # ISO 8601 UTC
run_id: Optional[str] = None
def to_json(self) -> str: ...
def to_dict(self) -> Dict[str, Any]: ...On the wire, each SSE message looks like:
data: {"type": "text_message_content", "content": "Hello", "delta": true, "timestamp": "2026-03-23T12:00:00.000Z", "run_id": "run_abc123"}
data: {"type": "tool_call_start", "tool_name": "get_vendor_info", "tool_use_id": "tu_456", "timestamp": "2026-03-23T12:00:01.000Z", "run_id": "run_abc123"}SSE streaming
Events are delivered via Server-Sent Events on the GET /stream/{session_id} endpoint. The connection stays open for the duration of the agent run. The client receives events in real time as the agent processes — text tokens stream as they are generated, tool calls appear as they are invoked.
Consuming events
Connect to the SSE stream from any language. Here is a TypeScript example using the standard EventSource API:
// TypeScript
const eventSource = new EventSource(
`http://localhost:8000/stream/${sessionId}`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "text_message_content":
// Append streaming text
appendText(data.content);
break;
case "tool_call_start":
// Show tool invocation
showToolCall(data.tool_name);
break;
case "state_snapshot":
// Update UI state
updateState(data.state);
break;
case "interrupt":
// Show approval dialog
showApprovalDialog(data.reason, data.options);
break;
case "run_finished":
eventSource.close();
break;
case "run_error":
showError(data.message);
eventSource.close();
break;
}
};And a Python example for server-side consumption:
# Python
import httpx
async with httpx.AsyncClient() as client:
async with client.stream(
"GET", f"http://localhost:8000/stream/{session_id}"
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
event = json.loads(line[6:])
print(f"[{event['type']}] {event}")