/** * Claude Code Integration Layer * * Provides easy integration with Claude Code and OpenClaw. * Single API surface for all pipeline operations. */ import { randomUUID } from 'crypto'; import ZAI from 'z-ai-web-dev-sdk'; import { DeterministicStateMachine, StateMachineDefinition, StateMachineRegistry, stateMachineRegistry } from '../core/state-machine'; import { ParallelExecutionEngine, PipelineTask, AgentRole, AgentSession, defaultExecutor } from '../engine/parallel-executor'; import { EventBus, PipelineEvent, PipelineEventTypes, defaultEventBus } from '../events/event-bus'; import { WorkspaceManager, WorkspaceFactory, AgentIdentity, defaultWorkspaceFactory } from '../workspace/agent-workspace'; import { WorkflowRegistry, YAMLWorkflow, defaultWorkflowRegistry } from '../workflows/yaml-workflow'; // ============================================================================ // Types // ============================================================================ export interface PipelineConfig { name: string; projects: ProjectConfig[]; roles: AgentRole[]; maxConcurrency?: number; timeout?: number; } export interface ProjectConfig { id: string; name: string; description?: string; repository?: string; branch?: string; tasks: TaskConfig[]; } export interface TaskConfig { type: string; description: string; role: AgentRole; priority?: 'low' | 'medium' | 'high' | 'critical'; dependencies?: string[]; timeout?: number; } export interface PipelineResult { pipelineId: string; status: 'running' | 'completed' | 'failed' | 'cancelled'; startTime: Date; endTime?: Date; projects: ProjectResult[]; } export interface ProjectResult { projectId: string; status: 'pending' | 'running' | 'completed' | 'failed'; tasks: TaskResult[]; } export interface TaskResult { taskId: string; status: 'pending' | 'running' | 'completed' | 'failed'; output?: unknown; error?: string; duration?: number; } export interface AgentMessage { role: 'system' | 'user' | 'assistant'; content: string; } // ============================================================================ // Pipeline Orchestrator // ============================================================================ /** * PipelineOrchestrator - Main integration class * * Single entry point for Claude Code and OpenClaw integration. */ export class PipelineOrchestrator { private zai: Awaited> | null = null; private executor: ParallelExecutionEngine; private eventBus: EventBus; private workflowRegistry: WorkflowRegistry; private workspaceFactory: WorkspaceFactory; private smRegistry: StateMachineRegistry; private pipelines: Map = new Map(); private initialized = false; constructor(config?: { executor?: ParallelExecutionEngine; eventBus?: EventBus; workflowRegistry?: WorkflowRegistry; workspaceFactory?: WorkspaceFactory; }) { this.executor = config?.executor || defaultExecutor; this.eventBus = config?.eventBus || defaultEventBus; this.workflowRegistry = config?.workflowRegistry || defaultWorkflowRegistry; this.workspaceFactory = config?.workspaceFactory || defaultWorkspaceFactory; this.smRegistry = stateMachineRegistry; } /** * Initialize the pipeline system */ async initialize(): Promise { if (this.initialized) return; // Initialize ZAI SDK this.zai = await ZAI.create(); // Start executor this.executor.start(); // Start event bus this.eventBus.start(); // Register task handler this.executor.registerHandler('agent-task', this.executeAgentTask.bind(this)); // Set up event subscriptions this.setupEventSubscriptions(); this.initialized = true; } /** * Set up event subscriptions for coordination */ private setupEventSubscriptions(): void { // Agent completion triggers next step this.eventBus.subscribe({ eventType: PipelineEventTypes.AGENT_COMPLETED, handler: async (event) => { const { projectId, role, output } = event.payload as Record; // Determine next role in pipeline const nextRole = this.getNextRole(role as AgentRole); if (nextRole) { // Emit event to trigger next agent this.eventBus.publish({ type: PipelineEventTypes.TASK_STARTED, source: 'orchestrator', payload: { projectId, role: nextRole, previousOutput: output } }); } } }); // Handle failures this.eventBus.subscribe({ eventType: PipelineEventTypes.AGENT_FAILED, handler: async (event) => { const { projectId, error } = event.payload as Record; console.error(`Agent failed for project ${projectId}:`, error); // Emit pipeline failure event this.eventBus.publish({ type: PipelineEventTypes.PIPELINE_COMPLETED, source: 'orchestrator', payload: { projectId, status: 'failed', error } }); } }); } /** * Get next role in the pipeline sequence */ private getNextRole(currentRole: AgentRole): AgentRole | null { const sequence: AgentRole[] = ['programmer', 'reviewer', 'tester']; const currentIndex = sequence.indexOf(currentRole); if (currentIndex < sequence.length - 1) { return sequence[currentIndex + 1]; } return null; // End of pipeline } /** * Execute an agent task */ private async executeAgentTask( task: PipelineTask, session: AgentSession ): Promise { if (!this.zai) { throw new Error('Pipeline not initialized'); } // Create workspace for this task const workspace = this.workspaceFactory.createWorkspace({ projectId: session.projectId, agentId: session.id, role: session.role, permissions: this.getPermissionsForRole(session.role) }); // Set agent identity workspace.setIdentity(session.identity); // Build messages for LLM const messages = this.buildMessages(task, session, workspace); try { // Call LLM const response = await this.zai.chat.completions.create({ messages, thinking: { type: 'disabled' } }); const output = response.choices?.[0]?.message?.content || ''; // Save output to workspace workspace.writeFile(`output/${task.id}.txt`, output); // Store in memory for next agent workspace.memorize(`task.${task.id}.output`, output); // Emit completion event this.eventBus.publish({ type: PipelineEventTypes.AGENT_COMPLETED, source: session.id, payload: { taskId: task.id, projectId: session.projectId, role: session.role, output } }); return { output, workspace: workspace.getPath() }; } catch (error) { // Emit failure event this.eventBus.publish({ type: PipelineEventTypes.AGENT_FAILED, source: session.id, payload: { taskId: task.id, projectId: session.projectId, role: session.role, error: error instanceof Error ? error.message : String(error) } }); throw error; } } /** * Build messages for LLM */ private buildMessages( task: PipelineTask, session: AgentSession, workspace: WorkspaceManager ): AgentMessage[] { const messages: AgentMessage[] = []; // System prompt with identity messages.push({ role: 'system', content: this.buildSystemPrompt(session, workspace) }); // Task description messages.push({ role: 'user', content: `## Task\n${task.description}\n\n## Context\nProject: ${session.projectId}\nRole: ${session.role}\n\n## Instructions\nComplete this task and provide your output.` }); // Add any previous context from memory const previousOutput = workspace.recall('previous.output'); if (previousOutput) { messages.push({ role: 'user', content: `## Previous Work\n${JSON.stringify(previousOutput, null, 2)}` }); } return messages; } /** * Build system prompt for agent */ private buildSystemPrompt(session: AgentSession, workspace: WorkspaceManager): string { const identity = session.identity; const role = session.role; const roleInstructions: Record = { programmer: `You are responsible for writing clean, efficient, and well-documented code. - Follow best practices and coding standards - Write tests for your code - Ensure code is production-ready`, reviewer: `You are responsible for reviewing code for quality, bugs, and improvements. - Check for security vulnerabilities - Verify coding standards - Suggest improvements - Approve or request changes`, tester: `You are responsible for testing the code thoroughly. - Write comprehensive test cases - Test edge cases and error handling - Verify functionality meets requirements - Report test results clearly`, planner: `You are responsible for planning and architecture. - Break down complex tasks - Design system architecture - Identify dependencies - Create implementation plans`, analyst: `You are responsible for analysis and reporting. - Analyze data and metrics - Identify patterns and insights - Create reports and recommendations`, custom: `You are a custom agent with specific instructions.` }; return `# Agent Identity Name: ${identity.name} Role: ${role} Description: ${identity.description} # Personality ${identity.personality || 'Professional and efficient.'} # Role Instructions ${roleInstructions[role] || roleInstructions.custom} # Workspace Your workspace is at: ${workspace.getPath()} # Available Tools ${session.tools.map(t => `- ${t}`).join('\n')} # Constraints - Stay within your role boundaries - Communicate clearly and concisely - Report progress and issues promptly`; } /** * Get permissions for a role */ private getPermissionsForRole(role: AgentRole): string[] { const permissionMap: Record = { programmer: ['read', 'write', 'execute', 'git'], reviewer: ['read', 'diff'], tester: ['read', 'execute', 'test'], planner: ['read', 'write'], analyst: ['read'], custom: ['read'] }; return permissionMap[role] || ['read']; } // ========================================================================= // Public API // ========================================================================= /** * Create and start a pipeline */ async createPipeline(config: PipelineConfig): Promise { await this.initialize(); const pipelineId = `pipeline-${randomUUID().substring(0, 8)}`; const result: PipelineResult = { pipelineId, status: 'running', startTime: new Date(), projects: config.projects.map(p => ({ projectId: p.id, status: 'pending', tasks: [] })) }; this.pipelines.set(pipelineId, result); // Create tasks for all projects and roles const tasks: PipelineTask[] = []; for (const project of config.projects) { for (const taskConfig of project.tasks) { const task = this.executor.submitTask({ projectId: project.id, role: taskConfig.role, type: taskConfig.type || 'agent-task', description: taskConfig.description, priority: taskConfig.priority || 'medium', input: { project, task: taskConfig }, dependencies: taskConfig.dependencies || [], timeout: taskConfig.timeout || config.timeout || 300000, maxRetries: 3 }); tasks.push(task); } } // Emit pipeline started event this.eventBus.publish({ type: PipelineEventTypes.PIPELINE_STARTED, source: 'orchestrator', payload: { pipelineId, config, taskCount: tasks.length } }); return pipelineId; } /** * Create pipeline from YAML workflow */ async createPipelineFromYAML(workflowId: string, context?: Record): Promise { await this.initialize(); const workflow = this.workflowRegistry.get(workflowId); if (!workflow) { throw new Error(`Workflow ${workflowId} not found`); } const definition = this.workflowRegistry.getParsed(workflowId)!; // Create state machine instance const sm = this.smRegistry.createInstance(workflowId); // Update context if provided if (context) { sm.updateContext(context); } // Start the state machine sm.start(); // Listen for state transitions sm.on('transition', ({ from, to, event }) => { this.eventBus.publish({ type: PipelineEventTypes.TRANSITION, source: sm.getInstance().id, payload: { workflowId, from, to, event } }); }); // Listen for actions sm.on('action', async ({ state, context }) => { if (state.agent || state.metadata?.role) { // Submit task to executor this.executor.submitTask({ projectId: context.projectId as string || 'default', role: state.metadata?.role as AgentRole || 'programmer', type: 'agent-task', description: `Execute ${state.name}`, priority: 'high', input: { state, context }, dependencies: [], timeout: state.timeout || 300000, maxRetries: state.retry?.maxAttempts || 3 }); } }); return sm.getInstance().id; } /** * Register a custom workflow */ registerWorkflow(yaml: YAMLWorkflow): StateMachineDefinition { return this.workflowRegistry.register(yaml); } /** * Get pipeline status */ getPipelineStatus(pipelineId: string): PipelineResult | undefined { return this.pipelines.get(pipelineId); } /** * Cancel a pipeline */ async cancelPipeline(pipelineId: string): Promise { const pipeline = this.pipelines.get(pipelineId); if (pipeline) { pipeline.status = 'cancelled'; pipeline.endTime = new Date(); this.eventBus.publish({ type: PipelineEventTypes.PIPELINE_COMPLETED, source: 'orchestrator', payload: { pipelineId, status: 'cancelled' } }); } } /** * Get system statistics */ getStats(): { pipelines: number; executor: ReturnType; eventBus: ReturnType; workspaces: ReturnType; } { return { pipelines: this.pipelines.size, executor: this.executor.getStats(), eventBus: this.eventBus.getStats(), workspaces: this.workspaceFactory.getStats() }; } /** * Subscribe to pipeline events */ onEvent(eventType: string, handler: (event: PipelineEvent) => void): () => void { return this.eventBus.subscribe({ eventType, handler }); } /** * Shutdown the pipeline system */ async shutdown(): Promise { await this.executor.stop(); this.eventBus.stop(); this.initialized = false; } } // ============================================================================ // Quick Start Functions // ============================================================================ /** * Create a simple code pipeline */ export async function createCodePipeline(projects: ProjectConfig[]): Promise { const orchestrator = new PipelineOrchestrator(); return orchestrator.createPipeline({ name: 'Code Pipeline', projects, roles: ['programmer', 'reviewer', 'tester'], maxConcurrency: 12, // 4 projects × 3 roles timeout: 300000 }); } /** * Create a parallel execution pipeline */ export async function createParallelPipeline(config: PipelineConfig): Promise { const orchestrator = new PipelineOrchestrator(); return orchestrator.createPipeline(config); } /** * Run a predefined workflow */ export async function runWorkflow( workflowId: string, context?: Record ): Promise { const orchestrator = new PipelineOrchestrator(); return orchestrator.createPipelineFromYAML(workflowId, context); } // Default orchestrator instance export const defaultOrchestrator = new PipelineOrchestrator();