Perplexity research exploring how GenServer's message-passing architecture aligns perfectly with AG-UI's 16 event types for agent-user interaction protocols
GenServer is exceptionally well-suited for implementing the AG-UI (Agent-User Interaction Protocol) due to its message-passing architecture and stateful event handling capabilities. The protocol’s 16 event types map naturally to GenServer’s message handling patterns.
AG-UI standardizes agent-to-user communication through 16 structured event types that stream over HTTP/SSE:
Lifecycle Events: RunStarted
, RunFinished
, RunError
, StepStarted
, StepFinished
Text Message Events: TextMessageStart
, TextMessageContent
, TextMessageEnd
Tool Call Events: ToolCallStart
, ToolCallArgs
, ToolCallEnd
, ToolCallResult
State Management Events: StateSnapshot
, StateDelta
, MessagesSnapshot
Special Events: Raw
, Custom
Architecture Pattern:
defmodule AgentCoordinator do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
# Subscribe to agent events
Phoenix.PubSub.subscribe(MyApp.PubSub, "agent_events")
initial_state = %{
runs: %{}, # Active agent runs
clients: %{}, # SSE connections
event_queue: :queue.new()
}
{:ok, initial_state}
end
end
GenServer maintains state for concurrent agent runs, each identified by runId
:
def handle_cast({:start_run, run_id, thread_id}, state) do
# Create AG-UI RunStarted event
event = %{
type: "RunStarted",
runId: run_id,
threadId: thread_id,
timestamp: DateTime.utc_now()
}
# Broadcast to subscribed clients
broadcast_event(event, thread_id)
# Track run state
updated_runs = Map.put(state.runs, run_id, %{
status: :running,
thread_id: thread_id,
started_at: DateTime.utc_now()
})
{:noreply, %{state | runs: updated_runs}}
end
Handle token streaming through coordinated events:
def handle_info({:agent_text_start, message_id, role}, state) do
event = %{
type: "TextMessageStart",
messageId: message_id,
role: role
}
broadcast_event(event, get_thread_id_for_message(message_id))
{:noreply, state}
end
def handle_info({:agent_text_delta, message_id, delta}, state) do
event = %{
type: "TextMessageContent",
messageId: message_id,
delta: delta
}
broadcast_event(event, get_thread_id_for_message(message_id))
{:noreply, state}
end
Manage tool execution lifecycle:
def handle_cast({:tool_call_start, tool_call_id, name, parent_msg_id}, state) do
event = %{
type: "ToolCallStart",
toolCallId: tool_call_id,
toolCallName: name,
parentMessageId: parent_msg_id
}
broadcast_event(event, get_thread_id_for_tool_call(tool_call_id))
# Track tool call state
{:noreply, update_tool_call_state(state, tool_call_id, :started)}
end
State Delta Processing: Handle incremental state updates using JSON Patch operations:
def handle_cast({:state_update, patches}, state) do
event = %{
type: "StateDelta",
delta: patches # JSON Patch RFC 6902 operations
}
broadcast_event(event, :all_threads)
{:noreply, state}
end
def handle_call({:get_state_snapshot}, _from, state) do
snapshot = build_current_snapshot(state)
event = %{
type: "StateSnapshot",
snapshot: snapshot
}
broadcast_event(event, :requesting_thread)
{:reply, snapshot, state}
end
SSE Controller Implementation:
defmodule MyAppWeb.AgentController do
use MyAppWeb, :controller
def sse_stream(conn, %{"thread_id" => thread_id}) do
conn
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("access-control-allow-origin", "*")
|> put_resp_content_type("text/event-stream")
|> send_chunked(200)
|> register_sse_client(thread_id)
|> stream_events()
end
defp register_sse_client(conn, thread_id) do
# Register with AgentCoordinator
AgentCoordinator.register_client(self(), thread_id)
conn
end
end
Event Broadcasting:
defp broadcast_event(event, thread_id) do
Phoenix.PubSub.broadcast(
MyApp.PubSub,
"ag_ui:#{thread_id}",
{:ag_ui_event, event}
)
end
Multiple Agent Coordination:
GenServer can orchestrate multiple AI agents through AGENT_HANDOFF
events:
def handle_cast({:agent_handoff, from_agent, to_agent, context}, state) do
# Transition control between agents
handoff_event = %{
type: "Custom",
name: "AGENT_HANDOFF",
value: %{
from: from_agent,
to: to_agent,
context: context
}
}
broadcast_event(handoff_event, context.thread_id)
# Update agent tracking
{:noreply, update_active_agent(state, to_agent, context.thread_id)}
end
Graceful Error Management:
def handle_info({:agent_error, run_id, error}, state) do
error_event = %{
type: "RunError",
message: error.message,
code: error.code
}
broadcast_event(error_event, get_thread_for_run(run_id))
# Clean up run state
updated_runs = Map.delete(state.runs, run_id)
{:noreply, %{state | runs: updated_runs}}
end
Event Queue Management: For high-throughput scenarios, implement event queuing to prevent GenServer bottlenecks:
def handle_cast({:queue_event, event}, state) do
new_queue = :queue.in(event, state.event_queue)
# Process queue if not busy
case :queue.len(new_queue) do
1 -> send(self(), :process_queue)
_ -> :ok
end
{:noreply, %{state | event_queue: new_queue}}
end
def handle_info(:process_queue, state) do
case :queue.out(state.event_queue) do
{{:value, event}, remaining_queue} ->
broadcast_event(event, event.thread_id)
# Continue processing if queue not empty
unless :queue.is_empty(remaining_queue) do
send(self(), :process_queue)
end
{:noreply, %{state | event_queue: remaining_queue}}
{:empty, _} ->
{:noreply, state}
end
end
Supervision Strategy:
defmodule MyApp.AgentSupervisor do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def init(_init_arg) do
children = [
{AgentCoordinator, []},
{DynamicSupervisor, name: AgentRunnerSupervisor}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
Clustering Considerations: For distributed deployments, use Phoenix PubSub’s distributed capabilities to ensure AG-UI events reach clients across nodes.
Why GenServer + AG-UI Works:
GenServer’s message-driven architecture perfectly complements AG-UI’s event-streaming model, creating a robust foundation for real-time agent-user interactions in Phoenix applications. The pattern provides both the performance characteristics needed for responsive UIs and the fault tolerance required for production agent systems.