SSE Streaming Integration
Server-Sent Events (SSE) provide one-way real-time streaming from server to client, optimized for LLM token-by-token completions and progress updates. This pattern offers the lowest latency for streaming text without the complexity of WebSocket connections.
Best For : Real-time text streaming, progress updates during long operations, and simple request/response patterns with streaming. Not suitable for bidirectional communication or complex state management.
Overview
SSE streaming is ideal when you need to display LLM responses in real-time as they’re generated, providing a smooth user experience with immediate feedback. Unlike WebSocket (used in CopilotKit), SSE is designed specifically for server-to-client streaming.
Key Benefits
Lowest latency for streaming text from LLMs
Simple implementation with EventSource API
Automatic reconnection on connection loss
Native browser support without external dependencies
Token-by-token display for engaging UX
When to Use SSE Streaming
✅ Use For
Real-time LLM completions
Token-by-token text streaming
Progress updates during long operations
Simple request/response with streaming
Report generation with live updates
❌ Avoid For
Bidirectional communication needs
Complex state management across agents
Multi-turn conversations with history
File uploads or binary data
Architecture
SSE uses a persistent HTTP connection where the server pushes data as text/event-stream. The browser’s EventSource API handles connection management and parsing.
Protocol Flow
Client initiates request to Next.js API route
API Route adds authentication and proxies to backend
Backend opens SSE stream with Content-Type: text/event-stream
Backend sends data chunks as data: {json}\n\n
Client receives events in real-time via EventSource
Connection closes when stream completes or errors
Use Cases
1. Streaming Chat Completions
Display LLM responses token-by-token for immediate feedback:
const { messages , sendMessage } = useAgentStream ({
sessionId: 'chat-' + generateId (),
});
await sendMessage ( "Explain ASIC RG94 compliance requirements" );
// Tokens stream in real-time: "ASIC" → "RG94" → "requires" → ...
2. Progress Updates
Show live progress during long-running operations:
const { progress , isLoading } = useAgentStream ({
sessionId: 'calc-' + portfolioId ,
});
await sendMessage ( "Calculate NAV for all portfolios" );
// Progress: "Fetching holdings..." → "Calculating..." → "Validating..." → "Complete!"
3. Report Generation
Stream report sections as they’re generated:
await sendMessage ( "Generate quarterly performance report" );
// Streams: Executive Summary → Performance Analysis → Risk Metrics → Conclusion
Backend Setup
FastAPI SSE Endpoint
Create a streaming endpoint using FastAPI’s StreamingResponse:
# opshub-agent-backend/app/api/chat.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json
import asyncio
app = FastAPI()
class ChatRequest ( BaseModel ):
message: str
sessionId: str
userId: str
@app.post ( "/api/chat" )
async def chat_stream ( request : ChatRequest):
"""
SSE endpoint for streaming LLM completions.
Returns: text/event-stream with data chunks
"""
async def generate ():
try :
# Initialize LLM stream
async for chunk in llm_stream(
message = request.message,
session_id = request.sessionId,
):
# Format as SSE event
event_data = {
"type" : "text_delta" ,
"content" : chunk.text,
"timestamp" : chunk.timestamp,
}
# CRITICAL: Format as "data: {json}\n\n"
yield f "data: { json.dumps(event_data) } \n\n "
# Small delay to prevent overwhelming client
await asyncio.sleep( 0.01 )
# Send completion event
yield f "data: { json.dumps({ 'type' : 'done' }) } \n\n "
except Exception as e:
# Send error event
error_data = {
"type" : "error" ,
"message" : str (e),
}
yield f "data: { json.dumps(error_data) } \n\n "
return StreamingResponse(
generate(),
media_type = "text/event-stream" ,
headers = {
"Cache-Control" : "no-cache" ,
"Connection" : "keep-alive" ,
"X-Accel-Buffering" : "no" , # Disable nginx buffering
},
)
Event Types
Define consistent event types for client parsing:
Event Type Purpose Example Data text_deltaLLM token chunk {"content": "Hello"}progressOperation progress {"percent": 45, "message": "Processing..."}tool_startTool execution began {"tool": "query_database", "args": {...}}tool_resultTool completed {"tool": "query_database", "result": {...}}doneStream complete {"finalMessage": "..."}errorError occurred {"message": "Connection failed"}
Frontend Implementation
Custom Hook: useAgentStream
Create a reusable hook for SSE streaming:
// lib/agent/use-agent-stream.ts
import { useState , useCallback , useRef , useEffect } from 'react' ;
import { buildAgentUrl } from '@/lib/config/agent-backend-url' ;
import { getAuthHeaders } from '@/lib/api/backend-auth' ;
interface StreamMessage {
id : string ;
role : 'user' | 'assistant' ;
content : string ;
timestamp : Date ;
}
interface UseAgentStreamOptions {
sessionId : string ;
onProgress ?: ( progress : number , message : string ) => void ;
onError ?: ( error : Error ) => void ;
}
export function useAgentStream ( options : UseAgentStreamOptions ) {
const [ messages , setMessages ] = useState < StreamMessage []>([]);
const [ isLoading , setIsLoading ] = useState ( false );
const [ error , setError ] = useState < Error | null >( null );
const abortControllerRef = useRef < AbortController | null >( null );
const sendMessage = useCallback ( async ( content : string ) => {
setIsLoading ( true );
setError ( null );
// Add user message immediately
const userMessage : StreamMessage = {
id: generateId (),
role: 'user' ,
content ,
timestamp: new Date (),
};
setMessages ( prev => [ ... prev , userMessage ]);
// Create assistant message that will be streamed
const assistantMessage : StreamMessage = {
id: generateId (),
role: 'assistant' ,
content: '' ,
timestamp: new Date (),
};
setMessages ( prev => [ ... prev , assistantMessage ]);
try {
// Create abort controller for cancellation
abortControllerRef . current = new AbortController ();
// Get auth headers
const authHeaders = await getAuthHeaders ();
// Send request to Next.js API route (proxy)
const response = await fetch ( '/api/agent/stream' , {
method: 'POST' ,
headers: {
'Content-Type' : 'application/json' ,
... authHeaders ,
},
body: JSON . stringify ({
message: content ,
sessionId: options . sessionId ,
}),
signal: abortControllerRef . current . signal ,
});
if ( ! response . ok ) {
throw new Error ( `HTTP ${ response . status } : ${ response . statusText } ` );
}
// Parse SSE stream
const reader = response . body ?. getReader ();
const decoder = new TextDecoder ();
if ( ! reader ) {
throw new Error ( 'No response body' );
}
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
// Decode chunk
const chunk = decoder . decode ( value , { stream: true });
// Parse SSE events (format: "data: {json}\n\n")
const lines = chunk . split ( ' \n ' );
for ( const line of lines ) {
if ( line . startsWith ( 'data: ' )) {
const data = line . slice ( 6 ); // Remove "data: " prefix
try {
const event = JSON . parse ( data );
// Handle different event types
switch ( event . type ) {
case 'text_delta' :
// Append token to assistant message
setMessages ( prev => {
const updated = [ ... prev ];
const lastMsg = updated [ updated . length - 1 ];
lastMsg . content += event . content ;
return updated ;
});
break ;
case 'progress' :
options . onProgress ?.( event . percent , event . message );
break ;
case 'done' :
setIsLoading ( false );
break ;
case 'error' :
throw new Error ( event . message );
}
} catch ( parseError ) {
console . error ( 'Failed to parse SSE event:' , parseError );
}
}
}
}
} catch ( err ) {
const error = err as Error ;
if ( error . name === 'AbortError' ) {
console . log ( 'Stream aborted by user' );
} else {
setError ( error );
options . onError ?.( error );
}
} finally {
setIsLoading ( false );
abortControllerRef . current = null ;
}
}, [ options ]);
// Abort stream on unmount
useEffect (() => {
return () => {
abortControllerRef . current ?. abort ();
};
}, []);
const abort = useCallback (() => {
abortControllerRef . current ?. abort ();
setIsLoading ( false );
}, []);
return {
messages ,
isLoading ,
error ,
sendMessage ,
abort ,
};
}
function generateId () : string {
return `msg- ${ Date . now () } - ${ Math . random (). toString ( 36 ). substr ( 2 , 9 ) } ` ;
}
Next.js API Route (Proxy)
Create a proxy route with authentication:
// app/api/agent/stream/route.ts
import { NextRequest } from 'next/server' ;
import { buildAgentUrl } from '@/lib/config/agent-backend-url' ;
import { getAuthHeaders } from '@/lib/api/backend-auth' ;
export async function POST ( request : NextRequest ) {
try {
// Get auth headers
const authHeaders = await getAuthHeaders ();
// Parse request body
const body = await request . json ();
// Proxy to backend
const backendUrl = buildAgentUrl ( '/api/chat' );
const response = await fetch ( backendUrl , {
method: 'POST' ,
headers: {
'Content-Type' : 'application/json' ,
... authHeaders ,
},
body: JSON . stringify ( body ),
});
if ( ! response . ok ) {
return new Response (
JSON . stringify ({ error: 'Backend request failed' }),
{ status: response . status }
);
}
// Return stream directly (Next.js handles streaming)
return new Response ( response . body , {
headers: {
'Content-Type' : 'text/event-stream' ,
'Cache-Control' : 'no-cache' ,
'Connection' : 'keep-alive' ,
},
});
} catch ( error ) {
console . error ( 'Stream proxy error:' , error );
return new Response (
JSON . stringify ({ error: 'Internal server error' }),
{ status: 500 }
);
}
}
Complete Example: Streaming Chat Interface
Build a complete chat interface with SSE streaming:
// components/agent/StreamingChat.tsx
'use client' ;
import { useAgentStream } from '@/lib/agent/use-agent-stream' ;
import { useState } from 'react' ;
import { Button } from '@/components/ui/button' ;
import { Input } from '@/components/ui/input' ;
import { Alert , AlertDescription } from '@/components/ui/alert' ;
import { Loader2 , StopCircle } from 'lucide-react' ;
export function StreamingChat () {
const [ input , setInput ] = useState ( '' );
const [ progress , setProgress ] = useState ({ percent: 0 , message: '' });
const { messages , isLoading , error , sendMessage , abort } = useAgentStream ({
sessionId: `chat- ${ Date . now () } ` ,
onProgress : ( percent , message ) => {
setProgress ({ percent , message });
},
onError : ( err ) => {
console . error ( 'Stream error:' , err );
},
});
const handleSubmit = async ( e : React . FormEvent ) => {
e . preventDefault ();
if ( ! input . trim () || isLoading ) return ;
await sendMessage ( input );
setInput ( '' );
};
return (
< div className = "flex flex-col h-full" >
{ /* Messages */ }
< div className = "flex-1 overflow-y-auto p-4 space-y-4" >
{ messages . map (( msg ) => (
< div
key = {msg. id }
className = { `flex ${ msg . role === 'user' ? 'justify-end' : 'justify-start' } ` }
>
< div
className = { `max-w-[70%] rounded-lg px-4 py-2 ${
msg . role === 'user'
? 'bg-blue-600 text-white'
: 'bg-gray-100 text-gray-900'
} ` }
>
< div className = "text-sm font-medium mb-1" >
{ msg . role === ' user ' ? 'You' : 'Agent' }
</ div >
< div className = "whitespace-pre-wrap" > {msg. content } </ div >
{ msg . role === ' assistant ' && isLoading && ! msg . content && (
< div className = "flex items-center gap-2 text-sm text-gray-500" >
< Loader2 className = "h-4 w-4 animate-spin" />
Thinking ...
</ div >
)}
</ div >
</ div >
))}
</ div >
{ /* Progress indicator */ }
{ isLoading && progress . message && (
< div className = "px-4 py-2 border-t" >
< div className = "flex items-center gap-2 text-sm text-gray-600" >
< Loader2 className = "h-4 w-4 animate-spin" />
< span >{ progress . message }</ span >
< span className = "ml-auto" > {progress. percent } %</ span >
</ div >
< div className = "w-full bg-gray-200 rounded-full h-1.5 mt-2" >
< div
className = "bg-blue-600 h-1.5 rounded-full transition-all"
style = {{ width : ` ${ progress . percent } %` }}
/>
</ div >
</ div >
)}
{ /* Error display */ }
{ error && (
< Alert variant = "destructive" className = "m-4" >
< AlertDescription >{ error . message }</ AlertDescription >
</ Alert >
)}
{ /* Input */ }
< form onSubmit = { handleSubmit } className = "p-4 border-t" >
< div className = "flex gap-2" >
< Input
value = { input }
onChange = {(e) => setInput (e.target.value)}
placeholder = "Type your message..."
disabled = { isLoading }
className = "flex-1"
/>
{ isLoading ? (
< Button
type = "button"
variant = "destructive"
onClick = { abort }
>
< StopCircle className = "h-4 w-4 mr-2" />
Stop
</ Button >
) : (
< Button type = "submit" disabled = {!input.trim()} >
Send
</ Button >
)}
</ div >
</ form >
</ div >
);
}
Token Streaming Details
How Token-by-Token Works
LLM generates tokens sequentially (e.g., GPT-4, Claude)
Backend streams each token as SSE event immediately
Frontend appends token to message content
React re-renders with updated content (efficient due to React reconciliation)
User sees text appear in real-time
Optimizing Token Display
Buffering Strategy
Debounced Rendering
// Buffer tokens for smoother rendering
const TOKEN_BUFFER_SIZE = 5 ;
const TOKEN_BUFFER_DELAY = 50 ; // ms
let tokenBuffer : string [] = [];
let flushTimeout : NodeJS . Timeout | null = null ;
function bufferToken ( token : string ) {
tokenBuffer . push ( token );
if ( tokenBuffer . length >= TOKEN_BUFFER_SIZE ) {
flushTokens ();
} else {
// Schedule flush if not already scheduled
if ( ! flushTimeout ) {
flushTimeout = setTimeout ( flushTokens , TOKEN_BUFFER_DELAY );
}
}
}
function flushTokens () {
if ( tokenBuffer . length > 0 ) {
const text = tokenBuffer . join ( '' );
setMessages ( prev => {
const updated = [ ... prev ];
const lastMsg = updated [ updated . length - 1 ];
lastMsg . content += text ;
return updated ;
});
tokenBuffer = [];
}
if ( flushTimeout ) {
clearTimeout ( flushTimeout );
flushTimeout = null ;
}
}
Progress Updates
Show progress during multi-step operations:
# Backend: Send progress events
async def generate_with_progress ():
steps = [ 'Fetching data' , 'Processing' , 'Validating' , 'Complete' ]
for i, step in enumerate (steps):
percent = int ((i + 1 ) / len (steps) * 100 )
yield f "data: { json.dumps({
'type' : 'progress' ,
'percent' : percent,
'message' : step,
}) } \n\n "
# Perform actual work
await perform_step(step)
await asyncio.sleep( 0.1 )
yield f "data: { json.dumps({ 'type' : 'done' }) } \n\n "
// Frontend: Display progress
const { progress , isLoading } = useAgentStream ({
sessionId: 'operation-123' ,
onProgress : ( percent , message ) => {
console . log ( `Progress: ${ percent } % - ${ message } ` );
},
});
return (
< div >
{ isLoading && (
< ProgressBar value = {progress. percent } label = {progress. message } />
)}
</ div >
);
Error Handling
Stream Interruption
Handle connection failures gracefully:
// Automatic retry with exponential backoff
function useAgentStreamWithRetry ( options : UseAgentStreamOptions ) {
const [ retryCount , setRetryCount ] = useState ( 0 );
const maxRetries = 3 ;
const sendMessageWithRetry = async ( content : string ) => {
try {
await sendMessage ( content );
setRetryCount ( 0 ); // Reset on success
} catch ( error ) {
if ( retryCount < maxRetries ) {
const delay = Math . pow ( 2 , retryCount ) * 1000 ; // 1s, 2s, 4s
console . log ( `Retrying in ${ delay } ms...` );
await new Promise ( resolve => setTimeout ( resolve , delay ));
setRetryCount ( prev => prev + 1 );
return sendMessageWithRetry ( content ); // Retry
} else {
throw new Error ( 'Max retries exceeded' );
}
}
};
return { sendMessage: sendMessageWithRetry , retryCount };
}
Connection Recovery
Detect and handle reconnection:
// Monitor connection health
const [ connectionStatus , setConnectionStatus ] = useState < 'connected' | 'disconnected' >( 'connected' );
useEffect (() => {
const checkConnection = setInterval (() => {
if ( isLoading && Date . now () - lastEventTime > 30000 ) {
setConnectionStatus ( 'disconnected' );
abort (); // Cancel stalled stream
}
}, 5000 );
return () => clearInterval ( checkConnection );
}, [ isLoading , lastEventTime , abort ]);
// Display connection status
{ connectionStatus === 'disconnected' && (
< Alert variant = "warning" >
< AlertDescription >
Connection lost . Attempting to reconnect ...
</ AlertDescription >
</ Alert >
)}
Buffering Strategy
Balance responsiveness with render performance:
Strategy Pros Cons Best For Immediate Most responsive Many re-renders Short responses Buffered Smooth animation Slight delay Long responses Debounced Minimal renders Choppier UX Very long responses
Backpressure Handling
Prevent overwhelming client with too much data:
# Backend: Implement backpressure
async def generate_with_backpressure ():
buffer_size = 0
max_buffer = 1024 * 10 # 10KB
async for chunk in llm_stream():
yield f "data: { json.dumps({ 'content' : chunk.text}) } \n\n "
buffer_size += len (chunk.text)
# Pause if buffer is full
if buffer_size > max_buffer:
await asyncio.sleep( 0.1 )
buffer_size = 0
Best Practices
UX for Streaming
Show typing indicator while streaming
Display progress for long operations
Allow users to stop streaming
Handle errors gracefully with retry
Buffer tokens for smooth animation
Show connection status
Abort Handling
Allow users to cancel long-running streams:
// Provide stop button
< Button onClick = { abort } disabled = {! isLoading } >
< StopCircle className = "h-4 w-4 mr-2" />
Stop Generation
</ Button >
// Clean up on abort
const abort = () => {
abortControllerRef . current ?. abort ();
// Mark last message as incomplete
setMessages ( prev => {
const updated = [ ... prev ];
const lastMsg = updated [ updated . length - 1 ];
lastMsg . content += ' \n\n [Generation stopped by user]' ;
return updated ;
});
setIsLoading ( false );
};
Troubleshooting
Common Issues
No data received from stream
Cause : Buffering by proxy servers (nginx, Vercel)Fix : Add anti-buffering headersheaders = {
"X-Accel-Buffering" : "no" , # nginx
"Cache-Control" : "no-cache" ,
}
Stream cuts off unexpectedly
Cause : Timeout or connection limitFix : Send periodic heartbeat eventsasync def generate_with_heartbeat ():
last_event = time.time()
async for chunk in llm_stream():
yield f "data: { json.dumps(chunk) } \n\n "
last_event = time.time()
# Send heartbeat every 15 seconds
if time.time() - last_event > 15 :
yield ": heartbeat \n\n "
last_event = time.time()
Messages not updating in real-time
Cause : React state update batchingFix : Use flushSync for immediate updates (use sparingly)import { flushSync } from 'react-dom' ;
flushSync (() => {
setMessages ( prev => [ ... prev , newMessage ]);
});
Memory leak with long streams
Cause : Not cleaning up EventSource/AbortControllerFix : Proper cleanup in useEffectuseEffect (() => {
return () => {
abortControllerRef . current ?. abort ();
};
}, []);
Last Updated : October 29, 2025
Pattern Status : ✅ Production Ready
Backend Endpoint : /api/chat (SSE streaming)