feat: persistent conversation history across sessions and restarts
- ConversationStore: per-chat JSON files in data/, survives restarts - 6000 token budget per chat context (fits ~20-30 exchanges) - Auto-trims old messages, always includes most recent - Wired into message handler: loads history before AI call, saves after - /reset command to clear chat history per chat - Cross-session, cross-model, cross-chat isolation
This commit is contained in:
@@ -11,7 +11,7 @@ import { isDuplicate, markProcessed } from './deduplication.js';
|
|||||||
import { queueRequest, clearQueue, isProcessing } from './request-queue.js';
|
import { queueRequest, clearQueue, isProcessing } from './request-queue.js';
|
||||||
import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage, StreamConsumer, markdownToHtml } from './message-sender.js';
|
import { sendFormatted, splitMessage, escapeMarkdown, sendStreamingMessage, StreamConsumer, markdownToHtml } from './message-sender.js';
|
||||||
import { withSelfCorrection } from './self-correction.js';
|
import { withSelfCorrection } from './self-correction.js';
|
||||||
import { getMemory } from './memory.js';
|
import { getMemory, getConversation } from './memory.js';
|
||||||
|
|
||||||
function buildSessionKey(chatId, threadId) {
|
function buildSessionKey(chatId, threadId) {
|
||||||
return threadId ? `${chatId}:${threadId}` : String(chatId);
|
return threadId ? `${chatId}:${threadId}` : String(chatId);
|
||||||
@@ -157,8 +157,12 @@ export async function initBot(config, api, tools, skills, agents) {
|
|||||||
const memory = getMemory();
|
const memory = getMemory();
|
||||||
await memory.init();
|
await memory.init();
|
||||||
|
|
||||||
|
// ── Conversation history (cross-session, cross-model) ──
|
||||||
|
const conversation = getConversation();
|
||||||
|
await conversation.init();
|
||||||
|
|
||||||
// ── Service registry ──
|
// ── Service registry ──
|
||||||
const svc = { config, api, tools: tools || [], skills: skills || [], agents: agents || [], rtk, memory,
|
const svc = { config, api, tools: tools || [], skills: skills || [], agents: agents || [], rtk, memory, conversation,
|
||||||
toolMap: new Map((tools || []).map(t => [t.name, t])),
|
toolMap: new Map((tools || []).map(t => [t.name, t])),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -625,6 +629,12 @@ export async function initBot(config, api, tools, skills, agents) {
|
|||||||
await ctx.reply(ok ? '🗑 Memory deleted.' : '❌ Memory not found.');
|
await ctx.reply(ok ? '🗑 Memory deleted.' : '❌ Memory not found.');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
bot.command('reset', async (ctx) => {
|
||||||
|
const chatKey = conversation._key(ctx.chat.id, ctx.message?.message_thread_id);
|
||||||
|
await conversation.clear(chatKey);
|
||||||
|
await ctx.reply('🧹 Chat history cleared. Fresh start — I won\'t remember our previous conversation in this chat.');
|
||||||
|
});
|
||||||
|
|
||||||
bot.command('cron', async (ctx) => {
|
bot.command('cron', async (ctx) => {
|
||||||
await ctx.reply('⏰ *Cron:* CronScheduler at `src/utils/cronScheduler.ts` — 1s interval, task locking, auto-recovery.');
|
await ctx.reply('⏰ *Cron:* CronScheduler at `src/utils/cronScheduler.ts` — 1s interval, task locking, auto-recovery.');
|
||||||
});
|
});
|
||||||
@@ -676,19 +686,31 @@ export async function initBot(config, api, tools, skills, agents) {
|
|||||||
await queueRequest(key, text, async () => {
|
await queueRequest(key, text, async () => {
|
||||||
await ctx.api.sendChatAction(ctx.chat.id, 'typing');
|
await ctx.api.sendChatAction(ctx.chat.id, 'typing');
|
||||||
|
|
||||||
|
// ── Load conversation history for this chat ──
|
||||||
|
const chatKey = conversation._key(ctx.chat.id, ctx.message?.message_thread_id);
|
||||||
|
const history = await conversation.getContext(chatKey);
|
||||||
|
|
||||||
// Create stream consumer for real-time edit-in-place
|
// Create stream consumer for real-time edit-in-place
|
||||||
const consumer = new StreamConsumer(ctx, { editInterval: 1000 });
|
const consumer = new StreamConsumer(ctx, { editInterval: 1000 });
|
||||||
const runPromise = consumer.run();
|
const runPromise = consumer.run();
|
||||||
|
|
||||||
|
// Build messages: system + history + current
|
||||||
|
const messages = [
|
||||||
|
{ role: 'system', content: buildSystemPrompt(svc) },
|
||||||
|
...history,
|
||||||
|
{ role: 'user', content: text },
|
||||||
|
];
|
||||||
|
|
||||||
// Wrap chatWithAI with self-correction + streaming
|
// Wrap chatWithAI with self-correction + streaming
|
||||||
const chatWithCorrection = withSelfCorrection(async (msgs) => {
|
const chatWithCorrection = withSelfCorrection(async (msgs) => {
|
||||||
return await chatWithAI(msgs, { onDelta: (token) => consumer.onDelta(token) });
|
return await chatWithAI(msgs, { onDelta: (token) => consumer.onDelta(token) });
|
||||||
});
|
});
|
||||||
|
|
||||||
const result = await chatWithCorrection([
|
const result = await chatWithCorrection(messages);
|
||||||
{ role: 'system', content: buildSystemPrompt(svc) },
|
|
||||||
{ role: 'user', content: text },
|
// ── Save this exchange to conversation history ──
|
||||||
]);
|
await conversation.add(chatKey, 'user', text);
|
||||||
|
if (result) await conversation.add(chatKey, 'assistant', result);
|
||||||
|
|
||||||
// Signal completion and wait for final edit
|
// Signal completion and wait for final edit
|
||||||
consumer.finish();
|
consumer.finish();
|
||||||
|
|||||||
@@ -280,10 +280,166 @@ class MemoryStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Singleton
|
// ───────────────────────────────────────────
|
||||||
let _instance = null;
|
// CONVERSATION HISTORY — per-chat, cross-session, cross-model
|
||||||
export function getMemory() {
|
// ───────────────────────────────────────────
|
||||||
if (!_instance) _instance = new MemoryStore();
|
|
||||||
return _instance;
|
const HISTORY_DIR = path.join(process.cwd(), 'data');
|
||||||
|
const MAX_HISTORY_PER_CHAT = 50; // last N exchanges per chat
|
||||||
|
const MAX_CONTEXT_TOKENS = 6000; // ~8000 chars — keeps API cost sane
|
||||||
|
const CHARS_PER_TOKEN = 1.3; // rough estimate for mixed content
|
||||||
|
|
||||||
|
class ConversationStore {
|
||||||
|
constructor() {
|
||||||
|
this.histories = new Map(); // chatKey → [{role, content, ts}]
|
||||||
|
this.loaded = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
async init() {
|
||||||
|
try {
|
||||||
|
await fs.ensureDir(HISTORY_DIR);
|
||||||
|
this.loaded = true;
|
||||||
|
logger.info('✓ Conversation store initialized');
|
||||||
|
} catch (e) {
|
||||||
|
logger.error('Conversation store init failed:', e.message);
|
||||||
|
this.loaded = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a unique key per chat/thread.
|
||||||
|
* Groups DMs by user, groups by group chat ID.
|
||||||
|
*/
|
||||||
|
_key(chatId, threadId) {
|
||||||
|
return `${chatId}:${threadId || 'main'}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the file path for a chat's history.
|
||||||
|
*/
|
||||||
|
_filePath(chatKey) {
|
||||||
|
const safe = chatKey.replace(/[^a-zA-Z0-9_\-]/g, '_');
|
||||||
|
return path.join(HISTORY_DIR, `chat_${safe}.json`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load conversation history for a chat from disk.
|
||||||
|
*/
|
||||||
|
async load(chatKey) {
|
||||||
|
if (this.histories.has(chatKey)) return this.histories.get(chatKey);
|
||||||
|
try {
|
||||||
|
const fp = this._filePath(chatKey);
|
||||||
|
if (await fs.pathExists(fp)) {
|
||||||
|
const data = await fs.readJson(fp);
|
||||||
|
const history = Array.isArray(data) ? data : [];
|
||||||
|
this.histories.set(chatKey, history);
|
||||||
|
return history;
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.warn(`Failed to load history for ${chatKey}: ${e.message}`);
|
||||||
|
}
|
||||||
|
const empty = [];
|
||||||
|
this.histories.set(chatKey, empty);
|
||||||
|
return empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Save conversation history to disk.
|
||||||
|
*/
|
||||||
|
async _save(chatKey) {
|
||||||
|
const history = this.histories.get(chatKey);
|
||||||
|
if (!history) return;
|
||||||
|
try {
|
||||||
|
const fp = this._filePath(chatKey);
|
||||||
|
await fs.writeJson(fp, history, { spaces: 2 });
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(`Failed to save history for ${chatKey}: ${e.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a message to conversation history.
|
||||||
|
*/
|
||||||
|
async add(chatKey, role, content) {
|
||||||
|
if (!this.loaded) await this.init();
|
||||||
|
const history = await this.load(chatKey);
|
||||||
|
history.push({ role, content, ts: Date.now() });
|
||||||
|
|
||||||
|
// Trim to max entries (keep most recent)
|
||||||
|
while (history.length > MAX_HISTORY_PER_CHAT) {
|
||||||
|
history.shift();
|
||||||
|
}
|
||||||
|
|
||||||
|
await this._save(chatKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get context messages for the API call.
|
||||||
|
* Returns the most recent messages that fit within the token budget.
|
||||||
|
* Excludes system messages (those are built separately).
|
||||||
|
* Returns array of {role, content} for the API.
|
||||||
|
*/
|
||||||
|
async getContext(chatKey) {
|
||||||
|
if (!this.loaded) await this.init();
|
||||||
|
const history = await this.load(chatKey);
|
||||||
|
if (history.length === 0) return [];
|
||||||
|
|
||||||
|
// Work backwards from most recent, fitting within budget
|
||||||
|
const selected = [];
|
||||||
|
let budget = MAX_CONTEXT_TOKENS;
|
||||||
|
|
||||||
|
for (let i = history.length - 1; i >= 0; i--) {
|
||||||
|
const msg = history[i];
|
||||||
|
if (msg.role === 'system') continue; // skip system messages
|
||||||
|
const cost = Math.ceil(msg.content.length / CHARS_PER_TOKEN);
|
||||||
|
if (cost > budget && selected.length > 0) break; // stop if budget exceeded (but always include last message)
|
||||||
|
budget -= cost;
|
||||||
|
selected.unshift({ role: msg.role, content: msg.content });
|
||||||
|
}
|
||||||
|
|
||||||
|
return selected;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear history for a specific chat.
|
||||||
|
*/
|
||||||
|
async clear(chatKey) {
|
||||||
|
this.histories.delete(chatKey);
|
||||||
|
try {
|
||||||
|
const fp = this._filePath(chatKey);
|
||||||
|
if (await fs.pathExists(fp)) await fs.remove(fp);
|
||||||
|
} catch {}
|
||||||
|
logger.info(`🗑 Cleared conversation history for ${chatKey}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear ALL conversation histories.
|
||||||
|
*/
|
||||||
|
async clearAll() {
|
||||||
|
this.histories.clear();
|
||||||
|
try {
|
||||||
|
const files = await fs.readdir(HISTORY_DIR);
|
||||||
|
for (const f of files) {
|
||||||
|
if (f.startsWith('chat_') && f.endsWith('.json')) {
|
||||||
|
await fs.remove(path.join(HISTORY_DIR, f));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch {}
|
||||||
|
logger.info('🗑 Cleared all conversation histories');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
export { MemoryStore };
|
|
||||||
|
// Singleton
|
||||||
|
let _memoryInstance = null;
|
||||||
|
export function getMemory() {
|
||||||
|
if (!_memoryInstance) _memoryInstance = new MemoryStore();
|
||||||
|
return _memoryInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
let _conversationInstance = null;
|
||||||
|
export function getConversation() {
|
||||||
|
if (!_conversationInstance) _conversationInstance = new ConversationStore();
|
||||||
|
return _conversationInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
export { MemoryStore, ConversationStore };
|
||||||
|
|||||||
Reference in New Issue
Block a user