Skip to main content

SSE Streaming Integration

Server-Sent Events (SSE) provide one-way real-time streaming from server to client, optimized for LLM token-by-token completions and progress updates. This pattern offers the lowest latency for streaming text without the complexity of WebSocket connections.
Best For: Real-time text streaming, progress updates during long operations, and simple request/response patterns with streaming. Not suitable for bidirectional communication or complex state management.

Overview

SSE streaming is ideal when you need to display LLM responses in real-time as they’re generated, providing a smooth user experience with immediate feedback. Unlike WebSocket (used in CopilotKit), SSE is designed specifically for server-to-client streaming.

Key Benefits

  • Lowest latency for streaming text from LLMs
  • Simple implementation with EventSource API
  • Automatic reconnection on connection loss
  • Native browser support without external dependencies
  • Token-by-token display for engaging UX

When to Use SSE Streaming

✅ Use For

  • Real-time LLM completions
  • Token-by-token text streaming
  • Progress updates during long operations
  • Simple request/response with streaming
  • Report generation with live updates

❌ Avoid For

  • Bidirectional communication needs
  • Complex state management across agents
  • Multi-turn conversations with history
  • File uploads or binary data

Architecture

SSE uses a persistent HTTP connection where the server pushes data as text/event-stream. The browser’s EventSource API handles connection management and parsing.

Protocol Flow

  1. Client initiates request to Next.js API route
  2. API Route adds authentication and proxies to backend
  3. Backend opens SSE stream with Content-Type: text/event-stream
  4. Backend sends data chunks as data: {json}\n\n
  5. Client receives events in real-time via EventSource
  6. Connection closes when stream completes or errors

Use Cases

1. Streaming Chat Completions

Display LLM responses token-by-token for immediate feedback:
const { messages, sendMessage } = useAgentStream({
  sessionId: 'chat-' + generateId(),
});

await sendMessage("Explain ASIC RG94 compliance requirements");
// Tokens stream in real-time: "ASIC" → "RG94" → "requires" → ...

2. Progress Updates

Show live progress during long-running operations:
const { progress, isLoading } = useAgentStream({
  sessionId: 'calc-' + portfolioId,
});

await sendMessage("Calculate NAV for all portfolios");
// Progress: "Fetching holdings..." → "Calculating..." → "Validating..." → "Complete!"

3. Report Generation

Stream report sections as they’re generated:
await sendMessage("Generate quarterly performance report");
// Streams: Executive Summary → Performance Analysis → Risk Metrics → Conclusion

Backend Setup

FastAPI SSE Endpoint

Create a streaming endpoint using FastAPI’s StreamingResponse:
# opshub-agent-backend/app/api/chat.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json
import asyncio

app = FastAPI()

class ChatRequest(BaseModel):
    message: str
    sessionId: str
    userId: str

@app.post("/api/chat")
async def chat_stream(request: ChatRequest):
    """
    SSE endpoint for streaming LLM completions.
    Returns: text/event-stream with data chunks
    """

    async def generate():
        try:
            # Initialize LLM stream
            async for chunk in llm_stream(
                message=request.message,
                session_id=request.sessionId,
            ):
                # Format as SSE event
                event_data = {
                    "type": "text_delta",
                    "content": chunk.text,
                    "timestamp": chunk.timestamp,
                }

                # CRITICAL: Format as "data: {json}\n\n"
                yield f"data: {json.dumps(event_data)}\n\n"

                # Small delay to prevent overwhelming client
                await asyncio.sleep(0.01)

            # Send completion event
            yield f"data: {json.dumps({'type': 'done'})}\n\n"

        except Exception as e:
            # Send error event
            error_data = {
                "type": "error",
                "message": str(e),
            }
            yield f"data: {json.dumps(error_data)}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        },
    )

Event Types

Define consistent event types for client parsing:
Event TypePurposeExample Data
text_deltaLLM token chunk{"content": "Hello"}
progressOperation progress{"percent": 45, "message": "Processing..."}
tool_startTool execution began{"tool": "query_database", "args": {...}}
tool_resultTool completed{"tool": "query_database", "result": {...}}
doneStream complete{"finalMessage": "..."}
errorError occurred{"message": "Connection failed"}

Frontend Implementation

Custom Hook: useAgentStream

Create a reusable hook for SSE streaming:
// lib/agent/use-agent-stream.ts
import { useState, useCallback, useRef, useEffect } from 'react';
import { buildAgentUrl } from '@/lib/config/agent-backend-url';
import { getAuthHeaders } from '@/lib/api/backend-auth';

interface StreamMessage {
  id: string;
  role: 'user' | 'assistant';
  content: string;
  timestamp: Date;
}

interface UseAgentStreamOptions {
  sessionId: string;
  onProgress?: (progress: number, message: string) => void;
  onError?: (error: Error) => void;
}

export function useAgentStream(options: UseAgentStreamOptions) {
  const [messages, setMessages] = useState<StreamMessage[]>([]);
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState<Error | null>(null);
  const abortControllerRef = useRef<AbortController | null>(null);

  const sendMessage = useCallback(async (content: string) => {
    setIsLoading(true);
    setError(null);

    // Add user message immediately
    const userMessage: StreamMessage = {
      id: generateId(),
      role: 'user',
      content,
      timestamp: new Date(),
    };
    setMessages(prev => [...prev, userMessage]);

    // Create assistant message that will be streamed
    const assistantMessage: StreamMessage = {
      id: generateId(),
      role: 'assistant',
      content: '',
      timestamp: new Date(),
    };
    setMessages(prev => [...prev, assistantMessage]);

    try {
      // Create abort controller for cancellation
      abortControllerRef.current = new AbortController();

      // Get auth headers
      const authHeaders = await getAuthHeaders();

      // Send request to Next.js API route (proxy)
      const response = await fetch('/api/agent/stream', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          ...authHeaders,
        },
        body: JSON.stringify({
          message: content,
          sessionId: options.sessionId,
        }),
        signal: abortControllerRef.current.signal,
      });

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${response.statusText}`);
      }

      // Parse SSE stream
      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      if (!reader) {
        throw new Error('No response body');
      }

      while (true) {
        const { done, value } = await reader.read();

        if (done) break;

        // Decode chunk
        const chunk = decoder.decode(value, { stream: true });

        // Parse SSE events (format: "data: {json}\n\n")
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6); // Remove "data: " prefix

            try {
              const event = JSON.parse(data);

              // Handle different event types
              switch (event.type) {
                case 'text_delta':
                  // Append token to assistant message
                  setMessages(prev => {
                    const updated = [...prev];
                    const lastMsg = updated[updated.length - 1];
                    lastMsg.content += event.content;
                    return updated;
                  });
                  break;

                case 'progress':
                  options.onProgress?.(event.percent, event.message);
                  break;

                case 'done':
                  setIsLoading(false);
                  break;

                case 'error':
                  throw new Error(event.message);
              }
            } catch (parseError) {
              console.error('Failed to parse SSE event:', parseError);
            }
          }
        }
      }

    } catch (err) {
      const error = err as Error;

      if (error.name === 'AbortError') {
        console.log('Stream aborted by user');
      } else {
        setError(error);
        options.onError?.(error);
      }
    } finally {
      setIsLoading(false);
      abortControllerRef.current = null;
    }
  }, [options]);

  // Abort stream on unmount
  useEffect(() => {
    return () => {
      abortControllerRef.current?.abort();
    };
  }, []);

  const abort = useCallback(() => {
    abortControllerRef.current?.abort();
    setIsLoading(false);
  }, []);

  return {
    messages,
    isLoading,
    error,
    sendMessage,
    abort,
  };
}

function generateId(): string {
  return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}

Next.js API Route (Proxy)

Create a proxy route with authentication:
// app/api/agent/stream/route.ts
import { NextRequest } from 'next/server';
import { buildAgentUrl } from '@/lib/config/agent-backend-url';
import { getAuthHeaders } from '@/lib/api/backend-auth';

export async function POST(request: NextRequest) {
  try {
    // Get auth headers
    const authHeaders = await getAuthHeaders();

    // Parse request body
    const body = await request.json();

    // Proxy to backend
    const backendUrl = buildAgentUrl('/api/chat');

    const response = await fetch(backendUrl, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        ...authHeaders,
      },
      body: JSON.stringify(body),
    });

    if (!response.ok) {
      return new Response(
        JSON.stringify({ error: 'Backend request failed' }),
        { status: response.status }
      );
    }

    // Return stream directly (Next.js handles streaming)
    return new Response(response.body, {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
      },
    });

  } catch (error) {
    console.error('Stream proxy error:', error);
    return new Response(
      JSON.stringify({ error: 'Internal server error' }),
      { status: 500 }
    );
  }
}

Complete Example: Streaming Chat Interface

Build a complete chat interface with SSE streaming:
// components/agent/StreamingChat.tsx
'use client';

import { useAgentStream } from '@/lib/agent/use-agent-stream';
import { useState } from 'react';
import { Button } from '@/components/ui/button';
import { Input } from '@/components/ui/input';
import { Alert, AlertDescription } from '@/components/ui/alert';
import { Loader2, StopCircle } from 'lucide-react';

export function StreamingChat() {
  const [input, setInput] = useState('');
  const [progress, setProgress] = useState({ percent: 0, message: '' });

  const { messages, isLoading, error, sendMessage, abort } = useAgentStream({
    sessionId: `chat-${Date.now()}`,
    onProgress: (percent, message) => {
      setProgress({ percent, message });
    },
    onError: (err) => {
      console.error('Stream error:', err);
    },
  });

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();

    if (!input.trim() || isLoading) return;

    await sendMessage(input);
    setInput('');
  };

  return (
    <div className="flex flex-col h-full">
      {/* Messages */}
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages.map((msg) => (
          <div
            key={msg.id}
            className={`flex ${msg.role === 'user' ? 'justify-end' : 'justify-start'}`}
          >
            <div
              className={`max-w-[70%] rounded-lg px-4 py-2 ${
                msg.role === 'user'
                  ? 'bg-blue-600 text-white'
                  : 'bg-gray-100 text-gray-900'
              }`}
            >
              <div className="text-sm font-medium mb-1">
                {msg.role === 'user' ? 'You' : 'Agent'}
              </div>
              <div className="whitespace-pre-wrap">{msg.content}</div>
              {msg.role === 'assistant' && isLoading && !msg.content && (
                <div className="flex items-center gap-2 text-sm text-gray-500">
                  <Loader2 className="h-4 w-4 animate-spin" />
                  Thinking...
                </div>
              )}
            </div>
          </div>
        ))}
      </div>

      {/* Progress indicator */}
      {isLoading && progress.message && (
        <div className="px-4 py-2 border-t">
          <div className="flex items-center gap-2 text-sm text-gray-600">
            <Loader2 className="h-4 w-4 animate-spin" />
            <span>{progress.message}</span>
            <span className="ml-auto">{progress.percent}%</span>
          </div>
          <div className="w-full bg-gray-200 rounded-full h-1.5 mt-2">
            <div
              className="bg-blue-600 h-1.5 rounded-full transition-all"
              style={{ width: `${progress.percent}%` }}
            />
          </div>
        </div>
      )}

      {/* Error display */}
      {error && (
        <Alert variant="destructive" className="m-4">
          <AlertDescription>{error.message}</AlertDescription>
        </Alert>
      )}

      {/* Input */}
      <form onSubmit={handleSubmit} className="p-4 border-t">
        <div className="flex gap-2">
          <Input
            value={input}
            onChange={(e) => setInput(e.target.value)}
            placeholder="Type your message..."
            disabled={isLoading}
            className="flex-1"
          />

          {isLoading ? (
            <Button
              type="button"
              variant="destructive"
              onClick={abort}
            >
              <StopCircle className="h-4 w-4 mr-2" />
              Stop
            </Button>
          ) : (
            <Button type="submit" disabled={!input.trim()}>
              Send
            </Button>
          )}
        </div>
      </form>
    </div>
  );
}

Token Streaming Details

How Token-by-Token Works

  1. LLM generates tokens sequentially (e.g., GPT-4, Claude)
  2. Backend streams each token as SSE event immediately
  3. Frontend appends token to message content
  4. React re-renders with updated content (efficient due to React reconciliation)
  5. User sees text appear in real-time

Optimizing Token Display

  • Buffering Strategy
  • Debounced Rendering
// Buffer tokens for smoother rendering
const TOKEN_BUFFER_SIZE = 5;
const TOKEN_BUFFER_DELAY = 50; // ms

let tokenBuffer: string[] = [];
let flushTimeout: NodeJS.Timeout | null = null;

function bufferToken(token: string) {
  tokenBuffer.push(token);

  if (tokenBuffer.length >= TOKEN_BUFFER_SIZE) {
    flushTokens();
  } else {
    // Schedule flush if not already scheduled
    if (!flushTimeout) {
      flushTimeout = setTimeout(flushTokens, TOKEN_BUFFER_DELAY);
    }
  }
}

function flushTokens() {
  if (tokenBuffer.length > 0) {
    const text = tokenBuffer.join('');
    setMessages(prev => {
      const updated = [...prev];
      const lastMsg = updated[updated.length - 1];
      lastMsg.content += text;
      return updated;
    });
    tokenBuffer = [];
  }

  if (flushTimeout) {
    clearTimeout(flushTimeout);
    flushTimeout = null;
  }
}

Progress Updates

Show progress during multi-step operations:
# Backend: Send progress events
async def generate_with_progress():
    steps = ['Fetching data', 'Processing', 'Validating', 'Complete']

    for i, step in enumerate(steps):
        percent = int((i + 1) / len(steps) * 100)

        yield f"data: {json.dumps({
            'type': 'progress',
            'percent': percent,
            'message': step,
        })}\n\n"

        # Perform actual work
        await perform_step(step)

        await asyncio.sleep(0.1)

    yield f"data: {json.dumps({'type': 'done'})}\n\n"
// Frontend: Display progress
const { progress, isLoading } = useAgentStream({
  sessionId: 'operation-123',
  onProgress: (percent, message) => {
    console.log(`Progress: ${percent}% - ${message}`);
  },
});

return (
  <div>
    {isLoading && (
      <ProgressBar value={progress.percent} label={progress.message} />
    )}
  </div>
);

Error Handling

Stream Interruption

Handle connection failures gracefully:
// Automatic retry with exponential backoff
function useAgentStreamWithRetry(options: UseAgentStreamOptions) {
  const [retryCount, setRetryCount] = useState(0);
  const maxRetries = 3;

  const sendMessageWithRetry = async (content: string) => {
    try {
      await sendMessage(content);
      setRetryCount(0); // Reset on success
    } catch (error) {
      if (retryCount < maxRetries) {
        const delay = Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s

        console.log(`Retrying in ${delay}ms...`);
        await new Promise(resolve => setTimeout(resolve, delay));

        setRetryCount(prev => prev + 1);
        return sendMessageWithRetry(content); // Retry
      } else {
        throw new Error('Max retries exceeded');
      }
    }
  };

  return { sendMessage: sendMessageWithRetry, retryCount };
}

Connection Recovery

Detect and handle reconnection:
// Monitor connection health
const [connectionStatus, setConnectionStatus] = useState<'connected' | 'disconnected'>('connected');

useEffect(() => {
  const checkConnection = setInterval(() => {
    if (isLoading && Date.now() - lastEventTime > 30000) {
      setConnectionStatus('disconnected');
      abort(); // Cancel stalled stream
    }
  }, 5000);

  return () => clearInterval(checkConnection);
}, [isLoading, lastEventTime, abort]);

// Display connection status
{connectionStatus === 'disconnected' && (
  <Alert variant="warning">
    <AlertDescription>
      Connection lost. Attempting to reconnect...
    </AlertDescription>
  </Alert>
)}

Performance

Buffering Strategy

Balance responsiveness with render performance:
StrategyProsConsBest For
ImmediateMost responsiveMany re-rendersShort responses
BufferedSmooth animationSlight delayLong responses
DebouncedMinimal rendersChoppier UXVery long responses

Backpressure Handling

Prevent overwhelming client with too much data:
# Backend: Implement backpressure
async def generate_with_backpressure():
    buffer_size = 0
    max_buffer = 1024 * 10  # 10KB

    async for chunk in llm_stream():
        yield f"data: {json.dumps({'content': chunk.text})}\n\n"

        buffer_size += len(chunk.text)

        # Pause if buffer is full
        if buffer_size > max_buffer:
            await asyncio.sleep(0.1)
            buffer_size = 0

Best Practices

UX for Streaming

  • ✅ DO
  • ❌ DON'T
  • Show typing indicator while streaming
  • Display progress for long operations
  • Allow users to stop streaming
  • Handle errors gracefully with retry
  • Buffer tokens for smooth animation
  • Show connection status

Abort Handling

Allow users to cancel long-running streams:
// Provide stop button
<Button onClick={abort} disabled={!isLoading}>
  <StopCircle className="h-4 w-4 mr-2" />
  Stop Generation
</Button>

// Clean up on abort
const abort = () => {
  abortControllerRef.current?.abort();

  // Mark last message as incomplete
  setMessages(prev => {
    const updated = [...prev];
    const lastMsg = updated[updated.length - 1];
    lastMsg.content += '\n\n[Generation stopped by user]';
    return updated;
  });

  setIsLoading(false);
};

Troubleshooting

Common Issues

Cause: Buffering by proxy servers (nginx, Vercel)Fix: Add anti-buffering headers
headers={
    "X-Accel-Buffering": "no",  # nginx
    "Cache-Control": "no-cache",
}
Cause: Timeout or connection limitFix: Send periodic heartbeat events
async def generate_with_heartbeat():
    last_event = time.time()

    async for chunk in llm_stream():
        yield f"data: {json.dumps(chunk)}\n\n"
        last_event = time.time()

        # Send heartbeat every 15 seconds
        if time.time() - last_event > 15:
            yield ": heartbeat\n\n"
            last_event = time.time()
Cause: React state update batchingFix: Use flushSync for immediate updates (use sparingly)
import { flushSync } from 'react-dom';

flushSync(() => {
  setMessages(prev => [...prev, newMessage]);
});
Cause: Not cleaning up EventSource/AbortControllerFix: Proper cleanup in useEffect
useEffect(() => {
  return () => {
    abortControllerRef.current?.abort();
  };
}, []);


Last Updated: October 29, 2025 Pattern Status: ✅ Production Ready Backend Endpoint: /api/chat (SSE streaming)