Files
admin c629646b9f Complete Agent Pipeline System with Claude Code & OpenClaw Integration
- Added Claude Code integration with full context compaction support
- Added OpenClaw integration with deterministic pipeline support
- Implemented parallel agent execution (4 projects x 3 roles pattern)
- Added workspace isolation with permissions and quotas
- Implemented Lobster-compatible YAML workflow parser
- Added persistent memory store for cross-session context
- Created comprehensive README with hero section

This project was 100% autonomously built by Z.AI GLM-5
2026-03-03 13:12:14 +00:00

600 lines
16 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Claude Code Integration Layer
*
* Provides easy integration with Claude Code and OpenClaw.
* Single API surface for all pipeline operations.
*/
import { randomUUID } from 'crypto';
import ZAI from 'z-ai-web-dev-sdk';
import {
DeterministicStateMachine,
StateMachineDefinition,
StateMachineRegistry,
stateMachineRegistry
} from '../core/state-machine';
import {
ParallelExecutionEngine,
PipelineTask,
AgentRole,
AgentSession,
defaultExecutor
} from '../engine/parallel-executor';
import {
EventBus,
PipelineEvent,
PipelineEventTypes,
defaultEventBus
} from '../events/event-bus';
import {
WorkspaceManager,
WorkspaceFactory,
AgentIdentity,
defaultWorkspaceFactory
} from '../workspace/agent-workspace';
import {
WorkflowRegistry,
YAMLWorkflow,
defaultWorkflowRegistry
} from '../workflows/yaml-workflow';
// ============================================================================
// Types
// ============================================================================
export interface PipelineConfig {
name: string;
projects: ProjectConfig[];
roles: AgentRole[];
maxConcurrency?: number;
timeout?: number;
}
export interface ProjectConfig {
id: string;
name: string;
description?: string;
repository?: string;
branch?: string;
tasks: TaskConfig[];
}
export interface TaskConfig {
type: string;
description: string;
role: AgentRole;
priority?: 'low' | 'medium' | 'high' | 'critical';
dependencies?: string[];
timeout?: number;
}
export interface PipelineResult {
pipelineId: string;
status: 'running' | 'completed' | 'failed' | 'cancelled';
startTime: Date;
endTime?: Date;
projects: ProjectResult[];
}
export interface ProjectResult {
projectId: string;
status: 'pending' | 'running' | 'completed' | 'failed';
tasks: TaskResult[];
}
export interface TaskResult {
taskId: string;
status: 'pending' | 'running' | 'completed' | 'failed';
output?: unknown;
error?: string;
duration?: number;
}
export interface AgentMessage {
role: 'system' | 'user' | 'assistant';
content: string;
}
// ============================================================================
// Pipeline Orchestrator
// ============================================================================
/**
* PipelineOrchestrator - Main integration class
*
* Single entry point for Claude Code and OpenClaw integration.
*/
export class PipelineOrchestrator {
private zai: Awaited<ReturnType<typeof ZAI.create>> | null = null;
private executor: ParallelExecutionEngine;
private eventBus: EventBus;
private workflowRegistry: WorkflowRegistry;
private workspaceFactory: WorkspaceFactory;
private smRegistry: StateMachineRegistry;
private pipelines: Map<string, PipelineResult> = new Map();
private initialized = false;
constructor(config?: {
executor?: ParallelExecutionEngine;
eventBus?: EventBus;
workflowRegistry?: WorkflowRegistry;
workspaceFactory?: WorkspaceFactory;
}) {
this.executor = config?.executor || defaultExecutor;
this.eventBus = config?.eventBus || defaultEventBus;
this.workflowRegistry = config?.workflowRegistry || defaultWorkflowRegistry;
this.workspaceFactory = config?.workspaceFactory || defaultWorkspaceFactory;
this.smRegistry = stateMachineRegistry;
}
/**
* Initialize the pipeline system
*/
async initialize(): Promise<void> {
if (this.initialized) return;
// Initialize ZAI SDK
this.zai = await ZAI.create();
// Start executor
this.executor.start();
// Start event bus
this.eventBus.start();
// Register task handler
this.executor.registerHandler('agent-task', this.executeAgentTask.bind(this));
// Set up event subscriptions
this.setupEventSubscriptions();
this.initialized = true;
}
/**
* Set up event subscriptions for coordination
*/
private setupEventSubscriptions(): void {
// Agent completion triggers next step
this.eventBus.subscribe({
eventType: PipelineEventTypes.AGENT_COMPLETED,
handler: async (event) => {
const { projectId, role, output } = event.payload as Record<string, unknown>;
// Determine next role in pipeline
const nextRole = this.getNextRole(role as AgentRole);
if (nextRole) {
// Emit event to trigger next agent
this.eventBus.publish({
type: PipelineEventTypes.TASK_STARTED,
source: 'orchestrator',
payload: { projectId, role: nextRole, previousOutput: output }
});
}
}
});
// Handle failures
this.eventBus.subscribe({
eventType: PipelineEventTypes.AGENT_FAILED,
handler: async (event) => {
const { projectId, error } = event.payload as Record<string, unknown>;
console.error(`Agent failed for project ${projectId}:`, error);
// Emit pipeline failure event
this.eventBus.publish({
type: PipelineEventTypes.PIPELINE_COMPLETED,
source: 'orchestrator',
payload: { projectId, status: 'failed', error }
});
}
});
}
/**
* Get next role in the pipeline sequence
*/
private getNextRole(currentRole: AgentRole): AgentRole | null {
const sequence: AgentRole[] = ['programmer', 'reviewer', 'tester'];
const currentIndex = sequence.indexOf(currentRole);
if (currentIndex < sequence.length - 1) {
return sequence[currentIndex + 1];
}
return null; // End of pipeline
}
/**
* Execute an agent task
*/
private async executeAgentTask(
task: PipelineTask,
session: AgentSession
): Promise<unknown> {
if (!this.zai) {
throw new Error('Pipeline not initialized');
}
// Create workspace for this task
const workspace = this.workspaceFactory.createWorkspace({
projectId: session.projectId,
agentId: session.id,
role: session.role,
permissions: this.getPermissionsForRole(session.role)
});
// Set agent identity
workspace.setIdentity(session.identity);
// Build messages for LLM
const messages = this.buildMessages(task, session, workspace);
try {
// Call LLM
const response = await this.zai.chat.completions.create({
messages,
thinking: { type: 'disabled' }
});
const output = response.choices?.[0]?.message?.content || '';
// Save output to workspace
workspace.writeFile(`output/${task.id}.txt`, output);
// Store in memory for next agent
workspace.memorize(`task.${task.id}.output`, output);
// Emit completion event
this.eventBus.publish({
type: PipelineEventTypes.AGENT_COMPLETED,
source: session.id,
payload: {
taskId: task.id,
projectId: session.projectId,
role: session.role,
output
}
});
return { output, workspace: workspace.getPath() };
} catch (error) {
// Emit failure event
this.eventBus.publish({
type: PipelineEventTypes.AGENT_FAILED,
source: session.id,
payload: {
taskId: task.id,
projectId: session.projectId,
role: session.role,
error: error instanceof Error ? error.message : String(error)
}
});
throw error;
}
}
/**
* Build messages for LLM
*/
private buildMessages(
task: PipelineTask,
session: AgentSession,
workspace: WorkspaceManager
): AgentMessage[] {
const messages: AgentMessage[] = [];
// System prompt with identity
messages.push({
role: 'system',
content: this.buildSystemPrompt(session, workspace)
});
// Task description
messages.push({
role: 'user',
content: `## Task\n${task.description}\n\n## Context\nProject: ${session.projectId}\nRole: ${session.role}\n\n## Instructions\nComplete this task and provide your output.`
});
// Add any previous context from memory
const previousOutput = workspace.recall('previous.output');
if (previousOutput) {
messages.push({
role: 'user',
content: `## Previous Work\n${JSON.stringify(previousOutput, null, 2)}`
});
}
return messages;
}
/**
* Build system prompt for agent
*/
private buildSystemPrompt(session: AgentSession, workspace: WorkspaceManager): string {
const identity = session.identity;
const role = session.role;
const roleInstructions: Record<AgentRole, string> = {
programmer: `You are responsible for writing clean, efficient, and well-documented code.
- Follow best practices and coding standards
- Write tests for your code
- Ensure code is production-ready`,
reviewer: `You are responsible for reviewing code for quality, bugs, and improvements.
- Check for security vulnerabilities
- Verify coding standards
- Suggest improvements
- Approve or request changes`,
tester: `You are responsible for testing the code thoroughly.
- Write comprehensive test cases
- Test edge cases and error handling
- Verify functionality meets requirements
- Report test results clearly`,
planner: `You are responsible for planning and architecture.
- Break down complex tasks
- Design system architecture
- Identify dependencies
- Create implementation plans`,
analyst: `You are responsible for analysis and reporting.
- Analyze data and metrics
- Identify patterns and insights
- Create reports and recommendations`,
custom: `You are a custom agent with specific instructions.`
};
return `# Agent Identity
Name: ${identity.name}
Role: ${role}
Description: ${identity.description}
# Personality
${identity.personality || 'Professional and efficient.'}
# Role Instructions
${roleInstructions[role] || roleInstructions.custom}
# Workspace
Your workspace is at: ${workspace.getPath()}
# Available Tools
${session.tools.map(t => `- ${t}`).join('\n')}
# Constraints
- Stay within your role boundaries
- Communicate clearly and concisely
- Report progress and issues promptly`;
}
/**
* Get permissions for a role
*/
private getPermissionsForRole(role: AgentRole): string[] {
const permissionMap: Record<AgentRole, string[]> = {
programmer: ['read', 'write', 'execute', 'git'],
reviewer: ['read', 'diff'],
tester: ['read', 'execute', 'test'],
planner: ['read', 'write'],
analyst: ['read'],
custom: ['read']
};
return permissionMap[role] || ['read'];
}
// =========================================================================
// Public API
// =========================================================================
/**
* Create and start a pipeline
*/
async createPipeline(config: PipelineConfig): Promise<string> {
await this.initialize();
const pipelineId = `pipeline-${randomUUID().substring(0, 8)}`;
const result: PipelineResult = {
pipelineId,
status: 'running',
startTime: new Date(),
projects: config.projects.map(p => ({
projectId: p.id,
status: 'pending',
tasks: []
}))
};
this.pipelines.set(pipelineId, result);
// Create tasks for all projects and roles
const tasks: PipelineTask[] = [];
for (const project of config.projects) {
for (const taskConfig of project.tasks) {
const task = this.executor.submitTask({
projectId: project.id,
role: taskConfig.role,
type: taskConfig.type || 'agent-task',
description: taskConfig.description,
priority: taskConfig.priority || 'medium',
input: { project, task: taskConfig },
dependencies: taskConfig.dependencies || [],
timeout: taskConfig.timeout || config.timeout || 300000,
maxRetries: 3
});
tasks.push(task);
}
}
// Emit pipeline started event
this.eventBus.publish({
type: PipelineEventTypes.PIPELINE_STARTED,
source: 'orchestrator',
payload: { pipelineId, config, taskCount: tasks.length }
});
return pipelineId;
}
/**
* Create pipeline from YAML workflow
*/
async createPipelineFromYAML(workflowId: string, context?: Record<string, unknown>): Promise<string> {
await this.initialize();
const workflow = this.workflowRegistry.get(workflowId);
if (!workflow) {
throw new Error(`Workflow ${workflowId} not found`);
}
const definition = this.workflowRegistry.getParsed(workflowId)!;
// Create state machine instance
const sm = this.smRegistry.createInstance(workflowId);
// Update context if provided
if (context) {
sm.updateContext(context);
}
// Start the state machine
sm.start();
// Listen for state transitions
sm.on('transition', ({ from, to, event }) => {
this.eventBus.publish({
type: PipelineEventTypes.TRANSITION,
source: sm.getInstance().id,
payload: { workflowId, from, to, event }
});
});
// Listen for actions
sm.on('action', async ({ state, context }) => {
if (state.agent || state.metadata?.role) {
// Submit task to executor
this.executor.submitTask({
projectId: context.projectId as string || 'default',
role: state.metadata?.role as AgentRole || 'programmer',
type: 'agent-task',
description: `Execute ${state.name}`,
priority: 'high',
input: { state, context },
dependencies: [],
timeout: state.timeout || 300000,
maxRetries: state.retry?.maxAttempts || 3
});
}
});
return sm.getInstance().id;
}
/**
* Register a custom workflow
*/
registerWorkflow(yaml: YAMLWorkflow): StateMachineDefinition {
return this.workflowRegistry.register(yaml);
}
/**
* Get pipeline status
*/
getPipelineStatus(pipelineId: string): PipelineResult | undefined {
return this.pipelines.get(pipelineId);
}
/**
* Cancel a pipeline
*/
async cancelPipeline(pipelineId: string): Promise<void> {
const pipeline = this.pipelines.get(pipelineId);
if (pipeline) {
pipeline.status = 'cancelled';
pipeline.endTime = new Date();
this.eventBus.publish({
type: PipelineEventTypes.PIPELINE_COMPLETED,
source: 'orchestrator',
payload: { pipelineId, status: 'cancelled' }
});
}
}
/**
* Get system statistics
*/
getStats(): {
pipelines: number;
executor: ReturnType<ParallelExecutionEngine['getStats']>;
eventBus: ReturnType<EventBus['getStats']>;
workspaces: ReturnType<WorkspaceFactory['getStats']>;
} {
return {
pipelines: this.pipelines.size,
executor: this.executor.getStats(),
eventBus: this.eventBus.getStats(),
workspaces: this.workspaceFactory.getStats()
};
}
/**
* Subscribe to pipeline events
*/
onEvent(eventType: string, handler: (event: PipelineEvent) => void): () => void {
return this.eventBus.subscribe({ eventType, handler });
}
/**
* Shutdown the pipeline system
*/
async shutdown(): Promise<void> {
await this.executor.stop();
this.eventBus.stop();
this.initialized = false;
}
}
// ============================================================================
// Quick Start Functions
// ============================================================================
/**
* Create a simple code pipeline
*/
export async function createCodePipeline(projects: ProjectConfig[]): Promise<string> {
const orchestrator = new PipelineOrchestrator();
return orchestrator.createPipeline({
name: 'Code Pipeline',
projects,
roles: ['programmer', 'reviewer', 'tester'],
maxConcurrency: 12, // 4 projects × 3 roles
timeout: 300000
});
}
/**
* Create a parallel execution pipeline
*/
export async function createParallelPipeline(config: PipelineConfig): Promise<string> {
const orchestrator = new PipelineOrchestrator();
return orchestrator.createPipeline(config);
}
/**
* Run a predefined workflow
*/
export async function runWorkflow(
workflowId: string,
context?: Record<string, unknown>
): Promise<string> {
const orchestrator = new PipelineOrchestrator();
return orchestrator.createPipelineFromYAML(workflowId, context);
}
// Default orchestrator instance
export const defaultOrchestrator = new PipelineOrchestrator();