diff --git a/src/bot/index.js b/src/bot/index.js index bf526f4a..4ccfa60a 100644 --- a/src/bot/index.js +++ b/src/bot/index.js @@ -467,23 +467,40 @@ export async function initBot(config, api, tools, skills, agents) { // ── Streaming API call (SSE) — returns { content, tool_calls, error } ── // Streams tokens via onDelta. If tool_calls detected, accumulates them and returns. - async function streamChat(svc, body, onDelta) { + // Self-cure: AbortController timeout + auto-retry on SSE errors + async function streamChat(svc, body, onDelta, retryCount = 0) { const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4'; const apiKey = svc.api?.config?.apiKey || ''; let fullContent = ''; const toolCallMap = {}; // index → { id, name, arguments } let finishReason = null; + const MAX_SSE_RETRIES = 2; + const SSE_FETCH_TIMEOUT = 90_000; // 90s total request timeout + const SSE_IDLE_TIMEOUT = 30_000; // 30s between chunks (no data = stuck) try { + const controller = new AbortController(); + const fetchTimeout = setTimeout(() => controller.abort(), SSE_FETCH_TIMEOUT); + const res = await fetch(`${baseUrl}/chat/completions`, { method: 'POST', headers: { 'Authorization': `Bearer ${apiKey}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ ...body, stream: true }), + signal: controller.signal, }); if (!res.ok) { + clearTimeout(fetchTimeout); const errText = await res.text(); logger.error(`SSE ${res.status}: ${errText.slice(0, 200)}`); + + // Auto-retry on 5xx errors + if (res.status >= 500 && retryCount < MAX_SSE_RETRIES) { + const delay = 1000 * (retryCount + 1); + logger.info(`🔄 SSE retry ${retryCount + 1}/${MAX_SSE_RETRIES} in ${delay}ms…`); + await new Promise(r => setTimeout(r, delay)); + return await streamChat(svc, body, onDelta, retryCount + 1); + } // Fallback to non-streaming return await nonStreamChat(body); } @@ -491,10 +508,42 @@ export async function initBot(config, api, tools, skills, agents) { const reader = res.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; + let lastChunkTime = Date.now(); while (true) { - const { done, value } = await reader.read(); + // Idle timeout: if no data for 30s, abort and retry + const idleMs = Date.now() - lastChunkTime; + if (idleMs > SSE_IDLE_TIMEOUT) { + logger.warn(`⏰ SSE idle timeout (${idleMs}ms), ${retryCount < MAX_SSE_RETRIES ? 'retrying' : 'falling back to non-stream'}`); + reader.cancel().catch(() => {}); + clearTimeout(fetchTimeout); + if (retryCount < MAX_SSE_RETRIES) { + return await streamChat(svc, body, onDelta, retryCount + 1); + } + return await nonStreamChat(body); + } + + // Read with timeout + let readResult; + try { + readResult = await Promise.race([ + reader.read(), + new Promise((_, reject) => setTimeout(() => reject(new Error('read timeout')), SSE_IDLE_TIMEOUT)), + ]); + } catch (readErr) { + logger.warn(`⏰ SSE read timeout, ${retryCount < MAX_SSE_RETRIES ? 'retrying' : 'falling back'}`); + reader.cancel().catch(() => {}); + clearTimeout(fetchTimeout); + if (retryCount < MAX_SSE_RETRIES) { + return await streamChat(svc, body, onDelta, retryCount + 1); + } + // Return what we have so far + break; + } + + const { done, value } = readResult; if (done) break; + lastChunkTime = Date.now(); buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); @@ -531,9 +580,21 @@ export async function initBot(config, api, tools, skills, agents) { } catch { /* skip malformed chunks */ } } } + clearTimeout(fetchTimeout); } catch (e) { - logger.error('SSE error:', e.message); + if (e.name === 'AbortError') { + logger.warn(`⏰ SSE fetch aborted (timeout), retry ${retryCount}/${MAX_SSE_RETRIES}`); + if (retryCount < MAX_SSE_RETRIES) { + return await streamChat(svc, body, onDelta, retryCount + 1); + } + } else { + logger.error('SSE error:', e.message); + } if (!fullContent && !Object.keys(toolCallMap).length) { + // Nothing received — try non-streaming fallback + if (retryCount < MAX_SSE_RETRIES) { + return await streamChat(svc, body, onDelta, retryCount + 1); + } return await nonStreamChat(body); } }