Events

Agentend streams agent execution via the AG-UI protocol — a set of 13 typed events delivered over Server-Sent Events (SSE). These events are compatible with the CopilotKit AG-UI spec, making it straightforward to connect any AG-UI frontend to an Agentend backend.

13 Event Types

Every event extends a base AgentEvent class with a type, timestamp, and optional run_id.

Event TypeClassDescription
run_startedRunStartedEmitted when a run begins. Includes user_id, session_id, and input text.
text_message_startTextMessageStartAgent begins generating text output.
text_message_contentTextMessageContentStreamed text content. Supports delta mode for token-by-token streaming.
text_message_endTextMessageEndAgent finishes text generation. Includes stop_reason.
tool_call_startToolCallStartAgent begins calling a tool. Includes tool_name and tool_use_id.
tool_call_argsToolCallArgsTool arguments being streamed. Supports delta mode.
tool_call_endToolCallEndTool call completes with result. Includes is_error flag.
state_snapshotStateSnapshotFull state snapshot including agent state and memory context.
state_deltaStateDeltaIncremental state change. Includes JSON path, value, and operation (set, delete, append).
thinking_stepThinkingStepInternal reasoning. Types: planning, reasoning, reflection.
interruptInterruptAgent pauses for human input. Actions: approve, select, input.
run_finishedRunFinishedRun completes successfully. Includes result, messages_sent, tools_used.
run_errorRunErrorRun encounters an error. Includes error_type, message, and recoverable flag.

Event structure

Every event is a Python dataclass that serializes to JSON. The base event contains the type, a UTC timestamp, and an optional run ID:

# Python
@dataclass
class AgentEvent:
    type: EventType
    timestamp: str  # ISO 8601 UTC
    run_id: Optional[str] = None

    def to_json(self) -> str: ...
    def to_dict(self) -> Dict[str, Any]: ...

On the wire, each SSE message looks like:

data: {"type": "text_message_content", "content": "Hello", "delta": true, "timestamp": "2026-03-23T12:00:00.000Z", "run_id": "run_abc123"}

data: {"type": "tool_call_start", "tool_name": "get_vendor_info", "tool_use_id": "tu_456", "timestamp": "2026-03-23T12:00:01.000Z", "run_id": "run_abc123"}

SSE streaming

Events are delivered via Server-Sent Events on the GET /stream/{session_id} endpoint. The connection stays open for the duration of the agent run. The client receives events in real time as the agent processes — text tokens stream as they are generated, tool calls appear as they are invoked.

Consuming events

Connect to the SSE stream from any language. Here is a TypeScript example using the standard EventSource API:

// TypeScript
const eventSource = new EventSource(
  `http://localhost:8000/stream/${sessionId}`
);

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);

  switch (data.type) {
    case "text_message_content":
      // Append streaming text
      appendText(data.content);
      break;

    case "tool_call_start":
      // Show tool invocation
      showToolCall(data.tool_name);
      break;

    case "state_snapshot":
      // Update UI state
      updateState(data.state);
      break;

    case "interrupt":
      // Show approval dialog
      showApprovalDialog(data.reason, data.options);
      break;

    case "run_finished":
      eventSource.close();
      break;

    case "run_error":
      showError(data.message);
      eventSource.close();
      break;
  }
};

And a Python example for server-side consumption:

# Python
import httpx

async with httpx.AsyncClient() as client:
    async with client.stream(
        "GET", f"http://localhost:8000/stream/{session_id}"
    ) as response:
        async for line in response.aiter_lines():
            if line.startswith("data: "):
                event = json.loads(line[6:])
                print(f"[{event['type']}] {event}")