Files
admin c629646b9f Complete Agent Pipeline System with Claude Code & OpenClaw Integration
- Added Claude Code integration with full context compaction support
- Added OpenClaw integration with deterministic pipeline support
- Implemented parallel agent execution (4 projects x 3 roles pattern)
- Added workspace isolation with permissions and quotas
- Implemented Lobster-compatible YAML workflow parser
- Added persistent memory store for cross-session context
- Created comprehensive README with hero section

This project was 100% autonomously built by Z.AI GLM-5
2026-03-03 13:12:14 +00:00

654 lines
16 KiB
TypeScript

/**
* Deterministic State Machine Core
*
* A state machine that controls agent flow WITHOUT LLM decision-making.
* States, transitions, and events are defined declaratively.
*
* Key principle: The LLM does creative work, the state machine handles the plumbing.
*/
import { randomUUID } from 'crypto';
import { EventEmitter } from 'events';
// ============================================================================
// Types
// ============================================================================
export type StateStatus = 'idle' | 'active' | 'waiting' | 'completed' | 'failed' | 'paused';
export interface State {
id: string;
name: string;
type: 'start' | 'end' | 'action' | 'parallel' | 'choice' | 'wait' | 'loop';
agent?: string; // Agent to invoke in this state
action?: string; // Action to execute
timeout?: number; // Timeout in ms
retry?: RetryConfig; // Retry configuration
onEnter?: Transition[]; // Transitions on entering state
onExit?: Transition[]; // Transitions on exiting state
metadata?: Record<string, unknown>;
}
export interface Transition {
event: string; // Event that triggers this transition
target: string; // Target state ID
condition?: Condition; // Optional condition
guard?: string; // Guard function name
}
export interface Condition {
type: 'equals' | 'contains' | 'exists' | 'custom';
field: string;
value?: unknown;
expression?: string;
}
export interface RetryConfig {
maxAttempts: number;
backoff: 'fixed' | 'exponential' | 'linear';
initialDelay: number;
maxDelay: number;
}
export interface StateMachineDefinition {
id: string;
name: string;
version: string;
description?: string;
initial: string;
states: Record<string, State>;
events?: string[]; // Allowed events
context?: Record<string, unknown>; // Initial context
onError?: ErrorHandling;
}
export interface ErrorHandling {
strategy: 'fail' | 'retry' | 'transition';
targetState?: string;
maxRetries?: number;
}
export interface StateMachineInstance {
id: string;
definition: StateMachineDefinition;
currentState: string;
previousState?: string;
status: StateStatus;
context: Record<string, unknown>;
history: StateTransition[];
createdAt: Date;
updatedAt: Date;
startedAt?: Date;
completedAt?: Date;
error?: string;
}
export interface StateTransition {
from: string;
to: string;
event: string;
timestamp: Date;
context?: Record<string, unknown>;
}
export interface Event {
type: string;
source: string;
target?: string;
payload: unknown;
timestamp: Date;
correlationId?: string;
}
// ============================================================================
// State Machine Engine
// ============================================================================
/**
* DeterministicStateMachine - Core engine for deterministic flow control
*/
export class DeterministicStateMachine extends EventEmitter {
private definition: StateMachineDefinition;
private instance: StateMachineInstance;
private eventQueue: Event[] = [];
private processing = false;
private timeoutId?: ReturnType<typeof setTimeout>;
constructor(definition: StateMachineDefinition, instanceId?: string) {
super();
this.definition = definition;
this.instance = this.createInstance(instanceId);
}
/**
* Create a new state machine instance
*/
private createInstance(instanceId?: string): StateMachineInstance {
return {
id: instanceId || randomUUID(),
definition: this.definition,
currentState: this.definition.initial,
status: 'idle',
context: { ...this.definition.context } || {},
history: [],
createdAt: new Date(),
updatedAt: new Date()
};
}
/**
* Start the state machine
*/
start(): void {
if (this.instance.status !== 'idle') {
throw new Error(`Cannot start state machine in ${this.instance.status} status`);
}
this.instance.status = 'active';
this.instance.startedAt = new Date();
this.emit('started', { instance: this.instance });
// Enter initial state
this.enterState(this.instance.currentState);
}
/**
* Send an event to the state machine
*/
sendEvent(event: Omit<Event, 'timestamp'>): void {
const fullEvent: Event = {
...event,
timestamp: new Date()
};
this.eventQueue.push(fullEvent);
this.emit('eventQueued', { event: fullEvent });
this.processQueue();
}
/**
* Process the event queue
*/
private async processQueue(): Promise<void> {
if (this.processing || this.eventQueue.length === 0) return;
this.processing = true;
try {
while (this.eventQueue.length > 0 && this.instance.status === 'active') {
const event = this.eventQueue.shift()!;
await this.handleEvent(event);
}
} finally {
this.processing = false;
}
}
/**
* Handle a single event
*/
private async handleEvent(event: Event): Promise<void> {
const currentState = this.getCurrentState();
this.emit('eventProcessed', { event, state: currentState });
// Find matching transition
const transition = this.findTransition(currentState, event);
if (!transition) {
this.emit('noTransition', { event, state: currentState });
return;
}
// Check condition if present
if (transition.condition && !this.evaluateCondition(transition.condition)) {
this.emit('conditionFailed', { event, transition });
return;
}
// Execute transition
await this.executeTransition(transition, event);
}
/**
* Find a matching transition for the event
*/
private findTransition(state: State, event: Event): Transition | undefined {
const transitions = state.onExit || [];
return transitions.find(t => {
// Check event type match
if (t.event !== event.type) return false;
// Check target filter if event has specific target
if (event.target && event.target !== this.instance.id) return false;
return true;
});
}
/**
* Evaluate a transition condition
*/
private evaluateCondition(condition: Condition): boolean {
const value = this.getDeepValue(this.instance.context, condition.field);
switch (condition.type) {
case 'equals':
return value === condition.value;
case 'contains':
if (Array.isArray(value)) {
return value.includes(condition.value);
}
return String(value).includes(String(condition.value));
case 'exists':
return value !== undefined && value !== null;
case 'custom':
// Custom conditions would be evaluated by a condition registry
return true;
default:
return false;
}
}
/**
* Execute a state transition
*/
private async executeTransition(transition: Transition, event: Event): Promise<void> {
const fromState = this.instance.currentState;
const toState = transition.target;
// Record transition
const transitionRecord: StateTransition = {
from: fromState,
to: toState,
event: event.type,
timestamp: new Date(),
context: { ...this.instance.context }
};
this.instance.history.push(transitionRecord);
// Exit current state
await this.exitState(fromState);
// Update instance
this.instance.previousState = fromState;
this.instance.currentState = toState;
this.instance.updatedAt = new Date();
// Merge event payload into context
if (event.payload && typeof event.payload === 'object') {
this.instance.context = {
...this.instance.context,
...event.payload as Record<string, unknown>
};
}
this.emit('transition', { from: fromState, to: toState, event });
// Enter new state
await this.enterState(toState);
}
/**
* Enter a state
*/
private async enterState(stateId: string): Promise<void> {
const state = this.definition.states[stateId];
if (!state) {
this.handleError(`State ${stateId} not found`);
return;
}
this.emit('enteringState', { state });
// Handle state types
switch (state.type) {
case 'end':
this.instance.status = 'completed';
this.instance.completedAt = new Date();
this.emit('completed', { instance: this.instance });
break;
case 'action':
// Emit event for external action handler
this.emit('action', {
state,
context: this.instance.context,
instanceId: this.instance.id
});
// Set timeout if specified
if (state.timeout) {
this.setTimeout(state.timeout, stateId);
}
break;
case 'parallel':
this.handleParallelState(state);
break;
case 'choice':
this.handleChoiceState(state);
break;
case 'wait':
// Wait for external event
this.instance.status = 'waiting';
break;
case 'loop':
this.handleLoopState(state);
break;
default:
// Process onEnter transitions
if (state.onEnter) {
for (const transition of state.onEnter) {
// Auto-transitions trigger immediately
if (transition.event === '*') {
await this.executeTransition(transition, {
type: '*',
source: stateId,
payload: {},
timestamp: new Date()
});
break;
}
}
}
}
this.emit('enteredState', { state });
}
/**
* Exit a state
*/
private async exitState(stateId: string): Promise<void> {
const state = this.definition.states[stateId];
// Clear any pending timeout
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = undefined;
}
this.emit('exitingState', { state });
}
/**
* Handle parallel state (fork into concurrent branches)
*/
private handleParallelState(state: State): void {
this.emit('parallel', {
state,
branches: state.onEnter?.map(t => t.target) || [],
context: this.instance.context
});
}
/**
* Handle choice state (conditional branching)
*/
private handleChoiceState(state: State): void {
const transitions = state.onExit || [];
for (const transition of transitions) {
if (transition.condition && this.evaluateCondition(transition.condition)) {
this.sendEvent({
type: transition.event,
source: state.id,
payload: {}
});
return;
}
}
// No condition matched - use default transition
const defaultTransition = transitions.find(t => !t.condition);
if (defaultTransition) {
this.sendEvent({
type: defaultTransition.event,
source: state.id,
payload: {}
});
}
}
/**
* Handle loop state
*/
private handleLoopState(state: State): void {
const loopCount = (this.instance.context._loopCount as Record<string, number>)?.[state.id] || 0;
const maxIterations = (state.metadata?.maxIterations as number) || 3;
if (loopCount < maxIterations) {
// Continue loop
this.instance.context._loopCount = {
...this.instance.context._loopCount as Record<string, number>,
[state.id]: loopCount + 1
};
this.emit('loopIteration', {
state,
iteration: loopCount + 1,
maxIterations
});
// Trigger loop body
const loopTransition = state.onExit?.find(t => t.event === 'continue');
if (loopTransition) {
this.sendEvent({
type: 'continue',
source: state.id,
payload: { iteration: loopCount + 1 }
});
}
} else {
// Exit loop
const exitTransition = state.onExit?.find(t => t.event === 'exit');
if (exitTransition) {
this.sendEvent({
type: 'exit',
source: state.id,
payload: { iterations: loopCount }
});
}
}
}
/**
* Set a timeout for the current state
*/
private setTimeout(duration: number, stateId: string): void {
this.timeoutId = setTimeout(() => {
this.emit('timeout', { stateId });
this.sendEvent({
type: 'timeout',
source: stateId,
payload: { timedOut: true }
});
}, duration);
}
/**
* Handle errors
*/
private handleError(error: string): void {
this.instance.error = error;
this.instance.status = 'failed';
this.instance.completedAt = new Date();
this.emit('error', { error, instance: this.instance });
}
/**
* Get current state definition
*/
getCurrentState(): State {
return this.definition.states[this.instance.currentState];
}
/**
* Get instance info
*/
getInstance(): StateMachineInstance {
return { ...this.instance };
}
/**
* Update context
*/
updateContext(updates: Record<string, unknown>): void {
this.instance.context = { ...this.instance.context, ...updates };
this.instance.updatedAt = new Date();
}
/**
* Pause the state machine
*/
pause(): void {
if (this.instance.status === 'active') {
this.instance.status = 'paused';
this.emit('paused', { instance: this.instance });
}
}
/**
* Resume the state machine
*/
resume(): void {
if (this.instance.status === 'paused') {
this.instance.status = 'active';
this.emit('resumed', { instance: this.instance });
this.processQueue();
}
}
/**
* Cancel the state machine
*/
cancel(): void {
this.instance.status = 'failed';
this.instance.error = 'Cancelled';
this.instance.completedAt = new Date();
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
this.eventQueue = [];
this.emit('cancelled', { instance: this.instance });
}
/**
* Get deep value from object by dot-notation path
*/
private getDeepValue(obj: Record<string, unknown>, path: string): unknown {
return path.split('.').reduce<unknown>((acc, key) => {
if (acc && typeof acc === 'object' && key in acc) {
return (acc as Record<string, unknown>)[key];
}
return undefined;
}, obj);
}
}
// ============================================================================
// State Machine Registry
// ============================================================================
/**
* StateMachineRegistry - Manages multiple state machine instances
*/
export class StateMachineRegistry {
private definitions: Map<string, StateMachineDefinition> = new Map();
private instances: Map<string, DeterministicStateMachine> = new Map();
/**
* Register a state machine definition
*/
register(definition: StateMachineDefinition): void {
this.definitions.set(definition.id, definition);
}
/**
* Create a new instance of a state machine
*/
createInstance(definitionId: string, instanceId?: string): DeterministicStateMachine {
const definition = this.definitions.get(definitionId);
if (!definition) {
throw new Error(`State machine definition ${definitionId} not found`);
}
const sm = new DeterministicStateMachine(definition, instanceId);
this.instances.set(sm.getInstance().id, sm);
return sm;
}
/**
* Get an instance by ID
*/
getInstance(instanceId: string): DeterministicStateMachine | undefined {
return this.instances.get(instanceId);
}
/**
* Get all instances
*/
getAllInstances(): DeterministicStateMachine[] {
return Array.from(this.instances.values());
}
/**
* Get instances by status
*/
getInstancesByStatus(status: StateStatus): DeterministicStateMachine[] {
return this.getAllInstances().filter(sm => sm.getInstance().status === status);
}
/**
* Remove an instance
*/
removeInstance(instanceId: string): boolean {
const sm = this.instances.get(instanceId);
if (sm) {
sm.cancel();
return this.instances.delete(instanceId);
}
return false;
}
/**
* Get statistics
*/
getStats(): {
definitions: number;
instances: number;
byStatus: Record<StateStatus, number>;
} {
const byStatus: Record<StateStatus, number> = {
idle: 0,
active: 0,
waiting: 0,
completed: 0,
failed: 0,
paused: 0
};
for (const sm of this.instances.values()) {
byStatus[sm.getInstance().status]++;
}
return {
definitions: this.definitions.size,
instances: this.instances.size,
byStatus
};
}
}
// Singleton registry
export const stateMachineRegistry = new StateMachineRegistry();