Andy Peatling

Building a Real-time Agentic Server with REST+SSE

12 min read

Using a traditional REST API works well for many applications, but they have serious limitations when it comes to AI agents. Responses from large language models take time to generate, often several seconds. If you request a response and wait synchronously, your users are left staring at loading indicators.

WebSockets are an option, but they’re more complex to implement and they maintain bidirectional connections that are typically overkill for agents.

What we really need is a way to stream partial results from the server to the client in real-time, while keeping the overall architecture clean and simple. This is where Server-Sent Events come in!

The REST+SSE Hybrid Pattern

The approach I’ve found most effective combines REST endpoints for commands and queries with SSE for streaming responses. Here’s how it works.

Your server exposes standard REST endpoints that clients use to send queries and report tool results. Instead of waiting for complete responses, these endpoints immediately return connection IDs. Clients then open an SSE connection to receive streamed updates for their specific conversation.

This approach has a few advantages. It allows for real-time streaming of partial responses, making the agent feel responsive. It also separates the concerns of command handling and response streaming.

Core Components of the Architecture

At the heart of this architecture are several key components working together.

The Agent Server acts as the central coordinator, managing conversation state and orchestrating communication between the client, the language model, and tools. It processes incoming queries, manages agent instances, and coordinates the streaming of responses.

Here’s how the interface for an agent server might look:

export type AgentServer = {
  processQuery: (
    conversationId: string,
    query: string,
    context: Record<string, unknown>,
    signal?: AbortSignal
  ) => Promise<string>;
  registerStream: (conversationId: string, connection: SSEConnection) => void;
  removeStream: (conversationId: string) => void;
  getActiveConversationsCount: () => number;
  getContextUsage: (conversationId: string) => Promise<TokenUsage>;
  cancelRequest: (conversationId: string) => void;
};
TypeScript

A Stream Manager handles all the SSE connections, maintaining a registry of active streams indexed by conversation ID. It provides methods to send various event types to clients, including messages, incremental updates, and tool requests.

Here’s a simplified implementation of a stream manager I’ve used in recent projects:

export const createStreamManager = (): StreamManager => {
  // Internal state
  const state: StreamState = {
    connections: new Map(),
    messageContents: new Map(),
  };

  // Helper function to get connection
  const getConnection = (connectionId: string): SSEConnection | undefined =>
    state.connections.get(connectionId);

  const registerConnection = (connection: SSEConnection): void => {
    state.connections.set(connection.id, connection);
  };

  const removeConnection = (id: string): void => {
    state.connections.delete(id);
  };

  const sendMessage = (connectionId: string, message: Message): boolean => {
    const connection = getConnection(connectionId);
    if (!connection) return false;

    // Track content for text messages
    if (message.type === 'text' && message.content) {
      state.messageContents.set(message.id, message.content);
    }

    connection.send('message', message);
    return true;
  };

  const updateMessage = (
    connectionId: string,
    messageId: string,
    updates: Partial<Message>
  ): boolean => {
    const connection = getConnection(connectionId);
    if (!connection) return false;

    // Handle content updates
    if (updates.content) {
      const existingContent = state.messageContents.get(messageId) || '';

      // Only send update if content has changed
      if (updates.content !== existingContent) {
        // Calculate the delta (new content that was added)
        let delta = '';
        if (updates.content.startsWith(existingContent)) {
          delta = updates.content.slice(existingContent.length);
        } else {
          // If content doesn't build on previous content, send full content
          delta = updates.content;
        }

        // Update stored content
        state.messageContents.set(messageId, updates.content);

        // Send delta in the update
        connection.send('update_message', {
          id: messageId,
          updates: {
            content: updates.content, // Full updated content
            delta: delta, // Just the new part
            deltaType: 'append', // Indicate this is an append operation
          },
        });
      }
    } else {
      connection.send('update_message', {
        id: messageId,
        updates,
      });
    }

    return true;
  };

  const sendToolRequest = (connectionId: string, toolRequest: ToolRequest): boolean => {
    const connection = getConnection(connectionId);
    if (!connection) return false;

    connection.send('tool_request', toolRequest);
    return true;
  };

  const sendError = (connectionId: string, error: string): boolean => {
    const connection = getConnection(connectionId);
    if (!connection) return false;

    connection.send('error', { error });
    return true;
  };

  const sendComplete = (connectionId: string): boolean => {
    const connection = getConnection(connectionId);
    if (!connection) return false;

    // Clean up message tracking
    state.messageContents.clear();

    connection.send('complete', {});
    connection.close();
    removeConnection(connectionId);
    return true;
  };

  const getConnectionCount = (): number => state.connections.size;

  return {
    registerConnection,
    removeConnection,
    sendMessage,
    updateMessage,
    sendToolRequest,
    sendError,
    sendComplete,
    getConnectionCount,
  };
};
TypeScript

The Conversation Service tracks active conversations and their associated agents, implementing lifecycle management for conversation resources. It handles cleanup of inactive conversations and maintains the mapping between conversation IDs and agent instances:

export const createConversationService = (): ConversationService => {
  // Map of conversation ID to conversation data
  const conversations: Map<string, Conversation> = new Map();

  const addConversation = (id: string, agent: Agent, provider: Provider): void => {
    const now = new Date();
    conversations.set(id, {
      id,
      agent,
      provider,
      createdAt: now,
      lastActivity: now,
      isActive: true,
    });
  };

  const getConversation = (id: string): Conversation | undefined => {
    return conversations.get(id);
  };

  const updateActivity = (id: string): boolean => {
    const conversation = conversations.get(id);
    if (!conversation) return false;

    conversation.lastActivity = new Date();
    return true;
  };

  const cleanupInactiveConversations = (maxAgeMs: number): void => {
    const now = new Date();
    for (const [id, conversation] of conversations.entries()) {
      const age = now.getTime() - conversation.lastActivity.getTime();
      if (age > maxAgeMs || !conversation.isActive) {
        conversations.delete(id);
      }
    }
  };

  // Other methods...

  return {
    addConversation,
    getConversation,
    updateActivity,
    cleanupInactiveConversations,
  };
};
TypeScript

To tie everything together, your API layer exposes REST endpoints that clients interact with, translating HTTP requests into agent operations and managing SSE connections. This layer is typically framework-specific (e.g., Express for Node.js applications).

Here’s a simplified snippet of an Express based adapter I’ve used to handle REST endpoints. This should be very familiar if you’ve built API endpoints before:

  // Enhanced logging for query endpoint
  app.post<{}, ApiResponse<{ conversationId: string }>, QueryRequest>(
    '/api/query',
    async (req, res) => {
      console.log(
        `[ExpressAdapter] Received POST /api/query for conversation: ${
          req.body.conversationId || 'new'
        }`
      );
      try {
        // Use the query handler to process the request
        const result = await queryHandler.handleQuery(req.body, agentServer);

        // Return success response
        console.log(
          `[ExpressAdapter] Successfully processed POST /api/query for conversation: ${result.conversationId}`
        );
        res.json({
          success: true,
          data: result,
        });
      } catch (error) {
        const errorMessage = error instanceof Error ? error.message : String(error);
        console.error(`[ExpressAdapter] Error processing POST /api/query: ${errorMessage}`, error);
        res.status(500).json({
          success: false,
          error: errorMessage,
        });
      }
    }
  );
TypeScript

The Lifecycle

It’s easier to picture how this all fits together if we look at a typical lifecycle flow.

A user sends a query through your application’s interface. The client application makes a POST request to your /api/query endpoint, including the query text and potentially a conversation ID if continuing an existing conversation.

The server immediately creates or retrieves the appropriate agent instance for this conversation and returns a response with the conversation ID. The client uses this ID to establish an SSE connection by connecting to /api/stream?conversationId=123.

Behind the scenes, the server starts processing the query asynchronously. Here’s how that looks in our implementation:

const processQuery = async (
  conversationId: string,
  query: string,
  context: Record<string, unknown>,
  signal?: AbortSignal
): Promise<string> => {
  log('info', `Processing query for conversation: ${conversationId}`);
  try {
    // Create a new abort controller if not provided
    const abortController = new AbortController();
    // Link the provided signal if any
    if (signal) {
      signal.addEventListener('abort', () => {
        abortController.abort();
      });
    }

    // Store the abort controller
    abortControllers.set(conversationId, abortController);

    // Get or create the agent
    const agent = await getOrCreateAgent(conversationId);

    // Add user message to the conversation history
    streamManager.sendMessage(conversationId, {
      id: uuidv4(),
      role: 'user',
      type: 'text',
      content: query,
      createdAt: new Date().toISOString(),
    });

    // Start streaming process in background
    void streamAgentResponse(conversationId, agent, query, abortController.signal);

    log('info', `Successfully initiated streaming for conversation: ${conversationId}`);
    return conversationId;
  } catch (error) {
    const errorMessage = error instanceof Error ? error.message : String(error);
    log('error', `Error in processQuery: ${errorMessage}`, error);
    streamManager.sendError(conversationId, errorMessage);
    throw error;
  }
};
TypeScript

As the agent generates its response, the server streams messages to the client through the SSE connection. These might include partial text updates as they become available, making the experience feel responsive.

const streamAgentResponse = async (
  conversationId: string,
  agent: AgentWithToolResult,
  query: string,
  signal: AbortSignal
): Promise<void> => {
  try {
    // Get agent's message generator
    const messageIterator = agent.processQuery(query, signal);

    // Create initial placeholder message for the assistant
    const assistantMessageId = uuidv4();
    streamManager.sendMessage(conversationId, {
      id: assistantMessageId,
      role: 'assistant',
      type: 'text',
      content: '',
      createdAt: new Date().toISOString(),
    });

    let currentAssistantMessageId = assistantMessageId;
    let hasExecutedTool = false;

    // Stream each yielded message from the agent
    for await (const message of messageIterator) {
      // Update conversation activity
      conversationService.updateActivity(conversationId);

      if (message.type === 'text' && message.role === 'assistant') {
        // If we've already executed a tool, create a new assistant message
        if (hasExecutedTool) {
          currentAssistantMessageId = uuidv4();
          streamManager.sendMessage(conversationId, {
            id: currentAssistantMessageId,
            role: 'assistant',
            type: 'text',
            content: message.content || '',
            createdAt: new Date().toISOString(),
          });
          hasExecutedTool = false; // Reset for potential future tool calls
        } else {
          // Update existing message with content
          streamManager.updateMessage(conversationId, currentAssistantMessageId, {
            content: message.content || '',
          });
        }
      } else if (message.type === 'tool_request' && message.tool) {
        // Handle tool request here, explained below.
      }
    }

    streamManager.sendComplete(conversationId);
  } catch (error) {
    const errorMsg = error instanceof Error ? error.message : String(error);
    streamManager.sendError(conversationId, errorMsg);
  }
};
TypeScript

When the agent needs to use a tool (like searching the web or checking the weather), it sends a tool request event through the SSE connection. You could either use this to execute tools on the client side, or simply to display the details of the tool request to the user. Here’s an example where we’re using it to display a “human readable” version of the tool call:

// Inside the streamAgentResponse loop
if (message.type === 'tool_request' && message.tool) {
  const requestId = uuidv4();

  // Get human-readable description of the tool call
  let humanReadableDescription = '';
  try {
    humanReadableDescription = await getHumanReadableToolCall(
      message.tool,
      message.parameters || {}
    );
  } catch (descError) {
    // Fallback description if generation fails
    humanReadableDescription = `Using ${message.tool}`;
  }

  // Track pending tool requests
  pendingToolRequests.set(requestId, {
    conversationId,
    tool: message.tool,
  });

  // Send tool request message to client
  streamManager.sendMessage(conversationId, {
    id: requestId,
    role: 'assistant',
    type: 'tool_request',
    tool: message.tool,
    parameters: message.parameters || {},
    content: humanReadableDescription,
    createdAt: new Date().toISOString(),
  });

  // Your code for implementing tool execution would go here
  // this could be via an MCP client, or handoff to the client side.
}
TypeScript

When the tool has been executed the results are passed to a tool result handler to be managed and set on to the agent for processing and continuation:

const handleToolResult = async (
  conversationId: string,
  requestId: string,
  result?: unknown,
  error?: string
): Promise<boolean> => {
  try {
    // Get the associated conversation
    const conversation = conversationService.getConversation(conversationId);
    if (!conversation) {
      return false;
    }

    // Update activity timestamp
    conversationService.updateActivity(conversationId);

    // Remove from pending requests
    pendingToolRequests.delete(requestId);

    // Get the agent with tool result handling capability
    const agent = conversation.agent as AgentWithToolResult;
    if (!agent.handleToolResult) {
      return false;
    }

    // Send tool response message to the client
    streamManager.sendMessage(conversationId, {
      id: uuidv4(),
      role: 'tool',
      type: 'tool_response',
      content: typeof result === 'string' ? result : JSON.stringify(result),
      error: error,
      createdAt: new Date().toISOString(),
    });

    // Forward the result to the agent for processing
    return await agent.handleToolResult(requestId, result, error);
  } catch (error) {
    const errorMsg = error instanceof Error ? error.message : String(error);
    log('error', `Error in handleToolResult: ${errorMsg}`);
    return false;
  }
};
TypeScript

The agent then continues, streaming more updates until the response is complete. This back-and-forth can happen multiple times during a single interaction.

Implementing the SSE Connection

The actual SSE connection handling depends on your framework of choice, but here’s a simplified example using Express:

// When a client connects to the SSE endpoint
app.get('/api/stream', (req, res) => {
  const conversationId = req.query.conversationId as string;
  if (!conversationId) {
    return res.status(400).json({ error: 'Conversation ID required' });
  }

  // Set up SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  // Create a unique ID for this connection
  const connectionId = uuidv4();

  // Create an SSE connection object
  const connection: SSEConnection = {
    id: connectionId,
    send: (event, data) => {
      res.write(`event: ${event}\n`);
      res.write(`data: ${JSON.stringify(data)}\n\n`);
    },
    close: () => {
      res.end();
    },
  };

  // Register the connection with the agent server
  agentServer.registerStream(conversationId, connection);

  // Handle client disconnect
  req.on('close', () => {
    agentServer.removeStream(conversationId);
  });
});
TypeScript

Error Handling and Recovery

Any real-time system needs robust error handling. For SSE connections, we need to handle disconnections gracefully.

When a client disconnects, the stream manager should remove the connection from its registry. Clients should also implement reconnection logic with exponential backoff in case of network issues.

For tool execution, we need to handle timeouts and failures. If a tool execution fails or takes too long, the agent should be able to continue with partial information or gracefully explain the issue to the user.

Server-side errors during processing should be captured and sent to the client as error events, allowing the client application to display appropriate messages or retry mechanisms.

Optimizing for Performance

To keep your agentic server responsive under load, consider a few performance optimizations.

Implement connection pooling for any database or external service connections. Use efficient data structures for tracking conversations and connections. Consider implementing rate limiting and timeouts to prevent resource exhaustion.

For memory management, implement a cleanup strategy for inactive conversations. A simple cleanup of inactive conversations in the conversation service is a good start:

const cleanupInactiveConversations = (maxAgeMs: number): void => {
  const now = new Date();
  for (const [id, conversation] of conversations.entries()) {
    const age = now.getTime() - conversation.lastActivity.getTime();
    if (age > maxAgeMs || !conversation.isActive) {
      conversations.delete(id);
    }
  }
};
TypeScript

Use streaming parsers for JSON to avoid holding large response objects in memory. And consider implementing batching for message updates to reduce the frequency of SSE events when updates are coming rapidly.

Security Considerations

Security is critical for any API, but especially for agentic servers that might be executing tools on behalf of users.

Implement proper authentication and authorization for all endpoints. Validate conversation IDs to ensure users can only access their own conversations. Sanitize all inputs, particularly tool parameters that might be executed in sensitive contexts.

Implement rate limiting to prevent abuse, and consider adding an audit log for tool executions to track activity. Use CORS appropriately to control which domains can connect to your SSE endpoints.

In short…

The REST+SSE hybrid pattern provides an elegant solution for building real-time agentic servers. It combines the simplicity and scalability of REST with the streaming capabilities of SSE, resulting in a responsive user experience without the complexity of bidirectional protocols like WebSockets.


© 2025 Andy Peatling