<think>我看到这是一个工作流引擎库，名为FlowLab。从代码结构看，这是一个用TypeScript实现的工作流引擎，支持各种工作流操作如任务执行、条件分支、并行执行等。我需要理解并总结这个库的关键组件、类和方法，以便创建一个全面的文档，便于AI agent理解和使用。

让我先分析库的主要结构：

<!--MARK: Core
 -->

1. 核心模块 (`core/`)：
   - `context.ts` - 上下文相关方法
   - `executor.ts` - 工作流执行器
   - `node.ts` - 节点定义
   - `node-register.ts` - 节点注册
   - `step.ts` - 步骤定义
   - `workflow.ts` - 工作流定义

2. 错误处理 (`errors/`)：
   - `error.ts` - 自定义错误类

3. 事件管理 (`event/`)：
   - `event-manager.ts` - 事件管理

4. 监控和日志 (`monitoring/`)：
   - `logger.ts` - 日志记录

5. 任务调度 (`scheduler/`)：
   - `scheduler.ts` - 调度器接口
   - `distributed.ts` - 分布式调度

6. 类型定义 (`types/`)：
   - `index.ts` - 核心类型定义

接下来，我需要详细了解每个关键组件的功能和API，以便生成全面的文档。我将从类型定义开始，因为这有助于理解整个系统的数据结构，然后再研究各个核心组件的实现细节。</think># FlowLab 工作流引擎文档

## 概述

FlowLab 是一个功能强大的工作流引擎，用于定义和执行复杂的工作流程序。它支持任务节点、条件分支、并行执行、子工作流、事件触发等多种工作流组件，并提供了完整的错误处理、补偿机制和执行上下文管理。

## 核心概念

### 状态与类型
<!--MARK: 状态与类型
 -->
```typescript
// 节点状态
export const NodeStatus = {
  PENDING: 'PENDING',      // 等待执行
  RUNNING: 'RUNNING',      // 当前正在执行
  COMPLETED: 'COMPLETED',  // 成功完成
  FAILED: 'FAILED',        // 执行失败
  TIMEOUT: 'TIMEOUT',      // 执行超时
  SKIPPED: 'SKIPPED',      // 步骤被跳过
  CANCELLED: 'CANCELLED',  // 执行被取消
  COMPENSATING: 'COMPENSATING', // 补偿进行中
  COMPENSATED: 'COMPENSATED',   // 补偿完成
};

// 步骤类型
export const StepType = {
  TASK: 'TASK',              // 常规任务节点
  CONDITION: 'CONDITION',    // 条件分支
  PARALLEL: 'PARALLEL',      // 并行执行
  SUB_WORKFLOW: 'SUB_WORKFLOW', // 子工作流
  EVENT_TRIGGER: 'EVENT_TRIGGER', // 事件触发
  EVENT_LISTENER: 'EVENT_LISTENER', // 事件监听
};
```

## 核心组件

### 1. 工作流定义 (WorkflowDefinition)
<!-- MARK: 工作流定义
 -->
工作流定义是构建工作流的基础，用于定义工作流的结构和步骤。

```typescript
class WorkflowDefinition {
  constructor(id: string, name?: string, description?: string);
  
  // 添加任务节点
  addStep(config: Omit<IStepConfig, 'type'> & { nodeId: string }): this;
  
  // 添加条件分支节点
  addConditionStep(config: Omit<IStepConfig, 'type'> & { 
    condition: (context: IWorkflowContext) => boolean | string; 
    branches: { [key: string]: string } 
  }): this;
  
  // 添加并行节点
  addParallelStep(config: Omit<IStepConfig, 'type'> & { parallelSteps: IStepConfig[] }): this;
  
  // 添加子工作流节点
  addSubWorkflowStep(config: Omit<IStepConfig, 'type'> & { subWorkflowId: string }): this;
  
  // 添加事件触发节点
  addEventTriggerStep(config: Omit<IStepConfig, 'type'> & { event: string }): this;
  
  // 添加事件监听节点
  addEventListenerStep(config: Omit<IStepConfig, 'type'> & { event: string }): this;
  
  // 设置工作流起始步骤
  setStartStep(stepId: string): this;
  
  // 获取起始步骤ID
  getStartStepId(): string;
  
  // 获取指定步骤
  getStep(stepId: string): Step | undefined;
  
  // 获取所有步骤
  getSteps(): Map<string, Step>;
  
  // 验证工作流定义
  validate(): boolean;
}
```

#### 使用示例
```typescript
const workflow = new WorkflowDefinition('my-workflow', '我的工作流', '这是一个示例工作流');

workflow.addStep({
  id: 'task1',
  nodeId: 'LogNode',
  input: { message: '执行任务1' }
})
.addConditionStep({
  id: 'condition1',
  condition: (context) => context.variables.value > 10,
  branches: {
    'true': 'task2',
    'false': 'task3'
  }
})
.setStartStep('task1');
```

### 2. 步骤 (Step)

步骤表示工作流中的一个具体操作单元。
<!-- MARK: Step
 -->
```typescript
class Step {
  constructor(config: IStepConfig);
  
  // 步骤ID
  get id(): string;
  
  // 步骤类型
  get type(): StepType;
  
  // 下一个步骤ID
  get nextStepId(): string | undefined;
  
  // 评估条件 (CONDITION类型专用)
  evaluateCondition(context: IWorkflowContext): boolean | string;
  
  // 获取条件分支目标 (CONDITION类型专用)
  getBranchTarget(branchKey: string): string;
  
  // 获取并行步骤配置 (PARALLEL类型专用)
  getParallelSteps(): IStepConfig[];
  
  // 获取子工作流ID (SUB_WORKFLOW类型专用)
  getSubWorkflowId(): string;
  
  // 获取传递给子工作流的输入 (SUB_WORKFLOW类型专用)
  getSubWorkflowInput(): Record<string, any> | undefined;
  
  // 获取节点ID (TASK类型专用)
  getNodeId(): string | undefined;
  
  // 获取事件名称 (EVENT_*类型专用)
  getEventName(): string;
  
  // 获取输入配置
  getInputConfig(): Record<string, any> | undefined;
  
  // 获取输出映射
  getOutputMapping(): Record<string, string> | undefined;
  
  // 获取超时时间
  getTimeout(): number | undefined;
  
  // 获取最大重试次数
  getMaxRetries(): number;
  
  // 获取重试延迟
  getRetryDelay(): number;
  
  // 获取失败补偿设置
  getCompensateOnFailure(): boolean | string;
  
  // 获取所需角色
  getRequiredRoles(): string[] | undefined;
  
  // 获取SLA配置
  getSla(): number | undefined;
}
```

### 3. 节点 (Node)
<!-- MARK: Node
 -->
节点是实际执行业务逻辑的组件，所有自定义节点都应继承自BaseNode。

```typescript
abstract class BaseNode implements INode {
  abstract id: string; // 节点类型ID
  
  // 执行核心逻辑
  abstract execute(context: INodeContext): Promise<INodeOutput>;
  
  // 补偿逻辑 (可选)
  async compensate(context: INodeContext): Promise<void>;
  
  // 输入验证逻辑 (可选)
  validateInput(input: Record<string, any>): boolean;
  
  // 执行此节点所需的角色列表 (可选)
  requiredRoles?: string[];
}
```

#### 自定义节点示例
```typescript
class LogNode extends BaseNode {
  id = 'LogNode';
  
  async execute(context: INodeContext): Promise<INodeOutput> {
    const message = context.input.message || '无消息';
    console.log(`[LogNode] ${message}`);
    return { 
      status: NodeStatus.COMPLETED, 
      output: { logged: true, timestamp: new Date() } 
    };
  }
}
```

### 4. 节点注册表 (NodeRegistry)
<!-- MARK: 节点注册
 -->
用于管理所有可用的自定义任务节点。

```typescript
class NodeRegistry {
  // 注册节点
  register(node: INode): void;
  
  // 获取节点
  get(nodeId: string): INode | undefined;
  
  // 检查节点是否存在
  has(nodeId: string): boolean;
  
  // 获取所有已注册节点的ID列表
  listNodeIds(): string[];
}
```

#### 使用示例
```typescript
const registry = new NodeRegistry();
registry.register(new LogNode());
registry.register(new HttpRequestNode());
```

### 5. 上下文 (Context)
<!-- MARK: 上下文
 -->
工作流和节点执行时的上下文对象，包含状态、变量和历史记录等信息。

```typescript
// 创建工作流上下文
function createWorkflowContext(
  definitionId: string,
  input: Record<string, any>,
  tenantId?: string,
  userId?: string,
  userRole?: string
): IWorkflowContext;

// 创建节点上下文
function createNodeContext(
  workflowContext: IWorkflowContext,
  stepId: string,
  nodeId: string | undefined,
  input: Record<string, any>
): INodeContext;
```

#### 上下文接口
```typescript
interface IWorkflowContext {
  workflowId: string;
  workflowDefinitionId: string;
  input: Record<string, any>;
  output?: Record<string, any>;
  status: NodeStatus;
  variables: Record<string, any>;
  startTime?: Date;
  endTime?: Date;
  tenantId?: string;
  userId?: string;
  userRole?: string;
  history: IExecutionRecord[];
  [key: string]: any;
}

interface INodeContext {
  nodeId: string;
  stepId: string;
  input: Record<string, any>;
  output?: Record<string, any>;
  status: NodeStatus;
  startTime?: Date;
  endTime?: Date;
  retries: number;
  error?: Error;
  logs: string[];
  workflowContext: IWorkflowContext;
  [key: string]: any;
}
```

### 6. 工作流执行器 (WorkflowExecutor)
<!-- MARK: 工作流执行器
 -->
负责执行工作流实例，处理任务执行、错误处理和补偿等逻辑。

```typescript
class WorkflowExecutor {
  constructor(
    workflowDefinition: WorkflowDefinition,
    context: IWorkflowContext,
    options: IWorkflowExecutorOptions
  );
  
  // 执行工作流
  async run(): Promise<IWorkflowContext>;
}

interface IWorkflowExecutorOptions {
  nodeRegistry: NodeRegistry;
  maxLoopIterations?: number; // 防止无限循环的最大迭代次数 (默认 1000)
}
```

#### 使用示例
```typescript
const definition = new WorkflowDefinition('my-workflow');
// 添加步骤...

const context = createWorkflowContext(definition.id, { userId: '123' });
const executor = new WorkflowExecutor(definition, context, { nodeRegistry });

// 执行工作流
const result = await executor.run();
console.log(`工作流执行结果: ${result.status}`);
```

### 7. 事件管理器 (EventManager)
<!-- MARK: 时间管理器
 -->
用于触发和监听事件，支持事件驱动的工作流执行。

```typescript
interface IEventManager {
  // 触发事件
  emit(eventName: string, data?: Record<string, any>, triggerContext?: IWorkflowContext): Promise<void>;
  
  // 绑定工作流到事件
  bindWorkflowToEvent(eventName: string, workflowDefinitionId: string, options?: any): void;
  
  // 添加事件监听器
  on(eventName: string, listener: (payload: IEventPayload) => void): void;
  
  // 移除事件监听器
  off(eventName: string, listener: (payload: IEventPayload) => void): void;
}

// 内存事件管理器实现
class InMemoryEventManager implements IEventManager {
  // 实现方法...
}
```

### 8. 调度器 (Scheduler)
<!-- MARK: 调度器
 -->
用于将工作流或任务的执行推迟或分发到不同的执行环境。

```typescript
interface IScheduler {
  // 调度工作流
  scheduleWorkflow(definitionId: string, input: Record<string, any>, options?: IScheduleOptions): Promise<string>;
  
  // 调度单个节点
  scheduleNode(nodeId: string, context: Partial<INodeContext>, options?: IScheduleOptions): Promise<string>;
  
  // 取消调度任务
  cancel(taskId: string): Promise<boolean>;
}

// 本地调度器实现
class LocalScheduler implements IScheduler {
  // 实现方法...
}

// 分布式调度器接口
interface IDistributedScheduler extends IScheduler {
  // 额外方法...
}

// 分布式调度器示例实现
class DistributedSchedulerPlaceholder implements IDistributedScheduler {
  // 实现方法...
}
```

### 9. 日志记录器 (Logger)
<!-- MARK: 日志记录器
 -->
用于记录工作流和节点执行的日志信息。

```typescript
interface ILogger {
  debug(message: string, meta?: any): void;
  info(message: string, meta?: any): void;
  warn(message: string, meta?: any): void;
  error(message: string, error?: Error, meta?: any): void;
}

// 控制台日志记录器实现
class ConsoleLogger implements ILogger {
  constructor(contextPrefix: string = '[FlowLab]');
  // 实现方法...
}
```

### 10. 错误处理 (Errors)

自定义错误类，用于标识不同类型的错误。

```typescript
class WorkflowError extends Error {
  constructor(message: string);
}

class NodeExecutionError extends WorkflowError {
  public readonly nodeId?: string;
  public readonly stepId?: string;
  constructor(message: string, nodeId?: string, stepId?: string);
}

class TimeoutError extends NodeExecutionError {
  constructor(message: string, nodeId?: string, stepId?: string);
}

class AuthorizationError extends WorkflowError {
  constructor(message: string);
}

class ConfigurationError extends WorkflowError {
  constructor(message: string);
}
```

## 完整工作流示例

以下是一个完整的工作流创建和执行示例，展示了如何使用FlowLab构建和运行一个典型的业务流程：

```typescript
import { 
  WorkflowDefinition, 
  NodeRegistry, 
  BaseNode, 
  createWorkflowContext, 
  WorkflowExecutor, 
  NodeStatus, 
  StepType, 
  INodeContext, 
  INodeOutput 
} from 'flowlab';

// 1. 创建自定义节点
class LogNode extends BaseNode {
  id = 'LogNode';
  
  async execute(context: INodeContext): Promise<INodeOutput> {
    console.log(`[LogNode] 消息: ${context.input.message}`);
    return { status: NodeStatus.COMPLETED, output: { logged: true } };
  }
}

class DataProcessNode extends BaseNode {
  id = 'DataProcessNode';
  
  async execute(context: INodeContext): Promise<INodeOutput> {
    // 处理数据
    const data = context.input.data;
    const result = data.map(item => item * 2);
    
    return { 
      status: NodeStatus.COMPLETED, 
      output: { result }
    };
  }
  
  // 添加补偿逻辑
  async compensate(context: INodeContext): Promise<void> {
    console.log(`[DataProcessNode] 补偿: 回滚处理的数据`);
    // 执行补偿逻辑...
  }
}

// 2. 注册节点
const nodeRegistry = new NodeRegistry();
nodeRegistry.register(new LogNode());
nodeRegistry.register(new DataProcessNode());

// 3. 创建工作流定义
const workflow = new WorkflowDefinition('data-process-workflow', '数据处理工作流');

workflow.addStep({
  id: 'start',
  nodeId: 'LogNode',
  input: { message: '开始数据处理工作流' },
  nextStepId: 'process'
})
.addStep({
  id: 'process',
  nodeId: 'DataProcessNode',
  input: { 
    data: 'workflow.input.data' // 使用工作流输入
  },
  outputMapping: {
    'workflow.variables.processedData': 'output.result' // 将输出映射到工作流变量
  },
  nextStepId: 'checkResult'
})
.addConditionStep({
  id: 'checkResult',
  condition: (context) => {
    // 检查处理结果
    const data = context.variables.processedData;
    return data && data.length > 0 ? 'hasData' : 'noData';
  },
  branches: {
    'hasData': 'success',
    'noData': 'empty'
  }
})
.addStep({
  id: 'success',
  nodeId: 'LogNode',
  input: { message: '数据处理成功' },
  nextStepId: 'end'
})
.addStep({
  id: 'empty',
  nodeId: 'LogNode',
  input: { message: '处理结果为空' },
  nextStepId: 'end'
})
.addStep({
  id: 'end',
  nodeId: 'LogNode',
  input: { message: '工作流执行完成' }
})
.setStartStep('start');

// 验证工作流
if (!workflow.validate()) {
  console.error('工作流验证失败');
  process.exit(1);
}

// 4. 创建工作流上下文
const context = createWorkflowContext(
  workflow.id,
  { data: [1, 2, 3, 4, 5] }, // 输入数据
  'tenant-001', // 租户ID
  'user-001',   // 用户ID
  'admin'       // 用户角色
);

// 5. 创建和运行执行器
const executor = new WorkflowExecutor(workflow, context, { nodeRegistry });

// 6. 执行工作流
async function runWorkflow() {
  try {
    const result = await executor.run();
    console.log(`工作流执行结果: ${result.status}`);
    console.log(`处理后的数据:`, result.variables.processedData);
    
    // 输出执行历史
    console.log(`执行历史:`);
    result.history.forEach(record => {
      console.log(`- 步骤 ${record.stepId} (${record.status}): 开始于 ${record.startTime.toISOString()}`);
    });
  } catch (error) {
    console.error(`工作流执行失败:`, error);
  }
}

runWorkflow();
```

## 高级功能

### 并行执行
<!-- MARK: 高级-并行
 -->
工作流可以并行执行多个步骤，实现更高效的处理：

```typescript
workflow.addParallelStep({
  id: 'parallel',
  parallelSteps: [
    {
      id: 'branch1',
      type: StepType.TASK,
      nodeId: 'Task1',
      input: { data: 'workflow.input.part1' }
    },
    {
      id: 'branch2',
      type: StepType.TASK,
      nodeId: 'Task2',
      input: { data: 'workflow.input.part2' }
    }
  ],
  nextStepId: 'merge'
});
```

### 子工作流
<!-- MARK: 子工作流 
-->
可以嵌套执行子工作流，实现模块化和复用：

```typescript
workflow.addSubWorkflowStep({
  id: 'subflow',
  subWorkflowId: 'reusable-process',
  input: { data: 'workflow.variables.inputForSubworkflow' },
  nextStepId: 'next'
});
```

### 事件触发与监听
<!-- MARK: 触发与监听
 -->
支持事件驱动的工作流：

```typescript
// 事件触发步骤
workflow.addEventTriggerStep({
  id: 'triggerEvent',
  event: 'data.processed',
  input: { result: 'workflow.variables.processedData' },
  nextStepId: 'next'
});

// 事件监听步骤 (通常作为工作流起点)
workflow.addEventListenerStep({
  id: 'listenEvent',
  event: 'order.created',
  nextStepId: 'processOrder'
});
```

### 错误处理和重试
<!-- MARK: 错误处理和重试 -->
支持自动重试和回滚机制：

```typescript
workflow.addStep({
  id: 'riskyOperation',
  nodeId: 'DataProcessNode',
  maxRetries: 3,         // 最多重试3次
  retryDelay: 1000,      // 每次重试间隔1秒
  timeout: 5000,         // 5秒超时
  compensateOnFailure: true,  // 失败时触发补偿
  nextStepId: 'next'
});
```

### SLA监控
<!-- MARK: SLA监控 -->
可以设置步骤的SLA，并在违反时采取行动：

```typescript
workflow.addStep({
  id: 'timeConstrainedStep',
  nodeId: 'ImportantNode',
  sla: 10000,  // 10秒SLA
  nextStepId: 'next'
});
```

## 实现细节

1. **步骤输入映射** - 支持从工作流变量到步骤输入的动态映射
2. **步骤输出映射** - 将步骤输出映射回工作流变量
3. **权限检查** - 基于角色的步骤执行权限控制
4. **监控和日志** - 详细的执行历史和日志记录
5. **补偿机制** - 失败时能够回滚已执行的步骤
6. **安全检查** - 防止无限循环等异常情况

## 扩展点

1. **自定义节点** - 通过继承BaseNode创建新的任务节点
2. **事件集成** - 通过实现IEventManager接口集成外部事件系统
3. **分布式调度** - 通过实现IDistributedScheduler接口支持分布式执行
4. **监控集成** - 通过实现ILogger接口集成外部日志和监控系统

---

这个文档涵盖了FlowLab工作流引擎的所有主要组件和功能，为AI agent提供了理解和使用这个库所需的所有关键信息。
