Initial commit

This commit is contained in:
Z User
2026-06-06 05:21:10 +00:00
Unverified
commit 6664758a6d
493 changed files with 135653 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
{
"permissions": {
"additionalDirectories": [
"/Users/huiningli/Documents/AI/openclaw/quiz-mastery"
]
}
}

93
skills/quiz-mastery/README.md Executable file
View File

@@ -0,0 +1,93 @@
# Quiz Mastery Skill
OpenClaw agent 的独立出题/复习/掌握度追踪引擎。
## Features
- **从学习资料出题**:把 PDF / Word / Markdown 等学习材料转成分级测验L1/L2/L3
- **从题目文件导入练习**:解析已有题目文件,标准化后让用户答题
- **答案评分**:自动评分 + 逐题反馈
- **掌握度追踪**:跨三级难度记录用户进度
- **薄弱知识点**:累计错误次数 ≥ 3 → 标为薄弱(**只增不减**,作为历史档案)
- **遗忘曲线复习**基于艾宾浩斯1/2/4/7/15 天)安排复习
## Architecture
```
quiz-mastery/
├── SKILL.md # Skill metadata (LLM 读)
├── README.md # 本文件(人读)
├── skill.yaml # Skill 配置
├── scripts/
│ ├── generate_from_material.py # 从学习资料提取知识点
│ ├── import_quiz.py # 从题目文件导入题目
│ ├── run_quiz.py # 生成测验 prompt
│ └── submit_answers.py # 提交答案 + 评分
├── src/quiz_mastery/ # 核心引擎
└── data/
├── knowledge_points/ # 文档知识点定义
├── user_progress/ # 用户掌握度数据(含薄弱标记、艾宾浩斯阶段)
└── sessions/ # 测验会话记录
```
## Quick Start
### 1. 从学习资料提取知识点
```bash
python3 scripts/generate_from_material.py <file_path> <document_id>
```
输出知识点提取 promptJSON。把 `prompts.system_prompt``prompts.user_prompt` 发给 LLM得到知识点列表后调 `service.save_knowledge_points()` 保存。
### 2. 从题目文件导入
```bash
python3 scripts/import_quiz.py <file_path> <document_id> <user_id>
```
输出题目解析 promptJSON。把 prompt 发给 LLM 得到标准化题目,调 `service.import_questions()` 导入。
### 3. 生成测验
```bash
python3 scripts/run_quiz.py <user_id> <document_id>
```
按用户当前掌握度自动决定难度,输出出题 promptJSON
### 4. 提交答案
```bash
python3 scripts/submit_answers.py <user_id> <document_id> <session_id> '<answers_json>'
```
返回:`score``total``accuracy`、逐题 `results`
## 数据集成(与 OpenClaw agent 的关系)
本 skill **不写 `memory/`**,仅写 USER.md 第 3 节"薄弱知识点"。所有持久化由调用方 agent 统一负责,详见 SKILL.md。
| 数据 | 谁写 | 写到哪 |
|------|------|--------|
| 知识点定义、答题战绩、艾宾浩斯阶段 | 本 skill | `data/` 目录 |
| 薄弱知识点(错误次数 ≥ 3 | 本 skill | USER.md 第 3 节(来源=`quiz-mastery` |
| 学习项目状态、DAY 排期 | study-buddy上游 agent | USER.md 第 2 节 |
## 难度系统
| 级别 | 含义 |
|------|------|
| L1 | 识记(基础记忆和理解) |
| L2 | 理解(深层理解、应用) |
| L3 | 应用(综合运用、问题解决) |
- 首次出题强制 L1 起步
- 答对升级(最高 L3答错降级最低 L1
## Workflow
1. **Generate / Import**:从学习资料或题目文件创建知识点 + 题目
2. **Run**:生成测验
3. **Submit**:用户答题 → 自动评分 → 更新掌握度 / 薄弱标记
4. **Sync**:薄弱知识点写入 USER.md 第 3 节

212
skills/quiz-mastery/SKILL.md Executable file
View File

@@ -0,0 +1,212 @@
---
name: quiz-mastery
description: 出题、测验、复习、掌握度追踪工具。**用户说"复习"、"巩固"、"回顾"任一关键词时优先触发本 skill**。当用户的请求与"题目/复习"相关时触发:把学习资料/PDF/材料转成题目练习("给这个 PDF 出几道题")、导入题目文件做练习("我有一份题目文件,帮我做")、复习已学内容("复习一下昨天的"、"巩固一下"、"回顾下昨天"、"用艾宾浩斯帮我安排")、遗忘曲线追踪、掌握度评分。**🔴 强制规则**:每次出题/导入题目成功后,**首轮展示题目前必须问一句**"要不要生成网页练习页?",用户说要 → 调用 quiz-html skill。**不处理**:长期学习项目的进度管理、计划制定(→ study-buddy
---
# 测验大师 (Quiz Mastery)
## 两大核心能力
### 能力一:从学习资料出题
1. 用户提供学习资料(.md / .txt / .docx / .pdf / .ppt / .pptx
2. 调用 `generate_from_material.py` 获取知识点提取 prompt
3. 将 prompt 发给 LLM得到知识点 JSON
4. 调用 `service.save_knowledge_points()` 保存知识点
5. 调用 `run_quiz.py` 生成出题 prompt
6. 将 prompt 发给 LLM得到题目 JSON
7. **⭐ 询问用户是否生成网页练习页**(见下方"网页练习联动"章节)
- 用户说要 → 调用 `quiz-html` skill 生成 HTML 并打开
- 用户说不用 → 走原流程
8. 逐题展示给用户,收集答案
9. 调用 `submit_answers.py` 提交评分
### 能力二:从题目文件练习
1. 用户提供题目文件(.md / .txt / .docx / .pdf / .ppt / .pptx
2. 调用 `import_quiz.py` 获取题目解析 prompt
3. 将 prompt 发给 LLM得到标准化题目 JSON
4. 调用 `service.import_questions()` 导入题目并创建 session
5. **⭐ 询问用户是否生成网页练习页**(见下方"网页练习联动"章节)
- 用户说要 → 调用 `quiz-html` skill 生成 HTML 并打开
- 用户说不用 → 走原流程
6. 逐题展示给用户,收集答案
7. 调用 `submit_answers.py` 提交评分
## 何时使用(触发条件)
1. **用户主动要求**"出几道题"、"测试一下"、"来个小测"、"练习题"、"考考我"
2. **用户说"复习"、"巩固"、"回顾"**:直接触发
3. **基于已有题目文件练习**:用户上传题目文件后触发
> ⚠️ **不处理 study-buddy 的"即时练习"**——那条链路由 study-buddy 走外部 `exam_take`,不调本 skill。
## 没历史数据时的兜底
当用户说"复习"但 `data/user_progress/` 是空的(新用户/没答过题):
- **不要硬启动复习流程**——没数据可复习
- 主动告诉用户:"还没有可复习的历史数据,要不要先用一份学习资料出题练一下?"
- 引导用户走"能力一:从学习资料出题"
## 难度系统
| 级别 | 含义 | 说明 |
|------|------|------|
| L1 | 识记 | 基础记忆和理解,考察概念辨认和基本事实 |
| L2 | 理解 | 深层理解,考察概念区分、原理解释和简单应用 |
| L3 | 应用 | 综合运用,考察实际场景应用、分析和问题解决 |
- **首次出题**:强制从 L1 开始
- **答对当前难度**:升一级(最高 L3
- **答错当前难度**:降一级(最低 L1
## 题型分配规则
| 级别 | 选择题 | 判断题 | 填空题 | 简答题 |
|------|--------|--------|--------|--------|
| L1 | 70% | 30% | - | - |
| L2 | 50% | 20% | 30% | - |
| L3 | 40% | 20% | 20% | 20% |
## 出题数量
- **默认每次出 3 道题**(一次对话展示 3 题,用户一次性回答后统一评分)
- 每轮最多 **15 题**(用户可要求调整数量)
- 简答题尽量少出,不自动评分(标记为 `needs_review`,由外部 LLM/人工评判)
## 薄弱知识点追踪
- **标记为薄弱**:累计错误次数 ≥ 3
- **不解除**:薄弱知识点只增不减,作为历史档案保留
- 内部数据保存在 `data/user_progress/`(错误次数、艾宾浩斯阶段等)
- **同步到 USER.md 第 3 节"薄弱知识点"**(由本 skill 直接写入,来源=`quiz-mastery`
| 知识点 | 错误次数 | 来源 | 备注 |
- 已有该知识点 → 更新错误次数
- 未在表中 → 新增一行
## 遗忘曲线复习机制
基于艾宾浩斯遗忘曲线,按 **1天 → 2天 → 4天 → 7天 → 15天** 间隔安排复习:
- 答对review_stage +1推进到下一个间隔
- 答错review_stage 重置为 0从头开始
- 复习推荐包含:即将遗忘的知识点 + 最近 3 天薄弱知识点
## 脚本调用方式
### 1. 从学习资料提取知识点
```bash
python3 scripts/generate_from_material.py <file_path> <document_id>
```
输出知识点提取 promptJSON将 prompts.system_prompt 和 prompts.user_prompt 发给 LLM。
### 2. 从题目文件导入题目
```bash
python3 scripts/import_quiz.py <file_path> <document_id> <user_id>
```
输出题目解析 promptJSON将 prompts.system_prompt 和 prompts.user_prompt 发给 LLM。
### 3. 生成测验
```bash
python3 scripts/run_quiz.py <user_id> <document_id>
```
根据已保存的知识点和用户当前掌握度自动决定难度,输出出题 promptJSON
### 4. 提交答案
```bash
python3 scripts/submit_answers.py <user_id> <document_id> <session_id> '<answers_json>'
```
参数说明:
- `answers_json`JSON 格式的答案字典,如 `{"q_001": "A", "q_002": "True"}`
返回评分结果score、total、accuracy、逐题 results。
## 出题流程(面向 study-buddy 的调用说明)
1. 确定知识点来源(学习资料 or 已有题目文件)
2. 执行对应的提取/导入流程
3. 调用 `run_quiz.py` 生成出题 prompt
4. **每次展示 3 道题给用户**(一次性展示,编号清晰,不要逐题出),用户一次性回答后再统一评分
5. 收集用户回答(用户可以一次性回复 3 道题的答案)
6. 调用 `submit_answers.py` 提交评分
7. 将评分结果返回给 study-buddy由其写入 memory 文件
⚠️ **本 skill 仅写入 USER.md 第 3 节"薄弱知识点"**(来源=`quiz-mastery`);不写其他分区,也不写 `memory/`。其他持久化由 study-buddy 统一负责。
## 数据目录结构
```
skills/quiz-mastery/data/
├── knowledge_points/ ← 知识点定义(按 document_id
├── sessions/ ← 测验会话记录
└── user_progress/ ← 用户掌握度数据(含薄弱标记、遗忘曲线)
```
## ⭐ 网页练习联动(与 quiz-html 协作)
每次拿到题目 JSON 之后("能力一"步骤 7、"能力二"步骤 5都要**主动问用户一句**
> "题目准备好啦~ 要不要我把它们生成一个网页练习页?你可以在浏览器里慢慢做,错题会自动记下来,还能切换主题、模拟考试 🎯"
### 用户回应判定
| 用户说 | 判定 | 行动 |
|---|---|---|
| "要 / 好 / 嗯 / 来一个 / 生成 / 网页 / 浏览器" | ✅ 要 | 调用 `quiz-html` |
| "不用 / 不要 / 算了 / 直接做 / 这里做" | ❌ 不要 | 走原对话流程 |
| 没回应 / 不明确 | 默认 ❌ 不要 | 直接走原流程,不强推 |
### 调用 quiz-html 的具体步骤
```python
import json, subprocess, tempfile
from pathlib import Path
# 1. 把已经拿到的题目 JSON 写到临时文件
tmp_dir = Path(tempfile.mkdtemp(prefix="quiz_"))
qjson = tmp_dir / "questions.json"
qjson.write_text(json.dumps(questions, ensure_ascii=False), encoding="utf-8")
# 2. 决定输出路径(推荐放 ~/Desktop
output = Path.home() / "Desktop" / f"quiz_{title_slug}.html"
# 3. 调脚本
result = subprocess.run([
"python3",
str(Path.home() / "Desktop/studybuddy_4.0/skills/quiz-html/scripts/build_quiz_html.py"),
str(qjson),
"--title", page_title, # 如 "📚 物理 · 电学练习"
"--output", str(output),
"--open", # 生成后自动用浏览器打开
], capture_output=True, text=True)
info = json.loads(result.stdout) # {"success": true, "output_path": "...", ...}
```
### 题目字段补全建议
调用前,最好给每道题补上以下字段(如果出题时没生成):
- `category`**一级分类,短词**(建议 2-6 字),用于网页顶部分类筛选 chip。
- ✅ 推荐:`物理` / `数学` / `法律` / `历史` / `编程` / `通用`
- ❌ 避免:`通用类 / 1.中华人民共和国证券法1998年12月29日…` 这种长串、含日期/编号/斜杠的写法
- 如果非要分两级,用 `/` 分隔且二级也要短:`物理 / 电学`
- `knowledge_point`:知识点名(侧边栏分组用,可与 quiz-mastery 的 KP title 一致,不要带层级前缀)
- `memory_tip`记忆口诀可选K12 学生很需要)
这样网页的分类筛选、侧栏分组、记忆卡片才能发挥作用。
### 边界
| 任务 | 用谁 |
|---|---|
| 出题、提取题目 | 本 skill (quiz-mastery) |
| 评分、掌握度追踪 | 本 skill (quiz-mastery) |
| **题目 → 网页练习页** | **quiz-html** |
调完 quiz-html 之后,**仍然要走 quiz-mastery 的评分流程**——网页里的答题状态是给用户自查用的,正式的 mastery 数据要靠 `submit_answers.py` 写入。两者并行不冲突。

View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""从学习资料生成知识点提取 prompt。
用法python3 generate_from_material.py <file_path> <document_id>
输出JSON 格式的 promptsystem_prompt + user_prompt由 agent 发给 LLM 执行。
LLM 返回知识点 JSON 后agent 应调用 service.save_knowledge_points() 保存。
"""
from pathlib import Path
import json
import sys
sys.path.append(str(Path(__file__).resolve().parents[1] / "src"))
from quiz_mastery.file_parser import parse_file, build_extraction_prompt
def main() -> None:
if len(sys.argv) < 3:
print("Usage: generate_from_material.py <file_path> <document_id>")
print(" file_path: Path to study material (.md, .txt, .text)")
print(" document_id: Identifier for this document")
sys.exit(1)
file_path = sys.argv[1]
document_id = sys.argv[2]
content = parse_file(file_path)
prompts = build_extraction_prompt(content)
output = {
"action": "extract_knowledge_points",
"document_id": document_id,
"file_path": file_path,
"prompts": prompts,
"instructions": (
"Send the system_prompt and user_prompt to an LLM. "
"The LLM should return a JSON array of knowledge points. "
"Then call save_knowledge_points(document_id, knowledge_points) to save."
),
}
print(json.dumps(output, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
"""从题目文件导入题目。
用法python3 import_quiz.py <file_path> <document_id> <user_id>
输出JSON 格式的 promptsystem_prompt + user_prompt由 agent 发给 LLM 解析题目。
LLM 返回题目 JSON 后agent 应调用 service.import_questions() 导入。
"""
from pathlib import Path
import json
import sys
sys.path.append(str(Path(__file__).resolve().parents[1] / "src"))
from quiz_mastery.file_parser import parse_file
from quiz_mastery.quiz_extractor import build_extraction_prompt
def main() -> None:
if len(sys.argv) < 4:
print("Usage: import_quiz.py <file_path> <document_id> <user_id>")
print(" file_path: Path to question file (.md, .txt, .text)")
print(" document_id: Identifier for this document")
print(" user_id: User identifier")
sys.exit(1)
file_path = sys.argv[1]
document_id = sys.argv[2]
user_id = sys.argv[3]
content = parse_file(file_path)
prompts = build_extraction_prompt(content)
output = {
"action": "import_questions",
"document_id": document_id,
"user_id": user_id,
"file_path": file_path,
"prompts": prompts,
"instructions": (
"Send the system_prompt and user_prompt to an LLM. "
"The LLM should return a JSON array of questions. "
"Then call service.import_questions(document_id, user_id, questions) to import."
),
}
print(json.dumps(output, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python3
"""生成出题 prompt。
用法python3 run_quiz.py <user_id> <document_id>
根据已保存的知识点和用户掌握度记录,自动决定出题难度。
- 从 mastery_records 读取每个知识点的 current_level
- 首次出题的知识点强制 level=1
- 输出 JSON 格式的 promptsystem_prompt + user_prompt由 agent 发给 LLM 生成题目
也支持指定知识点和难度:
python3 run_quiz.py <user_id> <document_id> [level] [kp_id1,kp_id2,...]
"""
from pathlib import Path
import json
import sys
sys.path.append(str(Path(__file__).resolve().parents[1] / "src"))
from quiz_mastery import QuizMasteryService
def main() -> None:
if len(sys.argv) < 3:
print("Usage: run_quiz.py <user_id> <document_id> [level] [kp_id1,kp_id2,...]")
print(" user_id: User identifier")
print(" document_id: Document identifier")
print(" level: Optional difficulty level (1/2/3)")
print(" kp_ids: Optional comma-separated knowledge point IDs")
sys.exit(1)
user_id = sys.argv[1]
document_id = sys.argv[2]
level = None
kp_ids = None
if len(sys.argv) >= 4:
try:
level = int(sys.argv[3])
except ValueError:
# Maybe it's kp_ids instead
kp_ids = sys.argv[3].split(",")
if len(sys.argv) >= 5:
kp_ids = sys.argv[4].split(",")
service = QuizMasteryService(
base_dir=Path(__file__).resolve().parents[1] / "data"
)
result = service.generate_quiz_for_user(
user_id=user_id,
document_id=document_id,
knowledge_point_ids=kp_ids,
level=level,
)
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,48 @@
#!/usr/bin/env python3
"""提交测验答案并评分。
用法python3 submit_answers.py <user_id> <document_id> <session_id> <answers_json>
参数:
user_id: 用户标识
document_id: 文档标识
session_id: 测验会话 ID
answers_json: JSON 格式的答案字典,如 '{"q_001":"A","q_002":"True"}'
输出:评分结果 JSONscore, total, accuracy, results
"""
from pathlib import Path
import json
import sys
sys.path.append(str(Path(__file__).resolve().parents[1] / "src"))
from quiz_mastery import QuizMasteryService
def main() -> None:
if len(sys.argv) < 5:
print("Usage: submit_answers.py <user_id> <document_id> <session_id> <answers_json>")
sys.exit(1)
user_id = sys.argv[1]
document_id = sys.argv[2]
session_id = sys.argv[3]
answers = json.loads(sys.argv[4])
service = QuizMasteryService(
base_dir=Path(__file__).resolve().parents[1] / "data"
)
result = service.submit_quiz_answers(
user_id=user_id,
document_id=document_id,
session_id=session_id,
answers=answers,
)
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

21
skills/quiz-mastery/skill.yaml Executable file
View File

@@ -0,0 +1,21 @@
name: quiz_mastery
version: 0.2.0
entrypoints:
run_quiz: scripts/run_quiz.py
submit_answers: scripts/submit_answers.py
generate_from_material: scripts/generate_from_material.py
import_quiz: scripts/import_quiz.py
storage:
type: local_json
base_dir: ./data
features:
- knowledge_extraction
- quiz_generation
- question_import
- answer_evaluation
- mastery_tracking
- weak_point_tracking
- ebbinghaus_review
- review_planning

View File

@@ -0,0 +1,3 @@
from .service import QuizMasteryService
__all__ = ["QuizMasteryService"]

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
from .models import Question, AnswerResult
class Evaluator:
"""Evaluates user answers against correct answers.
- single_choice / true_false: exact match
- fill_blank: case-insensitive, strip whitespace
- short_answer: no auto-scoring, returns needs_review=True
"""
def evaluate_answers(
self,
questions: list[Question],
user_answers: dict[str, str],
) -> list[AnswerResult]:
results: list[AnswerResult] = []
for q in questions:
user_answer = user_answers.get(q.id)
if q.type == "short_answer":
# Short answer: cannot auto-evaluate, needs human/LLM review
results.append(
AnswerResult(
question_id=q.id,
user_answer=user_answer,
is_correct=None,
score=None,
feedback="需要人工/LLM评判",
error_type="pending_review",
needs_review=True,
)
)
continue
if user_answer is None:
results.append(
AnswerResult(
question_id=q.id,
user_answer=None,
is_correct=False,
score=0.0,
feedback=f"未作答。正确答案:{q.answer}{q.explanation}",
error_type="no_answer",
needs_review=False,
)
)
continue
if q.type == "fill_blank":
# Fill-in-the-blank: case-insensitive, strip whitespace
is_correct = (
user_answer.strip().lower() == str(q.answer).strip().lower()
)
else:
# single_choice / true_false: exact match
is_correct = user_answer.strip() == str(q.answer).strip()
results.append(
AnswerResult(
question_id=q.id,
user_answer=user_answer,
is_correct=is_correct,
score=1.0 if is_correct else 0.0,
feedback=(
q.explanation
if is_correct
else f"回答错误。正确答案:{q.answer}{q.explanation}"
),
error_type="none" if is_correct else "concept_confusion",
needs_review=False,
)
)
return results

View File

@@ -0,0 +1,274 @@
from __future__ import annotations
import subprocess
import zipfile
import xml.etree.ElementTree as ET
from pathlib import Path
SUPPORTED_EXTENSIONS = {".md", ".txt", ".text", ".docx", ".pdf", ".ppt", ".pptx"}
# ── .docx (zero-dep: zip + xml) ──────────────────────────────────
def _parse_docx(file_path: Path) -> str:
"""Extract text from .docx using stdlib only (zipfile + xml).
.docx is a ZIP archive containing word/document.xml with paragraph data.
"""
ns = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}"
with zipfile.ZipFile(str(file_path), "r") as zf:
# Main document body
if "word/document.xml" not in zf.namelist():
raise ValueError("Invalid .docx: word/document.xml not found")
tree = ET.parse(zf.open("word/document.xml"))
root = tree.getroot()
parts: list[str] = []
for para in root.iter(f"{ns}p"):
texts = [node.text for node in para.iter(f"{ns}t") if node.text]
line = "".join(texts).strip()
if line:
parts.append(line)
return "\n\n".join(parts)
# ── .pptx (zero-dep: zip + xml) ──────────────────────────────────
def _parse_pptx(file_path: Path) -> str:
"""Extract text from .pptx using stdlib only (zipfile + xml).
.pptx is a ZIP archive; each slide is at ppt/slides/slideN.xml.
"""
ns_a = "{http://schemas.openxmlformats.org/drawingml/2006/main}"
with zipfile.ZipFile(str(file_path), "r") as zf:
slide_names = sorted(
[n for n in zf.namelist() if n.startswith("ppt/slides/slide") and n.endswith(".xml")]
)
if not slide_names:
raise ValueError("Invalid .pptx: no slides found")
parts: list[str] = []
for idx, slide_name in enumerate(slide_names, 1):
tree = ET.parse(zf.open(slide_name))
root = tree.getroot()
slide_texts: list[str] = []
for para in root.iter(f"{ns_a}p"):
texts = [node.text for node in para.iter(f"{ns_a}t") if node.text]
line = "".join(texts).strip()
if line:
slide_texts.append(line)
if slide_texts:
parts.append(f"[Slide {idx}]\n" + "\n".join(slide_texts))
return "\n\n".join(parts)
# ── .ppt (legacy binary → textutil fallback) ─────────────────────
def _parse_ppt(file_path: Path) -> str:
"""Extract text from legacy .ppt format.
Tries macOS textutil first. If unavailable, raises a helpful error.
"""
# macOS textutil can convert .doc but not .ppt directly.
# Try python-pptx as optional, otherwise error with guidance.
try:
from pptx import Presentation
prs = Presentation(str(file_path))
parts: list[str] = []
for slide_num, slide in enumerate(prs.slides, 1):
slide_texts: list[str] = []
for shape in slide.shapes:
if shape.has_text_frame:
for para in shape.text_frame.paragraphs:
text = para.text.strip()
if text:
slide_texts.append(text)
if slide_texts:
parts.append(f"[Slide {slide_num}]\n" + "\n".join(slide_texts))
return "\n\n".join(parts)
except ImportError:
pass
except Exception:
pass
raise ValueError(
"Legacy .ppt format requires conversion. "
"Please save as .pptx first (open in PowerPoint/WPS → Save As → .pptx), "
"or install python-pptx: pip install python-pptx"
)
# ── .pdf (macOS native or pymupdf fallback) ──────────────────────
def _parse_pdf(file_path: Path) -> str:
"""Extract text from .pdf.
Strategy:
1. Try pymupdf (fitz) if installed — best quality
2. Fallback: macOS `osascript` + Quartz filter (zero-dep on macOS)
3. Fallback: `pdftotext` from poppler if installed
"""
# Strategy 1: pymupdf
try:
import fitz
doc = fitz.open(str(file_path))
parts: list[str] = []
for page in doc:
text = page.get_text().strip()
if text:
parts.append(text)
doc.close()
if parts:
return "\n\n".join(parts)
except ImportError:
pass
# Strategy 2: macOS python3 Quartz (Core Graphics) — zero-dep on macOS
try:
result = subprocess.run(
[
"python3", "-c",
"import sys\n"
"from Quartz import PDFDocument\n"
"from Foundation import NSURL\n"
"url = NSURL.fileURLWithPath_(sys.argv[1])\n"
"doc = PDFDocument.alloc().initWithURL_(url)\n"
"if doc is None: sys.exit(1)\n"
"parts = []\n"
"for i in range(doc.pageCount()):\n"
" page = doc.pageAtIndex_(i)\n"
" text = page.string()\n"
" if text and text.strip(): parts.append(text.strip())\n"
"print('\\n\\n'.join(parts))\n",
str(file_path),
],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Strategy 3: pdftotext (poppler)
try:
result = subprocess.run(
["pdftotext", str(file_path), "-"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
raise ValueError(
"Could not extract text from PDF. Options:\n"
"1. Install pymupdf: pip install pymupdf\n"
"2. Install poppler: brew install poppler (provides pdftotext)\n"
"3. On macOS, ensure Quartz/pyobjc is available"
)
# ── Main entry ────────────────────────────────────────────────────
def parse_file(file_path: str) -> str:
"""Read a file and return its text content.
Supports: .md, .txt, .text, .docx, .pdf, .ppt, .pptx
.docx and .pptx use Python stdlib only (zipfile + xml).
.pdf tries pymupdf → macOS Quartz → pdftotext (graceful fallback).
.ppt (legacy) tries python-pptx if installed, otherwise asks for conversion.
Raises:
FileNotFoundError: If file does not exist.
ValueError: If file extension is not supported or extraction fails.
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
suffix = path.suffix.lower()
if suffix not in SUPPORTED_EXTENSIONS:
raise ValueError(
f"Unsupported file type: {suffix}. "
f"Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}"
)
if suffix == ".docx":
return _parse_docx(path)
elif suffix == ".pdf":
return _parse_pdf(path)
elif suffix == ".pptx":
return _parse_pptx(path)
elif suffix == ".ppt":
return _parse_ppt(path)
return path.read_text(encoding="utf-8")
def build_extraction_prompt(content: str) -> dict:
"""Build a prompt for LLM to extract knowledge points from study material.
Args:
content: The text content of the study material.
Returns:
dict with 'system_prompt' and 'user_prompt' keys.
"""
system_prompt = (
"你是一个专业的知识点提取助手。从用户提供的学习资料中提取核心知识点。\n"
"严格按照要求的 JSON 格式输出,不要输出任何其他内容。"
)
user_prompt = f"""请从以下学习资料中提取核心知识点。
## 学习资料内容
{content}
## 提取要求
1. 每个知识点必须包含以下字段:
- id: 唯一标识符格式kp_001, kp_002, ...
- title: 知识点名称(简洁明确)
- definition: 知识点的定义(一句话概括)
- description: 详细描述(可包含原文中的关键内容)
- tags: 标签列表(用于分类和检索)
2. 提取所有重要的概念、原理、定义、公式等
3. 每个知识点应该是独立的、可测试的单元
4. description 应尽量保留原文中的关键表述
## 输出格式
输出纯 JSON 数组,每个元素格式如下:
```json
[
{{
"id": "kp_001",
"title": "知识点名称",
"definition": "一句话定义",
"description": "详细描述,包含原文关键内容",
"tags": ["标签1", "标签2"]
}}
]
```
请直接输出 JSON 数组,不要包含 markdown 代码块标记或其他文字。"""
return {
"system_prompt": system_prompt,
"user_prompt": user_prompt,
}

View File

@@ -0,0 +1,149 @@
from __future__ import annotations
from datetime import datetime, timedelta
from .models import MasteryRecord, Question, AnswerResult
# Ebbinghaus forgetting curve intervals in days
REVIEW_INTERVALS = [1, 2, 4, 7, 15]
# Maximum accuracy history entries to keep
MAX_ACCURACY_HISTORY = 10
class MasteryEngine:
"""Tracks mastery level, weak points, and spaced repetition schedule."""
def update_mastery(
self,
existing_records: dict[str, MasteryRecord],
questions: list[Question],
results: list[AnswerResult],
) -> dict[str, MasteryRecord]:
"""Update mastery records based on quiz results.
- Correct answer at current level → level up (max 3)
- Wrong answer → level down (min 1)
- First time → forced to level 1
- Weak marking: accuracy < 0.5 or 2 consecutive wrong → is_weak = True
- Strong recovery: accuracy >= 0.8 for 2 consecutive → is_weak = False
- Updates Ebbinghaus review schedule
"""
question_map = {q.id: q for q in questions}
now_str = datetime.now().strftime("%Y-%m-%d")
# Group results by knowledge point
kp_scores: dict[str, list[float]] = {}
for r in results:
q = question_map.get(r.question_id)
if q is None:
continue
# Skip short_answer that needs review (score is None)
if r.score is None:
continue
for kp_id in q.knowledge_point_ids:
if kp_id not in kp_scores:
kp_scores[kp_id] = []
kp_scores[kp_id].append(r.score)
for kp_id, scores in kp_scores.items():
record = existing_records.get(kp_id)
if record is None:
record = MasteryRecord(knowledge_point_id=kp_id, current_level=1)
existing_records[kp_id] = record
accuracy = sum(scores) / len(scores) if scores else 0.0
# Update basic stats
record.attempts += 1
record.last_accuracy = accuracy
record.best_accuracy = max(record.best_accuracy, accuracy)
record.last_reviewed_at = now_str
# Update accuracy history (keep last 10)
record.accuracy_history.append(accuracy)
if len(record.accuracy_history) > MAX_ACCURACY_HISTORY:
record.accuracy_history = record.accuracy_history[-MAX_ACCURACY_HISTORY:]
# Update level: correct → up, wrong → down
is_correct = accuracy >= 0.6 # threshold for "correct" at current level
if is_correct:
record.current_level = min(record.current_level + 1, 3)
else:
record.current_level = max(record.current_level - 1, 1)
# Update weak status
self._update_weak_status(record)
# Update Ebbinghaus review schedule
self._update_review_schedule(record, is_correct, now_str)
return existing_records
def _update_weak_status(self, record: MasteryRecord) -> None:
"""Mark or unmark a knowledge point as weak.
Weak if: accuracy < 0.5 OR last 2 attempts both wrong
Recover if: accuracy >= 0.8 for last 2 consecutive attempts
"""
history = record.accuracy_history
# Check for consecutive failures (last 2 both < 0.6)
if len(history) >= 2 and history[-1] < 0.6 and history[-2] < 0.6:
record.is_weak = True
return
# Check overall recent accuracy
if record.last_accuracy < 0.5:
record.is_weak = True
return
# Check for recovery: last 2 both >= 0.8
if len(history) >= 2 and history[-1] >= 0.8 and history[-2] >= 0.8:
record.is_weak = False
def _update_review_schedule(
self, record: MasteryRecord, is_correct: bool, today_str: str
) -> None:
"""Update Ebbinghaus spaced repetition schedule.
Correct → advance review_stage (max 4)
Wrong → reset review_stage to 0
"""
if is_correct:
record.review_stage = min(record.review_stage + 1, len(REVIEW_INTERVALS) - 1)
else:
record.review_stage = 0
interval_days = REVIEW_INTERVALS[record.review_stage]
today = datetime.strptime(today_str, "%Y-%m-%d")
next_review = today + timedelta(days=interval_days)
record.next_review_at = next_review.strftime("%Y-%m-%d")
def get_review_candidates(
self,
records: dict[str, MasteryRecord],
today_str: str,
) -> list[str]:
"""Return knowledge point IDs that need review.
Criteria:
- next_review_at <= today (due for review)
- is_weak=True and last_reviewed_at within last 3 days
"""
candidates = set()
today = datetime.strptime(today_str, "%Y-%m-%d")
three_days_ago = (today - timedelta(days=3)).strftime("%Y-%m-%d")
for kp_id, record in records.items():
# Due for review based on Ebbinghaus schedule
if record.next_review_at and record.next_review_at <= today_str:
candidates.add(kp_id)
# Weak and recently reviewed (within 3 days)
if record.is_weak and record.last_reviewed_at:
if record.last_reviewed_at >= three_days_ago:
candidates.add(kp_id)
return list(candidates)

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from typing import Any, Literal
QuestionType = Literal["single_choice", "true_false", "fill_blank", "short_answer"]
@dataclass
class KnowledgeSource:
document_id: str
section_title: str = ""
page_start: int | None = None
page_end: int | None = None
snippets: list[str] = field(default_factory=list)
@dataclass
class KnowledgePoint:
id: str
title: str
description: str
definition: str = ""
tags: list[str] = field(default_factory=list)
source: KnowledgeSource | None = None
@dataclass
class Question:
id: str
knowledge_point_ids: list[str]
level: int # 1, 2, 3
type: QuestionType
prompt: str
options: list[str] = field(default_factory=list)
answer: Any = None
explanation: str = ""
source_refs: list[str] = field(default_factory=list)
@dataclass
class QuizSession:
session_id: str
user_id: str
document_id: str
level: int # 1, 2, 3
knowledge_point_ids: list[str]
questions: list[Question]
status: str = "generated"
@dataclass
class AnswerResult:
question_id: str
user_answer: Any
is_correct: bool | None # None for short_answer needing review
score: float | None # None for short_answer needing review
feedback: str = ""
error_type: str = "unknown"
needs_review: bool = False
@dataclass
class MasteryRecord:
knowledge_point_id: str
current_level: int = 1 # 1, 2, 3
attempts: int = 0
last_accuracy: float = 0.0
best_accuracy: float = 0.0
last_reviewed_at: str | None = None
is_weak: bool = False
accuracy_history: list[float] = field(default_factory=list)
next_review_at: str | None = None # ISO date string YYYY-MM-DD
review_stage: int = 0 # 0-4, maps to [1, 2, 4, 7, 15] days
def to_dict(obj: Any) -> Any:
"""Convert a dataclass instance to dict."""
if hasattr(obj, "__dataclass_fields__"):
return asdict(obj)
return obj

View File

@@ -0,0 +1,62 @@
from __future__ import annotations
from datetime import datetime, timedelta
from .models import KnowledgePoint, MasteryRecord
class Planner:
"""Recommends knowledge points for review based on Ebbinghaus
forgetting curve and weak-point tracking."""
def recommend_review(
self,
knowledge_points: list[KnowledgePoint],
mastery_records: dict[str, MasteryRecord],
today_str: str,
) -> list[dict]:
"""Return a list of review recommendations.
Combines:
1. Knowledge points due for review (next_review_at <= today)
2. Weak knowledge points reviewed in the last 3 days
Returns:
List of dicts with 'knowledge_point_id', 'title', 'reason',
'current_level', 'is_weak'.
"""
today = datetime.strptime(today_str, "%Y-%m-%d")
three_days_ago_str = (today - timedelta(days=3)).strftime("%Y-%m-%d")
# Build a title lookup
kp_title_map = {kp.id: kp.title for kp in knowledge_points}
seen_ids: set[str] = set()
recommendations: list[dict] = []
for kp_id, record in mastery_records.items():
reasons: list[str] = []
# Check Ebbinghaus due
if record.next_review_at and record.next_review_at <= today_str:
reasons.append("遗忘曲线到期,需要复习")
# Check weak + recently reviewed
if record.is_weak and record.last_reviewed_at:
if record.last_reviewed_at >= three_days_ago_str:
reasons.append("薄弱知识点最近3天内有练习记录")
elif not record.next_review_at:
# Weak but no review schedule yet
reasons.append("薄弱知识点,建议复习")
if reasons and kp_id not in seen_ids:
seen_ids.add(kp_id)
recommendations.append({
"knowledge_point_id": kp_id,
"title": kp_title_map.get(kp_id, kp_id),
"reason": "".join(reasons),
"current_level": record.current_level,
"is_weak": record.is_weak,
})
return recommendations

View File

@@ -0,0 +1,115 @@
from __future__ import annotations
import json
from .models import Question
def build_extraction_prompt(content: str) -> dict:
"""Build a prompt for LLM to parse questions from a question file.
Args:
content: Raw text content containing questions.
Returns:
dict with 'system_prompt' and 'user_prompt' keys.
"""
system_prompt = (
"你是一个专业的题目解析助手。从用户提供的题目文件中识别并解析所有题目。\n"
"严格按照要求的 JSON 格式输出,不要输出任何其他内容。"
)
user_prompt = f"""请从以下题目文件内容中解析出所有题目。
## 题目文件内容
{content}
## 解析要求
1. 识别每道题的类型:
- single_choice: 选择题(有 A/B/C/D 等选项)
- true_false: 判断题(判断对错)
- fill_blank: 填空题(有空格需要填写)
- short_answer: 简答题(需要文字回答)
2. 提取题目的所有信息
3. 如果题目有答案和解析,也一并提取
4. 为每道题分配唯一 ID
## 输出格式
输出纯 JSON 数组,每个元素格式如下:
```json
[
{{
"id": "q_001",
"knowledge_point_ids": [],
"level": 1,
"type": "single_choice",
"prompt": "题目内容",
"options": ["A. 选项一", "B. 选项二", "C. 选项三", "D. 选项四"],
"answer": "A",
"explanation": "解析内容(如果有)"
}}
]
```
注意:
- type 必须是 single_choice, true_false, fill_blank, short_answer 之一
- 选择题的 answer 填选项字母A/B/C/D
- 判断题的 answer 填 "True""False"
- 填空题的 answer 填正确答案文本
- 简答题的 answer 填参考答案(如果有)
- 如果无法确定 knowledge_point_ids留空数组
- level 默认为 1如果能从题目难度判断则相应调整
请直接输出 JSON 数组,不要包含 markdown 代码块标记或其他文字。"""
return {
"system_prompt": system_prompt,
"user_prompt": user_prompt,
}
def parse_questions_json(json_str: str) -> list[Question]:
"""Parse LLM-returned JSON string into a list of Question objects.
Args:
json_str: JSON string containing a list of question dicts.
Returns:
List of Question objects.
Raises:
json.JSONDecodeError: If json_str is not valid JSON.
ValueError: If the parsed data is not a list.
"""
# Try to extract JSON from possible markdown code blocks
cleaned = json_str.strip()
if cleaned.startswith("```"):
# Remove markdown code block markers
lines = cleaned.split("\n")
# Remove first line (```json or ```)
lines = lines[1:]
# Remove last line (```)
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
cleaned = "\n".join(lines)
data = json.loads(cleaned)
if not isinstance(data, list):
raise ValueError(f"Expected a JSON array, got {type(data).__name__}")
questions: list[Question] = []
for item in data:
q = Question(
id=item.get("id", "q_unknown"),
knowledge_point_ids=item.get("knowledge_point_ids", []),
level=item.get("level", 1),
type=item.get("type", "single_choice"),
prompt=item.get("prompt", ""),
options=item.get("options", []),
answer=item.get("answer"),
explanation=item.get("explanation", ""),
source_refs=item.get("source_refs", []),
)
questions.append(q)
return questions

View File

@@ -0,0 +1,128 @@
from __future__ import annotations
import json
from .models import KnowledgePoint
# Maximum questions per quiz round
MAX_QUESTIONS_PER_ROUND = 15
# Question type distribution by level
LEVEL_DISTRIBUTION = {
1: {"single_choice": 70, "true_false": 30},
2: {"single_choice": 50, "fill_blank": 30, "true_false": 20},
3: {"single_choice": 40, "fill_blank": 20, "true_false": 20, "short_answer": 20},
}
class QuizGenerator:
"""Builds prompt templates for LLM-based quiz generation.
This module does NOT call any LLM API. It constructs system_prompt and
user_prompt that should be sent to an LLM by the caller (agent).
"""
def generate_quiz(
self,
knowledge_points: list[KnowledgePoint],
level: int = 1,
num_questions: int | None = None,
) -> dict:
"""Build prompts for quiz generation.
Args:
knowledge_points: List of knowledge points to quiz on.
level: Difficulty level (1, 2, or 3).
num_questions: Number of questions to generate (max 15).
Returns:
dict with 'system_prompt' and 'user_prompt' keys.
"""
level = max(1, min(3, level))
if num_questions is None:
num_questions = 3 # Default: 3 questions per round for efficiency
num_questions = min(num_questions, MAX_QUESTIONS_PER_ROUND)
distribution = LEVEL_DISTRIBUTION[level]
distribution_text = "\n".join(
f" - {qtype}: {pct}%" for qtype, pct in distribution.items()
)
# Build knowledge point descriptions for the prompt
kp_descriptions = []
for kp in knowledge_points:
desc_parts = [f"- **{kp.title}** (ID: {kp.id})"]
if kp.definition:
desc_parts.append(f" 定义: {kp.definition}")
if kp.description:
desc_parts.append(f" 描述: {kp.description}")
if kp.source and kp.source.snippets:
desc_parts.append(f" 原文片段: {'; '.join(kp.source.snippets)}")
kp_descriptions.append("\n".join(desc_parts))
kp_text = "\n\n".join(kp_descriptions)
level_desc = {
1: "识记(基础记忆和理解,考察概念辨认和基本事实)",
2: "理解(深层理解,考察概念区分、原理解释和简单应用)",
3: "应用(综合运用,考察实际场景应用、分析和问题解决)",
}
system_prompt = (
"你是一个专业的出题助手。根据提供的知识点信息,生成高质量的测验题目。\n"
"严格按照要求的 JSON 格式输出,不要输出任何其他内容。\n"
"题目必须紧扣知识点的名称、定义和原文描述,不能出脱离原文的题目。"
)
user_prompt = f"""请根据以下知识点生成 {num_questions} 道测验题。
## 难度级别
Level {level}: {level_desc[level]}
## 题型分配
{distribution_text}
## 知识点信息
{kp_text}
## 出题要求
1. 每道题必须关联至少一个知识点 ID
2. 选择题 (single_choice)4 个选项 A/B/C/Danswer 填正确选项字母
3. 判断题 (true_false)answer 填 "True""False"
4. 填空题 (fill_blank)prompt 中用 ____ 标记空白处answer 填正确答案文本
5. 简答题 (short_answer)answer 填参考答案
6. 每道题必须包含 explanation解析
7. 题目内容必须基于上述知识点的名称、定义和描述,不要超出范围
8. **每道题必须填写 `category` 和 `knowledge_point` 字段**(用于分类筛选和侧栏分组):
- `category`:一级分类,**短词**(建议 2-6 字),用于顶部分类筛选 chip。
例:物理 / 数学 / 法律 / 历史 / 编程 / 通用 等。
**不要**写成长串、不要含日期、不要含编号、不要含书名号或斜杠。
- `knowledge_point`:所属知识点名称(直接用关联知识点的 title 即可),用于侧栏分组。
9. 总题数:{num_questions}
## 输出格式
输出纯 JSON 数组,每个元素格式如下:
```json
[
{{
"id": "q_001",
"knowledge_point_ids": ["kp_id"],
"category": "物理",
"knowledge_point": "牛顿第二定律",
"level": {level},
"type": "single_choice",
"prompt": "题目内容",
"options": ["A. 选项一", "B. 选项二", "C. 选项三", "D. 选项四"],
"answer": "A",
"explanation": "解析内容"
}}
]
```
请直接输出 JSON 数组,不要包含 markdown 代码块标记或其他文字。"""
return {
"system_prompt": system_prompt,
"user_prompt": user_prompt,
}

View File

@@ -0,0 +1,36 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
class JsonRepository:
def __init__(self, base_dir: str | Path):
self.base_dir = Path(base_dir)
self.kp_dir = self.base_dir / "knowledge_points"
self.progress_dir = self.base_dir / "user_progress"
self.sessions_dir = self.base_dir / "sessions"
self._ensure_dirs()
def _ensure_dirs(self) -> None:
for d in [self.kp_dir, self.progress_dir, self.sessions_dir]:
d.mkdir(parents=True, exist_ok=True)
def load_json(self, path: Path, default: Any = None) -> Any:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
def save_json(self, path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def knowledge_points_path(self, document_id: str) -> Path:
return self.kp_dir / f"{document_id}.json"
def progress_path(self, user_id: str, document_id: str) -> Path:
return self.progress_dir / f"{user_id}__{document_id}.json"
def session_path(self, session_id: str) -> Path:
return self.sessions_dir / f"{session_id}.json"

View File

@@ -0,0 +1,304 @@
from __future__ import annotations
from dataclasses import asdict
from pathlib import Path
import uuid
import json
from .models import KnowledgePoint, KnowledgeSource, Question, QuizSession, MasteryRecord
from .repository import JsonRepository
from .quiz_generator import QuizGenerator
from .evaluator import Evaluator
from .mastery_engine import MasteryEngine
from .planner import Planner
class QuizMasteryService:
def __init__(self, base_dir: str | Path):
self.repo = JsonRepository(base_dir)
self.generator = QuizGenerator()
self.evaluator = Evaluator()
self.mastery_engine = MasteryEngine()
self.planner = Planner()
# ── Knowledge Points ──────────────────────────────────────────
def load_knowledge_points(self, document_id: str) -> list[KnowledgePoint]:
"""Load knowledge points from JSON file."""
raw = self.repo.load_json(
self.repo.knowledge_points_path(document_id), default={}
)
items = raw.get("knowledge_points", [])
result: list[KnowledgePoint] = []
for item in items:
source_data = item.get("source")
kp = KnowledgePoint(
id=item["id"],
title=item["title"],
description=item.get("description", ""),
definition=item.get("definition", ""),
tags=item.get("tags", []),
source=KnowledgeSource(**source_data) if source_data else None,
)
result.append(kp)
return result
def save_knowledge_points(
self, document_id: str, knowledge_points_data: list[dict]
) -> None:
"""Save extracted knowledge points to JSON.
Args:
document_id: Document identifier.
knowledge_points_data: List of dicts, each with id, title,
definition, description, tags.
"""
payload = {"knowledge_points": knowledge_points_data}
self.repo.save_json(
self.repo.knowledge_points_path(document_id), payload
)
# ── Progress ──────────────────────────────────────────────────
def load_progress(
self, user_id: str, document_id: str
) -> dict[str, MasteryRecord]:
"""Load user mastery progress from JSON."""
raw = self.repo.load_json(
self.repo.progress_path(user_id, document_id), default={}
)
mastery_records = raw.get("mastery_records", {})
parsed: dict[str, MasteryRecord] = {}
for kp_id, record in mastery_records.items():
mr = MasteryRecord(knowledge_point_id=kp_id)
mr.current_level = record.get("current_level", 1)
mr.attempts = record.get("attempts", 0)
mr.last_accuracy = record.get("last_accuracy", 0.0)
mr.best_accuracy = record.get("best_accuracy", 0.0)
mr.last_reviewed_at = record.get("last_reviewed_at")
mr.is_weak = record.get("is_weak", False)
mr.accuracy_history = record.get("accuracy_history", [])
mr.next_review_at = record.get("next_review_at")
mr.review_stage = record.get("review_stage", 0)
parsed[kp_id] = mr
return parsed
def save_progress(
self,
user_id: str,
document_id: str,
records: dict[str, MasteryRecord],
) -> None:
"""Save user mastery progress to JSON."""
payload = {
"user_id": user_id,
"document_id": document_id,
"mastery_records": {
kp_id: asdict(record) for kp_id, record in records.items()
},
}
self.repo.save_json(
self.repo.progress_path(user_id, document_id), payload
)
# ── Quiz Generation ───────────────────────────────────────────
def generate_quiz_for_user(
self,
user_id: str,
document_id: str,
knowledge_point_ids: list[str] | None = None,
level: int | None = None,
num_questions: int | None = None,
) -> dict:
"""Generate quiz prompts for given knowledge points.
If knowledge_point_ids is None, uses all knowledge points.
If level is None, reads current_level from mastery records
(first-time defaults to 1).
Returns dict with 'prompts' (system_prompt + user_prompt),
'knowledge_points' used, and 'level'.
"""
knowledge_points = self.load_knowledge_points(document_id)
progress = self.load_progress(user_id, document_id)
if knowledge_point_ids:
selected = [kp for kp in knowledge_points if kp.id in knowledge_point_ids]
else:
selected = knowledge_points
if not selected:
return {"error": "No knowledge points found"}
# Determine level per knowledge point group
# Use the most common level or explicit level
if level is not None:
quiz_level = max(1, min(3, level))
else:
# Determine from mastery records; first-time = 1
levels = []
for kp in selected:
record = progress.get(kp.id)
if record is None:
levels.append(1) # First time → L1
else:
levels.append(record.current_level)
# Use the minimum level among selected (conservative)
quiz_level = min(levels) if levels else 1
prompts = self.generator.generate_quiz(
selected, level=quiz_level, num_questions=num_questions
)
return {
"document_id": document_id,
"user_id": user_id,
"level": quiz_level,
"knowledge_point_ids": [kp.id for kp in selected],
"prompts": prompts,
}
# ── Quiz Submission ───────────────────────────────────────────
def submit_quiz_answers(
self,
user_id: str,
document_id: str,
session_id: str,
answers: dict[str, str],
) -> dict:
"""Submit answers for a quiz session, evaluate, and update mastery."""
session_data = self.repo.load_json(
self.repo.session_path(session_id), default=None
)
if not session_data:
raise FileNotFoundError(f"Quiz session not found: {session_id}")
questions = []
for item in session_data["questions"]:
questions.append(Question(**item))
results = self.evaluator.evaluate_answers(questions, answers)
progress = self.load_progress(user_id, document_id)
updated = self.mastery_engine.update_mastery(progress, questions, results)
self.save_progress(user_id, document_id, updated)
# Calculate score (excluding short_answer with score=None)
scored_results = [r for r in results if r.score is not None]
score = sum(r.score for r in scored_results)
total = len(scored_results)
needs_review_count = sum(1 for r in results if r.needs_review)
summary = {
"session_id": session_id,
"score": score,
"total": total,
"accuracy": score / total if total else 0.0,
"needs_review_count": needs_review_count,
"results": [asdict(r) for r in results],
}
# Update session data
session_data["answers"] = answers
session_data["results"] = summary["results"]
session_data["status"] = "completed"
self.repo.save_json(self.repo.session_path(session_id), session_data)
return summary
# ── Import Questions ──────────────────────────────────────────
def import_questions(
self,
document_id: str,
user_id: str,
questions_data: list[dict],
) -> dict:
"""Import parsed questions and create a quiz session.
Args:
document_id: Document identifier.
user_id: User identifier.
questions_data: List of question dicts (from LLM parsing).
Returns:
dict with session_id and questions.
"""
questions: list[Question] = []
for item in questions_data:
q = Question(
id=item.get("id", f"q_{uuid.uuid4().hex[:8]}"),
knowledge_point_ids=item.get("knowledge_point_ids", []),
level=item.get("level", 1),
type=item.get("type", "single_choice"),
prompt=item.get("prompt", ""),
options=item.get("options", []),
answer=item.get("answer"),
explanation=item.get("explanation", ""),
source_refs=item.get("source_refs", []),
)
questions.append(q)
session_id = f"quiz_{uuid.uuid4().hex[:12]}"
kp_ids = list(
set(kp_id for q in questions for kp_id in q.knowledge_point_ids)
)
level = questions[0].level if questions else 1
session = QuizSession(
session_id=session_id,
user_id=user_id,
document_id=document_id,
level=level,
knowledge_point_ids=kp_ids,
questions=questions,
)
self.repo.save_json(
self.repo.session_path(session_id),
asdict(session),
)
return {
"session_id": session_id,
"document_id": document_id,
"level": level,
"num_questions": len(questions),
"questions": [asdict(q) for q in questions],
}
# ── Review Candidates ─────────────────────────────────────────
def get_review_candidates(
self, user_id: str, document_id: str, today_str: str | None = None
) -> list[dict]:
"""Get review recommendations for a user.
Returns list of knowledge points that need review.
"""
from datetime import datetime
if today_str is None:
today_str = datetime.now().strftime("%Y-%m-%d")
knowledge_points = self.load_knowledge_points(document_id)
progress = self.load_progress(user_id, document_id)
return self.planner.recommend_review(knowledge_points, progress, today_str)
# ── User Progress ─────────────────────────────────────────────
def get_user_progress(self, user_id: str, document_id: str) -> dict:
"""Get user's mastery progress summary."""
progress = self.load_progress(user_id, document_id)
return {
"user_id": user_id,
"document_id": document_id,
"mastery_records": {
kp_id: asdict(record) for kp_id, record in progress.items()
},
}

View File

@@ -0,0 +1,7 @@
from __future__ import annotations
from pathlib import Path
def project_root() -> Path:
return Path(__file__).resolve().parents[2]