Real-Time Streaming with Socket.IO: Building Responsive Web3 Chat

How Lunark implements real-time message streaming using Socket.IO, including the challenges of React state batching, room-based broadcasting, and coordinating blockchain transactions with chat responses.

When building Lunark's chat interface, I quickly realized that HTTP request-response wouldn't cut it. Users expect to see AI responses appear word by word, not wait for a complete response. And blockchain operations add another layer: transactions need to be displayed inline with chat messages, confirmations need to update in real-time, and network switches need to propagate instantly. Socket.IO solved all of these problems, but implementing it well required understanding both the backend architecture and React's rendering behavior. ## The Streaming Architecture Lunark's streaming system has three layers: 1. **OpenAI streaming** - The LLM generates tokens one at a time 2. **Backend relay** - Express server receives tokens and broadcasts via Socket.IO 3. **Frontend rendering** - React updates the UI for each token ```mermaid sequenceDiagram participant User participant React participant Socket participant Express participant Agent participant OpenAI User->>React: Send message React->>Express: POST /message Express->>Agent: agent.ask(message) Agent->>OpenAI: Stream request loop Token streaming OpenAI-->>Agent: Token Agent-->>Express: Yield token Express-->>Socket: Emit to room Socket-->>React: streamResponse React-->>User: Update UI end Express-->>Socket: streamEnd ``` ![Real-time chat streaming in action](/images/menu/lunark/chat-2.jpg) ## Backend Implementation The backend uses async generators to handle the streaming. When a message comes in, the agent's `ask` method returns an async generator that yields chunks as they arrive. > **Note:** The example below is simplified for clarity. The actual Lunark implementation uses a graph execution pattern with `graph.addTaskNode()` and `graph.run()` rather than direct `agent.ask()` calls. ```typescript:src/services/agent.service.ts async *streamResponse(message: string, chatId: string, messageId: string, userId: string) { const graph = await this.getOrCreateGraph(chatId); const stream = await this.agent.ask(message, { graphId: graph.id, stream: true, }); let fullResponse = ''; for await (const chunk of stream) { fullResponse += chunk; yield { chatId, messageId, message: fullResponse, role: 'lunark', userId, timestamp: new Date().toISOString(), }; } } ``` The message handler consumes this generator and emits to Socket.IO: ```typescript:src/routes/message.ts app.post('/message', async (req, res) => { const { chatId, content } = req.body; const userAddress = req.userAddress; // Generate messageId using timestamp const messageId = Date.now().toString(); // Respond immediately res.json({ success: true, messageId }); // Get socket.io instance const io = getIO(); // Emit stream status io.to(`chat:${chatId}`).emit('streamStatus', { status: 'Lunark is thinking...', }); // Stream asynchronously try { const agent = new LunarkAgent(userAddress, req.chainId); await agent.initialize(); let fullResponse = ''; const responseStream = await agent.ask(content, chatId); for await (const chunk of responseStream) { fullResponse += chunk; io.to(`chat:${chatId}`).emit('streamResponse', { chatId, messageId, message: fullResponse, role: 'lunark', userId: user.id, timestamp: new Date().toISOString(), }); } io.to(`chat:${chatId}`).emit('streamEnd', { chatId, messageId }); await agent.destroy(); } catch (error) { io.to(`chat:${chatId}`).emit('streamError', { error: error.message }); } }); ``` Notice that the HTTP response returns immediately. The actual streaming happens asynchronously, with updates pushed via WebSocket. ## Room-Based Broadcasting Socket.IO's room system is perfect for chat applications. Each chat session gets its own room, and each user gets a personal room for account-specific events: ```typescript:src/socket/index.ts io.on('connection', (socket) => { socket.on('authenticate', ({ address }) => { // Join user's personal room socket.join(`user:${address.toLowerCase()}`); socket.emit('authenticated', { address }); }); socket.on('joinChat', ({ chatId }) => { // Join chat-specific room socket.join(`chat:${chatId}`); socket.emit('joinedChat', { chatId }); }); }); // Emit to everyone in a chat function emitToChat(chatId: string, event: string, data: any) { io.to(`chat:${chatId}`).emit(event, data); } // Emit to a specific user (for transactions, network switches) function emitToUser(address: string, event: string, data: any) { io.to(`user:${address.toLowerCase()}`).emit(event, data); } ``` This separation is important. Chat messages go to the chat room (in case we ever support collaborative sessions), while transaction notifications go directly to the user's room. ![Chat history showing previous conversations](/images/menu/lunark/history-page.jpg) ## The React Batching Problem Here's where things got interesting. React 18 batches state updates for performance. When Socket.IO fires rapidly (multiple tokens per second), React batches those updates together, causing the UI to jump forward in chunks instead of streaming smoothly. The solution is `flushSync`, which forces React to flush updates immediately: ```typescript:src/hooks/useSocket.ts import { flushSync } from 'react-dom'; socket.on('streamResponse', (message: Message) => { flushSync(() => { setMessages((prev) => { const index = prev.findIndex((m) => m.id === message.id); if (index === -1) { // New message, add to array return [...prev, message]; } // Existing message, update in place const updated = [...prev]; updated[index] = message; return updated; }); }); }); ``` Without `flushSync`, you'd see updates every 50-100ms as React batches them. With it, each token appears immediately. ## Auto-Scroll Behavior Chat interfaces need smart scrolling. Auto-scroll when new messages arrive, but stop auto-scrolling if the user scrolls up to review history. ```typescript:src/components/ChatContainer.tsx const containerRef = useRef<HTMLDivElement>(null); const [userScrolled, setUserScrolled] = useState(false); const scrollTimeoutRef = useRef<NodeJS.Timeout>(); // Detect manual scrolling const handleScroll = () => { const container = containerRef.current; if (!container) return; const isNearBottom = container.scrollHeight - container.scrollTop - container.clientHeight < 100; if (!isNearBottom) { setUserScrolled(true); // Reset after 2 seconds of no scrolling clearTimeout(scrollTimeoutRef.current); scrollTimeoutRef.current = setTimeout(() => { setUserScrolled(false); }, 2000); } else { setUserScrolled(false); } }; // Auto-scroll on new messages useEffect(() => { if (!userScrolled && containerRef.current) { requestAnimationFrame(() => { containerRef.current?.scrollTo({ top: containerRef.current.scrollHeight, behavior: 'smooth', }); }); } }, [messages, userScrolled]); ``` The `requestAnimationFrame` ensures smooth scrolling that doesn't fight with React's rendering cycle. ## Transaction Events When the AI agent prepares a blockchain transaction, it needs to display in the chat and wait for user confirmation. This uses a different socket event: ```typescript:src/services/transaction.service.ts // Backend emits when transaction is ready emitToUser(userAddress, 'pendingTransaction', { id: transactionId, chatId, type: 'transfer', transaction: { to, value, data }, details: { recipient, amount, token }, buttonText: 'Send 0.5 ETH', }); ``` The frontend listens and attaches the transaction to the appropriate message: ```typescript:src/hooks/useSocket.ts socket.on('pendingTransaction', (tx: PendingTransaction) => { setPendingTransaction(tx); // Retry logic for race conditions const attachToMessage = () => { setMessages((prev) => { const lastLunarkMessage = [...prev] .reverse() .find((m) => m.role === 'lunark'); if (lastLunarkMessage) { lastLunarkMessage.pendingTransaction = tx; return [...prev]; } return prev; }); }; // The message might not exist yet, retry with increasing intervals const retryIntervals = [100, 300, 500, 1000, 2000]; attachToMessage(); retryIntervals.forEach((ms) => setTimeout(attachToMessage, ms)); }); ``` The retry logic handles a race condition: the transaction event might arrive before the streaming message that references it. ## Network Switching When the agent decides to switch networks (e.g., user asks to "check my balance on Polygon"), it emits a network switch event: ```typescript:src/services/network.service.ts // Backend emitToUser(userAddress, 'networkSwitch', { chainId: 137, name: 'Polygon', symbol: 'MATIC', rpcUrl: 'https://polygon-rpc.com', }); ``` The frontend uses the wallet's API to request the switch: ```typescript:src/hooks/useSocket.ts socket.on('networkSwitch', async ({ chainId, name }) => { try { await window.ethereum.request({ method: 'wallet_switchEthereumChain', params: [{ chainId: `0x${chainId.toString(16)}` }], }); toast.success(`Switched to ${name}`); } catch (error) { if (error.code === 4902) { // Chain not added to wallet toast.error(`Please add ${name} to your wallet`); } } }); ``` ## Connection Management Socket connections need careful lifecycle management. Connect when the user authenticates, disconnect on logout, handle reconnection gracefully: ```typescript:src/providers/SocketProvider.tsx const SocketProvider = ({ children }) => { const { user, isAuthenticated } = useUser(); const [socket, setSocket] = useState<Socket | null>(null); const [isConnected, setIsConnected] = useState(false); useEffect(() => { if (!isAuthenticated || !user?.address) { socket?.disconnect(); setSocket(null); return; } const newSocket = io(SOCKET_URL, { transports: ['websocket', 'polling'], reconnectionAttempts: 3, reconnectionDelay: 1000, timeout: 5000, }); newSocket.on('connect', () => { setIsConnected(true); newSocket.emit('authenticate', { address: user.address }); }); newSocket.on('disconnect', () => { setIsConnected(false); }); setSocket(newSocket); return () => { newSocket.disconnect(); }; }, [isAuthenticated, user?.address]); return ( <SocketContext.Provider value={{ socket, isConnected }}> {children} </SocketContext.Provider> ); }; ``` ## Stream Abortion Users should be able to stop a streaming response. This requires coordination between frontend and backend: ```typescript:src/hooks/useChat.ts // Frontend const stopStream = () => { socket.emit('streamAbort', { chatId: currentChatId }); setIsStreaming(false); }; // Backend socket.on('streamAbort', ({ chatId }) => { console.log(`Stream abort requested for chat: ${chatId}`); // Handle stream abort logic here if needed }); ```