v0.5.0: Binary-Free Mode - No OpenCode binary required

 Major Features:
- Native session management without OpenCode binary
- Provider routing: OpenCode Zen (free), Qwen OAuth, Z.AI
- Streaming chat with tool execution loop
- Mode detection API (/api/meta/mode)
- MCP integration fix (resolved infinite loading)
- NomadArch Native option in UI with comparison info

🆓 Free Models (No API Key):
- GPT-5 Nano (400K context)
- Grok Code Fast 1 (256K context)
- GLM-4.7 (205K context)
- Doubao Seed Code (256K context)
- Big Pickle (200K context)

📦 New Files:
- session-store.ts: Native session persistence
- native-sessions.ts: REST API for sessions
- lite-mode.ts: UI mode detection client
- native-sessions.ts (UI): SolidJS store

🔧 Updated:
- All installers: Optional binary download
- All launchers: Mode detection display
- Binary selector: Added NomadArch Native option
- README: Binary-Free Mode documentation
This commit is contained in:
Gemini AI
2025-12-26 02:08:13 +04:00
Unverified
parent 8dddf4d0cf
commit 4bd2893864
83 changed files with 10678 additions and 1290 deletions

View File

@@ -31,6 +31,7 @@
"fastify": "^4.28.1",
"fuzzysort": "^2.0.4",
"pino": "^9.4.0",
"ulid": "^3.0.2",
"undici": "^6.19.8",
"zod": "^3.23.8"
},

View File

@@ -42,19 +42,55 @@ export type ZenModel = z.infer<typeof ZenModelSchema>
// Chat message schema (OpenAI-compatible)
export const ChatMessageSchema = z.object({
role: z.enum(["user", "assistant", "system"]),
content: z.string()
role: z.enum(["user", "assistant", "system", "tool"]),
content: z.string().optional(),
tool_calls: z.array(z.object({
id: z.string(),
type: z.literal("function"),
function: z.object({
name: z.string(),
arguments: z.string()
})
})).optional(),
tool_call_id: z.string().optional()
})
export type ChatMessage = z.infer<typeof ChatMessageSchema>
// Chat request schema
// Tool Definition Schema
export const ToolDefinitionSchema = z.object({
type: z.literal("function"),
function: z.object({
name: z.string(),
description: z.string(),
parameters: z.object({
type: z.literal("object"),
properties: z.record(z.any()),
required: z.array(z.string()).optional()
})
})
})
export type ToolDefinition = z.infer<typeof ToolDefinitionSchema>
export const ChatRequestSchema = z.object({
model: z.string(),
messages: z.array(ChatMessageSchema),
stream: z.boolean().default(true),
temperature: z.number().optional(),
max_tokens: z.number().optional()
max_tokens: z.number().optional(),
tools: z.array(ToolDefinitionSchema).optional(),
tool_choice: z.union([
z.literal("auto"),
z.literal("none"),
z.object({
type: z.literal("function"),
function: z.object({ name: z.string() })
})
]).optional(),
workspacePath: z.string().optional(),
enableTools: z.boolean().optional()
})
export type ChatRequest = z.infer<typeof ChatRequestSchema>

View File

@@ -1,8 +1,9 @@
import { z } from "zod"
import { createHmac } from "crypto"
export const ZAIConfigSchema = z.object({
apiKey: z.string().optional(),
endpoint: z.string().default("https://api.z.ai/api/paas/v4"),
endpoint: z.string().default("https://api.z.ai/api/coding/paas/v4"),
enabled: z.boolean().default(false),
timeout: z.number().default(300000)
})
@@ -10,18 +11,55 @@ export const ZAIConfigSchema = z.object({
export type ZAIConfig = z.infer<typeof ZAIConfigSchema>
export const ZAIMessageSchema = z.object({
role: z.enum(["user", "assistant", "system"]),
content: z.string()
role: z.enum(["user", "assistant", "system", "tool"]),
content: z.string().optional(),
tool_calls: z.array(z.object({
id: z.string(),
type: z.literal("function"),
function: z.object({
name: z.string(),
arguments: z.string()
})
})).optional(),
tool_call_id: z.string().optional()
})
export type ZAIMessage = z.infer<typeof ZAIMessageSchema>
// Tool Definition Schema (OpenAI-compatible)
export const ZAIToolSchema = z.object({
type: z.literal("function"),
function: z.object({
name: z.string(),
description: z.string(),
parameters: z.object({
type: z.literal("object"),
properties: z.record(z.object({
type: z.string(),
description: z.string().optional()
})),
required: z.array(z.string()).optional()
})
})
})
export type ZAITool = z.infer<typeof ZAIToolSchema>
export const ZAIChatRequestSchema = z.object({
model: z.string().default("glm-4.7"),
messages: z.array(ZAIMessageSchema),
max_tokens: z.number().default(8192),
stream: z.boolean().default(true),
temperature: z.number().optional(),
tools: z.array(ZAIToolSchema).optional(),
tool_choice: z.union([
z.literal("auto"),
z.literal("none"),
z.object({
type: z.literal("function"),
function: z.object({ name: z.string() })
})
]).optional(),
thinking: z.object({
type: z.enum(["enabled", "disabled"]).optional()
}).optional()
@@ -38,8 +76,16 @@ export const ZAIChatResponseSchema = z.object({
index: z.number(),
message: z.object({
role: z.string(),
content: z.string().optional(),
reasoning_content: z.string().optional()
content: z.string().optional().nullable(),
reasoning_content: z.string().optional(),
tool_calls: z.array(z.object({
id: z.string(),
type: z.literal("function"),
function: z.object({
name: z.string(),
arguments: z.string()
})
})).optional()
}),
finish_reason: z.string()
})),
@@ -61,8 +107,17 @@ export const ZAIStreamChunkSchema = z.object({
index: z.number(),
delta: z.object({
role: z.string().optional(),
content: z.string().optional(),
reasoning_content: z.string().optional()
content: z.string().optional().nullable(),
reasoning_content: z.string().optional(),
tool_calls: z.array(z.object({
index: z.number().optional(),
id: z.string().optional(),
type: z.literal("function").optional(),
function: z.object({
name: z.string().optional(),
arguments: z.string().optional()
}).optional()
})).optional()
}),
finish_reason: z.string().nullable().optional()
}))
@@ -106,7 +161,12 @@ export class ZAIClient {
})
})
return response.status !== 401 && response.status !== 403
if (!response.ok) {
const text = await response.text()
console.error(`Z.AI connection failed (${response.status}): ${text}`)
}
return response.ok
} catch (error) {
console.error("Z.AI connection test failed:", error)
return false
@@ -194,9 +254,52 @@ export class ZAIClient {
}
private getHeaders(): Record<string, string> {
const token = this.generateToken(this.config.apiKey!)
return {
"Content-Type": "application/json",
"Authorization": `Bearer ${this.config.apiKey}`
"Authorization": `Bearer ${token}`
}
}
private generateToken(apiKey: string, expiresIn: number = 3600): string {
try {
const [id, secret] = apiKey.split(".")
if (!id || !secret) return apiKey // Fallback or handle error
const now = Date.now()
const payload = {
api_key: id,
exp: now + expiresIn * 1000,
timestamp: now
}
const header = {
alg: "HS256",
sign_type: "SIGN"
}
const base64UrlEncode = (obj: any) => {
return Buffer.from(JSON.stringify(obj))
.toString('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=+$/, '')
}
const encodedHeader = base64UrlEncode(header)
const encodedPayload = base64UrlEncode(payload)
const signature = createHmac("sha256", secret)
.update(`${encodedHeader}.${encodedPayload}`)
.digest("base64")
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=+$/, '')
return `${encodedHeader}.${encodedPayload}.${signature}`
} catch (e) {
console.warn("Failed to generate JWT, using raw key", e)
return apiKey
}
}

View File

@@ -0,0 +1,505 @@
/**
* MCP Client - Connects to MCP (Model Context Protocol) servers
* and provides tool discovery and execution capabilities.
*
* Supports:
* - stdio-based MCP servers (command + args)
* - HTTP/SSE-based remote MCP servers
*/
import { spawn, ChildProcess } from "child_process"
import { createLogger } from "../logger"
import path from "path"
const log = createLogger({ component: "mcp-client" })
// MCP Protocol Types
export interface McpServerConfig {
command?: string
args?: string[]
env?: Record<string, string>
type?: "stdio" | "remote" | "http" | "sse" | "streamable-http"
url?: string
headers?: Record<string, string>
}
export interface McpToolDefinition {
name: string
description: string
inputSchema: {
type: "object"
properties: Record<string, { type: string; description?: string }>
required?: string[]
}
}
export interface McpToolCall {
name: string
arguments: Record<string, unknown>
}
export interface McpToolResult {
content: Array<{
type: "text" | "image" | "resource"
text?: string
data?: string
mimeType?: string
}>
isError?: boolean
}
// MCP JSON-RPC Message Types
interface JsonRpcRequest {
jsonrpc: "2.0"
id: number | string
method: string
params?: unknown
}
interface JsonRpcResponse {
jsonrpc: "2.0"
id: number | string
result?: unknown
error?: { code: number; message: string; data?: unknown }
}
/**
* MCP Client for a single server
*/
export class McpClient {
private config: McpServerConfig
private process: ChildProcess | null = null
private messageId = 0
private pendingRequests: Map<number | string, {
resolve: (value: unknown) => void
reject: (reason: unknown) => void
}> = new Map()
private buffer = ""
private tools: McpToolDefinition[] = []
private connected = false
private serverName: string
constructor(serverName: string, config: McpServerConfig) {
this.serverName = serverName
this.config = config
}
/**
* Start and connect to the MCP server
*/
async connect(): Promise<void> {
if (this.connected) return
if (this.config.type === "remote" || this.config.type === "http" || this.config.type === "sse") {
// HTTP-based server - just mark as connected
this.connected = true
log.info({ server: this.serverName, type: this.config.type }, "Connected to remote MCP server")
return
}
// Stdio-based server
if (!this.config.command) {
throw new Error(`MCP server ${this.serverName} has no command configured`)
}
log.info({ server: this.serverName, command: this.config.command, args: this.config.args }, "Starting MCP server")
this.process = spawn(this.config.command, this.config.args || [], {
stdio: ["pipe", "pipe", "pipe"],
env: { ...process.env, ...this.config.env },
shell: true
})
this.process.stdout?.on("data", (data) => this.handleData(data.toString()))
this.process.stderr?.on("data", (data) => log.warn({ server: this.serverName }, `MCP stderr: ${data}`))
this.process.on("error", (err) => log.error({ server: this.serverName, error: err }, "MCP process error"))
this.process.on("exit", (code) => {
log.info({ server: this.serverName, code }, "MCP process exited")
this.connected = false
})
// Wait for process to start
await new Promise(resolve => setTimeout(resolve, 500))
// Initialize the server
try {
await this.sendRequest("initialize", {
protocolVersion: "2024-11-05",
capabilities: { tools: {} },
clientInfo: { name: "NomadArch", version: "0.4.0" }
})
await this.sendRequest("notifications/initialized", {})
this.connected = true
log.info({ server: this.serverName }, "MCP server initialized")
} catch (error) {
log.error({ server: this.serverName, error }, "Failed to initialize MCP server")
this.disconnect()
throw error
}
}
/**
* Disconnect from the MCP server
*/
disconnect(): void {
if (this.process) {
this.process.kill()
this.process = null
}
this.connected = false
this.tools = []
this.pendingRequests.clear()
}
/**
* List available tools from this MCP server
*/
async listTools(): Promise<McpToolDefinition[]> {
if (!this.connected) {
await this.connect()
}
if (this.config.type === "remote" || this.config.type === "http") {
// For HTTP servers, fetch tools via HTTP
return this.fetchToolsHttp()
}
try {
const response = await this.sendRequest("tools/list", {}) as { tools?: McpToolDefinition[] }
this.tools = response.tools || []
return this.tools
} catch (error) {
log.error({ server: this.serverName, error }, "Failed to list MCP tools")
return []
}
}
/**
* Execute a tool on this MCP server
*/
async executeTool(name: string, args: Record<string, unknown>): Promise<McpToolResult> {
if (!this.connected) {
await this.connect()
}
log.info({ server: this.serverName, tool: name, args }, "Executing MCP tool")
if (this.config.type === "remote" || this.config.type === "http") {
return this.executeToolHttp(name, args)
}
try {
const response = await this.sendRequest("tools/call", { name, arguments: args }) as McpToolResult
return response
} catch (error) {
log.error({ server: this.serverName, tool: name, error }, "MCP tool execution failed")
return {
content: [{ type: "text", text: `Error: ${error instanceof Error ? error.message : String(error)}` }],
isError: true
}
}
}
/**
* Send a JSON-RPC request to the MCP server
*/
private async sendRequest(method: string, params?: unknown): Promise<unknown> {
if (!this.process?.stdin) {
throw new Error("MCP server not running")
}
const id = ++this.messageId
const request: JsonRpcRequest = {
jsonrpc: "2.0",
id,
method,
params
}
return new Promise((resolve, reject) => {
this.pendingRequests.set(id, { resolve, reject })
const message = JSON.stringify(request) + "\n"
this.process!.stdin!.write(message)
// Timeout after 30 seconds
setTimeout(() => {
if (this.pendingRequests.has(id)) {
this.pendingRequests.delete(id)
reject(new Error(`MCP request timeout: ${method}`))
}
}, 30000)
})
}
/**
* Handle incoming data from the MCP server
*/
private handleData(data: string): void {
this.buffer += data
const lines = this.buffer.split("\n")
this.buffer = lines.pop() || ""
for (const line of lines) {
if (!line.trim()) continue
try {
const message = JSON.parse(line) as JsonRpcResponse
if (message.id !== undefined && this.pendingRequests.has(message.id)) {
const pending = this.pendingRequests.get(message.id)!
this.pendingRequests.delete(message.id)
if (message.error) {
pending.reject(new Error(message.error.message))
} else {
pending.resolve(message.result)
}
}
} catch (e) {
log.warn({ server: this.serverName }, `Failed to parse MCP message: ${line}`)
}
}
}
/**
* Fetch tools from HTTP-based MCP server
*/
private async fetchToolsHttp(): Promise<McpToolDefinition[]> {
if (!this.config.url) return []
try {
const response = await fetch(`${this.config.url}/tools/list`, {
method: "POST",
headers: {
"Content-Type": "application/json",
...this.config.headers
},
body: JSON.stringify({ jsonrpc: "2.0", id: 1, method: "tools/list", params: {} })
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}`)
}
const data = await response.json() as JsonRpcResponse
const result = data.result as { tools?: McpToolDefinition[] }
return result.tools || []
} catch (error) {
log.error({ server: this.serverName, error }, "Failed to fetch HTTP MCP tools")
return []
}
}
/**
* Execute tool on HTTP-based MCP server
*/
private async executeToolHttp(name: string, args: Record<string, unknown>): Promise<McpToolResult> {
if (!this.config.url) {
return { content: [{ type: "text", text: "No URL configured" }], isError: true }
}
try {
const response = await fetch(`${this.config.url}/tools/call`, {
method: "POST",
headers: {
"Content-Type": "application/json",
...this.config.headers
},
body: JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "tools/call",
params: { name, arguments: args }
})
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}`)
}
const data = await response.json() as JsonRpcResponse
return data.result as McpToolResult
} catch (error) {
return {
content: [{ type: "text", text: `HTTP error: ${error instanceof Error ? error.message : String(error)}` }],
isError: true
}
}
}
isConnected(): boolean {
return this.connected
}
getServerName(): string {
return this.serverName
}
}
/**
* MCP Manager - Manages multiple MCP server connections
*/
export class McpManager {
private clients: Map<string, McpClient> = new Map()
private configPath: string | null = null
/**
* Load MCP config from a workspace
*/
async loadConfig(workspacePath: string): Promise<void> {
const configPath = path.join(workspacePath, ".mcp.json")
this.configPath = configPath
try {
const fs = await import("fs")
if (!fs.existsSync(configPath)) {
log.info({ path: configPath }, "No MCP config found")
return
}
const content = fs.readFileSync(configPath, "utf-8")
const config = JSON.parse(content) as { mcpServers?: Record<string, McpServerConfig> }
if (config.mcpServers) {
for (const [name, serverConfig] of Object.entries(config.mcpServers)) {
this.addServer(name, serverConfig)
}
}
log.info({ servers: Object.keys(config.mcpServers || {}) }, "Loaded MCP config")
} catch (error) {
log.error({ path: configPath, error }, "Failed to load MCP config")
}
}
/**
* Add an MCP server
*/
addServer(name: string, config: McpServerConfig): void {
if (this.clients.has(name)) {
this.clients.get(name)!.disconnect()
}
this.clients.set(name, new McpClient(name, config))
log.info({ server: name }, "Added MCP server")
}
/**
* Remove an MCP server
*/
removeServer(name: string): void {
const client = this.clients.get(name)
if (client) {
client.disconnect()
this.clients.delete(name)
}
}
/**
* Get all available tools from all connected servers
*/
async getAllTools(): Promise<Array<McpToolDefinition & { serverName: string }>> {
const allTools: Array<McpToolDefinition & { serverName: string }> = []
for (const [name, client] of this.clients) {
try {
const tools = await client.listTools()
for (const tool of tools) {
allTools.push({ ...tool, serverName: name })
}
} catch (error) {
log.warn({ server: name, error }, "Failed to get tools from MCP server")
}
}
return allTools
}
/**
* Convert MCP tools to OpenAI-compatible format
*/
async getToolsAsOpenAIFormat(): Promise<Array<{
type: "function"
function: {
name: string
description: string
parameters: McpToolDefinition["inputSchema"]
}
}>> {
const mcpTools = await this.getAllTools()
return mcpTools.map(tool => ({
type: "function" as const,
function: {
// Prefix with server name to avoid conflicts
name: `mcp_${tool.serverName}_${tool.name}`,
description: `[MCP: ${tool.serverName}] ${tool.description}`,
parameters: tool.inputSchema
}
}))
}
/**
* Execute a tool by its full name (mcp_servername_toolname)
*/
async executeTool(fullName: string, args: Record<string, unknown>): Promise<string> {
// Parse mcp_servername_toolname format
const match = fullName.match(/^mcp_([^_]+)_(.+)$/)
if (!match) {
return `Error: Invalid MCP tool name format: ${fullName}`
}
const [, serverName, toolName] = match
const client = this.clients.get(serverName)
if (!client) {
return `Error: MCP server not found: ${serverName}`
}
const result = await client.executeTool(toolName, args)
// Convert result to string
const texts = result.content
.filter(c => c.type === "text" && c.text)
.map(c => c.text!)
return texts.join("\n") || (result.isError ? "Tool execution failed" : "Tool executed successfully")
}
/**
* Disconnect all servers
*/
disconnectAll(): void {
for (const client of this.clients.values()) {
client.disconnect()
}
this.clients.clear()
}
/**
* Get status of all servers
*/
getStatus(): Record<string, { connected: boolean }> {
const status: Record<string, { connected: boolean }> = {}
for (const [name, client] of this.clients) {
status[name] = { connected: client.isConnected() }
}
return status
}
}
// Singleton instance
let globalMcpManager: McpManager | null = null
export function getMcpManager(): McpManager {
if (!globalMcpManager) {
globalMcpManager = new McpManager()
}
return globalMcpManager
}
export function resetMcpManager(): void {
if (globalMcpManager) {
globalMcpManager.disconnectAll()
globalMcpManager = null
}
}

View File

@@ -0,0 +1,15 @@
/**
* MCP Module Index
* Exports MCP client and manager for external MCP server integration.
*/
export {
McpClient,
McpManager,
getMcpManager,
resetMcpManager,
type McpServerConfig,
type McpToolDefinition,
type McpToolCall,
type McpToolResult
} from "./client"

View File

@@ -24,6 +24,8 @@ import { registerZAIRoutes } from "./routes/zai"
import { registerOpenCodeZenRoutes } from "./routes/opencode-zen"
import { registerSkillsRoutes } from "./routes/skills"
import { registerContextEngineRoutes } from "./routes/context-engine"
import { registerNativeSessionsRoutes } from "./routes/native-sessions"
import { initSessionManager } from "../storage/session-store"
import { ServerMeta } from "../api-types"
import { InstanceStore } from "../storage/instance-store"
@@ -40,6 +42,7 @@ interface HttpServerDeps {
uiStaticDir: string
uiDevServerUrl?: string
logger: Logger
dataDir?: string // For session storage
}
interface HttpServerStartResult {
@@ -56,6 +59,10 @@ export function createHttpServer(deps: HttpServerDeps) {
const apiLogger = deps.logger.child({ component: "http" })
const sseLogger = deps.logger.child({ component: "sse" })
// Initialize session manager for Binary-Free Mode
const dataDir = deps.dataDir || path.join(process.cwd(), ".codenomad-data")
initSessionManager(dataDir)
const sseClients = new Set<() => void>()
const registerSseClient = (cleanup: () => void) => {
sseClients.add(cleanup)
@@ -126,6 +133,15 @@ export function createHttpServer(deps: HttpServerDeps) {
registerOpenCodeZenRoutes(app, { logger: deps.logger })
registerSkillsRoutes(app)
registerContextEngineRoutes(app)
// Register Binary-Free Mode native sessions routes
registerNativeSessionsRoutes(app, {
logger: deps.logger,
workspaceManager: deps.workspaceManager,
dataDir,
eventBus: deps.eventBus,
})
registerInstanceProxyRoutes(app, { workspaceManager: deps.workspaceManager, logger: proxyLogger })

View File

@@ -1,5 +1,6 @@
import { FastifyInstance } from "fastify"
import os from "os"
import { existsSync } from "fs"
import { NetworkAddress, ServerMeta, PortAvailabilityResponse } from "../../api-types"
import { getAvailablePort } from "../../utils/port"
@@ -7,8 +8,54 @@ interface RouteDeps {
serverMeta: ServerMeta
}
export interface ModeInfo {
mode: "lite" | "full"
binaryFreeMode: boolean
nativeSessions: boolean
opencodeBinaryAvailable: boolean
providers: {
qwen: boolean
zai: boolean
zen: boolean
}
}
export function registerMetaRoutes(app: FastifyInstance, deps: RouteDeps) {
app.get("/api/meta", async () => buildMetaResponse(deps.serverMeta))
// Mode detection endpoint for Binary-Free Mode
app.get("/api/meta/mode", async (): Promise<ModeInfo> => {
// Check if any OpenCode binary is available
const opencodePaths = [
process.env.OPENCODE_PATH,
"opencode",
"opencode.exe",
].filter(Boolean) as string[]
let binaryAvailable = false
for (const p of opencodePaths) {
if (existsSync(p)) {
binaryAvailable = true
break
}
}
// In Binary-Free Mode, we use native session management
const binaryFreeMode = !binaryAvailable
return {
mode: binaryFreeMode ? "lite" : "full",
binaryFreeMode,
nativeSessions: true, // Native sessions are always available
opencodeBinaryAvailable: binaryAvailable,
providers: {
qwen: true, // Always available
zai: true, // Always available
zen: true, // Always available (needs API key)
}
}
})
app.get("/api/ports/available", async () => {
const port = await getAvailablePort(3000)
const response: PortAvailabilityResponse = { port }

View File

@@ -0,0 +1,629 @@
/**
* Native Sessions API Routes - Binary-Free Mode
*
* These routes provide session management without requiring the OpenCode binary.
* They're used when running in "Lite Mode" or when OpenCode is unavailable.
*/
import { FastifyInstance } from "fastify"
import { Logger } from "../../logger"
import { getSessionManager, Session, SessionMessage } from "../../storage/session-store"
import { CORE_TOOLS, executeTools, type ToolCall, type ToolResult } from "../../tools/executor"
import { getMcpManager } from "../../mcp/client"
import { WorkspaceManager } from "../../workspaces/manager"
import { OpenCodeZenClient, ChatMessage } from "../../integrations/opencode-zen"
import { EventBus } from "../../events/bus"
interface NativeSessionsDeps {
logger: Logger
workspaceManager: WorkspaceManager
dataDir: string
eventBus?: EventBus
}
// Maximum tool execution loops to prevent infinite loops
const MAX_TOOL_LOOPS = 10
export function registerNativeSessionsRoutes(app: FastifyInstance, deps: NativeSessionsDeps) {
const logger = deps.logger.child({ component: "native-sessions" })
const sessionManager = getSessionManager(deps.dataDir)
// List all sessions for a workspace
app.get<{ Params: { workspaceId: string } }>("/api/native/workspaces/:workspaceId/sessions", async (request, reply) => {
try {
const sessions = await sessionManager.listSessions(request.params.workspaceId)
return { sessions }
} catch (error) {
logger.error({ error }, "Failed to list sessions")
reply.code(500)
return { error: "Failed to list sessions" }
}
})
// Create a new session
app.post<{
Params: { workspaceId: string }
Body: { title?: string; parentId?: string; model?: { providerId: string; modelId: string }; agent?: string }
}>("/api/native/workspaces/:workspaceId/sessions", async (request, reply) => {
try {
const session = await sessionManager.createSession(request.params.workspaceId, request.body)
// Emit session created event (using any for custom event type)
if (deps.eventBus) {
deps.eventBus.publish({
type: "native.session.created",
workspaceId: request.params.workspaceId,
session
} as any)
}
reply.code(201)
return { session }
} catch (error) {
logger.error({ error }, "Failed to create session")
reply.code(500)
return { error: "Failed to create session" }
}
})
// Get a specific session
app.get<{ Params: { workspaceId: string; sessionId: string } }>("/api/native/workspaces/:workspaceId/sessions/:sessionId", async (request, reply) => {
try {
const session = await sessionManager.getSession(request.params.workspaceId, request.params.sessionId)
if (!session) {
reply.code(404)
return { error: "Session not found" }
}
return { session }
} catch (error) {
logger.error({ error }, "Failed to get session")
reply.code(500)
return { error: "Failed to get session" }
}
})
// Update a session
app.patch<{
Params: { workspaceId: string; sessionId: string }
Body: Partial<Session>
}>("/api/native/workspaces/:workspaceId/sessions/:sessionId", async (request, reply) => {
try {
const session = await sessionManager.updateSession(
request.params.workspaceId,
request.params.sessionId,
request.body
)
if (!session) {
reply.code(404)
return { error: "Session not found" }
}
return { session }
} catch (error) {
logger.error({ error }, "Failed to update session")
reply.code(500)
return { error: "Failed to update session" }
}
})
// Delete a session
app.delete<{ Params: { workspaceId: string; sessionId: string } }>("/api/native/workspaces/:workspaceId/sessions/:sessionId", async (request, reply) => {
try {
const deleted = await sessionManager.deleteSession(request.params.workspaceId, request.params.sessionId)
if (!deleted) {
reply.code(404)
return { error: "Session not found" }
}
reply.code(204)
return
} catch (error) {
logger.error({ error }, "Failed to delete session")
reply.code(500)
return { error: "Failed to delete session" }
}
})
// Get messages for a session
app.get<{ Params: { workspaceId: string; sessionId: string } }>("/api/native/workspaces/:workspaceId/sessions/:sessionId/messages", async (request, reply) => {
try {
const messages = await sessionManager.getSessionMessages(
request.params.workspaceId,
request.params.sessionId
)
return { messages }
} catch (error) {
logger.error({ error }, "Failed to get messages")
reply.code(500)
return { error: "Failed to get messages" }
}
})
// Add a message (user prompt) and get streaming response
app.post<{
Params: { workspaceId: string; sessionId: string }
Body: {
content: string
provider: "qwen" | "zai" | "zen"
model?: string
accessToken?: string
resourceUrl?: string
enableTools?: boolean
systemPrompt?: string
}
}>("/api/native/workspaces/:workspaceId/sessions/:sessionId/prompt", async (request, reply) => {
const { workspaceId, sessionId } = request.params
const { content, provider, model, accessToken, resourceUrl, enableTools = true, systemPrompt } = request.body
try {
// Add user message
const userMessage = await sessionManager.addMessage(workspaceId, sessionId, {
role: "user",
content,
status: "completed",
})
// Get workspace path
const workspace = deps.workspaceManager.get(workspaceId)
const workspacePath = workspace?.path ?? process.cwd()
// Get all messages for context
const allMessages = await sessionManager.getSessionMessages(workspaceId, sessionId)
// Build chat messages array
const chatMessages: ChatMessage[] = []
// Add system prompt if provided
if (systemPrompt) {
chatMessages.push({ role: "system", content: systemPrompt })
}
// Add conversation history
for (const m of allMessages) {
if (m.role === "user" || m.role === "assistant" || m.role === "system") {
chatMessages.push({ role: m.role, content: m.content ?? "" })
}
}
// Load MCP tools
let allTools = [...CORE_TOOLS]
if (enableTools) {
try {
const mcpManager = getMcpManager()
await mcpManager.loadConfig(workspacePath)
const mcpTools = await mcpManager.getToolsAsOpenAIFormat()
allTools = [...CORE_TOOLS, ...mcpTools]
} catch (mcpError) {
logger.warn({ error: mcpError }, "Failed to load MCP tools")
}
}
// Create streaming response
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
})
// Create assistant message placeholder
const assistantMessage = await sessionManager.addMessage(workspaceId, sessionId, {
role: "assistant",
content: "",
status: "streaming",
})
let fullContent = ""
try {
// Route to the appropriate provider
fullContent = await streamWithProvider({
provider,
model,
accessToken,
resourceUrl,
messages: chatMessages,
tools: enableTools ? allTools : [],
workspacePath,
rawResponse: reply.raw,
logger,
})
} catch (streamError) {
logger.error({ error: streamError }, "Stream error")
reply.raw.write(`data: ${JSON.stringify({ error: String(streamError) })}\n\n`)
}
// Update assistant message with full content
await sessionManager.updateMessage(workspaceId, assistantMessage.id, {
content: fullContent,
status: "completed",
})
// Emit message event (using any for custom event type)
if (deps.eventBus) {
deps.eventBus.publish({
type: "native.message.completed",
workspaceId,
sessionId,
messageId: assistantMessage.id,
} as any)
}
reply.raw.write('data: [DONE]\n\n')
reply.raw.end()
} catch (error) {
logger.error({ error }, "Failed to process prompt")
if (!reply.sent) {
reply.code(500)
return { error: "Failed to process prompt" }
}
}
})
// SSE endpoint for session events
app.get<{ Params: { workspaceId: string } }>("/api/native/workspaces/:workspaceId/events", async (request, reply) => {
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
})
// Send initial ping
reply.raw.write(`data: ${JSON.stringify({ type: "ping" })}\n\n`)
// Keep connection alive
const keepAlive = setInterval(() => {
reply.raw.write(`data: ${JSON.stringify({ type: "ping" })}\n\n`)
}, 30000)
// Handle client disconnect
request.raw.on("close", () => {
clearInterval(keepAlive)
})
})
logger.info("Native sessions routes registered (Binary-Free Mode)")
}
/**
* Stream chat with the appropriate provider
*/
async function streamWithProvider(opts: {
provider: "qwen" | "zai" | "zen"
model?: string
accessToken?: string
resourceUrl?: string
messages: ChatMessage[]
tools: any[]
workspacePath: string
rawResponse: any
logger: Logger
}): Promise<string> {
const { provider, model, accessToken, resourceUrl, messages, tools, workspacePath, rawResponse, logger } = opts
let fullContent = ""
let loopCount = 0
let currentMessages = [...messages]
// Tool execution loop
while (loopCount < MAX_TOOL_LOOPS) {
loopCount++
let responseContent = ""
let toolCalls: ToolCall[] = []
// Route to the appropriate provider
switch (provider) {
case "zen":
const zenResult = await streamWithZen(model, currentMessages, tools, rawResponse, logger)
responseContent = zenResult.content
toolCalls = zenResult.toolCalls
break
case "qwen":
const qwenResult = await streamWithQwen(accessToken, resourceUrl, model, currentMessages, tools, rawResponse, logger)
responseContent = qwenResult.content
toolCalls = qwenResult.toolCalls
break
case "zai":
const zaiResult = await streamWithZAI(accessToken, model, currentMessages, tools, rawResponse, logger)
responseContent = zaiResult.content
toolCalls = zaiResult.toolCalls
break
}
fullContent += responseContent
// If no tool calls, we're done
if (toolCalls.length === 0) {
break
}
// Execute tools
logger.info({ toolCount: toolCalls.length }, "Executing tool calls")
// Add assistant message with tool calls
currentMessages.push({
role: "assistant",
content: responseContent,
tool_calls: toolCalls.map(tc => ({
id: tc.id,
type: "function" as const,
function: tc.function
}))
})
// Execute each tool and add result
const toolResults = await executeTools(workspacePath, toolCalls)
for (let i = 0; i < toolCalls.length; i++) {
const tc = toolCalls[i]
const result = toolResults[i]
// Emit tool execution event
rawResponse.write(`data: ${JSON.stringify({
type: "tool_execution",
tool: tc.function.name,
result: result?.content?.substring(0, 200) // Preview
})}\n\n`)
currentMessages.push({
role: "tool",
content: result?.content ?? "Tool execution failed",
tool_call_id: tc.id
})
}
}
return fullContent
}
/**
* Stream with OpenCode Zen (free models)
*/
async function streamWithZen(
model: string | undefined,
messages: ChatMessage[],
tools: any[],
rawResponse: any,
logger: Logger
): Promise<{ content: string; toolCalls: ToolCall[] }> {
const zenClient = new OpenCodeZenClient()
let content = ""
const toolCalls: ToolCall[] = []
try {
const stream = zenClient.chatStream({
model: model ?? "gpt-5-nano",
messages,
stream: true,
tools: tools.length > 0 ? tools : undefined,
tool_choice: tools.length > 0 ? "auto" : undefined,
})
for await (const chunk of stream) {
const delta = chunk.choices?.[0]?.delta
if (delta?.content) {
content += delta.content
rawResponse.write(`data: ${JSON.stringify({ choices: [{ delta: { content: delta.content } }] })}\n\n`)
}
// Handle tool calls (if model supports them)
const deltaToolCalls = (delta as any)?.tool_calls
if (deltaToolCalls) {
for (const tc of deltaToolCalls) {
if (tc.function?.name) {
toolCalls.push({
id: tc.id,
type: "function",
function: {
name: tc.function.name,
arguments: tc.function.arguments ?? "{}"
}
})
}
}
}
}
} catch (error) {
logger.error({ error }, "Zen streaming error")
throw error
}
return { content, toolCalls }
}
/**
* Stream with Qwen API
*/
async function streamWithQwen(
accessToken: string | undefined,
resourceUrl: string | undefined,
model: string | undefined,
messages: ChatMessage[],
tools: any[],
rawResponse: any,
logger: Logger
): Promise<{ content: string; toolCalls: ToolCall[] }> {
if (!accessToken) {
throw new Error("Qwen access token required. Please authenticate with Qwen first.")
}
const baseUrl = resourceUrl ?? "https://chat.qwen.ai"
let content = ""
const toolCalls: ToolCall[] = []
try {
const response = await fetch(`${baseUrl}/api/v1/chat/completions`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${accessToken}`,
},
body: JSON.stringify({
model: model ?? "qwen-plus-latest",
messages,
stream: true,
tools: tools.length > 0 ? tools : undefined,
tool_choice: tools.length > 0 ? "auto" : undefined,
})
})
if (!response.ok) {
const error = await response.text()
throw new Error(`Qwen API error: ${response.status} - ${error}`)
}
const reader = response.body?.getReader()
if (!reader) throw new Error("No response body")
const decoder = new TextDecoder()
let buffer = ""
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split("\n")
buffer = lines.pop() ?? ""
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6)
if (data === "[DONE]") continue
try {
const parsed = JSON.parse(data)
const delta = parsed.choices?.[0]?.delta
if (delta?.content) {
content += delta.content
rawResponse.write(`data: ${JSON.stringify({ choices: [{ delta: { content: delta.content } }] })}\n\n`)
}
if (delta?.tool_calls) {
for (const tc of delta.tool_calls) {
if (tc.function?.name) {
toolCalls.push({
id: tc.id ?? `call_${Date.now()}`,
type: "function",
function: {
name: tc.function.name,
arguments: tc.function.arguments ?? "{}"
}
})
}
}
}
} catch {
// Skip invalid JSON
}
}
}
}
} catch (error) {
logger.error({ error }, "Qwen streaming error")
throw error
}
return { content, toolCalls }
}
/**
* Stream with Z.AI API
*/
async function streamWithZAI(
accessToken: string | undefined,
model: string | undefined,
messages: ChatMessage[],
tools: any[],
rawResponse: any,
logger: Logger
): Promise<{ content: string; toolCalls: ToolCall[] }> {
let content = ""
const toolCalls: ToolCall[] = []
const baseUrl = "https://api.z.ai"
try {
const headers: Record<string, string> = {
"Content-Type": "application/json",
}
if (accessToken) {
headers["Authorization"] = `Bearer ${accessToken}`
}
const response = await fetch(`${baseUrl}/v1/chat/completions`, {
method: "POST",
headers,
body: JSON.stringify({
model: model ?? "z1-mini",
messages,
stream: true,
tools: tools.length > 0 ? tools : undefined,
tool_choice: tools.length > 0 ? "auto" : undefined,
})
})
if (!response.ok) {
const error = await response.text()
throw new Error(`Z.AI API error: ${response.status} - ${error}`)
}
const reader = response.body?.getReader()
if (!reader) throw new Error("No response body")
const decoder = new TextDecoder()
let buffer = ""
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split("\n")
buffer = lines.pop() ?? ""
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6)
if (data === "[DONE]") continue
try {
const parsed = JSON.parse(data)
const delta = parsed.choices?.[0]?.delta
if (delta?.content) {
content += delta.content
rawResponse.write(`data: ${JSON.stringify({ choices: [{ delta: { content: delta.content } }] })}\n\n`)
}
if (delta?.tool_calls) {
for (const tc of delta.tool_calls) {
if (tc.function?.name) {
toolCalls.push({
id: tc.id ?? `call_${Date.now()}`,
type: "function",
function: {
name: tc.function.name,
arguments: tc.function.arguments ?? "{}"
}
})
}
}
}
} catch {
// Skip invalid JSON
}
}
}
}
} catch (error) {
logger.error({ error }, "Z.AI streaming error")
throw error
}
return { content, toolCalls }
}

View File

@@ -1,11 +1,16 @@
import { FastifyInstance } from "fastify"
import { OpenCodeZenClient, type ChatRequest, getDefaultZenConfig } from "../../integrations/opencode-zen"
import { OpenCodeZenClient, type ChatRequest, getDefaultZenConfig, type ChatMessage } from "../../integrations/opencode-zen"
import { Logger } from "../../logger"
import { CORE_TOOLS, executeTools, type ToolCall, type ToolResult } from "../../tools/executor"
import { getMcpManager } from "../../mcp/client"
interface OpenCodeZenRouteDeps {
logger: Logger
}
// Maximum number of tool execution loops
const MAX_TOOL_LOOPS = 10
export async function registerOpenCodeZenRoutes(
app: FastifyInstance,
deps: OpenCodeZenRouteDeps
@@ -49,12 +54,25 @@ export async function registerOpenCodeZenRoutes(
}
})
// Chat completion endpoint
// Chat completion endpoint WITH MCP TOOL SUPPORT
app.post('/api/opencode-zen/chat', async (request, reply) => {
try {
const chatRequest = request.body as ChatRequest
const chatRequest = request.body as ChatRequest & {
workspacePath?: string
enableTools?: boolean
}
// Handle streaming
// Extract workspace path for tool execution
const workspacePath = chatRequest.workspacePath || process.cwd()
const enableTools = chatRequest.enableTools !== false
logger.info({
workspacePath,
receivedWorkspacePath: chatRequest.workspacePath,
enableTools
}, "OpenCode Zen chat request received")
// Handle streaming with tool loop
if (chatRequest.stream) {
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
@@ -63,16 +81,14 @@ export async function registerOpenCodeZenRoutes(
})
try {
for await (const chunk of client.chatStream(chatRequest)) {
reply.raw.write(`data: ${JSON.stringify(chunk)}\n\n`)
// Check for finish
if (chunk.choices?.[0]?.finish_reason) {
reply.raw.write('data: [DONE]\n\n')
break
}
}
await streamWithToolLoop(
client,
chatRequest,
workspacePath,
enableTools,
reply.raw,
logger
)
reply.raw.end()
} catch (streamError) {
logger.error({ error: streamError }, "OpenCode Zen streaming failed")
@@ -80,7 +96,14 @@ export async function registerOpenCodeZenRoutes(
reply.raw.end()
}
} else {
const response = await client.chat(chatRequest)
// Non-streaming with tool loop
const response = await chatWithToolLoop(
client,
chatRequest,
workspacePath,
enableTools,
logger
)
return response
}
} catch (error) {
@@ -89,5 +112,213 @@ export async function registerOpenCodeZenRoutes(
}
})
logger.info("OpenCode Zen routes registered - Free models available!")
logger.info("OpenCode Zen routes registered with MCP tool support - Free models available!")
}
/**
* Streaming chat with tool execution loop
*/
async function streamWithToolLoop(
client: OpenCodeZenClient,
request: ChatRequest,
workspacePath: string,
enableTools: boolean,
rawResponse: any,
logger: Logger
): Promise<void> {
let messages = [...request.messages]
let loopCount = 0
// Load MCP tools from workspace config
let allTools = [...CORE_TOOLS]
if (enableTools && workspacePath) {
try {
const mcpManager = getMcpManager()
await mcpManager.loadConfig(workspacePath)
const mcpTools = await mcpManager.getToolsAsOpenAIFormat()
allTools = [...CORE_TOOLS, ...mcpTools]
if (mcpTools.length > 0) {
logger.info({ mcpToolCount: mcpTools.length }, "Loaded MCP tools for OpenCode Zen")
}
} catch (mcpError) {
logger.warn({ error: mcpError }, "Failed to load MCP tools")
}
}
// Inject tools if enabled
const requestWithTools: ChatRequest = {
...request,
tools: enableTools ? allTools : undefined,
tool_choice: enableTools ? "auto" : undefined
}
while (loopCount < MAX_TOOL_LOOPS) {
loopCount++
// Accumulate tool calls from stream
let accumulatedToolCalls: { [index: number]: { id: string; name: string; arguments: string } } = {}
let hasToolCalls = false
let textContent = ""
// Stream response
for await (const chunk of client.chatStream({ ...requestWithTools, messages })) {
// Write chunk to client
rawResponse.write(`data: ${JSON.stringify(chunk)}\n\n`)
const choice = chunk.choices[0]
if (!choice) continue
// Accumulate text content
if (choice.delta?.content) {
textContent += choice.delta.content
}
// Accumulate tool calls from delta (if API supports it)
const deltaToolCalls = (choice.delta as any)?.tool_calls
if (deltaToolCalls) {
hasToolCalls = true
for (const tc of deltaToolCalls) {
const idx = tc.index ?? 0
if (!accumulatedToolCalls[idx]) {
accumulatedToolCalls[idx] = { id: tc.id || "", name: "", arguments: "" }
}
if (tc.id) accumulatedToolCalls[idx].id = tc.id
if (tc.function?.name) accumulatedToolCalls[idx].name += tc.function.name
if (tc.function?.arguments) accumulatedToolCalls[idx].arguments += tc.function.arguments
}
}
// Check if we should stop
if (choice.finish_reason === "stop") {
rawResponse.write('data: [DONE]\n\n')
return
}
}
// If no tool calls, we're done
if (!hasToolCalls || !enableTools) {
rawResponse.write('data: [DONE]\n\n')
return
}
// Convert accumulated tool calls
const toolCalls: ToolCall[] = Object.values(accumulatedToolCalls).map(tc => ({
id: tc.id,
type: "function" as const,
function: {
name: tc.name,
arguments: tc.arguments
}
}))
if (toolCalls.length === 0) {
rawResponse.write('data: [DONE]\n\n')
return
}
logger.info({ toolCalls: toolCalls.map(tc => tc.function.name) }, "Executing tool calls")
// Add assistant message with tool calls
const assistantMessage: ChatMessage = {
role: "assistant",
content: textContent || undefined,
tool_calls: toolCalls
}
messages.push(assistantMessage)
// Execute tools
const toolResults = await executeTools(workspacePath, toolCalls)
// Notify client about tool execution via special event
for (const result of toolResults) {
const toolEvent = {
type: "tool_result",
tool_call_id: result.tool_call_id,
content: result.content
}
rawResponse.write(`data: ${JSON.stringify(toolEvent)}\n\n`)
}
// Add tool results to messages
for (const result of toolResults) {
const toolMessage: ChatMessage = {
role: "tool",
content: result.content,
tool_call_id: result.tool_call_id
}
messages.push(toolMessage)
}
logger.info({ loopCount, toolsExecuted: toolResults.length }, "Tool loop iteration complete")
}
logger.warn({ loopCount }, "Max tool loops reached")
rawResponse.write('data: [DONE]\n\n')
}
/**
* Non-streaming chat with tool execution loop
*/
async function chatWithToolLoop(
client: OpenCodeZenClient,
request: ChatRequest,
workspacePath: string,
enableTools: boolean,
logger: Logger
): Promise<any> {
let messages = [...request.messages]
let loopCount = 0
let lastResponse: any = null
// Inject tools if enabled
const requestWithTools: ChatRequest = {
...request,
tools: enableTools ? CORE_TOOLS : undefined,
tool_choice: enableTools ? "auto" : undefined
}
while (loopCount < MAX_TOOL_LOOPS) {
loopCount++
const response = await client.chat({ ...requestWithTools, messages, stream: false })
lastResponse = response
const choice = response.choices[0]
if (!choice) break
const toolCalls = (choice.message as any)?.tool_calls
// If no tool calls, return
if (!toolCalls || toolCalls.length === 0 || !enableTools) {
return response
}
logger.info({ toolCalls: toolCalls.map((tc: any) => tc.function.name) }, "Executing tool calls")
// Add assistant message
const assistantMessage: ChatMessage = {
role: "assistant",
content: (choice.message as any).content || undefined,
tool_calls: toolCalls
}
messages.push(assistantMessage)
// Execute tools
const toolResults = await executeTools(workspacePath, toolCalls)
// Add tool results
for (const result of toolResults) {
const toolMessage: ChatMessage = {
role: "tool",
content: result.content,
tool_call_id: result.tool_call_id
}
messages.push(toolMessage)
}
logger.info({ loopCount, toolsExecuted: toolResults.length }, "Tool loop iteration complete")
}
logger.warn({ loopCount }, "Max tool loops reached")
return lastResponse
}

View File

@@ -1,10 +1,16 @@
import { FastifyInstance, FastifyReply } from "fastify"
import { join } from "path"
import { existsSync, mkdirSync } from "fs"
import { Logger } from "../../logger"
import { CORE_TOOLS, executeTools, type ToolCall, type ToolResult } from "../../tools/executor"
import { getMcpManager } from "../../mcp/client"
interface QwenRouteDeps {
logger: Logger
}
const MAX_TOOL_LOOPS = 10
const QWEN_OAUTH_BASE_URL = 'https://chat.qwen.ai'
const QWEN_OAUTH_DEVICE_CODE_ENDPOINT = `${QWEN_OAUTH_BASE_URL}/api/v1/oauth2/device/code`
const QWEN_OAUTH_TOKEN_ENDPOINT = `${QWEN_OAUTH_BASE_URL}/api/v1/oauth2/token`
@@ -197,7 +203,159 @@ export async function registerQwenRoutes(
}
})
// Qwen Chat API - proxy chat requests to Qwen using OAuth token
/**
* Streaming chat with tool execution loop for Qwen
*/
async function streamWithToolLoop(
accessToken: string,
chatUrl: string,
initialRequest: any,
workspacePath: string,
enableTools: boolean,
rawResponse: any,
logger: Logger
) {
let messages = [...initialRequest.messages]
let loopCount = 0
const model = initialRequest.model
while (loopCount < MAX_TOOL_LOOPS) {
loopCount++
logger.info({ loopCount, model }, "Starting Qwen tool loop iteration")
const response = await fetch(chatUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${accessToken}`,
'Accept': 'text/event-stream'
},
body: JSON.stringify({
...initialRequest,
messages,
stream: true,
tools: enableTools ? initialRequest.tools : undefined,
tool_choice: enableTools ? "auto" : undefined
})
})
if (!response.ok) {
const errorText = await response.text()
throw new Error(`Qwen API error (${response.status}): ${errorText}`)
}
if (!response.body) throw new Error("No response body")
const reader = response.body.getReader()
const decoder = new TextDecoder()
let textContent = ""
let hasToolCalls = false
let accumulatedToolCalls: Record<number, { id: string, name: string, arguments: string }> = {}
let buffer = ""
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split("\n")
buffer = lines.pop() || ""
for (const line of lines) {
const trimmed = line.trim()
if (!trimmed.startsWith("data: ")) continue
const data = trimmed.slice(6).trim()
if (data === "[DONE]") {
if (!hasToolCalls) {
rawResponse.write('data: [DONE]\n\n')
return
}
break
}
let chunk: any
try {
chunk = JSON.parse(data)
} catch (e) {
continue
}
const choice = chunk.choices?.[0]
if (!choice) continue
// Pass through text content to client
if (choice.delta?.content) {
textContent += choice.delta.content
rawResponse.write(`data: ${JSON.stringify(chunk)}\n\n`)
}
// Accumulate tool calls
if (choice.delta?.tool_calls) {
hasToolCalls = true
for (const tc of choice.delta.tool_calls) {
const idx = tc.index ?? 0
if (!accumulatedToolCalls[idx]) {
accumulatedToolCalls[idx] = { id: tc.id || "", name: "", arguments: "" }
}
if (tc.id) accumulatedToolCalls[idx].id = tc.id
if (tc.function?.name) accumulatedToolCalls[idx].name += tc.function.name
if (tc.function?.arguments) accumulatedToolCalls[idx].arguments += tc.function.arguments
}
}
if (choice.finish_reason === "tool_calls") {
break
}
if (choice.finish_reason === "stop" && !hasToolCalls) {
rawResponse.write('data: [DONE]\n\n')
return
}
}
}
// If no tool calls, we're done
if (!hasToolCalls || !enableTools) {
rawResponse.write('data: [DONE]\n\n')
return
}
// Execute tools
const toolCalls: ToolCall[] = Object.values(accumulatedToolCalls).map(tc => ({
id: tc.id,
type: "function" as const,
function: { name: tc.name, arguments: tc.arguments }
}))
logger.info({ toolCalls: toolCalls.map(tc => tc.function.name) }, "Executing Qwen tool calls")
messages.push({
role: "assistant",
content: textContent || undefined,
tool_calls: toolCalls
})
const toolResults = await executeTools(workspacePath, toolCalls)
// Notify frontend
for (const result of toolResults) {
const toolEvent = {
type: "tool_result",
tool_call_id: result.tool_call_id,
content: result.content
}
rawResponse.write(`data: ${JSON.stringify(toolEvent)}\n\n`)
messages.push({
role: "tool",
content: result.content,
tool_call_id: result.tool_call_id
})
}
}
rawResponse.write('data: [DONE]\n\n')
}
// Qwen Chat API - with tool support
app.post('/api/qwen/chat', {
schema: {
body: {
@@ -207,7 +365,9 @@ export async function registerQwenRoutes(
model: { type: 'string' },
messages: { type: 'array' },
stream: { type: 'boolean' },
resource_url: { type: 'string' }
resource_url: { type: 'string' },
workspacePath: { type: 'string' },
enableTools: { type: 'boolean' }
}
}
}
@@ -219,58 +379,59 @@ export async function registerQwenRoutes(
}
const accessToken = authHeader.substring(7)
const { model, messages, stream, resource_url } = request.body as any
const { model, messages, stream, resource_url, workspacePath, enableTools } = request.body as any
// Use resource_url from OAuth credentials to target the DashScope-compatible API
const apiBaseUrl = normalizeQwenResourceUrl(resource_url)
const normalizedModel = normalizeQwenModel(model)
const chatUrl = `${apiBaseUrl}/chat/completions`
logger.info({ chatUrl, model: normalizedModel, messageCount: messages?.length }, "Proxying Qwen chat request")
// MCP Tool Loading
let allTools = [...CORE_TOOLS]
const effectiveWorkspacePath = workspacePath || process.cwd()
const toolsEnabled = enableTools !== false
const response = await fetch(chatUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${accessToken}`,
'Accept': stream ? 'text/event-stream' : 'application/json'
},
body: JSON.stringify({
model: normalizedModel,
messages,
stream: stream || false
})
})
if (!response.ok) {
const errorText = await response.text()
logger.error({ status: response.status, errorText }, "Qwen chat request failed")
return reply.status(response.status).send({ error: "Chat request failed", details: errorText })
if (toolsEnabled && effectiveWorkspacePath) {
try {
const mcpManager = getMcpManager()
await mcpManager.loadConfig(effectiveWorkspacePath)
const mcpTools = await mcpManager.getToolsAsOpenAIFormat()
allTools = [...CORE_TOOLS, ...mcpTools]
} catch (mcpError) {
logger.warn({ error: mcpError }, "Failed to load MCP tools for Qwen")
}
}
if (stream && response.body) {
// Stream the response
logger.info({ chatUrl, model: normalizedModel, tools: allTools.length }, "Proxying Qwen chat with tools")
if (stream) {
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
})
const reader = response.body.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
reply.raw.write(chunk)
}
} finally {
reader.releaseLock()
reply.raw.end()
}
await streamWithToolLoop(
accessToken,
chatUrl,
{ model: normalizedModel, messages, tools: allTools },
effectiveWorkspacePath,
toolsEnabled,
reply.raw,
logger
)
} else {
const response = await fetch(chatUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${accessToken}`
},
body: JSON.stringify({
model: normalizedModel,
messages,
stream: false
})
})
const data = await response.json()
return reply.send(data)
}

View File

@@ -2,7 +2,7 @@ import { FastifyInstance, FastifyReply } from "fastify"
import { spawnSync } from "child_process"
import { z } from "zod"
import { existsSync, mkdirSync } from "fs"
import { cp, readFile, writeFile } from "fs/promises"
import { cp, readFile, writeFile, stat as readFileStat } from "fs/promises"
import path from "path"
import { WorkspaceManager } from "../../workspaces/manager"
import { InstanceStore } from "../../storage/instance-store"
@@ -257,6 +257,12 @@ export function registerWorkspaceRoutes(app: FastifyInstance, deps: RouteDeps) {
const configPath = path.join(workspace.path, ".mcp.json")
try {
await writeFile(configPath, JSON.stringify(body.config, null, 2), "utf-8")
// Auto-load MCP config into the manager after saving
const { getMcpManager } = await import("../../mcp/client")
const mcpManager = getMcpManager()
await mcpManager.loadConfig(workspace.path)
return { path: configPath, exists: true, config: body.config }
} catch (error) {
request.log.error({ err: error }, "Failed to write MCP config")
@@ -265,6 +271,110 @@ export function registerWorkspaceRoutes(app: FastifyInstance, deps: RouteDeps) {
}
})
// Get MCP connection status for a workspace
app.get<{ Params: { id: string } }>("/api/workspaces/:id/mcp-status", async (request, reply) => {
const workspace = deps.workspaceManager.get(request.params.id)
if (!workspace) {
reply.code(404)
return { error: "Workspace not found" }
}
try {
const { getMcpManager } = await import("../../mcp/client")
const mcpManager = getMcpManager()
// Load config if not already loaded
await mcpManager.loadConfig(workspace.path)
const status = mcpManager.getStatus()
const tools = await mcpManager.getAllTools()
return {
servers: status,
toolCount: tools.length,
tools: tools.map(t => ({ name: t.name, server: t.serverName, description: t.description }))
}
} catch (error) {
request.log.error({ err: error }, "Failed to get MCP status")
reply.code(500)
return { error: "Failed to get MCP status" }
}
})
// Connect all configured MCPs for a workspace
app.post<{ Params: { id: string } }>("/api/workspaces/:id/mcp-connect", async (request, reply) => {
const workspace = deps.workspaceManager.get(request.params.id)
if (!workspace) {
reply.code(404)
return { error: "Workspace not found" }
}
try {
const { getMcpManager } = await import("../../mcp/client")
const mcpManager = getMcpManager()
// Load and connect to all configured MCPs
await mcpManager.loadConfig(workspace.path)
// Get the tools to trigger connections
const tools = await mcpManager.getAllTools()
const status = mcpManager.getStatus()
return {
success: true,
servers: status,
toolCount: tools.length
}
} catch (error) {
request.log.error({ err: error }, "Failed to connect MCPs")
reply.code(500)
return { error: "Failed to connect MCPs" }
}
})
app.post<{
Params: { id: string }
Body: { name: string; description?: string; systemPrompt: string; mode?: string }
}>("/api/workspaces/:id/agents", async (request, reply) => {
const workspace = deps.workspaceManager.get(request.params.id)
if (!workspace) {
reply.code(404)
return { error: "Workspace not found" }
}
const { name, description, systemPrompt } = request.body
if (!name || !systemPrompt) {
reply.code(400)
return { error: "Name and systemPrompt are required" }
}
try {
const data = await deps.instanceStore.read(workspace.path)
const customAgents = data.customAgents || []
// Update existing or add new
const existingIndex = customAgents.findIndex(a => a.name === name)
const agentData = { name, description, prompt: systemPrompt }
if (existingIndex >= 0) {
customAgents[existingIndex] = agentData
} else {
customAgents.push(agentData)
}
await deps.instanceStore.write(workspace.path, {
...data,
customAgents
})
return { success: true, agent: agentData }
} catch (error) {
request.log.error({ err: error }, "Failed to save custom agent")
reply.code(500)
return { error: "Failed to save custom agent" }
}
})
app.post<{
Body: { source: string; destination: string; includeConfig?: boolean }
}>("/api/workspaces/import", async (request, reply) => {
@@ -308,6 +418,53 @@ export function registerWorkspaceRoutes(app: FastifyInstance, deps: RouteDeps) {
return workspace
})
// Serve static files from workspace for preview
app.get<{ Params: { id: string; "*": string } }>("/api/workspaces/:id/serve/*", async (request, reply) => {
const workspace = deps.workspaceManager.get(request.params.id)
if (!workspace) {
reply.code(404)
return { error: "Workspace not found" }
}
const relativePath = request.params["*"]
const filePath = path.join(workspace.path, relativePath)
// Security check: ensure file is within workspace.path
if (!filePath.startsWith(workspace.path)) {
reply.code(403)
return { error: "Access denied" }
}
if (!existsSync(filePath)) {
reply.code(404)
return { error: "File not found" }
}
const stat = await readFileStat(filePath)
if (!stat.isFile()) {
reply.code(400)
return { error: "Not a file" }
}
const ext = path.extname(filePath).toLowerCase()
const mimeTypes: Record<string, string> = {
".html": "text/html",
".htm": "text/html",
".js": "application/javascript",
".css": "text/css",
".json": "application/json",
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".svg": "image/svg+xml",
".txt": "text/plain",
}
reply.type(mimeTypes[ext] || "application/octet-stream")
return await readFile(filePath)
})
}

View File

@@ -1,9 +1,11 @@
import { FastifyInstance } from "fastify"
import { ZAIClient, ZAI_MODELS, type ZAIConfig, type ZAIChatRequest, ZAIChatRequestSchema } from "../../integrations/zai-api"
import { ZAIClient, ZAI_MODELS, type ZAIConfig, type ZAIChatRequest, type ZAIMessage } from "../../integrations/zai-api"
import { Logger } from "../../logger"
import { existsSync, readFileSync, writeFileSync, mkdirSync } from "fs"
import { join } from "path"
import { getUserIntegrationsDir } from "../../user-data"
import { CORE_TOOLS, executeTools, type ToolCall, type ToolResult } from "../../tools/executor"
import { getMcpManager } from "../../mcp/client"
interface ZAIRouteDeps {
logger: Logger
@@ -12,6 +14,9 @@ interface ZAIRouteDeps {
const CONFIG_DIR = getUserIntegrationsDir()
const CONFIG_FILE = join(CONFIG_DIR, "zai-config.json")
// Maximum number of tool execution loops to prevent infinite recursion
const MAX_TOOL_LOOPS = 10
export async function registerZAIRoutes(
app: FastifyInstance,
deps: ZAIRouteDeps
@@ -75,7 +80,7 @@ export async function registerZAIRoutes(
}
})
// Chat completion endpoint
// Chat completion endpoint WITH MCP TOOL SUPPORT
app.post('/api/zai/chat', async (request, reply) => {
try {
const config = getZAIConfig()
@@ -84,9 +89,46 @@ export async function registerZAIRoutes(
}
const client = new ZAIClient(config)
const chatRequest = request.body as ZAIChatRequest
const chatRequest = request.body as ZAIChatRequest & {
workspacePath?: string
enableTools?: boolean
}
// Handle streaming
// Extract workspace path for tool execution
// IMPORTANT: workspacePath must be provided by frontend, otherwise tools write to server directory
const workspacePath = chatRequest.workspacePath || process.cwd()
const enableTools = chatRequest.enableTools !== false // Default to true
logger.info({
workspacePath,
receivedWorkspacePath: chatRequest.workspacePath,
enableTools
}, "Z.AI chat request received")
// Load MCP tools from workspace config
let allTools = [...CORE_TOOLS]
if (enableTools && workspacePath) {
try {
const mcpManager = getMcpManager()
await mcpManager.loadConfig(workspacePath)
const mcpTools = await mcpManager.getToolsAsOpenAIFormat()
allTools = [...CORE_TOOLS, ...mcpTools]
if (mcpTools.length > 0) {
logger.info({ mcpToolCount: mcpTools.length }, "Loaded MCP tools")
}
} catch (mcpError) {
logger.warn({ error: mcpError }, "Failed to load MCP tools, using core tools only")
}
}
// Inject tools into request if enabled
const requestWithTools: ZAIChatRequest = {
...chatRequest,
tools: enableTools ? allTools : undefined,
tool_choice: enableTools ? "auto" : undefined
}
// Handle streaming with tool execution loop
if (chatRequest.stream) {
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
@@ -95,17 +137,14 @@ export async function registerZAIRoutes(
})
try {
for await (const chunk of client.chatStream(chatRequest)) {
reply.raw.write(`data: ${JSON.stringify(chunk)}\n\n`)
// Check for finish_reason to end stream
const finishReason = chunk.choices[0]?.finish_reason
if (finishReason) {
reply.raw.write('data: [DONE]\n\n')
break
}
}
await streamWithToolLoop(
client,
requestWithTools,
workspacePath,
enableTools,
reply.raw,
logger
)
reply.raw.end()
} catch (streamError) {
logger.error({ error: streamError }, "Z.AI streaming failed")
@@ -113,7 +152,14 @@ export async function registerZAIRoutes(
reply.raw.end()
}
} else {
const response = await client.chat(chatRequest)
// Non-streaming with tool loop
const response = await chatWithToolLoop(
client,
requestWithTools,
workspacePath,
enableTools,
logger
)
return response
}
} catch (error) {
@@ -122,7 +168,184 @@ export async function registerZAIRoutes(
}
})
logger.info("Z.AI routes registered")
logger.info("Z.AI routes registered with MCP tool support")
}
/**
* Streaming chat with tool execution loop
*/
async function streamWithToolLoop(
client: ZAIClient,
request: ZAIChatRequest,
workspacePath: string,
enableTools: boolean,
rawResponse: any,
logger: Logger
): Promise<void> {
let messages = [...request.messages]
let loopCount = 0
while (loopCount < MAX_TOOL_LOOPS) {
loopCount++
// Accumulate tool calls from stream
let accumulatedToolCalls: { [index: number]: { id: string; name: string; arguments: string } } = {}
let hasToolCalls = false
let textContent = ""
// Stream response
for await (const chunk of client.chatStream({ ...request, messages })) {
// Write chunk to client
rawResponse.write(`data: ${JSON.stringify(chunk)}\n\n`)
const choice = chunk.choices[0]
if (!choice) continue
// Accumulate text content
if (choice.delta?.content) {
textContent += choice.delta.content
}
// Accumulate tool calls from delta
if (choice.delta?.tool_calls) {
hasToolCalls = true
for (const tc of choice.delta.tool_calls) {
const idx = tc.index ?? 0
if (!accumulatedToolCalls[idx]) {
accumulatedToolCalls[idx] = { id: tc.id || "", name: "", arguments: "" }
}
if (tc.id) accumulatedToolCalls[idx].id = tc.id
if (tc.function?.name) accumulatedToolCalls[idx].name += tc.function.name
if (tc.function?.arguments) accumulatedToolCalls[idx].arguments += tc.function.arguments
}
}
// Check if we should stop
if (choice.finish_reason === "stop") {
rawResponse.write('data: [DONE]\n\n')
return
}
}
// If no tool calls, we're done
if (!hasToolCalls || !enableTools) {
rawResponse.write('data: [DONE]\n\n')
return
}
// Convert accumulated tool calls
const toolCalls: ToolCall[] = Object.values(accumulatedToolCalls).map(tc => ({
id: tc.id,
type: "function" as const,
function: {
name: tc.name,
arguments: tc.arguments
}
}))
if (toolCalls.length === 0) {
rawResponse.write('data: [DONE]\n\n')
return
}
logger.info({ toolCalls: toolCalls.map(tc => tc.function.name) }, "Executing tool calls")
// Add assistant message with tool calls
const assistantMessage: ZAIMessage = {
role: "assistant",
content: textContent || undefined,
tool_calls: toolCalls
}
messages.push(assistantMessage)
// Execute tools
const toolResults = await executeTools(workspacePath, toolCalls)
// Notify client about tool execution via special event
for (const result of toolResults) {
const toolEvent = {
type: "tool_result",
tool_call_id: result.tool_call_id,
content: result.content
}
rawResponse.write(`data: ${JSON.stringify(toolEvent)}\n\n`)
}
// Add tool results to messages
for (const result of toolResults) {
const toolMessage: ZAIMessage = {
role: "tool",
content: result.content,
tool_call_id: result.tool_call_id
}
messages.push(toolMessage)
}
logger.info({ loopCount, toolsExecuted: toolResults.length }, "Tool loop iteration complete")
}
logger.warn({ loopCount }, "Max tool loops reached")
rawResponse.write('data: [DONE]\n\n')
}
/**
* Non-streaming chat with tool execution loop
*/
async function chatWithToolLoop(
client: ZAIClient,
request: ZAIChatRequest,
workspacePath: string,
enableTools: boolean,
logger: Logger
): Promise<any> {
let messages = [...request.messages]
let loopCount = 0
let lastResponse: any = null
while (loopCount < MAX_TOOL_LOOPS) {
loopCount++
const response = await client.chat({ ...request, messages, stream: false })
lastResponse = response
const choice = response.choices[0]
if (!choice) break
const toolCalls = choice.message?.tool_calls
// If no tool calls or finish_reason is "stop", return
if (!toolCalls || toolCalls.length === 0 || !enableTools) {
return response
}
logger.info({ toolCalls: toolCalls.map((tc: any) => tc.function.name) }, "Executing tool calls")
// Add assistant message
const assistantMessage: ZAIMessage = {
role: "assistant",
content: choice.message.content || undefined,
tool_calls: toolCalls
}
messages.push(assistantMessage)
// Execute tools
const toolResults = await executeTools(workspacePath, toolCalls)
// Add tool results
for (const result of toolResults) {
const toolMessage: ZAIMessage = {
role: "tool",
content: result.content,
tool_call_id: result.tool_call_id
}
messages.push(toolMessage)
}
logger.info({ loopCount, toolsExecuted: toolResults.length }, "Tool loop iteration complete")
}
logger.warn({ loopCount }, "Max tool loops reached")
return lastResponse
}
function getZAIConfig(): ZAIConfig {
@@ -131,9 +354,9 @@ function getZAIConfig(): ZAIConfig {
const data = readFileSync(CONFIG_FILE, 'utf-8')
return JSON.parse(data)
}
return { enabled: false, endpoint: "https://api.z.ai/api/paas/v4", timeout: 300000 }
return { enabled: false, endpoint: "https://api.z.ai/api/coding/paas/v4", timeout: 300000 }
} catch {
return { enabled: false, endpoint: "https://api.z.ai/api/paas/v4", timeout: 300000 }
return { enabled: false, endpoint: "https://api.z.ai/api/coding/paas/v4", timeout: 300000 }
}
}

View File

@@ -0,0 +1,284 @@
/**
* Session Store - Native session management without OpenCode binary
*
* This provides a complete replacement for OpenCode's session management,
* allowing NomadArch to work in "Binary-Free Mode".
*/
import { readFile, writeFile, mkdir } from "fs/promises"
import { existsSync } from "fs"
import path from "path"
import { ulid } from "ulid"
import { createLogger } from "../logger"
const log = createLogger({ component: "session-store" })
// Types matching OpenCode's schema for compatibility
export interface SessionMessage {
id: string
sessionId: string
role: "user" | "assistant" | "system" | "tool"
content?: string
parts?: MessagePart[]
createdAt: number
updatedAt: number
toolCalls?: ToolCall[]
toolCallId?: string
status?: "pending" | "streaming" | "completed" | "error"
}
export interface MessagePart {
type: "text" | "tool_call" | "tool_result" | "thinking" | "code"
content?: string
toolCall?: ToolCall
toolResult?: ToolResult
}
export interface ToolCall {
id: string
type: "function"
function: {
name: string
arguments: string
}
}
export interface ToolResult {
toolCallId: string
content: string
isError?: boolean
}
export interface Session {
id: string
workspaceId: string
title?: string
parentId?: string | null
createdAt: number
updatedAt: number
messageIds: string[]
model?: {
providerId: string
modelId: string
}
agent?: string
revert?: {
messageID: string
reason?: string
} | null
}
export interface SessionStore {
sessions: Record<string, Session>
messages: Record<string, SessionMessage>
}
/**
* Native session management for Binary-Free Mode
*/
export class NativeSessionManager {
private stores = new Map<string, SessionStore>()
private dataDir: string
constructor(dataDir: string) {
this.dataDir = dataDir
}
private getStorePath(workspaceId: string): string {
return path.join(this.dataDir, workspaceId, "sessions.json")
}
private async ensureDir(workspaceId: string): Promise<void> {
const dir = path.join(this.dataDir, workspaceId)
if (!existsSync(dir)) {
await mkdir(dir, { recursive: true })
}
}
private async loadStore(workspaceId: string): Promise<SessionStore> {
if (this.stores.has(workspaceId)) {
return this.stores.get(workspaceId)!
}
const storePath = this.getStorePath(workspaceId)
let store: SessionStore = { sessions: {}, messages: {} }
if (existsSync(storePath)) {
try {
const data = await readFile(storePath, "utf-8")
store = JSON.parse(data)
} catch (error) {
log.error({ workspaceId, error }, "Failed to load session store")
}
}
this.stores.set(workspaceId, store)
return store
}
private async saveStore(workspaceId: string): Promise<void> {
const store = this.stores.get(workspaceId)
if (!store) return
await this.ensureDir(workspaceId)
const storePath = this.getStorePath(workspaceId)
await writeFile(storePath, JSON.stringify(store, null, 2), "utf-8")
}
// Session CRUD operations
async listSessions(workspaceId: string): Promise<Session[]> {
const store = await this.loadStore(workspaceId)
return Object.values(store.sessions).sort((a, b) => b.updatedAt - a.updatedAt)
}
async getSession(workspaceId: string, sessionId: string): Promise<Session | null> {
const store = await this.loadStore(workspaceId)
return store.sessions[sessionId] ?? null
}
async createSession(workspaceId: string, options?: {
title?: string
parentId?: string
model?: { providerId: string; modelId: string }
agent?: string
}): Promise<Session> {
const store = await this.loadStore(workspaceId)
const now = Date.now()
const session: Session = {
id: ulid(),
workspaceId,
title: options?.title ?? "New Session",
parentId: options?.parentId ?? null,
createdAt: now,
updatedAt: now,
messageIds: [],
model: options?.model,
agent: options?.agent,
}
store.sessions[session.id] = session
await this.saveStore(workspaceId)
log.info({ workspaceId, sessionId: session.id }, "Created new session")
return session
}
async updateSession(workspaceId: string, sessionId: string, updates: Partial<Session>): Promise<Session | null> {
const store = await this.loadStore(workspaceId)
const session = store.sessions[sessionId]
if (!session) return null
const updated = {
...session,
...updates,
id: session.id, // Prevent ID change
workspaceId: session.workspaceId, // Prevent workspace change
updatedAt: Date.now(),
}
store.sessions[sessionId] = updated
await this.saveStore(workspaceId)
return updated
}
async deleteSession(workspaceId: string, sessionId: string): Promise<boolean> {
const store = await this.loadStore(workspaceId)
const session = store.sessions[sessionId]
if (!session) return false
// Delete all messages in the session
for (const messageId of session.messageIds) {
delete store.messages[messageId]
}
delete store.sessions[sessionId]
await this.saveStore(workspaceId)
log.info({ workspaceId, sessionId }, "Deleted session")
return true
}
// Message operations
async getSessionMessages(workspaceId: string, sessionId: string): Promise<SessionMessage[]> {
const store = await this.loadStore(workspaceId)
const session = store.sessions[sessionId]
if (!session) return []
return session.messageIds
.map(id => store.messages[id])
.filter((msg): msg is SessionMessage => msg !== undefined)
}
async addMessage(workspaceId: string, sessionId: string, message: Omit<SessionMessage, "id" | "sessionId" | "createdAt" | "updatedAt">): Promise<SessionMessage> {
const store = await this.loadStore(workspaceId)
const session = store.sessions[sessionId]
if (!session) throw new Error(`Session not found: ${sessionId}`)
const now = Date.now()
const newMessage: SessionMessage = {
...message,
id: ulid(),
sessionId,
createdAt: now,
updatedAt: now,
}
store.messages[newMessage.id] = newMessage
session.messageIds.push(newMessage.id)
session.updatedAt = now
await this.saveStore(workspaceId)
return newMessage
}
async updateMessage(workspaceId: string, messageId: string, updates: Partial<SessionMessage>): Promise<SessionMessage | null> {
const store = await this.loadStore(workspaceId)
const message = store.messages[messageId]
if (!message) return null
const updated = {
...message,
...updates,
id: message.id, // Prevent ID change
sessionId: message.sessionId, // Prevent session change
updatedAt: Date.now(),
}
store.messages[messageId] = updated
await this.saveStore(workspaceId)
return updated
}
// Utility
async clearWorkspace(workspaceId: string): Promise<void> {
this.stores.delete(workspaceId)
// Optionally delete file
}
getActiveSessionCount(workspaceId: string): number {
const store = this.stores.get(workspaceId)
return store ? Object.keys(store.sessions).length : 0
}
}
// Singleton instance
let sessionManager: NativeSessionManager | null = null
export function getSessionManager(dataDir?: string): NativeSessionManager {
if (!sessionManager) {
if (!dataDir) {
throw new Error("Session manager not initialized - provide dataDir")
}
sessionManager = new NativeSessionManager(dataDir)
}
return sessionManager
}
export function initSessionManager(dataDir: string): NativeSessionManager {
sessionManager = new NativeSessionManager(dataDir)
return sessionManager
}

View File

@@ -0,0 +1,352 @@
/**
* Tool Executor Service
* Provides MCP-compatible tool definitions and execution for all AI models.
* This enables Z.AI, Qwen, OpenCode Zen, etc. to write files, read files, and interact with the workspace.
*/
import fs from "fs"
import path from "path"
import { createLogger } from "../logger"
import { getMcpManager } from "../mcp/client"
const log = createLogger({ component: "tool-executor" })
// OpenAI-compatible Tool Definition Schema
export interface ToolDefinition {
type: "function"
function: {
name: string
description: string
parameters: {
type: "object"
properties: Record<string, { type: string; description?: string }>
required?: string[]
}
}
}
// Tool Call from LLM Response
export interface ToolCall {
id: string
type: "function"
function: {
name: string
arguments: string // JSON string
}
}
// Tool Execution Result
export interface ToolResult {
tool_call_id: string
role: "tool"
content: string
}
/**
* Core Tool Definitions for MCP
* These follow OpenAI's function calling schema (compatible with Z.AI GLM-4)
*/
export const CORE_TOOLS: ToolDefinition[] = [
{
type: "function",
function: {
name: "write_file",
description: "Write content to a file in the workspace. Creates the file if it doesn't exist, or overwrites if it does. Use this to generate code files, configuration, or any text content.",
parameters: {
type: "object",
properties: {
path: {
type: "string",
description: "Relative path to the file within the workspace (e.g., 'src/components/Button.tsx')"
},
content: {
type: "string",
description: "The full content to write to the file"
}
},
required: ["path", "content"]
}
}
},
{
type: "function",
function: {
name: "read_file",
description: "Read the contents of a file from the workspace.",
parameters: {
type: "object",
properties: {
path: {
type: "string",
description: "Relative path to the file within the workspace"
}
},
required: ["path"]
}
}
},
{
type: "function",
function: {
name: "list_files",
description: "List files and directories in a workspace directory.",
parameters: {
type: "object",
properties: {
path: {
type: "string",
description: "Relative path to the directory (use '.' for root)"
}
},
required: ["path"]
}
}
},
{
type: "function",
function: {
name: "create_directory",
description: "Create a directory in the workspace. Creates parent directories if needed.",
parameters: {
type: "object",
properties: {
path: {
type: "string",
description: "Relative path to the directory to create"
}
},
required: ["path"]
}
}
},
{
type: "function",
function: {
name: "delete_file",
description: "Delete a file from the workspace.",
parameters: {
type: "object",
properties: {
path: {
type: "string",
description: "Relative path to the file to delete"
}
},
required: ["path"]
}
}
}
]
/**
* Execute a tool call within a workspace context
*/
export async function executeTool(
workspacePath: string,
toolCall: ToolCall
): Promise<ToolResult> {
const { id, function: fn } = toolCall
const name = fn.name
let args: Record<string, unknown>
try {
args = JSON.parse(fn.arguments)
} catch (e) {
return {
tool_call_id: id,
role: "tool",
content: `Error: Failed to parse tool arguments: ${fn.arguments}`
}
}
log.info({ tool: name, args, workspacePath }, "Executing tool")
try {
switch (name) {
case "write_file": {
const relativePath = String(args.path || "")
const content = String(args.content || "")
const fullPath = path.resolve(workspacePath, relativePath)
// Security check: ensure we're still within workspace
if (!fullPath.startsWith(path.resolve(workspacePath))) {
return {
tool_call_id: id,
role: "tool",
content: `Error: Path escapes workspace boundary: ${relativePath}`
}
}
// Ensure parent directory exists
const dir = path.dirname(fullPath)
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true })
}
fs.writeFileSync(fullPath, content, "utf-8")
log.info({ path: relativePath, bytes: content.length }, "File written successfully")
return {
tool_call_id: id,
role: "tool",
content: `Successfully wrote ${content.length} bytes to ${relativePath}`
}
}
case "read_file": {
const relativePath = String(args.path || "")
const fullPath = path.resolve(workspacePath, relativePath)
if (!fullPath.startsWith(path.resolve(workspacePath))) {
return {
tool_call_id: id,
role: "tool",
content: `Error: Path escapes workspace boundary: ${relativePath}`
}
}
if (!fs.existsSync(fullPath)) {
return {
tool_call_id: id,
role: "tool",
content: `Error: File not found: ${relativePath}`
}
}
const content = fs.readFileSync(fullPath, "utf-8")
return {
tool_call_id: id,
role: "tool",
content: content.slice(0, 50000) // Limit to prevent context overflow
}
}
case "list_files": {
const relativePath = String(args.path || ".")
const fullPath = path.resolve(workspacePath, relativePath)
if (!fullPath.startsWith(path.resolve(workspacePath))) {
return {
tool_call_id: id,
role: "tool",
content: `Error: Path escapes workspace boundary: ${relativePath}`
}
}
if (!fs.existsSync(fullPath)) {
return {
tool_call_id: id,
role: "tool",
content: `Error: Directory not found: ${relativePath}`
}
}
const entries = fs.readdirSync(fullPath, { withFileTypes: true })
const listing = entries.map(e =>
e.isDirectory() ? `${e.name}/` : e.name
).join("\n")
return {
tool_call_id: id,
role: "tool",
content: listing || "(empty directory)"
}
}
case "create_directory": {
const relativePath = String(args.path || "")
const fullPath = path.resolve(workspacePath, relativePath)
if (!fullPath.startsWith(path.resolve(workspacePath))) {
return {
tool_call_id: id,
role: "tool",
content: `Error: Path escapes workspace boundary: ${relativePath}`
}
}
fs.mkdirSync(fullPath, { recursive: true })
return {
tool_call_id: id,
role: "tool",
content: `Successfully created directory: ${relativePath}`
}
}
case "delete_file": {
const relativePath = String(args.path || "")
const fullPath = path.resolve(workspacePath, relativePath)
if (!fullPath.startsWith(path.resolve(workspacePath))) {
return {
tool_call_id: id,
role: "tool",
content: `Error: Path escapes workspace boundary: ${relativePath}`
}
}
if (!fs.existsSync(fullPath)) {
return {
tool_call_id: id,
role: "tool",
content: `Error: File not found: ${relativePath}`
}
}
fs.unlinkSync(fullPath)
return {
tool_call_id: id,
role: "tool",
content: `Successfully deleted: ${relativePath}`
}
}
default: {
// Check if this is an MCP tool (format: mcp_servername_toolname)
if (name.startsWith("mcp_")) {
try {
const mcpManager = getMcpManager()
const result = await mcpManager.executeTool(name, args)
return {
tool_call_id: id,
role: "tool",
content: result
}
} catch (mcpError) {
const message = mcpError instanceof Error ? mcpError.message : String(mcpError)
return {
tool_call_id: id,
role: "tool",
content: `MCP tool error: ${message}`
}
}
}
return {
tool_call_id: id,
role: "tool",
content: `Error: Unknown tool: ${name}`
}
}
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error)
log.error({ tool: name, error: message }, "Tool execution failed")
return {
tool_call_id: id,
role: "tool",
content: `Error executing ${name}: ${message}`
}
}
}
/**
* Execute multiple tool calls in parallel
*/
export async function executeTools(
workspacePath: string,
toolCalls: ToolCall[]
): Promise<ToolResult[]> {
return Promise.all(
toolCalls.map(tc => executeTool(workspacePath, tc))
)
}

View File

@@ -0,0 +1,13 @@
/**
* Tools Module Index
* Exports MCP-compatible tool definitions and executor for AI agent integration.
*/
export {
CORE_TOOLS,
executeTool,
executeTools,
type ToolDefinition,
type ToolCall,
type ToolResult
} from "./executor"

View File

@@ -28,11 +28,11 @@ interface ManagedProcess {
export class WorkspaceRuntime {
private processes = new Map<string, ManagedProcess>()
constructor(private readonly eventBus: EventBus, private readonly logger: Logger) {}
constructor(private readonly eventBus: EventBus, private readonly logger: Logger) { }
async launch(options: LaunchOptions): Promise<{ pid: number; port: number; exitPromise: Promise<ProcessExitInfo>; getLastOutput: () => string }> {
this.validateFolder(options.folder)
// Check if binary exists before attempting to launch
try {
accessSync(options.binaryPath, constants.F_OK)
@@ -41,8 +41,8 @@ export class WorkspaceRuntime {
}
const args = ["serve", "--port", "0", "--print-logs", "--log-level", "DEBUG"]
const env = {
...process.env,
const env = {
...process.env,
...(options.environment ?? {}),
"OPENCODE_SERVER_HOST": "127.0.0.1",
"OPENCODE_SERVER_PORT": "0",
@@ -58,7 +58,23 @@ export class WorkspaceRuntime {
const exitPromise = new Promise<ProcessExitInfo>((resolveExit) => {
exitResolve = resolveExit
})
let lastOutput = ""
// Store recent output for debugging - keep last 20 lines from each stream
const MAX_OUTPUT_LINES = 20
const recentStdout: string[] = []
const recentStderr: string[] = []
const getLastOutput = () => {
const combined: string[] = []
if (recentStderr.length > 0) {
combined.push("=== STDERR ===")
combined.push(...recentStderr.slice(-10))
}
if (recentStdout.length > 0) {
combined.push("=== STDOUT ===")
combined.push(...recentStdout.slice(-10))
}
return combined.join("\n")
}
return new Promise((resolve, reject) => {
this.logger.info(
@@ -149,23 +165,28 @@ export class WorkspaceRuntime {
for (const line of lines) {
const trimmed = line.trim()
if (!trimmed) continue
lastOutput = trimmed
// Store in recent buffer for debugging
recentStdout.push(trimmed)
if (recentStdout.length > MAX_OUTPUT_LINES) {
recentStdout.shift()
}
this.emitLog(options.workspaceId, "info", line)
if (!portFound) {
this.logger.debug({ workspaceId: options.workspaceId, line: trimmed }, "OpenCode output line")
// Try multiple patterns for port detection
const portMatch = line.match(/opencode server listening on http:\/\/.+:(\d+)/i) ||
line.match(/server listening on http:\/\/.+:(\d+)/i) ||
line.match(/listening on http:\/\/.+:(\d+)/i) ||
line.match(/:(\d+)/i)
line.match(/server listening on http:\/\/.+:(\d+)/i) ||
line.match(/listening on http:\/\/.+:(\d+)/i) ||
line.match(/:(\d+)/i)
if (portMatch) {
portFound = true
child.removeListener("error", handleError)
const port = parseInt(portMatch[1], 10)
this.logger.info({ workspaceId: options.workspaceId, port, matchedLine: trimmed }, "Workspace runtime allocated port - PORT DETECTED")
const getLastOutput = () => lastOutput.trim()
resolve({ pid: child.pid!, port, exitPromise, getLastOutput })
} else {
this.logger.debug({ workspaceId: options.workspaceId, line: trimmed }, "Port detection - no match in this line")
@@ -183,7 +204,13 @@ export class WorkspaceRuntime {
for (const line of lines) {
const trimmed = line.trim()
if (!trimmed) continue
lastOutput = `[stderr] ${trimmed}`
// Store in recent buffer for debugging
recentStderr.push(trimmed)
if (recentStderr.length > MAX_OUTPUT_LINES) {
recentStderr.shift()
}
this.emitLog(options.workspaceId, "error", line)
}
})