feat(chat, gateway): implement error recovery timer and enhance histo… (#179)
This commit is contained in:
@@ -120,6 +120,18 @@ function toMs(ts: number): number {
|
|||||||
// poll chat.history to surface intermediate tool-call turns.
|
// poll chat.history to surface intermediate tool-call turns.
|
||||||
let _historyPollTimer: ReturnType<typeof setTimeout> | null = null;
|
let _historyPollTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
|
||||||
|
// Timer for delayed error finalization. When the Gateway reports a mid-stream
|
||||||
|
// error (e.g. "terminated"), it may retry internally and recover. We wait
|
||||||
|
// before committing the error to give the recovery path a chance.
|
||||||
|
let _errorRecoveryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
|
||||||
|
function clearErrorRecoveryTimer(): void {
|
||||||
|
if (_errorRecoveryTimer) {
|
||||||
|
clearTimeout(_errorRecoveryTimer);
|
||||||
|
_errorRecoveryTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function clearHistoryPoll(): void {
|
function clearHistoryPoll(): void {
|
||||||
if (_historyPollTimer) {
|
if (_historyPollTimer) {
|
||||||
clearTimeout(_historyPollTimer);
|
clearTimeout(_historyPollTimer);
|
||||||
@@ -1137,6 +1149,7 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
|||||||
// entire agentic conversation finishes — the poll must run in parallel.
|
// entire agentic conversation finishes — the poll must run in parallel.
|
||||||
_lastChatEventAt = Date.now();
|
_lastChatEventAt = Date.now();
|
||||||
clearHistoryPoll();
|
clearHistoryPoll();
|
||||||
|
clearErrorRecoveryTimer();
|
||||||
|
|
||||||
const POLL_START_DELAY = 3_000;
|
const POLL_START_DELAY = 3_000;
|
||||||
const POLL_INTERVAL = 4_000;
|
const POLL_INTERVAL = 4_000;
|
||||||
@@ -1245,6 +1258,7 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
|||||||
|
|
||||||
abortRun: async () => {
|
abortRun: async () => {
|
||||||
clearHistoryPoll();
|
clearHistoryPoll();
|
||||||
|
clearErrorRecoveryTimer();
|
||||||
const { currentSessionKey } = get();
|
const { currentSessionKey } = get();
|
||||||
set({ sending: false, streamingText: '', streamingMessage: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [] });
|
set({ sending: false, streamingText: '', streamingMessage: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [] });
|
||||||
set({ streamingTools: [] });
|
set({ streamingTools: [] });
|
||||||
@@ -1296,7 +1310,13 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
|||||||
|
|
||||||
switch (resolvedState) {
|
switch (resolvedState) {
|
||||||
case 'delta': {
|
case 'delta': {
|
||||||
// Streaming update - store the cumulative message
|
// If we're receiving new deltas, the Gateway has recovered from any
|
||||||
|
// prior error — cancel the error finalization timer and clear the
|
||||||
|
// stale error banner so the user sees the live stream again.
|
||||||
|
if (_errorRecoveryTimer) {
|
||||||
|
clearErrorRecoveryTimer();
|
||||||
|
set({ error: null });
|
||||||
|
}
|
||||||
const updates = collectToolUpdates(event.message, resolvedState);
|
const updates = collectToolUpdates(event.message, resolvedState);
|
||||||
set((s) => ({
|
set((s) => ({
|
||||||
streamingMessage: (() => {
|
streamingMessage: (() => {
|
||||||
@@ -1311,6 +1331,8 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'final': {
|
case 'final': {
|
||||||
|
clearErrorRecoveryTimer();
|
||||||
|
if (get().error) set({ error: null });
|
||||||
// Message complete - add to history and clear streaming
|
// Message complete - add to history and clear streaming
|
||||||
const finalMsg = event.message as RawMessage | undefined;
|
const finalMsg = event.message as RawMessage | undefined;
|
||||||
if (finalMsg) {
|
if (finalMsg) {
|
||||||
@@ -1449,23 +1471,65 @@ export const useChatStore = create<ChatState>((set, get) => ({
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'error': {
|
case 'error': {
|
||||||
clearHistoryPoll();
|
|
||||||
const errorMsg = String(event.errorMessage || 'An error occurred');
|
const errorMsg = String(event.errorMessage || 'An error occurred');
|
||||||
|
const wasSending = get().sending;
|
||||||
|
|
||||||
|
// Snapshot the current streaming message into messages[] so partial
|
||||||
|
// content ("Let me get that written down...") is preserved in the UI
|
||||||
|
// rather than being silently discarded.
|
||||||
|
const currentStream = get().streamingMessage as RawMessage | null;
|
||||||
|
if (currentStream && (currentStream.role === 'assistant' || currentStream.role === undefined)) {
|
||||||
|
const snapId = (currentStream as RawMessage).id
|
||||||
|
|| `error-snap-${Date.now()}`;
|
||||||
|
const alreadyExists = get().messages.some(m => m.id === snapId);
|
||||||
|
if (!alreadyExists) {
|
||||||
|
set((s) => ({
|
||||||
|
messages: [...s.messages, { ...currentStream, role: 'assistant' as const, id: snapId }],
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
set({
|
set({
|
||||||
error: errorMsg,
|
error: errorMsg,
|
||||||
sending: false,
|
|
||||||
activeRunId: null,
|
|
||||||
streamingText: '',
|
streamingText: '',
|
||||||
streamingMessage: null,
|
streamingMessage: null,
|
||||||
streamingTools: [],
|
streamingTools: [],
|
||||||
pendingFinal: false,
|
pendingFinal: false,
|
||||||
lastUserMessageAt: null,
|
|
||||||
pendingToolImages: [],
|
pendingToolImages: [],
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Don't immediately give up: the Gateway often retries internally
|
||||||
|
// after transient API failures (e.g. "terminated"). Keep `sending`
|
||||||
|
// true for a grace period so that recovery events are processed and
|
||||||
|
// the agent-phase-completion handler can still trigger loadHistory.
|
||||||
|
if (wasSending) {
|
||||||
|
clearErrorRecoveryTimer();
|
||||||
|
const ERROR_RECOVERY_GRACE_MS = 15_000;
|
||||||
|
_errorRecoveryTimer = setTimeout(() => {
|
||||||
|
_errorRecoveryTimer = null;
|
||||||
|
const state = get();
|
||||||
|
if (state.sending && !state.streamingMessage) {
|
||||||
|
clearHistoryPoll();
|
||||||
|
// Grace period expired with no recovery — finalize the error
|
||||||
|
set({
|
||||||
|
sending: false,
|
||||||
|
activeRunId: null,
|
||||||
|
lastUserMessageAt: null,
|
||||||
|
});
|
||||||
|
// One final history reload in case the Gateway completed in the
|
||||||
|
// background and we just missed the event.
|
||||||
|
state.loadHistory(true);
|
||||||
|
}
|
||||||
|
}, ERROR_RECOVERY_GRACE_MS);
|
||||||
|
} else {
|
||||||
|
clearHistoryPoll();
|
||||||
|
set({ sending: false, activeRunId: null, lastUserMessageAt: null });
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'aborted': {
|
case 'aborted': {
|
||||||
clearHistoryPoll();
|
clearHistoryPoll();
|
||||||
|
clearErrorRecoveryTimer();
|
||||||
set({
|
set({
|
||||||
sending: false,
|
sending: false,
|
||||||
activeRunId: null,
|
activeRunId: null,
|
||||||
|
|||||||
@@ -99,8 +99,18 @@ export const useGatewayStore = create<GatewayState>((set, get) => ({
|
|||||||
import('./chat')
|
import('./chat')
|
||||||
.then(({ useChatStore }) => {
|
.then(({ useChatStore }) => {
|
||||||
const state = useChatStore.getState();
|
const state = useChatStore.getState();
|
||||||
|
// Always reload history on agent completion, regardless of
|
||||||
|
// the `sending` flag. After a transient error the flag may
|
||||||
|
// already be false, but the Gateway may have retried and
|
||||||
|
// completed successfully in the background.
|
||||||
|
state.loadHistory(true);
|
||||||
if (state.sending) {
|
if (state.sending) {
|
||||||
state.loadHistory(true);
|
useChatStore.setState({
|
||||||
|
sending: false,
|
||||||
|
activeRunId: null,
|
||||||
|
pendingFinal: false,
|
||||||
|
lastUserMessageAt: null,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.catch(() => {});
|
.catch(() => {});
|
||||||
|
|||||||
Reference in New Issue
Block a user