fix(chat): thinking execution graph (#880)
This commit is contained in:
@@ -8,6 +8,8 @@ interface ExecutionGraphCardProps {
|
||||
agentLabel: string;
|
||||
steps: TaskStep[];
|
||||
active: boolean;
|
||||
/** Hide the trailing "Thinking ..." indicator even when active. */
|
||||
suppressThinking?: boolean;
|
||||
/**
|
||||
* When provided, the card becomes fully controlled: the parent owns the
|
||||
* expand state (e.g. to persist across remounts) and toggling goes through
|
||||
@@ -149,6 +151,7 @@ export function ExecutionGraphCard({
|
||||
agentLabel,
|
||||
steps,
|
||||
active,
|
||||
suppressThinking = false,
|
||||
expanded: controlledExpanded,
|
||||
onExpandedChange,
|
||||
}: ExecutionGraphCardProps) {
|
||||
@@ -175,7 +178,7 @@ export function ExecutionGraphCard({
|
||||
|
||||
const toolCount = steps.filter((step) => step.kind === 'tool').length;
|
||||
const processCount = steps.length - toolCount;
|
||||
const shouldShowTrailingThinking = active;
|
||||
const shouldShowTrailingThinking = active && !suppressThinking;
|
||||
|
||||
if (!expanded) {
|
||||
return (
|
||||
|
||||
@@ -187,11 +187,25 @@ export function Chat() {
|
||||
|
||||
const isEmpty = messages.length === 0 && !sending;
|
||||
const subagentCompletionInfos = messages.map((message) => parseSubagentCompletionInfo(message));
|
||||
// Build an index of the *next* real user message after each position.
|
||||
// Gateway history may contain `role: 'user'` messages that are actually
|
||||
// tool-result wrappers (Anthropic API format). These must NOT split
|
||||
// the run into multiple segments — only genuine user-authored messages
|
||||
// should act as run boundaries.
|
||||
const isRealUserMessage = (msg: RawMessage): boolean => {
|
||||
if (msg.role !== 'user') return false;
|
||||
const content = msg.content;
|
||||
if (!Array.isArray(content)) return true;
|
||||
// If every block in the content is a tool_result, this is a Gateway
|
||||
// tool-result wrapper, not a real user message.
|
||||
const blocks = content as Array<{ type?: string }>;
|
||||
return blocks.length === 0 || !blocks.every((b) => b.type === 'tool_result');
|
||||
};
|
||||
const nextUserMessageIndexes = new Array<number>(messages.length).fill(-1);
|
||||
let nextUserMessageIndex = -1;
|
||||
for (let idx = messages.length - 1; idx >= 0; idx -= 1) {
|
||||
nextUserMessageIndexes[idx] = nextUserMessageIndex;
|
||||
if (messages[idx].role === 'user' && !subagentCompletionInfos[idx]) {
|
||||
if (isRealUserMessage(messages[idx]) && !subagentCompletionInfos[idx]) {
|
||||
nextUserMessageIndex = idx;
|
||||
}
|
||||
}
|
||||
@@ -202,7 +216,7 @@ export function Chat() {
|
||||
const foldedNarrationIndices = new Set<number>();
|
||||
|
||||
const userRunCards: UserRunCard[] = messages.flatMap((message, idx) => {
|
||||
if (message.role !== 'user' || subagentCompletionInfos[idx]) return [];
|
||||
if (!isRealUserMessage(message) || subagentCompletionInfos[idx]) return [];
|
||||
|
||||
const runKey = message.id
|
||||
? `msg-${message.id}`
|
||||
@@ -213,7 +227,27 @@ export function Chat() {
|
||||
const completionInfos = subagentCompletionInfos
|
||||
.slice(idx + 1, segmentEnd)
|
||||
.filter((value): value is NonNullable<typeof value> => value != null);
|
||||
const isLatestOpenRun = nextUserIndex === -1 && (sending || pendingFinal || hasAnyStreamContent);
|
||||
// A run is considered "open" (still active) when it's the last segment
|
||||
// AND at least one of:
|
||||
// - sending/pendingFinal/streaming data (normal streaming path)
|
||||
// - segment has tool calls but no pure-text final reply yet (server-side
|
||||
// tool execution — Gateway fires phase "end" per tool round which
|
||||
// briefly clears sending, but the run is still in progress)
|
||||
const hasToolActivity = segmentMessages.some((m) =>
|
||||
m.role === 'assistant' && extractToolUse(m).length > 0,
|
||||
);
|
||||
const hasFinalReply = segmentMessages.some((m) => {
|
||||
if (m.role !== 'assistant') return false;
|
||||
if (extractText(m).trim().length === 0) return false;
|
||||
const content = m.content;
|
||||
if (!Array.isArray(content)) return true;
|
||||
return !(content as Array<{ type?: string }>).some(
|
||||
(b) => b.type === 'tool_use' || b.type === 'toolCall',
|
||||
);
|
||||
});
|
||||
const runStillExecutingTools = hasToolActivity && !hasFinalReply;
|
||||
const isLatestOpenRun = nextUserIndex === -1
|
||||
&& (sending || pendingFinal || hasAnyStreamContent || runStillExecutingTools);
|
||||
const replyIndexOffset = findReplyMessageIndex(segmentMessages, isLatestOpenRun);
|
||||
const replyIndex = replyIndexOffset === -1 ? null : idx + 1 + replyIndexOffset;
|
||||
|
||||
@@ -266,7 +300,9 @@ export function Chat() {
|
||||
// 2. `allToolsCompleted` — all entries in streamingTools are completed
|
||||
// 3. `hasCompletedToolPhase` — historical messages (loaded by the poll)
|
||||
// contain tool_use blocks, meaning the Gateway executed tools
|
||||
// server-side without sending streaming tool events to the client
|
||||
// server-side without sending streaming tool events to the client.
|
||||
// During intermediate narration (before reply), stripProcessMessagePrefix
|
||||
// will produce an empty trimmedReplyText, so the graph stays active.
|
||||
const allToolsCompleted = streamingTools.length > 0 && !hasRunningStreamToolStatus;
|
||||
const hasCompletedToolPhase = segmentMessages.some((msg) =>
|
||||
msg.role === 'assistant' && extractToolUse(msg).length > 0,
|
||||
@@ -309,6 +345,13 @@ export function Chat() {
|
||||
}
|
||||
const cached = graphStepCache[runKey];
|
||||
if (!cached) return [];
|
||||
// The cache was captured during streaming and may contain stream-
|
||||
// generated message steps that include accumulated narration + reply
|
||||
// text. Strip these out — historical message steps (from messages[])
|
||||
// will be properly recomputed on the next render with fresh data.
|
||||
const cleanedSteps = cached.steps.filter(
|
||||
(s) => !(s.kind === 'message' && s.id.startsWith('stream-message')),
|
||||
);
|
||||
return [{
|
||||
triggerIndex: idx,
|
||||
replyIndex: cached.replyIndex,
|
||||
@@ -316,8 +359,8 @@ export function Chat() {
|
||||
agentLabel: cached.agentLabel,
|
||||
sessionLabel: cached.sessionLabel,
|
||||
segmentEnd: nextUserIndex === -1 ? messages.length - 1 : nextUserIndex - 1,
|
||||
steps: cached.steps,
|
||||
messageStepTexts: getPrimaryMessageStepTexts(cached.steps),
|
||||
steps: cleanedSteps,
|
||||
messageStepTexts: getPrimaryMessageStepTexts(cleanedSteps),
|
||||
streamingReplyText: null,
|
||||
}];
|
||||
}
|
||||
@@ -345,10 +388,17 @@ export function Chat() {
|
||||
foldedNarrationIndices.add(idx + 1 + offset);
|
||||
}
|
||||
|
||||
// The graph should stay "active" (expanded, can show trailing thinking)
|
||||
// for the entire duration of the run — not just until a streaming reply
|
||||
// appears. Tying active to streamingReplyText caused a flicker: a brief
|
||||
// active→false→true transition collapsed the graph via ExecutionGraphCard's
|
||||
// uncontrolled path before the controlled `expanded` override could kick in.
|
||||
const cardActive = isLatestOpenRun;
|
||||
|
||||
return [{
|
||||
triggerIndex: idx,
|
||||
replyIndex,
|
||||
active: isLatestOpenRun && streamingReplyText == null,
|
||||
active: cardActive,
|
||||
agentLabel: segmentAgentLabel,
|
||||
sessionLabel: segmentSessionLabel,
|
||||
segmentEnd: nextUserIndex === -1 ? messages.length - 1 : nextUserIndex - 1,
|
||||
@@ -358,17 +408,20 @@ export function Chat() {
|
||||
}];
|
||||
});
|
||||
const hasActiveExecutionGraph = userRunCards.some((card) => card.active);
|
||||
const replyTextOverrides = new Map<number, string>();
|
||||
for (const card of userRunCards) {
|
||||
if (card.replyIndex == null) continue;
|
||||
const replyMessage = messages[card.replyIndex];
|
||||
if (!replyMessage || replyMessage.role !== 'assistant') continue;
|
||||
const fullReplyText = extractText(replyMessage);
|
||||
const trimmedReplyText = stripProcessMessagePrefix(fullReplyText, card.messageStepTexts);
|
||||
if (trimmedReplyText !== fullReplyText) {
|
||||
replyTextOverrides.set(card.replyIndex, trimmedReplyText);
|
||||
const replyTextOverrides = useMemo(() => {
|
||||
const map = new Map<number, string>();
|
||||
for (const card of userRunCards) {
|
||||
if (card.replyIndex == null) continue;
|
||||
const replyMessage = messages[card.replyIndex];
|
||||
if (!replyMessage || replyMessage.role !== 'assistant') continue;
|
||||
const fullReplyText = extractText(replyMessage);
|
||||
const trimmedReplyText = stripProcessMessagePrefix(fullReplyText, card.messageStepTexts);
|
||||
if (trimmedReplyText !== fullReplyText) {
|
||||
map.set(card.replyIndex, trimmedReplyText);
|
||||
}
|
||||
}
|
||||
}
|
||||
return map;
|
||||
}, [userRunCards, messages]);
|
||||
const streamingReplyText = userRunCards.find((card) => card.streamingReplyText != null)?.streamingReplyText ?? null;
|
||||
|
||||
// Derive the set of run keys that should be auto-collapsed (run finished
|
||||
@@ -378,12 +431,10 @@ export function Chat() {
|
||||
const autoCollapsedRunKeys = useMemo(() => {
|
||||
const keys = new Set<string>();
|
||||
for (const card of userRunCards) {
|
||||
// Auto-collapse once the reply is visible — either the streaming
|
||||
// reply bubble is already rendering (streamingReplyText != null)
|
||||
// or the run finished and we have a reply text override.
|
||||
const hasStreamingReply = card.streamingReplyText != null;
|
||||
const hasHistoricalReply = card.replyIndex != null && replyTextOverrides.has(card.replyIndex);
|
||||
const shouldCollapse = hasStreamingReply || hasHistoricalReply;
|
||||
// Auto-collapse once the run is complete and a final reply exists.
|
||||
// Don't collapse while the reply is still streaming.
|
||||
const isStillStreaming = card.streamingReplyText != null;
|
||||
const shouldCollapse = !isStillStreaming && !card.active && card.replyIndex != null;
|
||||
if (!shouldCollapse) continue;
|
||||
const triggerMsg = messages[card.triggerIndex];
|
||||
const runKey = triggerMsg?.id
|
||||
@@ -492,17 +543,22 @@ export function Chat() {
|
||||
? `msg-${triggerMsg.id}`
|
||||
: `${currentSessionKey}:trigger-${card.triggerIndex}`;
|
||||
const userOverride = graphExpandedOverrides[runKey];
|
||||
// Always use the controlled expanded prop instead of
|
||||
// relying on ExecutionGraphCard's uncontrolled state.
|
||||
// Uncontrolled state is lost on remount (key changes
|
||||
// when loadHistory replaces message ids), causing
|
||||
// spurious collapse. The controlled prop survives
|
||||
// remounts because it's computed fresh each render.
|
||||
const expanded = userOverride != null
|
||||
? userOverride
|
||||
: autoCollapsedRunKeys.has(runKey)
|
||||
? false
|
||||
: undefined;
|
||||
: !autoCollapsedRunKeys.has(runKey);
|
||||
return (
|
||||
<ExecutionGraphCard
|
||||
key={`graph-${runKey}`}
|
||||
key={`graph-${currentSessionKey}:${card.triggerIndex}`}
|
||||
agentLabel={card.agentLabel}
|
||||
steps={card.steps}
|
||||
active={card.active}
|
||||
suppressThinking={card.streamingReplyText != null}
|
||||
expanded={expanded}
|
||||
onExpandedChange={(next) =>
|
||||
setGraphExpandedOverrides((prev) => ({ ...prev, [runKey]: next }))
|
||||
@@ -514,21 +570,37 @@ export function Chat() {
|
||||
);
|
||||
})}
|
||||
|
||||
{/* Streaming message */}
|
||||
{shouldRenderStreaming && !hasActiveExecutionGraph && (
|
||||
{/* Streaming message — render when reply text is separated from graph,
|
||||
OR when there's streaming content without an active graph */}
|
||||
{shouldRenderStreaming && (streamingReplyText != null || !hasActiveExecutionGraph) && (
|
||||
<ChatMessage
|
||||
message={(streamMsg
|
||||
? {
|
||||
...(streamMsg as Record<string, unknown>),
|
||||
role: (typeof streamMsg.role === 'string' ? streamMsg.role : 'assistant') as RawMessage['role'],
|
||||
content: streamMsg.content ?? streamText,
|
||||
timestamp: streamMsg.timestamp ?? streamingTimestamp,
|
||||
}
|
||||
: {
|
||||
role: 'assistant',
|
||||
content: streamText,
|
||||
timestamp: streamingTimestamp,
|
||||
}) as RawMessage}
|
||||
message={(() => {
|
||||
const base = streamMsg
|
||||
? {
|
||||
...(streamMsg as Record<string, unknown>),
|
||||
role: (typeof streamMsg.role === 'string' ? streamMsg.role : 'assistant') as RawMessage['role'],
|
||||
content: streamMsg.content ?? streamText,
|
||||
timestamp: streamMsg.timestamp ?? streamingTimestamp,
|
||||
}
|
||||
: {
|
||||
role: 'assistant' as const,
|
||||
content: streamText,
|
||||
timestamp: streamingTimestamp,
|
||||
};
|
||||
// When the reply renders as a separate bubble, strip
|
||||
// thinking blocks from the message — they belong to
|
||||
// the execution phase and are already omitted from
|
||||
// the graph via omitLastStreamingMessageSegment.
|
||||
if (streamingReplyText != null && Array.isArray(base.content)) {
|
||||
return {
|
||||
...base,
|
||||
content: (base.content as Array<{ type?: string }>).filter(
|
||||
(block) => block.type !== 'thinking',
|
||||
),
|
||||
} as RawMessage;
|
||||
}
|
||||
return base as RawMessage;
|
||||
})()}
|
||||
textOverride={streamingReplyText ?? undefined}
|
||||
isStreaming
|
||||
streamingTools={streamingReplyText != null ? [] : streamingTools}
|
||||
@@ -575,7 +647,7 @@ export function Chat() {
|
||||
onSend={sendMessage}
|
||||
onStop={abortRun}
|
||||
disabled={!isGatewayRunning}
|
||||
sending={sending}
|
||||
sending={sending || hasActiveExecutionGraph}
|
||||
isEmpty={isEmpty}
|
||||
/>
|
||||
|
||||
|
||||
@@ -2,12 +2,10 @@ import { invokeIpc } from '@/lib/api-client';
|
||||
import { hostApiFetch } from '@/lib/host-api';
|
||||
import { useGatewayStore } from '@/stores/gateway';
|
||||
import {
|
||||
clearHistoryPoll,
|
||||
enrichWithCachedImages,
|
||||
enrichWithToolResultFiles,
|
||||
getLatestOptimisticUserMessage,
|
||||
getMessageText,
|
||||
hasNonToolAssistantContent,
|
||||
isInternalMessage,
|
||||
isToolResultRole,
|
||||
loadMissingPreviews,
|
||||
@@ -160,6 +158,18 @@ export function createHistoryActions(
|
||||
return toMs(msg.timestamp) >= userMsTs;
|
||||
};
|
||||
|
||||
// If we're sending but haven't received streaming events, check
|
||||
// whether the loaded history reveals assistant activity (tool calls,
|
||||
// narration, etc.). Setting pendingFinal surfaces the execution
|
||||
// graph / activity indicator in the UI.
|
||||
//
|
||||
// Note: we intentionally do NOT set sending=false here. Run
|
||||
// completion is exclusively signalled by the Gateway's phase
|
||||
// 'completed' event (handled in gateway.ts) or by receiving a
|
||||
// 'final' streaming event (handled in runtime-event-handlers.ts).
|
||||
// Attempting to infer completion from message history is fragile
|
||||
// and leads to premature sending=false during server-side tool
|
||||
// execution.
|
||||
if (isSendingNow && !pendingFinal) {
|
||||
const hasRecentAssistantActivity = [...filteredMessages].reverse().some((msg) => {
|
||||
if (msg.role !== 'assistant') return false;
|
||||
@@ -169,25 +179,6 @@ export function createHistoryActions(
|
||||
set({ pendingFinal: true });
|
||||
}
|
||||
}
|
||||
|
||||
// If pendingFinal, check whether the AI produced a final text response.
|
||||
// Only finalize when the candidate is the very last message in the
|
||||
// history — intermediate assistant messages (narration + tool_use) are
|
||||
// followed by tool-result messages and must NOT be treated as the
|
||||
// completed response, otherwise `pendingFinal` is cleared too early
|
||||
// and the streaming reply bubble never renders.
|
||||
if (pendingFinal || get().pendingFinal) {
|
||||
const recentAssistant = [...filteredMessages].reverse().find((msg) => {
|
||||
if (msg.role !== 'assistant') return false;
|
||||
if (!hasNonToolAssistantContent(msg)) return false;
|
||||
return isAfterUserMsg(msg);
|
||||
});
|
||||
const lastMsg = filteredMessages[filteredMessages.length - 1];
|
||||
if (recentAssistant && lastMsg === recentAssistant) {
|
||||
clearHistoryPoll();
|
||||
set({ sending: false, activeRunId: null, pendingFinal: false });
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user