Real-Time Streaming with Socket.IO: Building Responsive Web3 Chat
How Lunark implements real-time message streaming using Socket.IO, including the challenges of React state batching, room-based broadcasting, and coordinating blockchain transactions with chat responses.
When building Lunark's chat interface, I quickly realized that HTTP request-response wouldn't cut it. Users expect to see AI responses appear word by word, not wait for a complete response. And blockchain operations add another layer: transactions need to be displayed inline with chat messages, confirmations need to update in real-time, and network switches need to propagate instantly.
Socket.IO solved all of these problems, but implementing it well required understanding both the backend architecture and React's rendering behavior.
## The Streaming Architecture
Lunark's streaming system has three layers:
1. **OpenAI streaming** - The LLM generates tokens one at a time
2. **Backend relay** - Express server receives tokens and broadcasts via Socket.IO
3. **Frontend rendering** - React updates the UI for each token
```mermaid
sequenceDiagram
participant User
participant React
participant Socket
participant Express
participant Agent
participant OpenAI
User->>React: Send message
React->>Express: POST /message
Express->>Agent: agent.ask(message)
Agent->>OpenAI: Stream request
loop Token streaming
OpenAI-->>Agent: Token
Agent-->>Express: Yield token
Express-->>Socket: Emit to room
Socket-->>React: streamResponse
React-->>User: Update UI
end
Express-->>Socket: streamEnd
```

## Backend Implementation
The backend uses async generators to handle the streaming. When a message comes in, the agent's `ask` method returns an async generator that yields chunks as they arrive.
> **Note:** The example below is simplified for clarity. The actual Lunark implementation uses a graph execution pattern with `graph.addTaskNode()` and `graph.run()` rather than direct `agent.ask()` calls.
```typescript:src/services/agent.service.ts
async *streamResponse(message: string, chatId: string, messageId: string, userId: string) {
const graph = await this.getOrCreateGraph(chatId);
const stream = await this.agent.ask(message, {
graphId: graph.id,
stream: true,
});
let fullResponse = '';
for await (const chunk of stream) {
fullResponse += chunk;
yield {
chatId,
messageId,
message: fullResponse,
role: 'lunark',
userId,
timestamp: new Date().toISOString(),
};
}
}
```
The message handler consumes this generator and emits to Socket.IO:
```typescript:src/routes/message.ts
app.post('/message', async (req, res) => {
const { chatId, content } = req.body;
const userAddress = req.userAddress;
// Generate messageId using timestamp
const messageId = Date.now().toString();
// Respond immediately
res.json({ success: true, messageId });
// Get socket.io instance
const io = getIO();
// Emit stream status
io.to(`chat:${chatId}`).emit('streamStatus', {
status: 'Lunark is thinking...',
});
// Stream asynchronously
try {
const agent = new LunarkAgent(userAddress, req.chainId);
await agent.initialize();
let fullResponse = '';
const responseStream = await agent.ask(content, chatId);
for await (const chunk of responseStream) {
fullResponse += chunk;
io.to(`chat:${chatId}`).emit('streamResponse', {
chatId,
messageId,
message: fullResponse,
role: 'lunark',
userId: user.id,
timestamp: new Date().toISOString(),
});
}
io.to(`chat:${chatId}`).emit('streamEnd', { chatId, messageId });
await agent.destroy();
} catch (error) {
io.to(`chat:${chatId}`).emit('streamError', { error: error.message });
}
});
```
Notice that the HTTP response returns immediately. The actual streaming happens asynchronously, with updates pushed via WebSocket.
## Room-Based Broadcasting
Socket.IO's room system is perfect for chat applications. Each chat session gets its own room, and each user gets a personal room for account-specific events:
```typescript:src/socket/index.ts
io.on('connection', (socket) => {
socket.on('authenticate', ({ address }) => {
// Join user's personal room
socket.join(`user:${address.toLowerCase()}`);
socket.emit('authenticated', { address });
});
socket.on('joinChat', ({ chatId }) => {
// Join chat-specific room
socket.join(`chat:${chatId}`);
socket.emit('joinedChat', { chatId });
});
});
// Emit to everyone in a chat
function emitToChat(chatId: string, event: string, data: any) {
io.to(`chat:${chatId}`).emit(event, data);
}
// Emit to a specific user (for transactions, network switches)
function emitToUser(address: string, event: string, data: any) {
io.to(`user:${address.toLowerCase()}`).emit(event, data);
}
```
This separation is important. Chat messages go to the chat room (in case we ever support collaborative sessions), while transaction notifications go directly to the user's room.

## The React Batching Problem
Here's where things got interesting. React 18 batches state updates for performance. When Socket.IO fires rapidly (multiple tokens per second), React batches those updates together, causing the UI to jump forward in chunks instead of streaming smoothly.
The solution is `flushSync`, which forces React to flush updates immediately:
```typescript:src/hooks/useSocket.ts
import { flushSync } from 'react-dom';
socket.on('streamResponse', (message: Message) => {
flushSync(() => {
setMessages((prev) => {
const index = prev.findIndex((m) => m.id === message.id);
if (index === -1) {
// New message, add to array
return [...prev, message];
}
// Existing message, update in place
const updated = [...prev];
updated[index] = message;
return updated;
});
});
});
```
Without `flushSync`, you'd see updates every 50-100ms as React batches them. With it, each token appears immediately.
## Auto-Scroll Behavior
Chat interfaces need smart scrolling. Auto-scroll when new messages arrive, but stop auto-scrolling if the user scrolls up to review history.
```typescript:src/components/ChatContainer.tsx
const containerRef = useRef<HTMLDivElement>(null);
const [userScrolled, setUserScrolled] = useState(false);
const scrollTimeoutRef = useRef<NodeJS.Timeout>();
// Detect manual scrolling
const handleScroll = () => {
const container = containerRef.current;
if (!container) return;
const isNearBottom =
container.scrollHeight - container.scrollTop - container.clientHeight < 100;
if (!isNearBottom) {
setUserScrolled(true);
// Reset after 2 seconds of no scrolling
clearTimeout(scrollTimeoutRef.current);
scrollTimeoutRef.current = setTimeout(() => {
setUserScrolled(false);
}, 2000);
} else {
setUserScrolled(false);
}
};
// Auto-scroll on new messages
useEffect(() => {
if (!userScrolled && containerRef.current) {
requestAnimationFrame(() => {
containerRef.current?.scrollTo({
top: containerRef.current.scrollHeight,
behavior: 'smooth',
});
});
}
}, [messages, userScrolled]);
```
The `requestAnimationFrame` ensures smooth scrolling that doesn't fight with React's rendering cycle.
## Transaction Events
When the AI agent prepares a blockchain transaction, it needs to display in the chat and wait for user confirmation. This uses a different socket event:
```typescript:src/services/transaction.service.ts
// Backend emits when transaction is ready
emitToUser(userAddress, 'pendingTransaction', {
id: transactionId,
chatId,
type: 'transfer',
transaction: { to, value, data },
details: { recipient, amount, token },
buttonText: 'Send 0.5 ETH',
});
```
The frontend listens and attaches the transaction to the appropriate message:
```typescript:src/hooks/useSocket.ts
socket.on('pendingTransaction', (tx: PendingTransaction) => {
setPendingTransaction(tx);
// Retry logic for race conditions
const attachToMessage = () => {
setMessages((prev) => {
const lastLunarkMessage = [...prev]
.reverse()
.find((m) => m.role === 'lunark');
if (lastLunarkMessage) {
lastLunarkMessage.pendingTransaction = tx;
return [...prev];
}
return prev;
});
};
// The message might not exist yet, retry with increasing intervals
const retryIntervals = [100, 300, 500, 1000, 2000];
attachToMessage();
retryIntervals.forEach((ms) => setTimeout(attachToMessage, ms));
});
```
The retry logic handles a race condition: the transaction event might arrive before the streaming message that references it.
## Network Switching
When the agent decides to switch networks (e.g., user asks to "check my balance on Polygon"), it emits a network switch event:
```typescript:src/services/network.service.ts
// Backend
emitToUser(userAddress, 'networkSwitch', {
chainId: 137,
name: 'Polygon',
symbol: 'MATIC',
rpcUrl: 'https://polygon-rpc.com',
});
```
The frontend uses the wallet's API to request the switch:
```typescript:src/hooks/useSocket.ts
socket.on('networkSwitch', async ({ chainId, name }) => {
try {
await window.ethereum.request({
method: 'wallet_switchEthereumChain',
params: [{ chainId: `0x${chainId.toString(16)}` }],
});
toast.success(`Switched to ${name}`);
} catch (error) {
if (error.code === 4902) {
// Chain not added to wallet
toast.error(`Please add ${name} to your wallet`);
}
}
});
```
## Connection Management
Socket connections need careful lifecycle management. Connect when the user authenticates, disconnect on logout, handle reconnection gracefully:
```typescript:src/providers/SocketProvider.tsx
const SocketProvider = ({ children }) => {
const { user, isAuthenticated } = useUser();
const [socket, setSocket] = useState<Socket | null>(null);
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
if (!isAuthenticated || !user?.address) {
socket?.disconnect();
setSocket(null);
return;
}
const newSocket = io(SOCKET_URL, {
transports: ['websocket', 'polling'],
reconnectionAttempts: 3,
reconnectionDelay: 1000,
timeout: 5000,
});
newSocket.on('connect', () => {
setIsConnected(true);
newSocket.emit('authenticate', { address: user.address });
});
newSocket.on('disconnect', () => {
setIsConnected(false);
});
setSocket(newSocket);
return () => {
newSocket.disconnect();
};
}, [isAuthenticated, user?.address]);
return (
<SocketContext.Provider value={{ socket, isConnected }}>
{children}
</SocketContext.Provider>
);
};
```
## Stream Abortion
Users should be able to stop a streaming response. This requires coordination between frontend and backend:
```typescript:src/hooks/useChat.ts
// Frontend
const stopStream = () => {
socket.emit('streamAbort', { chatId: currentChatId });
setIsStreaming(false);
};
// Backend
socket.on('streamAbort', ({ chatId }) => {
console.log(`Stream abort requested for chat: ${chatId}`);
// Handle stream abort logic here if needed
});
```