当单个Agent无法完成复杂任务时,我们需要把工作拆成多个步骤,让不同Agent或Skill按特定顺序协作执行。这就涉及工作流编排。编排不是简单的顺序执行,而是要考虑依赖关系、并行可能性、失败回滚和状态同步。这篇文章从DAG和状态机两个核心模型出发,讲清楚Agent工作流编排的工程实践。
从顺序执行到工作流编排
最简单的Agent调用是单步的:用户提需求,Agent执行,返回结果。但当任务复杂起来,比如”分析代码、生成测试、运行验证、如果失败则修复再重试”,单步执行就不够用了。
很多人第一反应是把步骤写成代码,用if else和for loop串联。这种方式在小规模时没问题,但扩展性很差。步骤之间的依赖关系被硬编码在代码里,新增一个步骤可能要改多处;失败处理逻辑和正常流程混在一起,越来越难以维护;并行执行需要手动管理线程和同步,容易出错。
工作流编排的核心思想是把”做什么”和”怎么做”分开。DAG或状态机描述”做什么”,即步骤有哪些、它们之间的依赖和条件是什么;执行引擎负责”怎么做”,即调度顺序、并行策略、失败重试和资源分配。
这种分离带来几个好处。一是可视化,工作流结构可以用图表示,团队沟通更直观。二是可复用,同一个DAG定义可以用不同的执行引擎跑。三是可扩展,新增步骤不需要改现有代码,只需要在图中添加节点和边。四是可观测,执行状态天然适合监控和追踪。
DAG模型:用有向无环图描述依赖
DAG是工作流编排中最常用的模型。节点代表执行步骤,有向边代表依赖关系。无环意味着不存在循环依赖,这保证了工作流总能终止。
一个典型的代码审查工作流DAG可能长这样:
# 代码审查工作流DAG定义
workflow:
name: code-review-pipeline
nodes:
- id: fetch-diff
type: skill
skillId: git-diff-fetcher
- id: analyze-structure
type: skill
skillId: code-structure-analyzer
- id: check-security
type: skill
skillId: security-scanner
- id: check-performance
type: skill
skillId: performance-analyzer
- id: review-style
type: skill
skillId: style-reviewer
- id: generate-report
type: skill
skillId: report-generator
edges:
- from: fetch-diff
to: [analyze-structure, check-security, check-performance, review-style]
- from: [analyze-structure, check-security, check-performance, review-style]
to: generate-report
这个DAG表示:先获取代码差异,然后并行做结构分析、安全检查、性能分析和风格审查,最后等前面四个步骤都完成后生成报告。
DAG的执行引擎需要解决两个核心问题:拓扑排序和并行调度。拓扑排序确定步骤的合法执行顺序,并行调度则决定哪些没有依赖关系的步骤可以同时跑。
# 拓扑排序与并行调度示例
from collections import deque
class DAGExecutor:
def __init__(self, workflow):
self.workflow = workflow
self.in_degree = self._compute_in_degrees()
self.node_outputs = {}
def _compute_in_degrees(self):
in_degree = {node.id: 0 for node in self.workflow.nodes}
for edge in self.workflow.edges:
for target in edge.to if isinstance(edge.to, list) else [edge.to]:
in_degree[target] += 1
return in_degree
def execute(self):
queue = deque([nid for nid, deg in self.in_degree.items() if deg == 0])
while queue:
# 当前可以并行执行的所有节点
ready_nodes = list(queue)
queue.clear()
# 并行执行
results = self._execute_parallel(ready_nodes)
# 更新下游节点的入度
for node_id, output in results.items():
self.node_outputs[node_id] = output
for edge in self.workflow.edges:
if edge.from == node_id:
for target in edge.to if isinstance(edge.to, list) else [edge.to]:
self.in_degree[target] -= 1
if self.in_degree[target] == 0:
queue.append(target)
DAG的优势是表达能力强,能清晰描述复杂的依赖网络。但它也有局限。DAG假设依赖关系是静态的,执行前就能确定。而有些场景需要根据中间结果动态决定下一步,比如”如果测试失败就修复代码,否则直接发布”,这种条件分支用DAG表达会比较别扭。
状态机模型:用状态和事件驱动流程
状态机是另一种常用的工作流模型。它用状态节点和转移边描述流程,转移由事件触发。相比DAG,状态机更擅长表达条件分支和循环。
一个发布流程的状态机可能包含这些状态:待分析、分析中、待实现、实现中、待测试、测试中、待发布、已发布。每个状态的转移条件各不相同。
// 状态机定义示例
interface StateMachine {
initialState: string;
states: Record<string, State>;
}
interface State {
onEnter?: (ctx: ExecutionContext) => Promise<void>;
onExit?: (ctx: ExecutionContext) => Promise<void>;
transitions: Transition[];
}
interface Transition {
event: string;
target: string;
condition?: (ctx: ExecutionContext) => boolean;
action?: (ctx: ExecutionContext) => Promise<void>;
}
const releaseWorkflow: StateMachine = {
initialState: 'pending_analysis',
states: {
pending_analysis: {
transitions: [
{ event: 'start', target: 'analyzing' }
]
},
analyzing: {
onEnter: async (ctx) => {
ctx.result = await ctx.agent.run('analyze-requirements', ctx.input);
},
transitions: [
{ event: 'complete', target: 'pending_implementation' },
{ event: 'fail', target: 'failed' }
]
},
pending_implementation: {
transitions: [
{ event: 'start', target: 'implementing' }
]
},
implementing: {
onEnter: async (ctx) => {
ctx.result = await ctx.agent.run('implement-feature', ctx.result);
},
transitions: [
{ event: 'complete', target: 'pending_testing' },
{ event: 'fail', target: 'failed' }
]
},
pending_testing: {
transitions: [
{ event: 'start', target: 'testing' }
]
},
testing: {
onEnter: async (ctx) => {
ctx.result = await ctx.agent.run('run-tests', ctx.result);
},
transitions: [
{
event: 'complete',
target: 'pending_release',
condition: (ctx) => ctx.result.allPassed
},
{
event: 'complete',
target: 'pending_fix',
condition: (ctx) => !ctx.result.allPassed
},
{ event: 'fail', target: 'failed' }
]
},
pending_fix: {
transitions: [
{ event: 'start', target: 'implementing' }
]
},
pending_release: {
transitions: [
{ event: 'release', target: 'released' }
]
},
released: {
onEnter: async (ctx) => {
await ctx.agent.run('notify-team', { status: 'released' });
},
transitions: []
},
failed: {
onEnter: async (ctx) => {
await ctx.agent.run('handle-failure', ctx.error);
},
transitions: []
}
}
};
状态机的执行引擎负责管理当前状态、监听事件、评估转移条件和执行状态动作。
class StateMachineExecutor {
private currentState: string;
private context: ExecutionContext;
constructor(private machine: StateMachine, context: ExecutionContext) {
this.currentState = machine.initialState;
this.context = context;
}
async start() {
await this._enterState(this.currentState);
}
async send(event: string) {
const state = this.machine.states[this.currentState];
const transition = state.transitions.find(t => t.event === event);
if (!transition) return;
if (transition.condition && !transition.condition(this.context)) return;
await this._exitState(this.currentState);
if (transition.action) await transition.action(this.context);
this.currentState = transition.target;
await this._enterState(this.currentState);
}
private async _enterState(stateId: string) {
const state = this.machine.states[stateId];
if (state.onEnter) await state.onEnter(this.context);
// 自动触发无条件的转移
const autoTransition = state.transitions.find(t => t.event === 'auto');
if (autoTransition) {
await this.send('auto');
}
}
}
状态机的一个关键优势是表达循环和条件分支很自然。测试失败回到修复状态再重新测试,这种循环在DAG里需要特殊处理,在状态机里就是两条转移边。但状态机不擅长表达并行,多个独立步骤同时执行的状态机模型会变得很复杂。
混合模型:DAG与状态机的结合
实际工程中,单一模型往往不够。更好的做法是分层:用状态机管理高层流程阶段,每个阶段内部用DAG执行具体的并行步骤。
比如一个完整的软件交付流程,顶层用状态机管理:需求分析 -> 设计 -> 开发 -> 测试 -> 发布。在开发阶段内部,用DAG并行执行:接口设计、数据库建模、核心逻辑实现、单元测试编写。在测试阶段内部,用DAG并行执行:单元测试、集成测试、安全扫描、性能基准测试。
# 混合模型示例
workflow:
type: state-machine
states:
development:
type: dag
nodes:
- id: design-api
- id: design-db
- id: implement-core
- id: write-tests
edges:
- from: [design-api, design-db]
to: implement-core
- from: implement-core
to: write-tests
testing:
type: dag
nodes:
- id: unit-tests
- id: integration-tests
- id: security-scan
- id: performance-baseline
edges:
- from: unit-tests
to: [integration-tests, security-scan, performance-baseline]
这种分层架构的好处是清晰。状态机负责流程的骨架和决策点,DAG负责具体工作的编排。团队可以独立优化每个DAG,而不影响整体流程。
条件分支与动态路由
工作流中的条件分支有两种实现方式。一种是静态条件,执行前就能确定走哪条分支;另一种是动态条件,需要根据前面步骤的结果决定。
静态条件可以用DAG表达,把工作流拆成多个子图,根据输入参数选择对应的子图。动态条件更适合状态机,或者DAG中的条件网关节点。
# 条件网关示例
nodes:
- id: evaluate-risk
type: skill
skillId: risk-evaluator
- id: high-risk-review
type: skill
skillId: senior-reviewer
- id: standard-review
type: skill
skillId: standard-reviewer
- id: condition-gateway
type: condition
branches:
- condition: evaluate-risk.score > 80
to: high-risk-review
- condition: evaluate-risk.score <= 80
to: standard-review
条件网关的实现难点在于类型安全。如果条件表达式引用了不存在的节点输出,或者类型不匹配,应该在定义阶段就报错,而不是执行时才暴露。
// 条件表达式类型检查
class ConditionChecker {
check(nodeOutputs: Map<string, Type>, condition: string): TypeCheckResult {
// 解析条件表达式
const refs = this.extractReferences(condition);
// 验证所有引用都存在
for (const ref of refs) {
if (!nodeOutputs.has(ref.nodeId)) {
return { valid: false, error: `Node ${ref.nodeId} not found` };
}
const outputType = nodeOutputs.get(ref.nodeId);
if (!this.isCompatible(outputType, ref.field)) {
return { valid: false, error: `Field ${ref.field} not found in ${ref.nodeId}` };
}
}
return { valid: true };
}
}
动态路由的另一个挑战是后续步骤的输入类型。如果不同分支的输出结构不同,合并到同一条后续边时可能需要做类型转换或字段映射。
并行执行与资源管理
并行执行是提升工作流效率的关键,但也引入了新的复杂性:资源竞争、死锁、超时和结果合并。
资源竞争发生在多个并行步骤需要访问同一个外部系统时。比如三个Skill同时调用同一个LLM API,可能触发速率限制。解决方法是在执行引擎里引入资源配额和令牌桶限流。
// 资源配额管理
class ResourceQuota {
private tokens: Map<string, Semaphore> = new Map();
async acquire(resourceId: string, permits: number = 1): Promise<ReleaseFn> {
if (!this.tokens.has(resourceId)) {
this.tokens.set(resourceId, new Semaphore(this.getLimit(resourceId)));
}
const semaphore = this.tokens.get(resourceId);
await semaphore.acquire(permits);
return () => semaphore.release(permits);
}
}
// 在DAG执行器中使用
async _execute_parallel(nodeIds: string[]) {
const releaseFns: ReleaseFn[] = [];
try {
// 先获取所有需要的资源
for (const nodeId of nodeIds) {
const resources = this.getResourceRequirements(nodeId);
for (const res of resources) {
const release = await this.quota.acquire(res.id, res.amount);
releaseFns.push(release);
}
}
// 并行执行
return await Promise.allSettled(
nodeIds.map(id => this.executeNode(id))
);
} finally {
// 释放资源
for (const release of releaseFns) {
release();
}
}
}
死锁在DAG中不常见,因为DAG本身无环。但在混合模型中,如果状态机的某个转移条件等待另一个状态机的事件,就可能出现循环等待。避免死锁的原则是:永远不要让一个步骤的完成条件依赖于它自己的输出。
超时管理也很重要。并行步骤中如果某一个卡住了,不应该无限等待。应该为每个步骤设置合理的超时时间,超时后标记为失败,触发回滚或降级。
# 超时与重试配置
nodes:
- id: llm-generation
type: skill
skillId: text-generator
timeout: 60s
retries:
maxAttempts: 3
backoff: exponential
initialDelay: 1s
maxDelay: 10s
onTimeout: fallback-to-cache
onFailure: skip-and-warn
依赖管理与数据传递
工作流步骤之间的数据传递有两种模式:推模式和拉模式。
推模式是上游步骤主动把输出推给下游。简单直接,但耦合度高,上游需要知道下游需要什么。拉模式是下游步骤从共享上下文中读取需要的数据。解耦性好,但容易出现命名冲突和数据竞争。
建议采用混合模式:每个步骤的输出按命名规范写入上下文,下游步骤通过声明式的方式引用需要的输入。
# 声明式数据引用
nodes:
- id: fetch-data
output:
users: "$.data.users"
total: "$.data.total"
- id: process-users
input:
users: "fetch-data.users"
batchSize: 100
- id: generate-summary
input:
total: "fetch-data.total"
processed: "process-users.count"
这种声明式引用可以被执行引擎静态分析,在运行前检查所有引用是否都有对应的输出,避免运行时才发现数据缺失。
对于大数据量的传递,不建议直接通过上下文序列化传输。可以用引用传递,上下文只保存数据标识符,实际数据存在对象存储中,需要时按需读取。
可视化与可观测性
工作流编排的另一个重要方面是可视化。团队需要看到工作流的结构、当前执行状态和 history。
工作流结构可以用Mermaid或Graphviz渲染。执行状态可以用不同颜色标识节点状态:灰色是未开始,蓝色是运行中,绿色是成功,红色是失败,黄色是等待依赖。
graph TD
A[获取代码差异] --> B[结构分析]
A --> C[安全检查]
A --> D[性能分析]
A --> E[风格审查]
B --> F[生成报告]
C --> F
D --> F
E --> F
style A fill:#4ade80
style B fill:#60a5fa
style C fill:#f87171
style D fill:#9ca3af
style E fill:#9ca3af
style F fill:#9ca3af
可观测性方面,每个工作流执行都应该生成一个trace,包含所有步骤的开始时间、结束时间、输入输出摘要和资源使用情况。这些trace不仅用于调试,也用于后续优化:哪些步骤总是超时,哪些步骤可以并行但没有并行,哪些路径是瓶颈。
实际案例:自动化内容发布流水线
我们团队的内容发布流程是一个典型的混合工作流。当作者提交一篇文章后,工作流自动启动。
状态机层面有三个主要阶段:内容处理、质量检查、发布部署。
在内容处理阶段,内部DAG并行执行:提取元数据、生成摘要、优化图片、检查内部链接。这些步骤互不依赖,可以并行。
在质量检查阶段,内部DAG先串行执行语法检查,然后并行执行SEO检查、可读性分析和事实核查。语法检查必须先完成,因为后面的检查都依赖它的输出。
在发布部署阶段,根据前面阶段的结果决定走哪条分支。如果所有检查通过,直接构建并部署;如果有警告,发送通知等待人工确认;如果有严重错误,退回给作者修改。
// 发布流程的执行日志片段
{
"workflowId": "publish-2025-12-24-001",
"startTime": "2025-12-24T09:00:00Z",
"stages": [
{
"stage": "content-processing",
"parallelSteps": [
{ "step": "extract-metadata", "duration": "0.3s", "status": "success" },
{ "step": "generate-summary", "duration": "2.1s", "status": "success" },
{ "step": "optimize-images", "duration": "5.4s", "status": "success" },
{ "step": "check-links", "duration": "3.2s", "status": "success" }
]
},
{
"stage": "quality-check",
"steps": [
{ "step": "grammar-check", "duration": "1.5s", "status": "success", "issues": 2 },
{ "step": "seo-check", "duration": "0.8s", "status": "success", "score": 92 },
{ "step": "readability-check", "duration": "0.6s", "status": "success", "grade": "A" },
{ "step": "fact-check", "duration": "4.2s", "status": "success", "flags": 0 }
]
},
{
"stage": "deploy",
"decision": "auto-deploy",
"duration": "12.3s",
"status": "success",
"url": "https://blog.example.com/posts/new-article"
}
],
"totalDuration": "30.4s"
}
这个流程用纯串行执行大概需要90秒,通过DAG并行优化后降到30秒。更重要的是,所有步骤都有明确的依赖关系,不会出现”还没检查完就发布了”这类人为失误。
总结与最佳实践
Agent工作流编排的核心是选择合适的模型并分层设计。DAG适合表达并行依赖,状态机适合表达条件分支和循环。实际系统中建议混合使用,状态机管阶段,DAG管具体工作。
在设计工作流时,遵循这些原则。步骤拆分要适度,太细会增加调度开销,太粗会失去并行优化的空间。依赖关系要明确,避免隐式依赖导致执行顺序不确定。失败处理要完整,每个步骤都要有超时、重试和降级策略。数据传递要规范,用声明式引用代替隐式共享。
在执行层面,重视资源管理。并行步骤要有限流保护,避免下游系统被压垮。重视可观测性,每个执行都要有完整的trace和日志。重视幂等性,同一个工作流多次触发不应该产生副作用。
最后,工作流定义本身也要版本管理。修改工作流结构是高风险操作,应该像代码一样走review流程,支持灰度发布和快速回滚。把工作流当成基础设施而不是临时脚本,才能真正发挥编排的价值。