feat(chat): reformat streaming output (#173)
This commit is contained in:
@@ -109,6 +109,24 @@ interface ChatState {
|
||||
// between tool-result finals and the next delta.
|
||||
let _lastChatEventAt = 0;
|
||||
|
||||
/** Normalize a timestamp to milliseconds. Handles both seconds and ms. */
|
||||
function toMs(ts: number): number {
|
||||
// Timestamps < 1e12 are in seconds (before ~2033); >= 1e12 are milliseconds
|
||||
return ts < 1e12 ? ts * 1000 : ts;
|
||||
}
|
||||
|
||||
// Timer for fallback history polling during active sends.
|
||||
// If no streaming events arrive within a few seconds, we periodically
|
||||
// poll chat.history to surface intermediate tool-call turns.
|
||||
let _historyPollTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
function clearHistoryPoll(): void {
|
||||
if (_historyPollTimer) {
|
||||
clearTimeout(_historyPollTimer);
|
||||
_historyPollTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
const DEFAULT_CANONICAL_PREFIX = 'agent:main';
|
||||
const DEFAULT_SESSION_KEY = `${DEFAULT_CANONICAL_PREFIX}:main`;
|
||||
|
||||
@@ -1014,6 +1032,7 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
if (result.success && result.result) {
|
||||
const data = result.result;
|
||||
const rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : [];
|
||||
|
||||
// Before filtering: attach images/files from tool_result messages to the next assistant message
|
||||
const messagesWithToolImages = enrichWithToolResultFiles(rawMessages);
|
||||
const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role));
|
||||
@@ -1037,15 +1056,36 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
});
|
||||
}
|
||||
});
|
||||
const { pendingFinal, lastUserMessageAt } = get();
|
||||
if (pendingFinal) {
|
||||
const { pendingFinal, lastUserMessageAt, sending: isSendingNow } = get();
|
||||
|
||||
// If we're sending but haven't received streaming events, check
|
||||
// whether the loaded history reveals intermediate tool-call activity.
|
||||
// This surfaces progress via the pendingFinal → ActivityIndicator path.
|
||||
const userMsTs = lastUserMessageAt ? toMs(lastUserMessageAt) : 0;
|
||||
const isAfterUserMsg = (msg: RawMessage): boolean => {
|
||||
if (!userMsTs || !msg.timestamp) return true;
|
||||
return toMs(msg.timestamp) >= userMsTs;
|
||||
};
|
||||
|
||||
if (isSendingNow && !pendingFinal) {
|
||||
const hasRecentAssistantActivity = [...filteredMessages].reverse().some((msg) => {
|
||||
if (msg.role !== 'assistant') return false;
|
||||
return isAfterUserMsg(msg);
|
||||
});
|
||||
if (hasRecentAssistantActivity) {
|
||||
set({ pendingFinal: true });
|
||||
}
|
||||
}
|
||||
|
||||
// If pendingFinal, check whether the AI produced a final text response.
|
||||
if (pendingFinal || get().pendingFinal) {
|
||||
const recentAssistant = [...filteredMessages].reverse().find((msg) => {
|
||||
if (msg.role !== 'assistant') return false;
|
||||
if (!hasNonToolAssistantContent(msg)) return false;
|
||||
if (lastUserMessageAt && msg.timestamp && msg.timestamp < lastUserMessageAt) return false;
|
||||
return true;
|
||||
return isAfterUserMsg(msg);
|
||||
});
|
||||
if (recentAssistant) {
|
||||
clearHistoryPoll();
|
||||
set({ sending: false, activeRunId: null, pendingFinal: false });
|
||||
}
|
||||
}
|
||||
@@ -1067,10 +1107,11 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
const { currentSessionKey } = get();
|
||||
|
||||
// Add user message optimistically (with local file metadata for UI display)
|
||||
const nowMs = Date.now();
|
||||
const userMsg: RawMessage = {
|
||||
role: 'user',
|
||||
content: trimmed || (attachments?.length ? '(file attached)' : ''),
|
||||
timestamp: Date.now() / 1000,
|
||||
timestamp: nowMs / 1000,
|
||||
id: crypto.randomUUID(),
|
||||
_attachedFiles: attachments?.map(a => ({
|
||||
fileName: a.fileName,
|
||||
@@ -1088,13 +1129,55 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
streamingMessage: null,
|
||||
streamingTools: [],
|
||||
pendingFinal: false,
|
||||
lastUserMessageAt: userMsg.timestamp ?? null,
|
||||
lastUserMessageAt: nowMs,
|
||||
}));
|
||||
|
||||
// Start the history poll and safety timeout IMMEDIATELY (before the
|
||||
// RPC await) because the gateway's chat.send RPC may block until the
|
||||
// entire agentic conversation finishes — the poll must run in parallel.
|
||||
_lastChatEventAt = Date.now();
|
||||
clearHistoryPoll();
|
||||
|
||||
const POLL_START_DELAY = 3_000;
|
||||
const POLL_INTERVAL = 4_000;
|
||||
const pollHistory = () => {
|
||||
const state = get();
|
||||
if (!state.sending) { clearHistoryPoll(); return; }
|
||||
if (state.streamingMessage) {
|
||||
_historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL);
|
||||
return;
|
||||
}
|
||||
state.loadHistory(true);
|
||||
_historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL);
|
||||
};
|
||||
_historyPollTimer = setTimeout(pollHistory, POLL_START_DELAY);
|
||||
|
||||
const SAFETY_TIMEOUT_MS = 90_000;
|
||||
const checkStuck = () => {
|
||||
const state = get();
|
||||
if (!state.sending) return;
|
||||
if (state.streamingMessage || state.streamingText) return;
|
||||
if (state.pendingFinal) {
|
||||
setTimeout(checkStuck, 10_000);
|
||||
return;
|
||||
}
|
||||
if (Date.now() - _lastChatEventAt < SAFETY_TIMEOUT_MS) {
|
||||
setTimeout(checkStuck, 10_000);
|
||||
return;
|
||||
}
|
||||
clearHistoryPoll();
|
||||
set({
|
||||
error: 'No response received from the model. The provider may be unavailable or the API key may have insufficient quota. Please check your provider settings.',
|
||||
sending: false,
|
||||
activeRunId: null,
|
||||
lastUserMessageAt: null,
|
||||
});
|
||||
};
|
||||
setTimeout(checkStuck, 30_000);
|
||||
|
||||
try {
|
||||
const idempotencyKey = crypto.randomUUID();
|
||||
const hasMedia = attachments && attachments.length > 0;
|
||||
console.log(`[sendMessage] hasMedia=${hasMedia}, attachmentCount=${attachments?.length ?? 0}`);
|
||||
if (hasMedia) {
|
||||
console.log('[sendMessage] Media paths:', attachments!.map(a => a.stagedPath));
|
||||
}
|
||||
@@ -1117,8 +1200,6 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
let result: { success: boolean; result?: { runId?: string }; error?: string };
|
||||
|
||||
if (hasMedia) {
|
||||
// Use dedicated chat:sendWithMedia handler — main process reads staged files
|
||||
// from disk and builds base64 attachments, avoiding large IPC transfers
|
||||
result = await window.electron.ipcRenderer.invoke(
|
||||
'chat:sendWithMedia',
|
||||
{
|
||||
@@ -1134,7 +1215,6 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
},
|
||||
) as { success: boolean; result?: { runId?: string }; error?: string };
|
||||
} else {
|
||||
// No media — use standard lightweight RPC
|
||||
result = await window.electron.ipcRenderer.invoke(
|
||||
'gateway:rpc',
|
||||
'chat.send',
|
||||
@@ -1147,50 +1227,16 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
) as { success: boolean; result?: { runId?: string }; error?: string };
|
||||
}
|
||||
|
||||
console.log(`[sendMessage] RPC result: success=${result.success}, error=${result.error || 'none'}, runId=${result.result?.runId || 'none'}`);
|
||||
console.log(`[sendMessage] RPC result: success=${result.success}, runId=${result.result?.runId || 'none'}`);
|
||||
|
||||
if (!result.success) {
|
||||
clearHistoryPoll();
|
||||
set({ error: result.error || 'Failed to send message', sending: false });
|
||||
} else if (result.result?.runId) {
|
||||
set({ activeRunId: result.result.runId });
|
||||
} else {
|
||||
// No runId from gateway; keep sending state and wait for events.
|
||||
}
|
||||
|
||||
// Safety timeout: if we're still in "sending" state without receiving
|
||||
// any chat event for SAFETY_TIMEOUT_MS, the run likely failed silently
|
||||
// (e.g. provider error not surfaced as a chat event). Surface the error
|
||||
// to the user instead of leaving an infinite spinner.
|
||||
//
|
||||
// The timeout is based on the last received event (not the original send
|
||||
// time) so that long-running tool-use conversations don't trigger false
|
||||
// positives — each received event resets the clock.
|
||||
if (result.success) {
|
||||
_lastChatEventAt = Date.now();
|
||||
const SAFETY_TIMEOUT_MS = 90_000;
|
||||
const checkStuck = () => {
|
||||
const state = get();
|
||||
if (!state.sending) return;
|
||||
if (state.streamingMessage || state.streamingText) return;
|
||||
// Still between tool cycles (pendingFinal) — the model is working
|
||||
if (state.pendingFinal) {
|
||||
setTimeout(checkStuck, 10_000);
|
||||
return;
|
||||
}
|
||||
if (Date.now() - _lastChatEventAt < SAFETY_TIMEOUT_MS) {
|
||||
setTimeout(checkStuck, 10_000);
|
||||
return;
|
||||
}
|
||||
set({
|
||||
error: 'No response received from the model. The provider may be unavailable or the API key may have insufficient quota. Please check your provider settings.',
|
||||
sending: false,
|
||||
activeRunId: null,
|
||||
lastUserMessageAt: null,
|
||||
});
|
||||
};
|
||||
setTimeout(checkStuck, 30_000);
|
||||
}
|
||||
} catch (err) {
|
||||
clearHistoryPoll();
|
||||
set({ error: String(err), sending: false });
|
||||
}
|
||||
},
|
||||
@@ -1198,6 +1244,7 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
// ── Abort active run ──
|
||||
|
||||
abortRun: async () => {
|
||||
clearHistoryPoll();
|
||||
const { currentSessionKey } = get();
|
||||
set({ sending: false, streamingText: '', streamingMessage: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [] });
|
||||
set({ streamingTools: [] });
|
||||
@@ -1226,21 +1273,27 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
_lastChatEventAt = Date.now();
|
||||
|
||||
// Defensive: if state is missing but we have a message, try to infer state.
|
||||
// This handles the case where the Gateway sends events without a state wrapper
|
||||
// (e.g., protocol events where payload is the raw message).
|
||||
let resolvedState = eventState;
|
||||
if (!resolvedState && event.message && typeof event.message === 'object') {
|
||||
const msg = event.message as Record<string, unknown>;
|
||||
const stopReason = msg.stopReason ?? msg.stop_reason;
|
||||
if (stopReason) {
|
||||
// Message has a stopReason → it's a final message
|
||||
resolvedState = 'final';
|
||||
} else if (msg.role || msg.content) {
|
||||
// Message has role/content but no stopReason → treat as delta (streaming)
|
||||
resolvedState = 'delta';
|
||||
}
|
||||
}
|
||||
|
||||
// Only pause the history poll when we receive actual streaming data.
|
||||
// The gateway sends "agent" events with { phase, startedAt } that carry
|
||||
// no message — these must NOT kill the poll, since the poll is our only
|
||||
// way to track progress when the gateway doesn't stream intermediate turns.
|
||||
const hasUsefulData = resolvedState === 'delta' || resolvedState === 'final'
|
||||
|| resolvedState === 'error' || resolvedState === 'aborted';
|
||||
if (hasUsefulData) {
|
||||
clearHistoryPoll();
|
||||
}
|
||||
|
||||
switch (resolvedState) {
|
||||
case 'delta': {
|
||||
// Streaming update - store the cumulative message
|
||||
@@ -1385,6 +1438,7 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
// After the final response, quietly reload history to surface all intermediate
|
||||
// tool-use turns (thinking + tool blocks) from the Gateway's authoritative record.
|
||||
if (hasOutput && !toolOnly) {
|
||||
clearHistoryPoll();
|
||||
void get().loadHistory(true);
|
||||
}
|
||||
} else {
|
||||
@@ -1395,6 +1449,7 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
break;
|
||||
}
|
||||
case 'error': {
|
||||
clearHistoryPoll();
|
||||
const errorMsg = String(event.errorMessage || 'An error occurred');
|
||||
set({
|
||||
error: errorMsg,
|
||||
@@ -1410,6 +1465,7 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
break;
|
||||
}
|
||||
case 'aborted': {
|
||||
clearHistoryPoll();
|
||||
set({
|
||||
sending: false,
|
||||
activeRunId: null,
|
||||
|
||||
Reference in New Issue
Block a user