Building a Real-time Agentic Server with REST+SSE

Using a traditional REST API works well for many applications, but they have serious limitations when it comes to AI agents. Responses from large language models take time to generate, often several seconds. If you request a response and wait synchronously, your users are left staring at loading indicators.
WebSockets are an option, but they’re more complex to implement and they maintain bidirectional connections that are typically overkill for agents.
What we really need is a way to stream partial results from the server to the client in real-time, while keeping the overall architecture clean and simple. This is where Server-Sent Events come in!
The REST+SSE Hybrid Pattern
The approach I’ve found most effective combines REST endpoints for commands and queries with SSE for streaming responses. Here’s how it works.
Your server exposes standard REST endpoints that clients use to send queries and report tool results. Instead of waiting for complete responses, these endpoints immediately return connection IDs. Clients then open an SSE connection to receive streamed updates for their specific conversation.
This approach has a few advantages. It allows for real-time streaming of partial responses, making the agent feel responsive. It also separates the concerns of command handling and response streaming.
Core Components of the Architecture
At the heart of this architecture are several key components working together.
The Agent Server
acts as the central coordinator, managing conversation state and orchestrating communication between the client, the language model, and tools. It processes incoming queries, manages agent instances, and coordinates the streaming of responses.
Here’s how the interface for an agent server might look:
export type AgentServer = {
processQuery: (
conversationId: string,
query: string,
context: Record<string, unknown>,
signal?: AbortSignal
) => Promise<string>;
registerStream: (conversationId: string, connection: SSEConnection) => void;
removeStream: (conversationId: string) => void;
getActiveConversationsCount: () => number;
getContextUsage: (conversationId: string) => Promise<TokenUsage>;
cancelRequest: (conversationId: string) => void;
};
TypeScriptA Stream Manager
handles all the SSE connections, maintaining a registry of active streams indexed by conversation ID. It provides methods to send various event types to clients, including messages, incremental updates, and tool requests.
Here’s a simplified implementation of a stream manager I’ve used in recent projects:
export const createStreamManager = (): StreamManager => {
// Internal state
const state: StreamState = {
connections: new Map(),
messageContents: new Map(),
};
// Helper function to get connection
const getConnection = (connectionId: string): SSEConnection | undefined =>
state.connections.get(connectionId);
const registerConnection = (connection: SSEConnection): void => {
state.connections.set(connection.id, connection);
};
const removeConnection = (id: string): void => {
state.connections.delete(id);
};
const sendMessage = (connectionId: string, message: Message): boolean => {
const connection = getConnection(connectionId);
if (!connection) return false;
// Track content for text messages
if (message.type === 'text' && message.content) {
state.messageContents.set(message.id, message.content);
}
connection.send('message', message);
return true;
};
const updateMessage = (
connectionId: string,
messageId: string,
updates: Partial<Message>
): boolean => {
const connection = getConnection(connectionId);
if (!connection) return false;
// Handle content updates
if (updates.content) {
const existingContent = state.messageContents.get(messageId) || '';
// Only send update if content has changed
if (updates.content !== existingContent) {
// Calculate the delta (new content that was added)
let delta = '';
if (updates.content.startsWith(existingContent)) {
delta = updates.content.slice(existingContent.length);
} else {
// If content doesn't build on previous content, send full content
delta = updates.content;
}
// Update stored content
state.messageContents.set(messageId, updates.content);
// Send delta in the update
connection.send('update_message', {
id: messageId,
updates: {
content: updates.content, // Full updated content
delta: delta, // Just the new part
deltaType: 'append', // Indicate this is an append operation
},
});
}
} else {
connection.send('update_message', {
id: messageId,
updates,
});
}
return true;
};
const sendToolRequest = (connectionId: string, toolRequest: ToolRequest): boolean => {
const connection = getConnection(connectionId);
if (!connection) return false;
connection.send('tool_request', toolRequest);
return true;
};
const sendError = (connectionId: string, error: string): boolean => {
const connection = getConnection(connectionId);
if (!connection) return false;
connection.send('error', { error });
return true;
};
const sendComplete = (connectionId: string): boolean => {
const connection = getConnection(connectionId);
if (!connection) return false;
// Clean up message tracking
state.messageContents.clear();
connection.send('complete', {});
connection.close();
removeConnection(connectionId);
return true;
};
const getConnectionCount = (): number => state.connections.size;
return {
registerConnection,
removeConnection,
sendMessage,
updateMessage,
sendToolRequest,
sendError,
sendComplete,
getConnectionCount,
};
};
TypeScriptThe Conversation Service
tracks active conversations and their associated agents, implementing lifecycle management for conversation resources. It handles cleanup of inactive conversations and maintains the mapping between conversation IDs and agent instances:
export const createConversationService = (): ConversationService => {
// Map of conversation ID to conversation data
const conversations: Map<string, Conversation> = new Map();
const addConversation = (id: string, agent: Agent, provider: Provider): void => {
const now = new Date();
conversations.set(id, {
id,
agent,
provider,
createdAt: now,
lastActivity: now,
isActive: true,
});
};
const getConversation = (id: string): Conversation | undefined => {
return conversations.get(id);
};
const updateActivity = (id: string): boolean => {
const conversation = conversations.get(id);
if (!conversation) return false;
conversation.lastActivity = new Date();
return true;
};
const cleanupInactiveConversations = (maxAgeMs: number): void => {
const now = new Date();
for (const [id, conversation] of conversations.entries()) {
const age = now.getTime() - conversation.lastActivity.getTime();
if (age > maxAgeMs || !conversation.isActive) {
conversations.delete(id);
}
}
};
// Other methods...
return {
addConversation,
getConversation,
updateActivity,
cleanupInactiveConversations,
};
};
TypeScriptTo tie everything together, your API layer
exposes REST endpoints that clients interact with, translating HTTP requests into agent operations and managing SSE connections. This layer is typically framework-specific (e.g., Express for Node.js applications).
Here’s a simplified snippet of an Express based adapter I’ve used to handle REST endpoints. This should be very familiar if you’ve built API endpoints before:
// Enhanced logging for query endpoint
app.post<{}, ApiResponse<{ conversationId: string }>, QueryRequest>(
'/api/query',
async (req, res) => {
console.log(
`[ExpressAdapter] Received POST /api/query for conversation: ${
req.body.conversationId || 'new'
}`
);
try {
// Use the query handler to process the request
const result = await queryHandler.handleQuery(req.body, agentServer);
// Return success response
console.log(
`[ExpressAdapter] Successfully processed POST /api/query for conversation: ${result.conversationId}`
);
res.json({
success: true,
data: result,
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
console.error(`[ExpressAdapter] Error processing POST /api/query: ${errorMessage}`, error);
res.status(500).json({
success: false,
error: errorMessage,
});
}
}
);
TypeScriptThe Lifecycle
It’s easier to picture how this all fits together if we look at a typical lifecycle flow.
A user sends a query through your application’s interface. The client application makes a POST request to your /api/query
endpoint, including the query text and potentially a conversation ID if continuing an existing conversation.
The server immediately creates or retrieves the appropriate agent instance for this conversation and returns a response with the conversation ID. The client uses this ID to establish an SSE connection by connecting to /api/stream?conversationId=123
.
Behind the scenes, the server starts processing the query asynchronously. Here’s how that looks in our implementation:
const processQuery = async (
conversationId: string,
query: string,
context: Record<string, unknown>,
signal?: AbortSignal
): Promise<string> => {
log('info', `Processing query for conversation: ${conversationId}`);
try {
// Create a new abort controller if not provided
const abortController = new AbortController();
// Link the provided signal if any
if (signal) {
signal.addEventListener('abort', () => {
abortController.abort();
});
}
// Store the abort controller
abortControllers.set(conversationId, abortController);
// Get or create the agent
const agent = await getOrCreateAgent(conversationId);
// Add user message to the conversation history
streamManager.sendMessage(conversationId, {
id: uuidv4(),
role: 'user',
type: 'text',
content: query,
createdAt: new Date().toISOString(),
});
// Start streaming process in background
void streamAgentResponse(conversationId, agent, query, abortController.signal);
log('info', `Successfully initiated streaming for conversation: ${conversationId}`);
return conversationId;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
log('error', `Error in processQuery: ${errorMessage}`, error);
streamManager.sendError(conversationId, errorMessage);
throw error;
}
};
TypeScriptAs the agent generates its response, the server streams messages to the client through the SSE connection. These might include partial text updates as they become available, making the experience feel responsive.
const streamAgentResponse = async (
conversationId: string,
agent: AgentWithToolResult,
query: string,
signal: AbortSignal
): Promise<void> => {
try {
// Get agent's message generator
const messageIterator = agent.processQuery(query, signal);
// Create initial placeholder message for the assistant
const assistantMessageId = uuidv4();
streamManager.sendMessage(conversationId, {
id: assistantMessageId,
role: 'assistant',
type: 'text',
content: '',
createdAt: new Date().toISOString(),
});
let currentAssistantMessageId = assistantMessageId;
let hasExecutedTool = false;
// Stream each yielded message from the agent
for await (const message of messageIterator) {
// Update conversation activity
conversationService.updateActivity(conversationId);
if (message.type === 'text' && message.role === 'assistant') {
// If we've already executed a tool, create a new assistant message
if (hasExecutedTool) {
currentAssistantMessageId = uuidv4();
streamManager.sendMessage(conversationId, {
id: currentAssistantMessageId,
role: 'assistant',
type: 'text',
content: message.content || '',
createdAt: new Date().toISOString(),
});
hasExecutedTool = false; // Reset for potential future tool calls
} else {
// Update existing message with content
streamManager.updateMessage(conversationId, currentAssistantMessageId, {
content: message.content || '',
});
}
} else if (message.type === 'tool_request' && message.tool) {
// Handle tool request here, explained below.
}
}
streamManager.sendComplete(conversationId);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
streamManager.sendError(conversationId, errorMsg);
}
};
TypeScriptWhen the agent needs to use a tool (like searching the web or checking the weather), it sends a tool request event through the SSE connection. You could either use this to execute tools on the client side, or simply to display the details of the tool request to the user. Here’s an example where we’re using it to display a “human readable” version of the tool call:
// Inside the streamAgentResponse loop
if (message.type === 'tool_request' && message.tool) {
const requestId = uuidv4();
// Get human-readable description of the tool call
let humanReadableDescription = '';
try {
humanReadableDescription = await getHumanReadableToolCall(
message.tool,
message.parameters || {}
);
} catch (descError) {
// Fallback description if generation fails
humanReadableDescription = `Using ${message.tool}`;
}
// Track pending tool requests
pendingToolRequests.set(requestId, {
conversationId,
tool: message.tool,
});
// Send tool request message to client
streamManager.sendMessage(conversationId, {
id: requestId,
role: 'assistant',
type: 'tool_request',
tool: message.tool,
parameters: message.parameters || {},
content: humanReadableDescription,
createdAt: new Date().toISOString(),
});
// Your code for implementing tool execution would go here
// this could be via an MCP client, or handoff to the client side.
}
TypeScriptWhen the tool has been executed the results are passed to a tool result handler to be managed and set on to the agent for processing and continuation:
const handleToolResult = async (
conversationId: string,
requestId: string,
result?: unknown,
error?: string
): Promise<boolean> => {
try {
// Get the associated conversation
const conversation = conversationService.getConversation(conversationId);
if (!conversation) {
return false;
}
// Update activity timestamp
conversationService.updateActivity(conversationId);
// Remove from pending requests
pendingToolRequests.delete(requestId);
// Get the agent with tool result handling capability
const agent = conversation.agent as AgentWithToolResult;
if (!agent.handleToolResult) {
return false;
}
// Send tool response message to the client
streamManager.sendMessage(conversationId, {
id: uuidv4(),
role: 'tool',
type: 'tool_response',
content: typeof result === 'string' ? result : JSON.stringify(result),
error: error,
createdAt: new Date().toISOString(),
});
// Forward the result to the agent for processing
return await agent.handleToolResult(requestId, result, error);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
log('error', `Error in handleToolResult: ${errorMsg}`);
return false;
}
};
TypeScriptThe agent then continues, streaming more updates until the response is complete. This back-and-forth can happen multiple times during a single interaction.
Implementing the SSE Connection
The actual SSE connection handling depends on your framework of choice, but here’s a simplified example using Express:
// When a client connects to the SSE endpoint
app.get('/api/stream', (req, res) => {
const conversationId = req.query.conversationId as string;
if (!conversationId) {
return res.status(400).json({ error: 'Conversation ID required' });
}
// Set up SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Create a unique ID for this connection
const connectionId = uuidv4();
// Create an SSE connection object
const connection: SSEConnection = {
id: connectionId,
send: (event, data) => {
res.write(`event: ${event}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
},
close: () => {
res.end();
},
};
// Register the connection with the agent server
agentServer.registerStream(conversationId, connection);
// Handle client disconnect
req.on('close', () => {
agentServer.removeStream(conversationId);
});
});
TypeScriptError Handling and Recovery
Any real-time system needs robust error handling. For SSE connections, we need to handle disconnections gracefully.
When a client disconnects, the stream manager should remove the connection from its registry. Clients should also implement reconnection logic with exponential backoff in case of network issues.
For tool execution, we need to handle timeouts and failures. If a tool execution fails or takes too long, the agent should be able to continue with partial information or gracefully explain the issue to the user.
Server-side errors during processing should be captured and sent to the client as error events, allowing the client application to display appropriate messages or retry mechanisms.
Optimizing for Performance
To keep your agentic server responsive under load, consider a few performance optimizations.
Implement connection pooling for any database or external service connections. Use efficient data structures for tracking conversations and connections. Consider implementing rate limiting and timeouts to prevent resource exhaustion.
For memory management, implement a cleanup strategy for inactive conversations. A simple cleanup of inactive conversations in the conversation service is a good start:
const cleanupInactiveConversations = (maxAgeMs: number): void => {
const now = new Date();
for (const [id, conversation] of conversations.entries()) {
const age = now.getTime() - conversation.lastActivity.getTime();
if (age > maxAgeMs || !conversation.isActive) {
conversations.delete(id);
}
}
};
TypeScriptUse streaming parsers for JSON to avoid holding large response objects in memory. And consider implementing batching for message updates to reduce the frequency of SSE events when updates are coming rapidly.
Security Considerations
Security is critical for any API, but especially for agentic servers that might be executing tools on behalf of users.
Implement proper authentication and authorization for all endpoints. Validate conversation IDs to ensure users can only access their own conversations. Sanitize all inputs, particularly tool parameters that might be executed in sensitive contexts.
Implement rate limiting to prevent abuse, and consider adding an audit log for tool executions to track activity. Use CORS appropriately to control which domains can connect to your SSE endpoints.
In short…
The REST+SSE hybrid pattern provides an elegant solution for building real-time agentic servers. It combines the simplicity and scalability of REST with the streaming capabilities of SSE, resulting in a responsive user experience without the complexity of bidirectional protocols like WebSockets.