feat: persistent typing indicator — refreshes every 4s until first stream token arrives
This commit is contained in:
@@ -467,23 +467,40 @@ export async function initBot(config, api, tools, skills, agents) {
|
||||
|
||||
// ── Streaming API call (SSE) — returns { content, tool_calls, error } ──
|
||||
// Streams tokens via onDelta. If tool_calls detected, accumulates them and returns.
|
||||
async function streamChat(svc, body, onDelta) {
|
||||
// Self-cure: AbortController timeout + auto-retry on SSE errors
|
||||
async function streamChat(svc, body, onDelta, retryCount = 0) {
|
||||
const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4';
|
||||
const apiKey = svc.api?.config?.apiKey || '';
|
||||
let fullContent = '';
|
||||
const toolCallMap = {}; // index → { id, name, arguments }
|
||||
let finishReason = null;
|
||||
const MAX_SSE_RETRIES = 2;
|
||||
const SSE_FETCH_TIMEOUT = 90_000; // 90s total request timeout
|
||||
const SSE_IDLE_TIMEOUT = 30_000; // 30s between chunks (no data = stuck)
|
||||
|
||||
try {
|
||||
const controller = new AbortController();
|
||||
const fetchTimeout = setTimeout(() => controller.abort(), SSE_FETCH_TIMEOUT);
|
||||
|
||||
const res = await fetch(`${baseUrl}/chat/completions`, {
|
||||
method: 'POST',
|
||||
headers: { 'Authorization': `Bearer ${apiKey}`, 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ ...body, stream: true }),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
clearTimeout(fetchTimeout);
|
||||
const errText = await res.text();
|
||||
logger.error(`SSE ${res.status}: ${errText.slice(0, 200)}`);
|
||||
|
||||
// Auto-retry on 5xx errors
|
||||
if (res.status >= 500 && retryCount < MAX_SSE_RETRIES) {
|
||||
const delay = 1000 * (retryCount + 1);
|
||||
logger.info(`🔄 SSE retry ${retryCount + 1}/${MAX_SSE_RETRIES} in ${delay}ms…`);
|
||||
await new Promise(r => setTimeout(r, delay));
|
||||
return await streamChat(svc, body, onDelta, retryCount + 1);
|
||||
}
|
||||
// Fallback to non-streaming
|
||||
return await nonStreamChat(body);
|
||||
}
|
||||
@@ -491,10 +508,42 @@ export async function initBot(config, api, tools, skills, agents) {
|
||||
const reader = res.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
let lastChunkTime = Date.now();
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
// Idle timeout: if no data for 30s, abort and retry
|
||||
const idleMs = Date.now() - lastChunkTime;
|
||||
if (idleMs > SSE_IDLE_TIMEOUT) {
|
||||
logger.warn(`⏰ SSE idle timeout (${idleMs}ms), ${retryCount < MAX_SSE_RETRIES ? 'retrying' : 'falling back to non-stream'}`);
|
||||
reader.cancel().catch(() => {});
|
||||
clearTimeout(fetchTimeout);
|
||||
if (retryCount < MAX_SSE_RETRIES) {
|
||||
return await streamChat(svc, body, onDelta, retryCount + 1);
|
||||
}
|
||||
return await nonStreamChat(body);
|
||||
}
|
||||
|
||||
// Read with timeout
|
||||
let readResult;
|
||||
try {
|
||||
readResult = await Promise.race([
|
||||
reader.read(),
|
||||
new Promise((_, reject) => setTimeout(() => reject(new Error('read timeout')), SSE_IDLE_TIMEOUT)),
|
||||
]);
|
||||
} catch (readErr) {
|
||||
logger.warn(`⏰ SSE read timeout, ${retryCount < MAX_SSE_RETRIES ? 'retrying' : 'falling back'}`);
|
||||
reader.cancel().catch(() => {});
|
||||
clearTimeout(fetchTimeout);
|
||||
if (retryCount < MAX_SSE_RETRIES) {
|
||||
return await streamChat(svc, body, onDelta, retryCount + 1);
|
||||
}
|
||||
// Return what we have so far
|
||||
break;
|
||||
}
|
||||
|
||||
const { done, value } = readResult;
|
||||
if (done) break;
|
||||
lastChunkTime = Date.now();
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split('\n');
|
||||
@@ -531,9 +580,21 @@ export async function initBot(config, api, tools, skills, agents) {
|
||||
} catch { /* skip malformed chunks */ }
|
||||
}
|
||||
}
|
||||
clearTimeout(fetchTimeout);
|
||||
} catch (e) {
|
||||
logger.error('SSE error:', e.message);
|
||||
if (e.name === 'AbortError') {
|
||||
logger.warn(`⏰ SSE fetch aborted (timeout), retry ${retryCount}/${MAX_SSE_RETRIES}`);
|
||||
if (retryCount < MAX_SSE_RETRIES) {
|
||||
return await streamChat(svc, body, onDelta, retryCount + 1);
|
||||
}
|
||||
} else {
|
||||
logger.error('SSE error:', e.message);
|
||||
}
|
||||
if (!fullContent && !Object.keys(toolCallMap).length) {
|
||||
// Nothing received — try non-streaming fallback
|
||||
if (retryCount < MAX_SSE_RETRIES) {
|
||||
return await streamChat(svc, body, onDelta, retryCount + 1);
|
||||
}
|
||||
return await nonStreamChat(body);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user