refactor(chat): execution graph optimize (#873)
Co-authored-by: Haze <hazeone@users.noreply.github.com>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { extractThinking, extractToolUse } from './message-utils';
|
||||
import { extractText, extractTextSegments, extractThinkingSegments, extractToolUse } from './message-utils';
|
||||
import type { RawMessage, ToolStatus } from '@/stores/chat';
|
||||
|
||||
export type TaskStepStatus = 'running' | 'completed' | 'error';
|
||||
@@ -7,21 +7,45 @@ export interface TaskStep {
|
||||
id: string;
|
||||
label: string;
|
||||
status: TaskStepStatus;
|
||||
kind: 'thinking' | 'tool' | 'system';
|
||||
kind: 'thinking' | 'tool' | 'system' | 'message';
|
||||
detail?: string;
|
||||
depth: number;
|
||||
parentId?: string;
|
||||
}
|
||||
|
||||
const MAX_TASK_STEPS = 8;
|
||||
/**
|
||||
* Detects the index of the "final reply" assistant message in a run segment.
|
||||
*
|
||||
* The reply is the last assistant message that carries non-empty text
|
||||
* content, regardless of whether it ALSO carries tool calls. (Mixed
|
||||
* `text + toolCall` replies are rare but real — the model can emit a parting
|
||||
* text block alongside a final tool call. Treating such a message as the
|
||||
* reply avoids mis-protecting an earlier narration as the "answer" and
|
||||
* leaking the actual last text into the fold.)
|
||||
*
|
||||
* When this returns a non-negative index, the caller should avoid folding
|
||||
* that message's text into the graph (it is the answer the user sees in the
|
||||
* chat stream). When the run is still active (streaming) the final reply is
|
||||
* produced via `streamingMessage` instead, so callers pass
|
||||
* `hasStreamingReply = true` to skip protection and let every assistant-with-
|
||||
* text message in history be folded into the graph as narration.
|
||||
*/
|
||||
export function findReplyMessageIndex(messages: RawMessage[], hasStreamingReply: boolean): number {
|
||||
if (hasStreamingReply) return -1;
|
||||
for (let idx = messages.length - 1; idx >= 0; idx -= 1) {
|
||||
const message = messages[idx];
|
||||
if (!message || message.role !== 'assistant') continue;
|
||||
if (extractText(message).trim().length === 0) continue;
|
||||
return idx;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
interface DeriveTaskStepsInput {
|
||||
messages: RawMessage[];
|
||||
streamingMessage: unknown | null;
|
||||
streamingTools: ToolStatus[];
|
||||
sending: boolean;
|
||||
pendingFinal: boolean;
|
||||
showThinking: boolean;
|
||||
omitLastStreamingMessageSegment?: boolean;
|
||||
}
|
||||
|
||||
export interface SubagentCompletionInfo {
|
||||
@@ -128,7 +152,7 @@ function attachTopology(steps: TaskStep[]): TaskStep[] {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (step.kind === 'thinking') {
|
||||
if (step.kind === 'thinking' || step.kind === 'message') {
|
||||
withTopology.push({
|
||||
...step,
|
||||
depth: activeBranchNodeId ? 3 : 1,
|
||||
@@ -157,13 +181,37 @@ function attachTopology(steps: TaskStep[]): TaskStep[] {
|
||||
return withTopology;
|
||||
}
|
||||
|
||||
function appendDetailSegments(
|
||||
segments: string[],
|
||||
options: {
|
||||
idPrefix: string;
|
||||
label: string;
|
||||
kind: Extract<TaskStep['kind'], 'thinking' | 'message'>;
|
||||
running: boolean;
|
||||
upsertStep: (step: TaskStep) => void;
|
||||
},
|
||||
): void {
|
||||
const normalizedSegments = segments
|
||||
.map((segment) => normalizeText(segment))
|
||||
.filter((segment): segment is string => !!segment);
|
||||
|
||||
normalizedSegments.forEach((detail, index) => {
|
||||
options.upsertStep({
|
||||
id: `${options.idPrefix}-${index}`,
|
||||
label: options.label,
|
||||
status: options.running && index === normalizedSegments.length - 1 ? 'running' : 'completed',
|
||||
kind: options.kind,
|
||||
detail,
|
||||
depth: 1,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function deriveTaskSteps({
|
||||
messages,
|
||||
streamingMessage,
|
||||
streamingTools,
|
||||
sending,
|
||||
pendingFinal,
|
||||
showThinking,
|
||||
omitLastStreamingMessageSegment = false,
|
||||
}: DeriveTaskStepsInput): TaskStep[] {
|
||||
const steps: TaskStep[] = [];
|
||||
const stepIndexById = new Map<string, number>();
|
||||
@@ -187,30 +235,44 @@ export function deriveTaskSteps({
|
||||
? streamingMessage as RawMessage
|
||||
: null;
|
||||
|
||||
const relevantAssistantMessages = messages.filter((message) => {
|
||||
if (!message || message.role !== 'assistant') return false;
|
||||
if (extractToolUse(message).length > 0) return true;
|
||||
return showThinking && !!extractThinking(message);
|
||||
});
|
||||
// The final answer the user sees as a chat bubble. We avoid folding it into
|
||||
// the graph to prevent duplication. When a run is still streaming, the
|
||||
// reply lives in `streamingMessage`, so every pure-text assistant message in
|
||||
// `messages` is treated as intermediate narration.
|
||||
const replyIndex = findReplyMessageIndex(messages, streamMessage != null);
|
||||
|
||||
for (const [messageIndex, assistantMessage] of relevantAssistantMessages.entries()) {
|
||||
if (showThinking) {
|
||||
const thinking = extractThinking(assistantMessage);
|
||||
if (thinking) {
|
||||
upsertStep({
|
||||
id: `history-thinking-${assistantMessage.id || messageIndex}`,
|
||||
label: 'Thinking',
|
||||
status: 'completed',
|
||||
kind: 'thinking',
|
||||
detail: normalizeText(thinking),
|
||||
depth: 1,
|
||||
});
|
||||
}
|
||||
}
|
||||
for (const [messageIndex, message] of messages.entries()) {
|
||||
if (!message || message.role !== 'assistant') continue;
|
||||
|
||||
extractToolUse(assistantMessage).forEach((tool, index) => {
|
||||
appendDetailSegments(extractThinkingSegments(message), {
|
||||
idPrefix: `history-thinking-${message.id || messageIndex}`,
|
||||
label: 'Thinking',
|
||||
kind: 'thinking',
|
||||
running: false,
|
||||
upsertStep,
|
||||
});
|
||||
|
||||
const toolUses = extractToolUse(message);
|
||||
// Fold any intermediate assistant text into the graph as a narration
|
||||
// step — including text that lives on a mixed `text + toolCall` message.
|
||||
// The narration step is emitted BEFORE the tool steps so the graph
|
||||
// preserves the original ordering (the assistant "thinks out loud" and
|
||||
// then invokes the tool).
|
||||
const narrationSegments = extractTextSegments(message);
|
||||
const graphNarrationSegments = messageIndex === replyIndex
|
||||
? narrationSegments.slice(0, -1)
|
||||
: narrationSegments;
|
||||
appendDetailSegments(graphNarrationSegments, {
|
||||
idPrefix: `history-message-${message.id || messageIndex}`,
|
||||
label: 'Message',
|
||||
kind: 'message',
|
||||
running: false,
|
||||
upsertStep,
|
||||
});
|
||||
|
||||
toolUses.forEach((tool, index) => {
|
||||
upsertStep({
|
||||
id: tool.id || makeToolId(`history-tool-${assistantMessage.id || messageIndex}`, tool.name, index),
|
||||
id: tool.id || makeToolId(`history-tool-${message.id || messageIndex}`, tool.name, index),
|
||||
label: tool.name,
|
||||
status: 'completed',
|
||||
kind: 'tool',
|
||||
@@ -220,18 +282,29 @@ export function deriveTaskSteps({
|
||||
});
|
||||
}
|
||||
|
||||
if (streamMessage && showThinking) {
|
||||
const thinking = extractThinking(streamMessage);
|
||||
if (thinking) {
|
||||
upsertStep({
|
||||
id: 'stream-thinking',
|
||||
label: 'Thinking',
|
||||
status: 'running',
|
||||
kind: 'thinking',
|
||||
detail: normalizeText(thinking),
|
||||
depth: 1,
|
||||
});
|
||||
}
|
||||
if (streamMessage) {
|
||||
appendDetailSegments(extractThinkingSegments(streamMessage), {
|
||||
idPrefix: 'stream-thinking',
|
||||
label: 'Thinking',
|
||||
kind: 'thinking',
|
||||
running: true,
|
||||
upsertStep,
|
||||
});
|
||||
|
||||
// Stream-time narration should also appear in the execution graph so that
|
||||
// intermediate process output stays in P1 instead of leaking into the
|
||||
// assistant reply area.
|
||||
const streamNarrationSegments = extractTextSegments(streamMessage);
|
||||
const graphStreamNarrationSegments = omitLastStreamingMessageSegment
|
||||
? streamNarrationSegments.slice(0, -1)
|
||||
: streamNarrationSegments;
|
||||
appendDetailSegments(graphStreamNarrationSegments, {
|
||||
idPrefix: 'stream-message',
|
||||
label: 'Message',
|
||||
kind: 'message',
|
||||
running: !omitLastStreamingMessageSegment,
|
||||
upsertStep,
|
||||
});
|
||||
}
|
||||
|
||||
const activeToolIds = new Set<string>();
|
||||
@@ -267,28 +340,5 @@ export function deriveTaskSteps({
|
||||
});
|
||||
}
|
||||
|
||||
if (sending && pendingFinal) {
|
||||
upsertStep({
|
||||
id: 'system-finalizing',
|
||||
label: 'Finalizing answer',
|
||||
status: 'running',
|
||||
kind: 'system',
|
||||
detail: 'Waiting for the assistant to finish this run.',
|
||||
depth: 1,
|
||||
});
|
||||
} else if (sending && steps.length === 0) {
|
||||
upsertStep({
|
||||
id: 'system-preparing',
|
||||
label: 'Preparing run',
|
||||
status: 'running',
|
||||
kind: 'system',
|
||||
detail: 'Waiting for the first streaming update.',
|
||||
depth: 1,
|
||||
});
|
||||
}
|
||||
|
||||
const withTopology = attachTopology(steps);
|
||||
return withTopology.length > MAX_TASK_STEPS
|
||||
? withTopology.slice(-MAX_TASK_STEPS)
|
||||
: withTopology;
|
||||
return attachTopology(steps);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user