Jeffrey Hicks

Jeffrey Hicks

Platform Eng @R360

Implementing AG-UI Protocol with GenServer in Phoenix

Perplexity research exploring how GenServer's message-passing architecture aligns perfectly with AG-UI's 16 event types for agent-user interaction protocols

By Agent Hicks • Aug 24, 2025 • perplexity-export

GenServer is exceptionally well-suited for implementing the AG-UI (Agent-User Interaction Protocol) due to its message-passing architecture and stateful event handling capabilities. The protocol’s 16 event types map naturally to GenServer’s message handling patterns.

AG-UI Protocol Overview

AG-UI standardizes agent-to-user communication through 16 structured event types that stream over HTTP/SSE:

Lifecycle Events: RunStarted, RunFinished, RunError, StepStarted, StepFinished Text Message Events: TextMessageStart, TextMessageContent, TextMessageEnd Tool Call Events: ToolCallStart, ToolCallArgs, ToolCallEnd, ToolCallResult State Management Events: StateSnapshot, StateDelta, MessagesSnapshot Special Events: Raw, Custom

GenServer as AG-UI Event Coordinator

Architecture Pattern:

defmodule AgentCoordinator do
  use GenServer
  
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def init(opts) do
    # Subscribe to agent events
    Phoenix.PubSub.subscribe(MyApp.PubSub, "agent_events")
    
    initial_state = %{
      runs: %{},           # Active agent runs
      clients: %{},        # SSE connections
      event_queue: :queue.new()
    }
    
    {:ok, initial_state}
  end
end

Event Processing Architecture

1. Agent Run Management

GenServer maintains state for concurrent agent runs, each identified by runId:

def handle_cast({:start_run, run_id, thread_id}, state) do
  # Create AG-UI RunStarted event
  event = %{
    type: "RunStarted",
    runId: run_id,
    threadId: thread_id,
    timestamp: DateTime.utc_now()
  }
  
  # Broadcast to subscribed clients
  broadcast_event(event, thread_id)
  
  # Track run state
  updated_runs = Map.put(state.runs, run_id, %{
    status: :running,
    thread_id: thread_id,
    started_at: DateTime.utc_now()
  })
  
  {:noreply, %{state | runs: updated_runs}}
end

2. Streaming Text Messages

Handle token streaming through coordinated events:

def handle_info({:agent_text_start, message_id, role}, state) do
  event = %{
    type: "TextMessageStart",
    messageId: message_id,
    role: role
  }
  
  broadcast_event(event, get_thread_id_for_message(message_id))
  {:noreply, state}
end

def handle_info({:agent_text_delta, message_id, delta}, state) do
  event = %{
    type: "TextMessageContent",
    messageId: message_id,
    delta: delta
  }
  
  broadcast_event(event, get_thread_id_for_message(message_id))
  {:noreply, state}
end

3. Tool Call Coordination

Manage tool execution lifecycle:

def handle_cast({:tool_call_start, tool_call_id, name, parent_msg_id}, state) do
  event = %{
    type: "ToolCallStart",
    toolCallId: tool_call_id,
    toolCallName: name,
    parentMessageId: parent_msg_id
  }
  
  broadcast_event(event, get_thread_id_for_tool_call(tool_call_id))
  
  # Track tool call state
  {:noreply, update_tool_call_state(state, tool_call_id, :started)}
end

State Management Integration

State Delta Processing: Handle incremental state updates using JSON Patch operations:

def handle_cast({:state_update, patches}, state) do
  event = %{
    type: "StateDelta", 
    delta: patches  # JSON Patch RFC 6902 operations
  }
  
  broadcast_event(event, :all_threads)
  {:noreply, state}
end

def handle_call({:get_state_snapshot}, _from, state) do
  snapshot = build_current_snapshot(state)
  
  event = %{
    type: "StateSnapshot",
    snapshot: snapshot
  }
  
  broadcast_event(event, :requesting_thread)
  {:reply, snapshot, state}
end

Phoenix Integration Patterns

SSE Controller Implementation:

defmodule MyAppWeb.AgentController do
  use MyAppWeb, :controller

  def sse_stream(conn, %{"thread_id" => thread_id}) do
    conn
    |> put_resp_header("cache-control", "no-cache")
    |> put_resp_header("access-control-allow-origin", "*")
    |> put_resp_content_type("text/event-stream")
    |> send_chunked(200)
    |> register_sse_client(thread_id)
    |> stream_events()
  end

  defp register_sse_client(conn, thread_id) do
    # Register with AgentCoordinator
    AgentCoordinator.register_client(self(), thread_id)
    conn
  end
end

Event Broadcasting:

defp broadcast_event(event, thread_id) do
  Phoenix.PubSub.broadcast(
    MyApp.PubSub,
    "ag_ui:#{thread_id}",
    {:ag_ui_event, event}
  )
end

Concurrent Agent Management

Multiple Agent Coordination: GenServer can orchestrate multiple AI agents through AGENT_HANDOFF events:

def handle_cast({:agent_handoff, from_agent, to_agent, context}, state) do
  # Transition control between agents
  handoff_event = %{
    type: "Custom",
    name: "AGENT_HANDOFF", 
    value: %{
      from: from_agent,
      to: to_agent,
      context: context
    }
  }
  
  broadcast_event(handoff_event, context.thread_id)
  
  # Update agent tracking
  {:noreply, update_active_agent(state, to_agent, context.thread_id)}
end

Error Handling and Recovery

Graceful Error Management:

def handle_info({:agent_error, run_id, error}, state) do
  error_event = %{
    type: "RunError",
    message: error.message,
    code: error.code
  }
  
  broadcast_event(error_event, get_thread_for_run(run_id))
  
  # Clean up run state
  updated_runs = Map.delete(state.runs, run_id)
  {:noreply, %{state | runs: updated_runs}}
end

Performance Considerations

Event Queue Management: For high-throughput scenarios, implement event queuing to prevent GenServer bottlenecks:

def handle_cast({:queue_event, event}, state) do
  new_queue = :queue.in(event, state.event_queue)
  
  # Process queue if not busy
  case :queue.len(new_queue) do
    1 -> send(self(), :process_queue)
    _ -> :ok
  end
  
  {:noreply, %{state | event_queue: new_queue}}
end

def handle_info(:process_queue, state) do
  case :queue.out(state.event_queue) do
    {{:value, event}, remaining_queue} ->
      broadcast_event(event, event.thread_id)
      
      # Continue processing if queue not empty
      unless :queue.is_empty(remaining_queue) do
        send(self(), :process_queue)
      end
      
      {:noreply, %{state | event_queue: remaining_queue}}
    
    {:empty, _} ->
      {:noreply, state}
  end
end

Production Deployment Patterns

Supervision Strategy:

defmodule MyApp.AgentSupervisor do
  use Supervisor

  def start_link(init_arg) do
    Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def init(_init_arg) do
    children = [
      {AgentCoordinator, []},
      {DynamicSupervisor, name: AgentRunnerSupervisor}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

Clustering Considerations: For distributed deployments, use Phoenix PubSub’s distributed capabilities to ensure AG-UI events reach clients across nodes.

Integration Benefits

Why GenServer + AG-UI Works:

  • Message serialization naturally handles concurrent agent operations
  • State isolation prevents cross-contamination between agent runs
  • Supervision trees provide automatic recovery from agent failures
  • PubSub integration enables efficient event distribution to multiple clients
  • Process registry allows direct addressing of specific agent runs

GenServer’s message-driven architecture perfectly complements AG-UI’s event-streaming model, creating a robust foundation for real-time agent-user interactions in Phoenix applications. The pattern provides both the performance characteristics needed for responsive UIs and the fault tolerance required for production agent systems.

References

  1. AG-UI Protocol GitHub Repository
  2. AG-UI Documentation
  3. AG-UI Events Specification
  4. AG-UI Protocol Updates
  5. GenServer Message Queue Management
  6. Phoenix PubSub with SSE
  7. Understanding Elixir GenServer
  8. Sharing State Between GenServers
  9. Elixir Supervisor and Application
  10. GenServer Documentation
  11. Introducing AG-UI Protocol
  12. AG-UI Protocol Bridging Agents
  13. Server-sent Events with Phoenix
  14. Live Financial Dashboard with Phoenix
  15. PubSub Event Listener Patterns