Skill间通信与协作模式详解

现代AI系统很少由单个Skill独立完成所有工作。多个Skill需要协作:一个Skill生成代码,另一个Skill审查代码;一个Skill收集数据,另一个Skill分析数据。Skill之间的通信和协作模式,直接决定了系统的灵活性、可靠性和可维护性。这篇文章系统讲解八种核心协作模式。

消息传递:最基础的通信方式

消息传递是Skill之间通信的最基础形式。一个Skill发送消息,另一个Skill接收并处理。简单、直接、容易理解。

消息传递的核心是消息格式。好的消息格式应该自描述、类型安全、可扩展。自描述意味着消息包含足够信息,接收方不需要额外上下文就能理解。类型安全意味着消息结构固定,不会因为字段缺失或类型错误导致处理失败。可扩展意味着新增字段不会影响现有处理逻辑。

interface SkillMessage {
  id: string;           // 消息唯一标识
  type: string;         // 消息类型,决定处理方式
  source: string;       // 发送方Skill ID
  target: string;       // 接收方Skill ID
  payload: unknown;     // 消息体
  timestamp: number;    // 发送时间
  correlationId?: string; // 关联ID,用于追踪请求链
}

// 示例:代码生成完成消息
const codeGenMessage: SkillMessage = {
  id: 'msg-001',
  type: 'code_generation.completed',
  source: 'code-gen-skill',
  target: 'code-review-skill',
  payload: {
    files: [
      { path: 'src/api/user.ts', content: '...' },
      { path: 'src/types/user.ts', content: '...' },
    ],
    metadata: {
      language: 'typescript',
      framework: 'express',
    },
  },
  timestamp: Date.now(),
  correlationId: 'task-123',
};

消息传递有两种模式:直接发送和通过中介。直接发送是Skill A直接调用Skill B的接口,简单但耦合度高。通过中介是Skill A把消息发到消息队列或事件总线,Skill B从那里订阅,解耦但增加复杂度。

// 直接发送
class DirectMessaging {
  private skills = new Map<string, Skill>();

  register(skillId: string, skill: Skill): void {
    this.skills.set(skillId, skill);
  }

  async send(message: SkillMessage): Promise<void> {
    const target = this.skills.get(message.target);
    if (!target) {
      throw new Error(`Skill ${message.target} not found`);
    }
    await target.handleMessage(message);
  }
}

// 通过中介
class MessageBroker {
  private queues = new Map<string, SkillMessage[]>();
  private subscribers = new Map<string, Set<(msg: SkillMessage) => Promise<void>>>();

  async publish(message: SkillMessage): Promise<void> {
    const queue = this.queues.get(message.target) || [];
    queue.push(message);
    this.queues.set(message.target, queue);
    
    // 通知订阅者
    const subs = this.subscribers.get(message.target) || new Set();
    for (const handler of subs) {
      await handler(message);
    }
  }

  subscribe(target: string, handler: (msg: SkillMessage) => Promise<void>): void {
    const subs = this.subscribers.get(target) || new Set();
    subs.add(handler);
    this.subscribers.set(target, subs);
  }
}

选择哪种模式取决于系统规模和演化需求。小规模系统直接发送足够;大规模或需要动态扩展的系统应该通过中介。

事件驱动:松耦合的协作方式

事件驱动是消息传递的进阶形式。Skill之间不直接通信,而是通过事件总线发布和订阅事件。一个Skill完成工作后发布事件,感兴趣的Skill订阅并响应。

事件驱动的核心是:发布者不知道谁在处理事件,订阅者不知道事件从哪里来。这种松耦合让系统更灵活:新增Skill只需要订阅感兴趣的事件,不需要修改现有Skill。

interface DomainEvent {
  id: string;
  type: string;
  payload: unknown;
  timestamp: number;
  source: string;
}

class EventBus {
  private handlers = new Map<string, Set<EventHandler>>();

  on(eventType: string, handler: EventHandler): void {
    const handlers = this.handlers.get(eventType) || new Set();
    handlers.add(handler);
    this.handlers.set(eventType, handlers);
  }

  off(eventType: string, handler: EventHandler): void {
    const handlers = this.handlers.get(eventType);
    if (handlers) {
      handlers.delete(handler);
    }
  }

  async emit(event: DomainEvent): Promise<void> {
    const handlers = this.handlers.get(event.type) || new Set();
    await Promise.all(
      Array.from(handlers).map((handler) =>
        handler(event).catch((error) => {
          console.error(`Event handler failed for ${event.type}:`, error);
        })
      )
    );
  }
}

// 使用示例
const eventBus = new EventBus();

// 代码生成Skill发布事件
eventBus.emit({
  id: 'evt-001',
  type: 'code.generated',
  payload: { files: [...], language: 'typescript' },
  timestamp: Date.now(),
  source: 'code-gen-skill',
});

// 代码审查Skill订阅事件
eventBus.on('code.generated', async (event) => {
  const reviewSkill = new CodeReviewSkill();
  await reviewSkill.review(event.payload.files);
});

// 文档生成Skill也订阅同一事件
eventBus.on('code.generated', async (event) => {
  const docSkill = new DocGenSkill();
  await docSkill.generateDocs(event.payload.files);
});

事件驱动的挑战在于事件顺序和幂等性。如果事件处理有依赖顺序,需要额外机制保证。如果事件可能重复发送(比如网络重试),处理逻辑需要幂等:多次处理同一事件的结果与一次处理相同。

interface IdempotentHandler {
  processedEvents: Set<string>;

  async handle(event: DomainEvent): Promise<void> {
    if (this.processedEvents.has(event.id)) {
      return; // 已处理过,直接返回
    }
    
    await this.doHandle(event);
    this.processedEvents.add(event.id);
  }
}

共享状态:直接的数据协作

有时Skill之间需要共享数据。比如一个Skill读取配置,另一个Skill修改配置,第三个Skill使用更新后的配置。这种场景下,共享状态比消息传递更自然。

共享状态的关键是访问控制。不是所有Skill都能读写所有状态。需要定义状态的属主、访问权限和更新规则。

interface SharedState {
  namespace: string;
  data: Record<string, unknown>;
  version: number;
  lastModified: number;
  modifiedBy: string;
}

class StateManager {
  private states = new Map<string, SharedState>();
  private accessRules = new Map<string, AccessRule>();

  defineState(namespace: string, initialData: Record<string, unknown>): void {
    this.states.set(namespace, {
      namespace,
      data: initialData,
      version: 0,
      lastModified: Date.now(),
      modifiedBy: 'system',
    });
  }

  setAccessRule(namespace: string, rule: AccessRule): void {
    this.accessRules.set(namespace, rule);
  }

  async read(namespace: string, skillId: string, keys?: string[]): Promise<unknown> {
    const state = this.states.get(namespace);
    if (!state) throw new Error(`State ${namespace} not found`);
    
    const rule = this.accessRules.get(namespace);
    if (rule && !rule.read.includes(skillId)) {
      throw new Error(`Skill ${skillId} has no read access to ${namespace}`);
    }
    
    if (keys) {
      return keys.reduce((obj, key) => {
        obj[key] = state.data[key];
        return obj;
      }, {} as Record<string, unknown>);
    }
    return state.data;
  }

  async write(
    namespace: string,
    skillId: string,
    updates: Record<string, unknown>
  ): Promise<void> {
    const state = this.states.get(namespace);
    if (!state) throw new Error(`State ${namespace} not found`);
    
    const rule = this.accessRules.get(namespace);
    if (rule && !rule.write.includes(skillId)) {
      throw new Error(`Skill ${skillId} has no write access to ${namespace}`);
    }
    
    Object.assign(state.data, updates);
    state.version++;
    state.lastModified = Date.now();
    state.modifiedBy = skillId;
    
    // 通知订阅者
    await this.notifySubscribers(namespace, state);
  }
}

共享状态的挑战是并发冲突。两个Skill同时修改同一状态,后提交的会覆盖先提交的。解决方法有乐观锁、悲观锁和CRDT(无冲突复制数据类型)。

// 乐观锁:版本号检查
async function updateWithOptimisticLock(
  stateManager: StateManager,
  namespace: string,
  skillId: string,
  updates: Record<string, unknown>,
  expectedVersion: number
): Promise<boolean> {
  const state = await stateManager.read(namespace, skillId);
  if (state.version !== expectedVersion) {
    return false; // 版本冲突,需要重试
  }
  await stateManager.write(namespace, skillId, updates);
  return true;
}

管道模式:数据流处理

管道模式是把多个Skill串联起来,前一个Skill的输出作为后一个Skill的输入。就像Unix管道的概念:cat file | grep pattern | sort | uniq。

管道模式适合数据流处理场景:数据从一个Skill流向下一个,每个Skill做一种转换。比如”收集数据 -> 清洗数据 -> 分析数据 -> 生成报告”。

interface PipeStage {
  skill: Skill;
  config?: Record<string, unknown>;
}

class Pipeline {
  private stages: PipeStage[] = [];

  addStage(stage: PipeStage): Pipeline {
    this.stages.push(stage);
    return this;
  }

  async execute(input: unknown): Promise<unknown> {
    let data = input;
    
    for (const [index, stage] of this.stages.entries()) {
      try {
        data = await stage.skill.execute({
          input: data,
          config: stage.config,
          stage: index,
          totalStages: this.stages.length,
        });
      } catch (error) {
        throw new PipelineError(`Stage ${index} failed: ${error.message}`, index, error);
      }
    }
    
    return data;
  }
}

// 使用示例
const dataPipeline = new Pipeline()
  .addStage({ skill: new DataCollectionSkill(), config: { source: 'api' } })
  .addStage({ skill: new DataCleaningSkill(), config: { removeNulls: true } })
  .addStage({ skill: new DataAnalysisSkill(), config: { metrics: ['avg', 'max'] } })
  .addStage({ skill: new ReportGenerationSkill(), config: { format: 'markdown' } });

const report = await dataPipeline.execute({ query: 'sales-q3' });

管道模式的优势是清晰和可组合。每个Skill只做一件事,通过组合实现复杂流程。新增处理阶段只需在管道中添加一个节点。

挑战在于错误处理和中间状态。如果管道中途失败,前面阶段的结果可能已部分生效。需要设计回滚机制或补偿逻辑。

interface CompensatablePipeline extends Pipeline {
  private compensations: Map<number, Compensation> = new Map();

  addStage(stage: PipeStage, compensation?: Compensation): CompensatablePipeline {
    super.addStage(stage);
    if (compensation) {
      this.compensations.set(this.stages.length - 1, compensation);
    }
    return this;
  }

  async execute(input: unknown): Promise<unknown> {
    const completedStages = [];
    
    try {
      let data = input;
      for (const [index, stage] of this.stages.entries()) {
        data = await stage.skill.execute({ input: data });
        completedStages.push(index);
      }
      return data;
    } catch (error) {
      // 回滚已完成的阶段
      for (const stageIndex of completedStages.reverse()) {
        const compensation = this.compensations.get(stageIndex);
        if (compensation) {
          await compensation.run();
        }
      }
      throw error;
    }
  }
}

发布订阅:一对多的通知

发布订阅(Pub/Sub)是事件驱动的特例。一个发布者,多个订阅者。发布者不关心谁订阅,订阅者不关心谁发布。

发布订阅适合广播场景:一个事件需要通知多个感兴趣的Skill。比如”代码提交”事件需要通知审查Skill、测试Skill、部署Skill。

class PubSubSystem {
  private topics = new Map<string, Set<Subscriber>>();

  subscribe(topic: string, subscriber: Subscriber): Subscription {
    const subscribers = this.topics.get(topic) || new Set();
    subscribers.add(subscriber);
    this.topics.set(topic, subscribers);
    
    return {
      unsubscribe: () => {
        subscribers.delete(subscriber);
      },
    };
  }

  async publish(topic: string, message: unknown): Promise<void> {
    const subscribers = this.topics.get(topic) || new Set();
    await Promise.all(
      Array.from(subscribers).map((sub) =>
        sub.handle(message).catch((err) => {
          console.error(`Subscriber failed for ${topic}:`, err);
        })
      )
    );
  }
}

// 使用示例
const pubsub = new PubSubSystem();

// 多个Skill订阅代码提交事件
pubsub.subscribe('code.committed', codeReviewSkill);
pubsub.subscribe('code.committed', testSkill);
pubsub.subscribe('code.committed', deploySkill);

// 提交代码时发布事件
await pubsub.publish('code.committed', {
  commitId: 'abc123',
  author: 'developer',
  files: ['src/api.ts'],
});

发布订阅的挑战是消息保证和背压。如果订阅者处理慢,消息会堆积。需要设计流量控制:丢弃旧消息、限流或背压通知发布者减速。

请求响应:同步协作

有些场景需要同步协作:Skill A调用Skill B,等待B完成后继续。这种请求响应模式是最直观的协作方式。

请求响应的关键是超时和错误处理。Skill B可能迟迟不响应,Skill A不能无限等待。需要设置合理的超时时间,以及超时后的降级策略。

interface RequestOptions {
  timeout: number;
  retries: number;
  fallback?: (error: Error) => Promise<unknown>;
}

class RequestResponseClient {
  async call(
    target: string,
    request: SkillMessage,
    options: RequestOptions
  ): Promise<unknown> {
    let lastError: Error;
    
    for (let attempt = 0; attempt <= options.retries; attempt++) {
      try {
        return await this.executeWithTimeout(target, request, options.timeout);
      } catch (error) {
        lastError = error;
        if (attempt < options.retries) {
          await this.delay(Math.pow(2, attempt) * 1000); // 指数退避
        }
      }
    }
    
    if (options.fallback) {
      return await options.fallback(lastError);
    }
    throw lastError;
  }

  private async executeWithTimeout(
    target: string,
    request: SkillMessage,
    timeout: number
  ): Promise<unknown> {
    return Promise.race([
      this.sendAndWait(target, request),
      new Promise((_, reject) =>
        setTimeout(() => reject(new Error('Request timeout')), timeout)
      ),
    ]);
  }
}

// 使用示例
const client = new RequestResponseClient();

const result = await client.call(
  'code-review-skill',
  {
    id: 'req-001',
    type: 'review.request',
    source: 'main-agent',
    target: 'code-review-skill',
    payload: { files: ['src/api.ts'] },
    timestamp: Date.now(),
  },
  {
    timeout: 30000,
    retries: 2,
    fallback: async () => ({ findings: [], error: 'Review service unavailable' }),
  }
);

请求响应的缺点是耦合度高:调用方需要知道被调用方的存在和接口。适合紧密协作的场景,不适合松耦合的系统。

异步协作:非阻塞的工作流

很多场景不需要立即得到结果。Skill A发起任务后可以继续做其他事情,等Skill B完成后再处理结果。这种异步协作提高系统吞吐量。

异步协作需要任务标识和回调机制。任务标识让Skill A能在稍后查询任务状态;回调机制让Skill B在完成后主动通知Skill A。

interface AsyncTask {
  id: string;
  status: 'pending' | 'running' | 'completed' | 'failed';
  result?: unknown;
  error?: Error;
  createdAt: number;
  completedAt?: number;
}

class AsyncTaskManager {
  private tasks = new Map<string, AsyncTask>();
  private callbacks = new Map<string, (task: AsyncTask) => Promise<void>>();

  async submit(task: Omit<AsyncTask, 'status'>, callback?: (task: AsyncTask) => Promise<void>): Promise<string> {
    const taskId = `task-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
    const asyncTask: AsyncTask = {
      ...task,
      id: taskId,
      status: 'pending',
      createdAt: Date.now(),
    };
    
    this.tasks.set(taskId, asyncTask);
    if (callback) {
      this.callbacks.set(taskId, callback);
    }
    
    return taskId;
  }

  async update(taskId: string, updates: Partial<AsyncTask>): Promise<void> {
    const task = this.tasks.get(taskId);
    if (!task) return;
    
    Object.assign(task, updates);
    
    if (task.status === 'completed' || task.status === 'failed') {
      task.completedAt = Date.now();
      const callback = this.callbacks.get(taskId);
      if (callback) {
        await callback(task);
        this.callbacks.delete(taskId);
      }
    }
  }

  async getStatus(taskId: string): Promise<AsyncTask | undefined> {
    return this.tasks.get(taskId);
  }
}

// 使用示例
const taskManager = new AsyncTaskManager();

// Skill A提交异步任务
const taskId = await taskManager.submit(
  { type: 'code-analysis' },
  async (task) => {
    // 回调:Skill B完成后通知Skill A
    console.log(`Task ${task.id} completed with result:`, task.result);
  }
);

// Skill B处理任务
async function processTask(taskId: string) {
  await taskManager.update(taskId, { status: 'running' });
  try {
    const result = await analyzeCode();
    await taskManager.update(taskId, { status: 'completed', result });
  } catch (error) {
    await taskManager.update(taskId, { status: 'failed', error });
  }
}

数据一致性:分布式协作的保证

多个Skill协作时,数据一致性是核心挑战。一个Skill修改了数据,另一个Skill读取到的可能是旧值。分布式系统中,一致性、可用性和分区容错性不可兼得,需要根据场景做权衡。

强一致性要求所有Skill随时看到最新数据。实现方式是分布式锁或两阶段提交。代价是性能:每次读写都要协调,延迟增加。

最终一致性允许短暂不一致,但保证最终所有Skill看到相同数据。实现方式是异步复制和冲突解决。性能好,但需要处理冲突。

// 最终一致性:异步复制
class EventuallyConsistentStore {
  private localStore = new Map<string, unknown>();
  private replicationQueue: ReplicationTask[] = [];

  async write(key: string, value: unknown): Promise<void> {
    this.localStore.set(key, value);
    this.replicationQueue.push({ key, value, timestamp: Date.now() });
    await this.replicate();
  }

  async read(key: string): Promise<unknown> {
    return this.localStore.get(key);
  }

  private async replicate(): Promise<void> {
    while (this.replicationQueue.length > 0) {
      const task = this.replicationQueue.shift();
      await this.sendToReplicas(task);
    }
  }
}

// 冲突解决:最后写入者胜
function resolveConflict(versions: VersionedValue[]): unknown {
  return versions.reduce((latest, current) => {
    return current.timestamp > latest.timestamp ? current : latest;
  }).value;
}

对于AI系统,最终一致性通常是更好的选择。因为Skill之间的协作通常是异步的,短暂的不一致可以接受。关键是设计好冲突解决策略和重试机制。

模式选择与组合

八种协作模式各有适用场景,也各有代价。实际系统中通常是多种模式组合使用。

简单直接的协作用请求响应。比如主Agent调用一个Skill完成特定任务,需要立即得到结果。

松耦合的协作用事件驱动或发布订阅。比如一个Skill完成工作后通知多个其他Skill,彼此不需要知道对方存在。

数据流处理用管道。比如数据经过多个阶段的转换,每个阶段一个Skill。

状态共享用共享状态加访问控制。比如多个Skill需要读写同一份配置或缓存。

长任务用异步协作。比如代码分析、测试运行等耗时操作,不应该阻塞主流程。

复杂工作流用状态机 orchestration。比如发布流程:构建 -> 测试 -> 审查 -> 部署,每个阶段可能用不同模式协作。

class HybridOrchestrator {
  private eventBus = new EventBus();
  private taskManager = new AsyncTaskManager();
  private stateManager = new StateManager();

  async executeWorkflow(workflow: Workflow): Promise<void> {
    for (const step of workflow.steps) {
      switch (step.collaborationMode) {
        case 'sync':
          await this.executeSync(step);
          break;
        case 'async':
          await this.executeAsync(step);
          break;
        case 'event':
          await this.executeEventDriven(step);
          break;
        case 'pipeline':
          await this.executePipeline(step);
          break;
      }
    }
  }
}

总结与最佳实践

Skill间通信与协作的核心原则是:根据场景选择合适模式,不要追求统一;松耦合优于紧耦合,但松耦合有代价;最终一致性通常足够,但要有冲突解决策略。

具体最佳实践:

优先用事件驱动实现松耦合。Skill之间通过事件通信,不直接依赖。新增Skill只需订阅事件,不影响现有Skill。

管道模式处理数据流。数据转换类任务用管道串联,每个Skill只做一种转换,通过组合实现复杂流程。

共享状态加访问控制。需要共享数据时用状态管理器,明确定义读写权限,避免随意修改。

异步处理长任务。耗时操作不要阻塞主流程,用任务管理器跟踪状态,完成后回调或轮询。

设计补偿和回滚。多阶段协作中,如果中途失败,要有机制回滚已完成的阶段,避免部分成功的混乱状态。

监控协作链路。跟踪消息传递延迟、任务处理时间、错误率和重试次数。这些数据帮助发现瓶颈和优化点。

最终,好的协作模式让多个Skill像团队一样工作:各自负责擅长的部分,通过清晰的通信机制协调,共同完成复杂任务。模式不是目的,目的是让系统可靠、灵活、可维护。选择合适的模式,比使用高级模式更重要。