backup: before ruflo integration (plugins + multi-agent + hooks)

This commit is contained in:
admin
2026-05-06 09:00:31 +00:00
Unverified
parent 9615b0ba98
commit 321279b430
7 changed files with 691 additions and 154 deletions

View File

@@ -14,6 +14,9 @@ import { queueRequest, clearQueue, isProcessing } from './request-queue.js';
import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage, StreamConsumer, markdownToHtml } from './message-sender.js';
import { withSelfCorrection } from './self-correction.js';
import { getMemory, getConversation } from './memory.js';
import { createSessionState } from './session-state.js';
import { detectIntent } from './intent-detector.js';
import { streamChatWithRetry } from './stream-handler.js';
// ── Pidfile lock: prevent duplicate instances ──
const PIDFILE = path.join(process.env.HOME || '/tmp', '.zcode-bot.pid');
@@ -197,8 +200,11 @@ export async function initBot(config, api, tools, skills, agents) {
const conversation = getConversation();
await conversation.init();
// ── Session state: LRU file read cache + read-once dedup ──
const sessionState = createSessionState();
// ── Service registry ──
const svc = { config, api, tools: tools || [], skills: skills || [], agents: agents || [], rtk, memory, conversation,
const svc = { config, api, tools: tools || [], skills: skills || [], agents: agents || [], rtk, memory, conversation, sessionState,
toolMap: new Map((tools || []).map(t => [t.name, t])),
};
@@ -443,7 +449,7 @@ export async function initBot(config, api, tools, skills, agents) {
let response; // { content: string, tool_calls: array|null }
if (onDelta) {
response = await streamChat(svc, body, onDelta);
response = await streamChatWithRetry(svc, body, onDelta);
} else {
response = await nonStreamChat(body);
}
@@ -504,6 +510,14 @@ export async function initBot(config, api, tools, skills, agents) {
loopMessages.push({ role: 'tool', tool_call_id: tc.id, content: result });
continue;
}
// ── File access dedup: warn if AI re-reads same file ──
if (fn.name === 'file_read' && args?.file_path) {
const ghostCheck = sessionState.checkGhostChasing(args.file_path);
if (ghostCheck) {
logger.warn(`⚠ Ghost detected: ${ghostCheck.file} read ${ghostCheck.count}x this session`);
result = `⚠ WARNING: You have already read this file ${ghostCheck.count} times. Full content is cached. Stop re-reading and act on it.\n\n` + result;
}
}
logger.info(`${fn.name}(${fn.arguments?.slice(0, 100)})`);
result = String(await handler(args)).slice(0, TOOL_RESULT_MAX);
}
@@ -548,152 +562,6 @@ export async function initBot(config, api, tools, skills, agents) {
}
}
// ── Streaming API call (SSE) — returns { content, tool_calls, error } ──
// Streams tokens via onDelta. If tool_calls detected, accumulates them and returns.
// Self-cure: AbortController timeout + auto-retry on SSE errors
async function streamChat(svc, body, onDelta, retryCount = 0) {
const baseUrl = svc.api?.config?.baseUrl || 'https://api.z.ai/api/coding/paas/v4';
const apiKey = svc.api?.config?.apiKey || '';
let fullContent = '';
const toolCallMap = {}; // index → { id, name, arguments }
let finishReason = null;
const MAX_SSE_RETRIES = 4;
const SSE_FETCH_TIMEOUT = 180_000; // 180s total request timeout
const SSE_IDLE_TIMEOUT = 45_000; // 45s between chunks (no data = stuck)
try {
const controller = new AbortController();
const fetchTimeout = setTimeout(() => controller.abort(), SSE_FETCH_TIMEOUT);
const res = await fetch(`${baseUrl}/chat/completions`, {
method: 'POST',
headers: { 'Authorization': `Bearer ${apiKey}`, 'Content-Type': 'application/json' },
body: JSON.stringify({ ...body, stream: true }),
signal: controller.signal,
});
if (!res.ok) {
clearTimeout(fetchTimeout);
const errText = await res.text();
logger.error(`SSE ${res.status}: ${errText.slice(0, 200)}`);
// Auto-retry on 5xx errors
if (res.status >= 500 && retryCount < MAX_SSE_RETRIES) {
const delay = 1000 * (retryCount + 1);
logger.info(`🔄 SSE retry ${retryCount + 1}/${MAX_SSE_RETRIES} in ${delay}ms…`);
await new Promise(r => setTimeout(r, delay));
return await streamChat(svc, body, onDelta, retryCount + 1);
}
// Fallback to non-streaming
return await nonStreamChat(body);
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let lastChunkTime = Date.now();
while (true) {
// Idle timeout: if no data for 30s, abort and retry
const idleMs = Date.now() - lastChunkTime;
if (idleMs > SSE_IDLE_TIMEOUT) {
logger.warn(`⏰ SSE idle timeout (${idleMs}ms), ${retryCount < MAX_SSE_RETRIES ? 'retrying' : 'falling back to non-stream'}`);
reader.cancel().catch(() => {});
clearTimeout(fetchTimeout);
if (retryCount < MAX_SSE_RETRIES) {
return await streamChat(svc, body, onDelta, retryCount + 1);
}
return await nonStreamChat(body);
}
// Read with timeout
let readResult;
try {
readResult = await Promise.race([
reader.read(),
new Promise((_, reject) => setTimeout(() => reject(new Error('read timeout')), SSE_IDLE_TIMEOUT)),
]);
} catch (readErr) {
logger.warn(`⏰ SSE read timeout, ${retryCount < MAX_SSE_RETRIES ? 'retrying' : 'falling back'}`);
reader.cancel().catch(() => {});
clearTimeout(fetchTimeout);
if (retryCount < MAX_SSE_RETRIES) {
return await streamChat(svc, body, onDelta, retryCount + 1);
}
// Return what we have so far
break;
}
const { done, value } = readResult;
if (done) break;
lastChunkTime = Date.now();
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed.startsWith('data: ')) continue;
const data = trimmed.slice(6);
if (data === '[DONE]') continue;
try {
const parsed = JSON.parse(data);
const choice = parsed.choices?.[0];
if (!choice) continue;
finishReason = choice.finish_reason;
const delta = choice.delta || {};
// Stream text content
if (delta.content) {
fullContent += delta.content;
onDelta(delta.content);
}
// Accumulate tool calls from stream deltas
if (delta.tool_calls) {
for (const tc of delta.tool_calls) {
const idx = tc.index ?? 0;
if (!toolCallMap[idx]) toolCallMap[idx] = { id: tc.id || '', name: '', arguments: '' };
if (tc.id) toolCallMap[idx].id = tc.id;
if (tc.function?.name) toolCallMap[idx].name += tc.function.name;
if (tc.function?.arguments) toolCallMap[idx].arguments += tc.function.arguments;
}
}
} catch { /* skip malformed chunks */ }
}
}
clearTimeout(fetchTimeout);
} catch (e) {
if (e.name === 'AbortError') {
logger.warn(`⏰ SSE fetch aborted (timeout), retry ${retryCount}/${MAX_SSE_RETRIES}`);
if (retryCount < MAX_SSE_RETRIES) {
return await streamChat(svc, body, onDelta, retryCount + 1);
}
} else {
logger.error('SSE error:', e.message);
}
if (!fullContent && !Object.keys(toolCallMap).length) {
// Nothing received — try non-streaming fallback
if (retryCount < MAX_SSE_RETRIES) {
return await streamChat(svc, body, onDelta, retryCount + 1);
}
return await nonStreamChat(body);
}
}
// Build tool_calls array from accumulated deltas
const toolCalls = Object.keys(toolCallMap).length > 0
? Object.values(toolCallMap).map(tc => ({
id: tc.id,
type: 'function',
function: { name: tc.name, arguments: tc.arguments },
}))
: null;
return { content: fullContent, tool_calls: toolCalls, error: null };
}
// ── Tool handlers: route API tool_calls to tool class methods ──
const toolHandlers = {
bash: async (args) => {
@@ -1182,6 +1050,17 @@ export async function initBot(config, api, tools, skills, agents) {
} catch (e) { logger.warn(`⌨️ initial typing error: ${e.message}`); }
try {
// ── Intent detection: bypass AI for simple messages ──
const intent = detectIntent(text);
if (intent && intent.bypassAI) {
logger.info(`🎯 Intent: ${intent.type} — bypassing AI`);
const reply = intent.response || 'Got it.';
await queueRequest(key, text, async () => {
await sendFormatted(ctx, reply);
});
return;
}
// ── Load conversation history for this chat ──
const chatKey = conversation._key(ctx.chat.id, ctx.message?.message_thread_id);
svc.currentChatId = ctx.chat.id; // Track for TTS auto-send