Optimize gateway comms reload behavior and strengthen regression coverage (#496)
This commit is contained in:
committed by
GitHub
Unverified
parent
08960d700f
commit
1dbe4a8466
@@ -140,6 +140,15 @@ let _historyPollTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
// error (e.g. "terminated"), it may retry internally and recover. We wait
|
||||
// before committing the error to give the recovery path a chance.
|
||||
let _errorRecoveryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let _loadSessionsInFlight: Promise<void> | null = null;
|
||||
let _lastLoadSessionsAt = 0;
|
||||
const _historyLoadInFlight = new Map<string, Promise<void>>();
|
||||
const _lastHistoryLoadAtBySession = new Map<string, number>();
|
||||
const SESSION_LOAD_MIN_INTERVAL_MS = 1_200;
|
||||
const HISTORY_LOAD_MIN_INTERVAL_MS = 800;
|
||||
const HISTORY_POLL_SILENCE_WINDOW_MS = 2_500;
|
||||
const CHAT_EVENT_DEDUPE_TTL_MS = 30_000;
|
||||
const _chatEventDedupe = new Map<string, number>();
|
||||
|
||||
function clearErrorRecoveryTimer(): void {
|
||||
if (_errorRecoveryTimer) {
|
||||
@@ -155,6 +164,46 @@ function clearHistoryPoll(): void {
|
||||
}
|
||||
}
|
||||
|
||||
function pruneChatEventDedupe(now: number): void {
|
||||
for (const [key, ts] of _chatEventDedupe.entries()) {
|
||||
if (now - ts > CHAT_EVENT_DEDUPE_TTL_MS) {
|
||||
_chatEventDedupe.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function buildChatEventDedupeKey(eventState: string, event: Record<string, unknown>): string | null {
|
||||
const runId = event.runId != null ? String(event.runId) : '';
|
||||
const sessionKey = event.sessionKey != null ? String(event.sessionKey) : '';
|
||||
const seq = event.seq != null ? String(event.seq) : '';
|
||||
if (runId || sessionKey || seq || eventState) {
|
||||
return [runId, sessionKey, seq, eventState].join('|');
|
||||
}
|
||||
const msg = (event.message && typeof event.message === 'object')
|
||||
? event.message as Record<string, unknown>
|
||||
: null;
|
||||
if (msg) {
|
||||
const messageId = msg.id != null ? String(msg.id) : '';
|
||||
const stopReason = msg.stopReason ?? msg.stop_reason;
|
||||
if (messageId || stopReason) {
|
||||
return `msg|${messageId}|${String(stopReason ?? '')}|${eventState}`;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function isDuplicateChatEvent(eventState: string, event: Record<string, unknown>): boolean {
|
||||
const key = buildChatEventDedupeKey(eventState, event);
|
||||
if (!key) return false;
|
||||
const now = Date.now();
|
||||
pruneChatEventDedupe(now);
|
||||
if (_chatEventDedupe.has(key)) {
|
||||
return true;
|
||||
}
|
||||
_chatEventDedupe.set(key, now);
|
||||
return false;
|
||||
}
|
||||
|
||||
const DEFAULT_CANONICAL_PREFIX = 'agent:main';
|
||||
const DEFAULT_SESSION_KEY = `${DEFAULT_CANONICAL_PREFIX}:main`;
|
||||
|
||||
@@ -1040,118 +1089,139 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
// ── Load sessions via sessions.list ──
|
||||
|
||||
loadSessions: async () => {
|
||||
try {
|
||||
const data = await useGatewayStore.getState().rpc<Record<string, unknown>>('sessions.list', {});
|
||||
if (data) {
|
||||
const rawSessions = Array.isArray(data.sessions) ? data.sessions : [];
|
||||
const sessions: ChatSession[] = rawSessions.map((s: Record<string, unknown>) => ({
|
||||
key: String(s.key || ''),
|
||||
label: s.label ? String(s.label) : undefined,
|
||||
displayName: s.displayName ? String(s.displayName) : undefined,
|
||||
thinkingLevel: s.thinkingLevel ? String(s.thinkingLevel) : undefined,
|
||||
model: s.model ? String(s.model) : undefined,
|
||||
updatedAt: parseSessionUpdatedAtMs(s.updatedAt),
|
||||
})).filter((s: ChatSession) => s.key);
|
||||
const now = Date.now();
|
||||
if (_loadSessionsInFlight) {
|
||||
await _loadSessionsInFlight;
|
||||
return;
|
||||
}
|
||||
if (now - _lastLoadSessionsAt < SESSION_LOAD_MIN_INTERVAL_MS) {
|
||||
return;
|
||||
}
|
||||
|
||||
const canonicalBySuffix = new Map<string, string>();
|
||||
for (const session of sessions) {
|
||||
if (!session.key.startsWith('agent:')) continue;
|
||||
const parts = session.key.split(':');
|
||||
if (parts.length < 3) continue;
|
||||
const suffix = parts.slice(2).join(':');
|
||||
if (suffix && !canonicalBySuffix.has(suffix)) {
|
||||
canonicalBySuffix.set(suffix, session.key);
|
||||
_loadSessionsInFlight = (async () => {
|
||||
try {
|
||||
const data = await useGatewayStore.getState().rpc<Record<string, unknown>>('sessions.list', {});
|
||||
if (data) {
|
||||
const rawSessions = Array.isArray(data.sessions) ? data.sessions : [];
|
||||
const sessions: ChatSession[] = rawSessions.map((s: Record<string, unknown>) => ({
|
||||
key: String(s.key || ''),
|
||||
label: s.label ? String(s.label) : undefined,
|
||||
displayName: s.displayName ? String(s.displayName) : undefined,
|
||||
thinkingLevel: s.thinkingLevel ? String(s.thinkingLevel) : undefined,
|
||||
model: s.model ? String(s.model) : undefined,
|
||||
updatedAt: parseSessionUpdatedAtMs(s.updatedAt),
|
||||
})).filter((s: ChatSession) => s.key);
|
||||
|
||||
const canonicalBySuffix = new Map<string, string>();
|
||||
for (const session of sessions) {
|
||||
if (!session.key.startsWith('agent:')) continue;
|
||||
const parts = session.key.split(':');
|
||||
if (parts.length < 3) continue;
|
||||
const suffix = parts.slice(2).join(':');
|
||||
if (suffix && !canonicalBySuffix.has(suffix)) {
|
||||
canonicalBySuffix.set(suffix, session.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate: if both short and canonical existed, keep canonical only
|
||||
const seen = new Set<string>();
|
||||
const dedupedSessions = sessions.filter((s) => {
|
||||
if (!s.key.startsWith('agent:') && canonicalBySuffix.has(s.key)) return false;
|
||||
if (seen.has(s.key)) return false;
|
||||
seen.add(s.key);
|
||||
return true;
|
||||
});
|
||||
// Deduplicate: if both short and canonical existed, keep canonical only
|
||||
const seen = new Set<string>();
|
||||
const dedupedSessions = sessions.filter((s) => {
|
||||
if (!s.key.startsWith('agent:') && canonicalBySuffix.has(s.key)) return false;
|
||||
if (seen.has(s.key)) return false;
|
||||
seen.add(s.key);
|
||||
return true;
|
||||
});
|
||||
|
||||
const { currentSessionKey, sessions: localSessions } = get();
|
||||
let nextSessionKey = currentSessionKey || DEFAULT_SESSION_KEY;
|
||||
if (!nextSessionKey.startsWith('agent:')) {
|
||||
const canonicalMatch = canonicalBySuffix.get(nextSessionKey);
|
||||
if (canonicalMatch) {
|
||||
nextSessionKey = canonicalMatch;
|
||||
const { currentSessionKey, sessions: localSessions } = get();
|
||||
let nextSessionKey = currentSessionKey || DEFAULT_SESSION_KEY;
|
||||
if (!nextSessionKey.startsWith('agent:')) {
|
||||
const canonicalMatch = canonicalBySuffix.get(nextSessionKey);
|
||||
if (canonicalMatch) {
|
||||
nextSessionKey = canonicalMatch;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!dedupedSessions.find((s) => s.key === nextSessionKey) && dedupedSessions.length > 0) {
|
||||
// Preserve only locally-created pending sessions. On initial boot the
|
||||
// default ghost key (`agent:main:main`) should yield to real history.
|
||||
const hasLocalPendingSession = localSessions.some((session) => session.key === nextSessionKey);
|
||||
if (!hasLocalPendingSession) {
|
||||
nextSessionKey = dedupedSessions[0].key;
|
||||
if (!dedupedSessions.find((s) => s.key === nextSessionKey) && dedupedSessions.length > 0) {
|
||||
// Preserve only locally-created pending sessions. On initial boot the
|
||||
// default ghost key (`agent:main:main`) should yield to real history.
|
||||
const hasLocalPendingSession = localSessions.some((session) => session.key === nextSessionKey);
|
||||
if (!hasLocalPendingSession) {
|
||||
nextSessionKey = dedupedSessions[0].key;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const sessionsWithCurrent = !dedupedSessions.find((s) => s.key === nextSessionKey) && nextSessionKey
|
||||
? [
|
||||
...dedupedSessions,
|
||||
{ key: nextSessionKey, displayName: nextSessionKey },
|
||||
]
|
||||
: dedupedSessions;
|
||||
const sessionsWithCurrent = !dedupedSessions.find((s) => s.key === nextSessionKey) && nextSessionKey
|
||||
? [
|
||||
...dedupedSessions,
|
||||
{ key: nextSessionKey, displayName: nextSessionKey },
|
||||
]
|
||||
: dedupedSessions;
|
||||
|
||||
const discoveredActivity = Object.fromEntries(
|
||||
sessionsWithCurrent
|
||||
.filter((session) => typeof session.updatedAt === 'number' && Number.isFinite(session.updatedAt))
|
||||
.map((session) => [session.key, session.updatedAt!]),
|
||||
);
|
||||
|
||||
set((state) => ({
|
||||
sessions: sessionsWithCurrent,
|
||||
currentSessionKey: nextSessionKey,
|
||||
currentAgentId: getAgentIdFromSessionKey(nextSessionKey),
|
||||
sessionLastActivity: {
|
||||
...state.sessionLastActivity,
|
||||
...discoveredActivity,
|
||||
},
|
||||
}));
|
||||
|
||||
if (currentSessionKey !== nextSessionKey) {
|
||||
get().loadHistory();
|
||||
}
|
||||
|
||||
// Background: fetch first user message for every non-main session to populate labels upfront.
|
||||
// Uses a small limit so it's cheap; runs in parallel and doesn't block anything.
|
||||
const sessionsToLabel = sessionsWithCurrent.filter((s) => !s.key.endsWith(':main'));
|
||||
if (sessionsToLabel.length > 0) {
|
||||
void Promise.all(
|
||||
sessionsToLabel.map(async (session) => {
|
||||
try {
|
||||
const r = await useGatewayStore.getState().rpc<Record<string, unknown>>(
|
||||
'chat.history',
|
||||
{ sessionKey: session.key, limit: 1000 },
|
||||
);
|
||||
const msgs = Array.isArray(r.messages) ? r.messages as RawMessage[] : [];
|
||||
const firstUser = msgs.find((m) => m.role === 'user');
|
||||
const lastMsg = msgs[msgs.length - 1];
|
||||
set((s) => {
|
||||
const next: Partial<typeof s> = {};
|
||||
if (firstUser) {
|
||||
const labelText = getMessageText(firstUser.content).trim();
|
||||
if (labelText) {
|
||||
const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText;
|
||||
next.sessionLabels = { ...s.sessionLabels, [session.key]: truncated };
|
||||
}
|
||||
}
|
||||
if (lastMsg?.timestamp) {
|
||||
next.sessionLastActivity = { ...s.sessionLastActivity, [session.key]: toMs(lastMsg.timestamp) };
|
||||
}
|
||||
return next;
|
||||
});
|
||||
} catch { /* ignore per-session errors */ }
|
||||
}),
|
||||
const discoveredActivity = Object.fromEntries(
|
||||
sessionsWithCurrent
|
||||
.filter((session) => typeof session.updatedAt === 'number' && Number.isFinite(session.updatedAt))
|
||||
.map((session) => [session.key, session.updatedAt!]),
|
||||
);
|
||||
|
||||
set((state) => ({
|
||||
sessions: sessionsWithCurrent,
|
||||
currentSessionKey: nextSessionKey,
|
||||
currentAgentId: getAgentIdFromSessionKey(nextSessionKey),
|
||||
sessionLastActivity: {
|
||||
...state.sessionLastActivity,
|
||||
...discoveredActivity,
|
||||
},
|
||||
}));
|
||||
|
||||
if (currentSessionKey !== nextSessionKey) {
|
||||
void get().loadHistory();
|
||||
}
|
||||
|
||||
// Background: fetch first user message for every non-main session to populate labels upfront.
|
||||
// Uses a small limit so it's cheap; runs in parallel and doesn't block anything.
|
||||
const sessionsToLabel = sessionsWithCurrent.filter((s) => !s.key.endsWith(':main'));
|
||||
if (sessionsToLabel.length > 0) {
|
||||
void Promise.all(
|
||||
sessionsToLabel.map(async (session) => {
|
||||
try {
|
||||
const r = await useGatewayStore.getState().rpc<Record<string, unknown>>(
|
||||
'chat.history',
|
||||
{ sessionKey: session.key, limit: 1000 },
|
||||
);
|
||||
const msgs = Array.isArray(r.messages) ? r.messages as RawMessage[] : [];
|
||||
const firstUser = msgs.find((m) => m.role === 'user');
|
||||
const lastMsg = msgs[msgs.length - 1];
|
||||
set((s) => {
|
||||
const next: Partial<typeof s> = {};
|
||||
if (firstUser) {
|
||||
const labelText = getMessageText(firstUser.content).trim();
|
||||
if (labelText) {
|
||||
const truncated = labelText.length > 50 ? `${labelText.slice(0, 50)}…` : labelText;
|
||||
next.sessionLabels = { ...s.sessionLabels, [session.key]: truncated };
|
||||
}
|
||||
}
|
||||
if (lastMsg?.timestamp) {
|
||||
next.sessionLastActivity = { ...s.sessionLastActivity, [session.key]: toMs(lastMsg.timestamp) };
|
||||
}
|
||||
return next;
|
||||
});
|
||||
} catch {
|
||||
// ignore per-session errors
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn('Failed to load sessions:', err);
|
||||
} finally {
|
||||
_lastLoadSessionsAt = Date.now();
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn('Failed to load sessions:', err);
|
||||
})();
|
||||
|
||||
try {
|
||||
await _loadSessionsInFlight;
|
||||
} finally {
|
||||
_loadSessionsInFlight = null;
|
||||
}
|
||||
},
|
||||
|
||||
@@ -1289,9 +1359,21 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
|
||||
loadHistory: async (quiet = false) => {
|
||||
const { currentSessionKey } = get();
|
||||
const existingLoad = _historyLoadInFlight.get(currentSessionKey);
|
||||
if (existingLoad) {
|
||||
await existingLoad;
|
||||
return;
|
||||
}
|
||||
|
||||
const lastLoadAt = _lastHistoryLoadAtBySession.get(currentSessionKey) || 0;
|
||||
if (quiet && Date.now() - lastLoadAt < HISTORY_LOAD_MIN_INTERVAL_MS) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!quiet) set({ loading: true, error: null });
|
||||
|
||||
const applyLoadedMessages = (rawMessages: RawMessage[], thinkingLevel: string | null) => {
|
||||
const loadPromise = (async () => {
|
||||
const applyLoadedMessages = (rawMessages: RawMessage[], thinkingLevel: string | null) => {
|
||||
// Before filtering: attach images/files from tool_result messages to the next assistant message
|
||||
const messagesWithToolImages = enrichWithToolResultFiles(rawMessages);
|
||||
const filteredMessages = messagesWithToolImages.filter((msg) => !isToolResultRole(msg.role));
|
||||
@@ -1395,22 +1477,31 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
set({ sending: false, activeRunId: null, pendingFinal: false });
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
try {
|
||||
const data = await useGatewayStore.getState().rpc<Record<string, unknown>>(
|
||||
'chat.history',
|
||||
{ sessionKey: currentSessionKey, limit: 200 },
|
||||
);
|
||||
if (data) {
|
||||
let rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : [];
|
||||
const thinkingLevel = data.thinkingLevel ? String(data.thinkingLevel) : null;
|
||||
if (rawMessages.length === 0 && isCronSessionKey(currentSessionKey)) {
|
||||
rawMessages = await loadCronFallbackMessages(currentSessionKey, 200);
|
||||
try {
|
||||
const data = await useGatewayStore.getState().rpc<Record<string, unknown>>(
|
||||
'chat.history',
|
||||
{ sessionKey: currentSessionKey, limit: 200 },
|
||||
);
|
||||
if (data) {
|
||||
let rawMessages = Array.isArray(data.messages) ? data.messages as RawMessage[] : [];
|
||||
const thinkingLevel = data.thinkingLevel ? String(data.thinkingLevel) : null;
|
||||
if (rawMessages.length === 0 && isCronSessionKey(currentSessionKey)) {
|
||||
rawMessages = await loadCronFallbackMessages(currentSessionKey, 200);
|
||||
}
|
||||
|
||||
applyLoadedMessages(rawMessages, thinkingLevel);
|
||||
} else {
|
||||
const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200);
|
||||
if (fallbackMessages.length > 0) {
|
||||
applyLoadedMessages(fallbackMessages, null);
|
||||
} else {
|
||||
set({ messages: [], loading: false });
|
||||
}
|
||||
}
|
||||
|
||||
applyLoadedMessages(rawMessages, thinkingLevel);
|
||||
} else {
|
||||
} catch (err) {
|
||||
console.warn('Failed to load chat history:', err);
|
||||
const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200);
|
||||
if (fallbackMessages.length > 0) {
|
||||
applyLoadedMessages(fallbackMessages, null);
|
||||
@@ -1418,13 +1509,16 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
set({ messages: [], loading: false });
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn('Failed to load chat history:', err);
|
||||
const fallbackMessages = await loadCronFallbackMessages(currentSessionKey, 200);
|
||||
if (fallbackMessages.length > 0) {
|
||||
applyLoadedMessages(fallbackMessages, null);
|
||||
} else {
|
||||
set({ messages: [], loading: false });
|
||||
})();
|
||||
|
||||
_historyLoadInFlight.set(currentSessionKey, loadPromise);
|
||||
try {
|
||||
await loadPromise;
|
||||
} finally {
|
||||
_lastHistoryLoadAtBySession.set(currentSessionKey, Date.now());
|
||||
const active = _historyLoadInFlight.get(currentSessionKey);
|
||||
if (active === loadPromise) {
|
||||
_historyLoadInFlight.delete(currentSessionKey);
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -1501,6 +1595,10 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
_historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL);
|
||||
return;
|
||||
}
|
||||
if (Date.now() - _lastChatEventAt < HISTORY_POLL_SILENCE_WINDOW_MS) {
|
||||
_historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL);
|
||||
return;
|
||||
}
|
||||
state.loadHistory(true);
|
||||
_historyPollTimer = setTimeout(pollHistory, POLL_INTERVAL);
|
||||
};
|
||||
@@ -1635,6 +1733,8 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
||||
// Only process events for the active run (or if no active run set)
|
||||
if (activeRunId && runId && runId !== activeRunId) return;
|
||||
|
||||
if (isDuplicateChatEvent(eventState, event)) return;
|
||||
|
||||
_lastChatEventAt = Date.now();
|
||||
|
||||
// Defensive: if state is missing but we have a message, try to infer state.
|
||||
|
||||
Reference in New Issue
Block a user