Vercel AI SDK Agent Loop 完全指南
基础入门
什么是 AI SDK?
Vercel AI SDK 是一个免费开源的 TypeScript 库,用于构建 AI 驱动的应用程序和代理(Agent)。它提供了统一的接口来与各种 LLM 提供商(OpenAI、Anthropic、Google 等)交互,大大降低了切换模型的成本。
核心概念对比
| 概念 | 说明 | 使用场景 |
|---|---|---|
generateText | 一次性生成完整文本 | 非交互式任务、Agent |
streamText | 流式生成文本 | 实时聊天、交互式应用 |
| Tool Calling | 让模型调用外部函数 | 获取数据、执行操作 |
| Agent Loop | 多步骤工具调用循环 | 复杂任务自动化 |
快速开始
安装
# 使用 pnpmpnpm add ai
# 使用 npmnpm install ai
# 使用 yarnyarn add ai安装模型提供商 SDK(以 OpenAI 为例):
pnpm add @ai-sdk/openai第一个示例
import { generateText } from 'ai';
const { text } = await generateText({ model: 'openai/gpt-4o', prompt: '你好,请介绍一下自己。',});
console.log(text);AI Gateway - 简化模型配置
从 AI SDK 5.0.36 开始,你可以直接使用模型字符串,无需单独导入 Provider:
import { generateText } from 'ai';
// 直接使用模型字符串 - AI Gateway 自动路由const { text } = await generateText({ model: 'anthropic/claude-sonnet-4.5', prompt: '分析这段代码的性能问题',});
// 支持多种提供商const providers = [ 'openai/gpt-4o', 'anthropic/claude-sonnet-4.5', 'google/gemini-2.5-pro', 'xai/grok-3',];使用独立 Provider
如果需要更精细的控制,可以使用独立的 Provider 包:
import { generateText } from 'ai';import { anthropic } from '@ai-sdk/anthropic';
const { text } = await generateText({ model: anthropic('claude-sonnet-4.5'), prompt: '你好',});核心功能
Tool Calling - 工具调用
Tool Calling 让模型能够”调用”外部函数,获取实时数据或执行操作。
定义工具
import { tool } from 'ai';import { z } from 'zod';
const weatherTool = tool({ description: '获取指定城市的当前天气', inputSchema: z.object({ city: z.string().describe('城市名称'), unit: z.enum(['celsius', 'fahrenheit']).default('celsius'), }), execute: async ({ city, unit }) => { // 调用真实的天气 API const response = await fetch( `https://api.weather.com/v1/current?city=${city}&unit=${unit}` ); return response.json(); },});工具定义结构
| 属性 | 说明 | 必填 |
|---|---|---|
description | 工具描述,帮助模型理解何时使用 | 是 |
inputSchema | 使用 Zod 定义的输入参数 Schema | 是 |
outputSchema | 输出 Schema,用于类型推断和验证 | 否 |
execute | 异步执行函数 | 否(客户端工具可省略) |
needsApproval | 是否需要用户批准 | 否 |
使用工具
import { generateText, tool } from 'ai';import { z } from 'zod';
const { text, toolCalls, toolResults } = await generateText({ model: 'openai/gpt-4o', tools: { weather: tool({ description: '获取天气信息', inputSchema: z.object({ location: z.string().describe('城市名称'), }), execute: async ({ location }) => ({ location, temperature: 22, condition: '晴天', }), }), }, prompt: '北京今天天气怎么样?',});
console.log(text);// 输出: 北京今天的天气是晴天,温度约为22度。Agent Loop - 多步骤循环
Agent Loop 是 AI SDK 的核心能力,让模型能够自动执行多个工具调用,直到完成任务。
flowchart TD
A[用户 Prompt] --> B[模型生成]
B --> C{是否调用工具?}
C -->|是| D[执行工具]
D --> E[获取工具结果]
E --> F{检查停止条件}
F -->|未满足| B
F -->|满足| G[返回最终结果]
C -->|否| G
使用 stopWhen 启用 Agent Loop
import { generateText, tool, stepCountIs } from 'ai';import { z } from 'zod';
const { text, steps } = await generateText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(5), // 最多执行 5 步 tools: { weather: tool({ description: '获取天气信息', inputSchema: z.object({ location: z.string(), }), execute: async ({ location }) => ({ location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }), }), convertTemperature: tool({ description: '将华氏度转换为摄氏度', inputSchema: z.object({ fahrenheit: z.number(), }), execute: async ({ fahrenheit }) => ({ celsius: Math.round((fahrenheit - 32) * (5 / 9)), }), }), }, prompt: '旧金山的天气如何?请用摄氏度告诉我。',});
// Agent 会:// 1. 调用 weather 获取华氏温度// 2. 调用 convertTemperature 转换为摄氏度// 3. 生成最终回答console.log(text);console.log('执行步骤:', steps.length);停止条件
AI SDK 提供了多种停止条件:
import { stepCountIs, hasToolCall } from 'ai';
// 1. 步数限制stopWhen: stepCountIs(10)
// 2. 特定工具被调用时停止stopWhen: hasToolCall('finish')
// 3. 组合多个条件(任一满足即停止)stopWhen: [stepCountIs(10), hasToolCall('submitAnswer')]自定义停止条件
import { generateText, StopCondition } from 'ai';
// 自定义停止条件:当获取到有效结果时停止const hasValidResult: StopCondition = ({ steps }) => { const lastStep = steps[steps.length - 1]; if (lastStep?.toolResults) { return lastStep.toolResults.some( result => result.result?.status === 'success' ); } return false;};
const { text } = await generateText({ model: 'openai/gpt-4o', stopWhen: [stepCountIs(10), hasValidResult], tools: { /* ... */ }, prompt: '查询订单状态',});ToolLoopAgent - AI SDK 6 新特性
AI SDK 6 引入了 ToolLoopAgent 类,提供了更简洁的 Agent 构建方式。
import { ToolLoopAgent, stepCountIs, tool } from 'ai';import { z } from 'zod';
const weatherAgent = new ToolLoopAgent({ model: 'anthropic/claude-sonnet-4.5', instructions: '你是一个天气助手,帮助用户查询天气信息。', tools: { weather: tool({ description: '获取天气信息', inputSchema: z.object({ city: z.string(), }), execute: async ({ city }) => ({ city, temperature: 25, condition: '多云', }), }), }, stopWhen: stepCountIs(20), // 默认最多 20 步});
const result = await weatherAgent.generate({ prompt: '北京和上海今天天气如何?',});
console.log(result.text);console.log('执行步骤:', result.steps);ToolLoopAgent vs generateText
| 特性 | generateText + stopWhen | ToolLoopAgent |
|---|---|---|
| 代码量 | 较多 | 较少 |
| 复用性 | 每次需重新配置 | 定义一次,多处使用 |
| 指令 | 通过 system prompt | 内置 instructions |
| 默认行为 | 单步执行 | 20 步循环 |
| 适用场景 | 灵活控制 | 标准 Agent 场景 |
prepareStep - 动态步骤控制
prepareStep 让你能够在每一步执行前动态调整配置:
import { generateText, stepCountIs } from 'ai';
const { text } = await generateText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(10), tools: { search: searchTool, summarize: summarizeTool, calculate: calculateTool, }, prepareStep: async ({ previousStep, stepCount }) => { // 根据步骤数调整可用工具 if (stepCount > 5) { // 后期阶段禁用搜索,强制总结 return { tools: { summarize: summarizeTool }, toolChoice: 'required', }; }
// 根据上一步结果调整 if (previousStep?.toolResults?.some(r => r.toolName === 'search')) { // 搜索后切换到更强的模型进行分析 return { model: 'anthropic/claude-opus-4', }; }
return {}; // 使用默认配置 }, prompt: '研究人工智能的最新发展并总结',});prepareStep 可配置项
| 配置 | 说明 |
|---|---|
model | 切换模型 |
tools | 调整可用工具 |
toolChoice | 强制工具选择 |
messages | 修改/压缩消息历史 |
system | 更新系统提示 |
onStepFinish - 步骤完成回调
使用 onStepFinish 监控每一步的执行:
import { generateText, stepCountIs } from 'ai';
const { text } = await generateText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(5), tools: { /* ... */ }, onStepFinish: async ({ stepType, text, toolCalls, toolResults, usage }) => { console.log(`步骤类型: ${stepType}`);
if (toolCalls?.length) { console.log('工具调用:', toolCalls.map(tc => tc.toolName)); }
if (toolResults?.length) { console.log('工具结果:', toolResults); }
console.log('Token 使用:', usage); }, prompt: '分析这个项目的代码质量',});Human-in-the-Loop - 人工干预
needsApproval - 工具执行审批
import { tool } from 'ai';import { z } from 'zod';
// 始终需要审批const deleteFileTool = tool({ description: '删除文件', inputSchema: z.object({ path: z.string().describe('文件路径'), }), needsApproval: true, // 始终需要用户批准 execute: async ({ path }) => { // 删除逻辑 },});
// 条件性审批const paymentTool = tool({ description: '处理支付', inputSchema: z.object({ amount: z.number(), recipient: z.string(), }), needsApproval: async ({ amount }) => { // 超过 1000 元需要审批 return amount > 1000; }, execute: async ({ amount, recipient }) => { return await processPayment(amount, recipient); },});审批流程
flowchart LR
A[模型请求调用工具] --> B{needsApproval?}
B -->|是| C[返回 tool-approval-request]
C --> D[UI 显示审批请求]
D --> E{用户决定}
E -->|批准| F[执行工具]
E -->|拒绝| G[通知模型被拒绝]
B -->|否| F
F --> H[返回工具结果]
处理审批响应(generateText)
审批流程需要两次调用模型:第一次返回审批请求,用户决定后将审批响应加入消息,第二次调用才会执行工具或通知模型被拒绝。
import { generateText, tool, type ModelMessage, type ToolApprovalResponse } from 'ai';import { openai } from '@ai-sdk/openai';import { z } from 'zod';import * as readline from 'readline/promises';
// 1. 定义需要审批的工具const runCommand = tool({ description: '执行 Shell 命令', inputSchema: z.object({ command: z.string().describe('要执行的命令'), }), // 条件审批:危险命令需要审批,安全命令自动执行 needsApproval: async ({ command }) => { const dangerousPatterns = ['rm', 'delete', 'drop', 'truncate', 'kill']; return dangerousPatterns.some(p => command.toLowerCase().includes(p)); }, execute: async ({ command }) => { // 实际执行逻辑(简化示例) return { stdout: `已执行: ${command}`, exitCode: 0 }; },});
// 2. 用户交互函数async function askUserApproval(toolName: string, args: unknown): Promise<boolean> { const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); const answer = await rl.question( `\n[审批请求] 工具: ${toolName}\n参数: ${JSON.stringify(args, null, 2)}\n是否批准? (y/n): ` ); rl.close(); return answer.toLowerCase() === 'y';}
// 3. 完整的审批循环async function agentWithApproval(userPrompt: string) { const messages: ModelMessage[] = [ { role: 'user', content: userPrompt }, ];
while (true) { const result = await generateText({ model: openai('gpt-4o'), tools: { runCommand }, messages, });
// 将模型响应加入消息历史 messages.push(...result.response.messages);
// 检查是否有待审批的工具调用 const approvalRequests = result.content.filter( part => part.type === 'tool-approval-request' );
// 没有审批请求 -> Agent 完成 if (approvalRequests.length === 0) { console.log('Agent 最终回复:', result.text); return result.text; }
// 逐个处理审批请求 const approvals: ToolApprovalResponse[] = [];
for (const request of approvalRequests) { const approved = await askUserApproval( request.toolCall.toolName, request.toolCall.input, );
approvals.push({ type: 'tool-approval-response', approvalId: request.approvalId, approved, reason: approved ? '用户已确认' : '用户拒绝执行', }); }
// 将审批结果加入消息,继续循环 messages.push({ role: 'tool', content: approvals }); // 循环回到 generateText,模型会根据审批结果继续 }}
// 运行示例agentWithApproval('帮我清理 /tmp 目录下的临时文件');运行效果:
[审批请求] 工具: runCommand参数: { "command": "rm -rf /tmp/temp_*" }是否批准? (y/n): y
Agent 最终回复: 已成功清理 /tmp 目录下的临时文件。前端审批流程(useChat + Next.js)
在实际 Web 应用中,审批流程通过 useChat 的 addToolApprovalResponse 实现。
服务端 API Route:
import { streamText, tool } from 'ai';import { openai } from '@ai-sdk/openai';import { z } from 'zod';
export async function POST(req: Request) { const { messages } = await req.json();
const result = streamText({ model: openai('gpt-4o'), messages, // 系统提示:被拒绝的工具不要重试 system: '当工具执行被用户拒绝时,不要重试该操作,改为告知用户操作已取消。', tools: { processPayment: tool({ description: '处理支付转账', inputSchema: z.object({ amount: z.number().describe('金额(元)'), recipient: z.string().describe('收款人'), note: z.string().optional().describe('备注'), }), // 条件审批:超过 500 元需要用户确认 needsApproval: async ({ amount }) => amount > 500, execute: async ({ amount, recipient, note }) => { // 调用支付 API(简化示例) return { transactionId: `TXN_${Date.now()}`, amount, recipient, status: 'completed', }; }, }), }, });
return result.toUIMessageStreamResponse();}客户端组件:
'use client';
import { useChat } from '@ai-sdk/react';import { DefaultChatTransport, lastAssistantMessageIsCompleteWithApprovalResponses,} from 'ai';import { useState } from 'react';
export default function PaymentChat() { const [input, setInput] = useState(''); const { messages, sendMessage, addToolApprovalResponse } = useChat({ transport: new DefaultChatTransport({ api: '/api/chat' }), // 用户做出审批决定后,自动将结果发送给模型继续处理 sendAutomaticallyWhen: lastAssistantMessageIsCompleteWithApprovalResponses, });
return ( <div className="max-w-2xl mx-auto p-4"> <div className="space-y-4"> {messages.map(m => ( <div key={m.id} className="p-3 rounded-lg"> <strong>{m.role === 'user' ? '你' : 'AI'}:</strong> {m.parts?.map((part, i) => { if (part.type === 'text') { return <p key={i}>{part.text}</p>; }
// 处理支付工具的不同状态 if (part.type === 'tool-processPayment') { switch (part.state) { // 等待用户审批 case 'approval-requested': return ( <div key={part.toolCallId} className="border p-4 rounded-lg my-2"> <p className="font-bold">需要确认支付操作</p> <p>金额:{part.input.amount} 元</p> <p>收款人:{part.input.recipient}</p> {part.input.note && <p>备注:{part.input.note}</p>} <div className="flex gap-2 mt-3"> <button className="px-4 py-2 bg-green-500 text-white rounded" onClick={() => addToolApprovalResponse({ id: part.approval.id, approved: true, }) } > 确认支付 </button> <button className="px-4 py-2 bg-red-500 text-white rounded" onClick={() => addToolApprovalResponse({ id: part.approval.id, approved: false, }) } > 取消 </button> </div> </div> );
// 工具执行完成 case 'output-available': return ( <div key={part.toolCallId} className="bg-green-50 p-3 rounded my-2"> <p>支付成功!交易号:{part.output.transactionId}</p> </div> );
// 用户拒绝执行 case 'output-denied': return ( <div key={part.toolCallId} className="bg-gray-100 p-3 rounded my-2"> <p>支付操作已取消</p> </div> ); } } return null; })} </div> ))} </div>
<form onSubmit={e => { e.preventDefault(); if (input.trim()) { sendMessage({ text: input }); setInput(''); } }} className="mt-4 flex gap-2" > <input value={input} onChange={e => setInput(e.target.value)} placeholder="例如:转账 800 元给张三" className="flex-1 border rounded px-3 py-2" /> <button type="submit" className="px-4 py-2 bg-blue-500 text-white rounded"> 发送 </button> </form> </div> );}工具部件(tool part)的状态流转:
stateDiagram-v2
[*] --> approval_requested: needsApproval 返回 true
approval_requested --> output_available: 用户点击"确认"
approval_requested --> output_denied: 用户点击"取消"
output_available --> [*]: 显示执行结果
output_denied --> [*]: 模型收到拒绝通知
参考来源: AI SDK - Human-in-the-Loop | AI SDK - Tool Calling | AI SDK - Chatbot Tool Usage
toModelOutput - 工具输出转换
当工具返回大量数据时,使用 toModelOutput 控制发送给模型的内容:
import { tool } from 'ai';import { z } from 'zod';
const searchTool = tool({ description: '搜索文档', inputSchema: z.object({ query: z.string(), }), execute: async ({ query }) => { // 返回完整的搜索结果供应用使用 const results = await searchDocuments(query); return { total: results.length, documents: results.map(doc => ({ id: doc.id, title: doc.title, content: doc.content, // 可能很长 metadata: doc.metadata, })), }; }, // 只将摘要发送给模型,节省 Token toModelOutput: ({ output }) => ({ total: output.total, documents: output.documents.map(doc => ({ id: doc.id, title: doc.title, snippet: doc.content.substring(0, 200) + '...', })), }),});toModelOutput 使用场景
| 场景 | 说明 |
|---|---|
| 大文本压缩 | 截取或摘要长文本内容 |
| 敏感信息过滤 | 移除不应发送给模型的数据 |
| 格式转换 | 将二进制数据转换为描述 |
| Token 优化 | 减少上下文长度和成本 |
Structured Output - 结构化输出
结合 Agent Loop 和结构化输出:
import { generateText, Output, tool, stepCountIs } from 'ai';import { z } from 'zod';
const { output, steps } = await generateText({ model: 'anthropic/claude-sonnet-4.5', stopWhen: stepCountIs(10), tools: { fetchData: tool({ description: '获取数据', inputSchema: z.object({ source: z.string() }), execute: async ({ source }) => { // 获取数据逻辑 return { data: '...' }; }, }), }, output: Output.object({ schema: z.object({ summary: z.string().describe('数据摘要'), insights: z.array(z.string()).describe('关键洞察'), recommendations: z.array(z.object({ action: z.string(), priority: z.enum(['high', 'medium', 'low']), })), }), }), prompt: '分析销售数据并提供建议',});
// output 是类型安全的console.log(output.summary);console.log(output.insights);console.log(output.recommendations);错误处理
工具执行错误
import { generateText, tool, stepCountIs } from 'ai';import { z } from 'zod';
const { text, steps } = await generateText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(5), tools: { riskyTool: tool({ description: '可能失败的工具', inputSchema: z.object({ input: z.string() }), execute: async ({ input }) => { if (Math.random() > 0.5) { throw new Error('工具执行失败'); } return { result: 'success' }; }, }), }, prompt: '执行风险操作',});
// 检查工具错误for (const step of steps) { const errors = step.content?.filter( part => part.type === 'tool-error' );
if (errors?.length) { console.log('工具执行错误:', errors); }}API 错误处理
import { generateText, RetryError } from 'ai';
try { const { text } = await generateText({ model: 'openai/gpt-4o', prompt: '你好', });} catch (error) { if (RetryError.isInstance(error)) { console.log('重试失败:', error.reason); console.log('最后一个错误:', error.lastError); console.log('所有错误:', error.errors); } else { console.log('其他错误:', error); }}流式 Agent - streamText
对于需要实时显示的场景,使用 streamText:
import { streamText, tool, stepCountIs } from 'ai';import { z } from 'zod';
const result = streamText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(5), tools: { weather: tool({ description: '获取天气', inputSchema: z.object({ city: z.string() }), execute: async ({ city }) => ({ city, temp: 25 }), }), }, onStepFinish: ({ stepType, toolCalls }) => { if (toolCalls?.length) { console.log('正在调用工具:', toolCalls.map(t => t.toolName)); } }, prompt: '北京天气如何?',});
// 流式输出文本for await (const textPart of result.textStream) { process.stdout.write(textPart);}
// 获取最终结果const finalResult = await result;console.log('\n步骤数:', finalResult.steps.length);实战示例:代码分析 Agent
import { ToolLoopAgent, tool, stepCountIs } from 'ai';import { z } from 'zod';import { readFile } from 'fs/promises';import { glob } from 'glob';
const codeAnalyzer = new ToolLoopAgent({ model: 'anthropic/claude-sonnet-4.5', instructions: `你是一个代码分析专家。分析代码时请:1. 首先了解项目结构2. 阅读关键文件3. 识别潜在问题4. 提供具体改进建议`,
tools: { listFiles: tool({ description: '列出匹配模式的文件', inputSchema: z.object({ pattern: z.string().describe('Glob 模式,如 "src/**/*.ts"'), }), execute: async ({ pattern }) => { const files = await glob(pattern); return { files: files.slice(0, 50) }; // 限制数量 }, }),
readFile: tool({ description: '读取文件内容', inputSchema: z.object({ path: z.string().describe('文件路径'), }), execute: async ({ path }) => { const content = await readFile(path, 'utf-8'); return { path, content }; }, toModelOutput: ({ output }) => ({ path: output.path, // 限制发送给模型的内容长度 content: output.content.length > 5000 ? output.content.substring(0, 5000) + '\n... (truncated)' : output.content, }), }),
searchCode: tool({ description: '在代码中搜索关键词', inputSchema: z.object({ keyword: z.string(), filePattern: z.string().default('**/*.ts'), }), execute: async ({ keyword, filePattern }) => { // 实现搜索逻辑 return { matches: [] }; }, }), },
stopWhen: stepCountIs(15),});
// 使用 Agentconst result = await codeAnalyzer.generate({ prompt: '分析 src 目录下的 TypeScript 代码,找出潜在的性能问题和改进建议',});
console.log(result.text);最佳实践
工具描述技巧
| 做法 | 示例 |
|---|---|
| 说明用途 | ”获取指定城市的实时天气数据” |
| 说明何时使用 | ”当用户询问天气相关问题时使用” |
| 说明输出内容 | ”返回温度、湿度、天气状况” |
| 避免模糊 | 不要:“处理数据”,要:“将 CSV 转换为 JSON” |
常见陷阱
- 无限循环:始终设置
stopWhen限制步数 - Token 爆炸:使用
toModelOutput压缩大输出 - 错误静默失败:监控
tool-error类型的内容 - 审批绕过:敏感操作务必设置
needsApproval
高级主题
记忆与上下文管理
Agent 的核心挑战之一是如何在有限的上下文窗口中保持连贯性。本节介绍主流的记忆管理策略。
记忆类型
上下文压缩策略
当对话超出上下文窗口时,需要压缩策略:
| 策略 | 描述 | 优点 | 缺点 |
|---|---|---|---|
| 滑动窗口 | 保留最近 N 轮对话 | 简单高效 | ”灾难性遗忘” |
| 摘要压缩 | 将旧消息压缩为摘要 | 保留关键信息 | 可能引入幻觉 |
| 混合方案 | 摘要 + 滑动窗口 | 平衡效果 | 实现复杂 |
| 层级摘要 | 多级别渐进压缩 | 信息保留最好 | 计算成本高 |
混合记忆策略
import { generateText, stepCountIs } from 'ai';
interface ContextStrategy { systemPrompt: string; // 始终保留 summary: string; // 历史摘要(每 3-5 轮更新) recentMessages: Message[]; // 最近 5-7 轮完整对话 entities: Entity[]; // 关键实体(跨会话保持)}
// 实现上下文压缩的 prepareStepconst { text } = await generateText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(20), tools: { /* ... */ }, prepareStep: async ({ messages, stepCount }) => { // 每 5 步检查是否需要压缩 if (stepCount > 0 && stepCount % 5 === 0) { const compressedMessages = await compressContext(messages); return { messages: compressedMessages }; } return {}; }, prompt: '执行复杂任务...',});
async function compressContext(messages: Message[]): Promise<Message[]> { const KEEP_RECENT = 6; // 保留最近 6 条
if (messages.length <= KEEP_RECENT + 1) { return messages; // 无需压缩 }
const systemMessage = messages[0]; const oldMessages = messages.slice(1, -KEEP_RECENT); const recentMessages = messages.slice(-KEEP_RECENT);
// 使用小模型生成摘要 const { text: summary } = await generateText({ model: 'openai/gpt-4o-mini', prompt: `请简洁总结以下对话的关键信息:\n${JSON.stringify(oldMessages)}`, });
return [ systemMessage, { role: 'system', content: `[历史摘要]:${summary}` }, ...recentMessages, ];}集成 Mem0 记忆层
Mem0 是一个独立的记忆中间件,可与任何 Agent 框架集成:
import { generateText, tool, stepCountIs } from 'ai';import { MemoryClient } from 'mem0ai';import { z } from 'zod';
const memory = new MemoryClient({ apiKey: process.env.MEM0_API_KEY });
const memoryTools = { saveMemory: tool({ description: '保存重要信息到长期记忆', inputSchema: z.object({ content: z.string().describe('要记住的信息'), tags: z.array(z.string()).optional(), }), execute: async ({ content, tags }) => { await memory.add(content, { user_id: 'user_123', metadata: { tags }, }); return { success: true }; }, }),
recallMemory: tool({ description: '从长期记忆中检索相关信息', inputSchema: z.object({ query: z.string().describe('检索查询'), }), execute: async ({ query }) => { const memories = await memory.search(query, { user_id: 'user_123', limit: 5, }); return { memories }; }, }),};
const { text } = await generateText({ model: 'anthropic/claude-sonnet-4.5', stopWhen: stepCountIs(10), tools: { ...memoryTools, // 其他业务工具... }, system: `你是一个有记忆能力的助手。在对话开始时,先用 recallMemory 检索相关历史。当用户提供重要信息时,用 saveMemory 保存。`, prompt: userMessage,});子代理(Subagent)模式
子代理让你可以将复杂任务分解给专业化的 Agent 处理。
子代理的优势
| 优势 | 说明 |
|---|---|
| 上下文隔离 | 子代理有独立上下文,不污染主对话 |
| 并行执行 | 多个子代理可同时运行 |
| 专业化 | 每个子代理可有专门的指令和工具 |
| 成本优化 | 子代理可使用更小的模型 |
AI SDK 子代理实现
import { generateText, tool, stepCountIs } from 'ai';import { z } from 'zod';
// 定义子代理工具const subagentTools = { researcher: tool({ description: '研究专家 - 搜索和分析信息', inputSchema: z.object({ task: z.string().describe('研究任务'), sources: z.array(z.string()).optional(), }), execute: async ({ task, sources }) => { // 启动研究子代理 const { text } = await generateText({ model: 'openai/gpt-4o-mini', // 使用较小模型 stopWhen: stepCountIs(5), tools: { webSearch: webSearchTool, readDocument: readDocumentTool, }, system: '你是一个研究专家,专注于查找和验证信息。', prompt: task, }); return { findings: text }; }, }),
coder: tool({ description: '编程专家 - 编写和分析代码', inputSchema: z.object({ task: z.string().describe('编程任务'), language: z.string().default('typescript'), }), execute: async ({ task, language }) => { const { text } = await generateText({ model: 'anthropic/claude-sonnet-4.5', stopWhen: stepCountIs(8), tools: { readFile: readFileTool, writeFile: writeFileTool, runTests: runTestsTool, }, system: `你是一个 ${language} 专家,专注于编写高质量代码。`, prompt: task, }); return { result: text }; }, }),
reviewer: tool({ description: '审查专家 - 代码和文档审查', inputSchema: z.object({ content: z.string().describe('待审查内容'), type: z.enum(['code', 'document']), }), execute: async ({ content, type }) => { const { text } = await generateText({ model: 'openai/gpt-4o-mini', system: `你是一个${type === 'code' ? '代码' : '文档'}审查专家。提供具体、可操作的反馈。`, prompt: `请审查以下内容:\n\n${content}`, }); return { review: text }; }, }),};
// 主 Agent 协调子代理const { text } = await generateText({ model: 'anthropic/claude-sonnet-4.5', stopWhen: stepCountIs(15), tools: subagentTools, system: `你是一个项目经理,协调研究员、程序员和审查员完成任务。根据任务性质选择合适的专家,并整合他们的工作成果。`, prompt: '帮我研究 React Server Components 的最佳实践,然后写一个示例组件',});并行子代理执行
import { generateText, tool, stepCountIs } from 'ai';import { z } from 'zod';
const parallelResearch = tool({ description: '并行研究多个主题', inputSchema: z.object({ topics: z.array(z.string()).describe('研究主题列表'), }), execute: async ({ topics }) => { // 并行启动多个子代理 const results = await Promise.all( topics.map(async (topic) => { const { text } = await generateText({ model: 'openai/gpt-4o-mini', stopWhen: stepCountIs(3), tools: { webSearch: webSearchTool }, prompt: `研究:${topic}`, }); return { topic, findings: text }; }) ); return { results }; },});与其他框架对比
Agent 框架全景
flowchart TB
subgraph "轻量级"
A[Vercel AI SDK]
B[Mastra]
end
subgraph "全功能"
C[LangGraph]
D[CrewAI]
E[AutoGen]
end
subgraph "平台化"
F[OpenAI Agents SDK]
G[Claude Agent SDK]
end
subgraph "记忆层"
H[Mem0]
end
H --> A
H --> C
H --> F
H --> G
框架选择指南
| 需求 | 推荐框架 | 原因 |
|---|---|---|
| TypeScript 优先 + 轻量 | AI SDK | 原生 TS,零配置 |
| 复杂工作流 + 状态机 | LangGraph | 图结构,checkpointer |
| 团队角色 + 委派 | CrewAI | 内置层级管理 |
| 快速原型 + UI | OpenAI Agents SDK | 可视化画布 |
| 企业安全 + 长上下文 | Claude Agent SDK | 100K 上下文,MCP |
| 跨框架记忆 | Mem0 | 即插即用 |
LangGraph 对比
LangGraph 使用图结构和 checkpointer 实现状态持久化:
// LangGraph 方式(Python 风格伪代码)const graph = new StateGraph({ channels: { messages: [], memory: {} }});
graph.addNode('agent', agentNode);graph.addNode('tools', toolNode);graph.addEdge('agent', 'tools');graph.addConditionalEdge('tools', shouldContinue);
// 编译时添加 checkpointerconst app = graph.compile({ checkpointer: new SqliteSaver('agent.db')});
// 通过 thread_id 隔离会话await app.invoke(input, { configurable: { thread_id: 'user_123' }});// AI SDK 等效实现import { generateText, stepCountIs } from 'ai';
// 使用 Map 模拟简单的会话存储const sessions = new Map<string, Message[]>();
async function agentWithMemory(threadId: string, prompt: string) { const history = sessions.get(threadId) || [];
const { text, steps } = await generateText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(10), messages: [ ...history, { role: 'user', content: prompt }, ], tools: { /* ... */ }, });
// 保存新的消息历史 const newMessages = steps.flatMap(s => s.messages || []); sessions.set(threadId, [...history, ...newMessages]);
return text;}CrewAI 对比
CrewAI 强调角色扮演和团队协作:
# CrewAI 方式(Python)from crewai import Agent, Task, Crew
researcher = Agent( role='研究员', goal='找到最新的技术趋势', backstory='你是一位资深技术分析师...', tools=[search_tool])
writer = Agent( role='作家', goal='撰写引人入胜的文章', backstory='你是一位技术博主...',)
crew = Crew( agents=[researcher, writer], tasks=[research_task, writing_task], process=Process.hierarchical, # 层级管理 memory=True # 启用记忆)// AI SDK 等效实现import { generateText, tool, stepCountIs } from 'ai';import { z } from 'zod';
// 定义专家子代理const experts = { researcher: { model: 'openai/gpt-4o', system: `角色:研究员目标:找到最新的技术趋势背景:你是一位资深技术分析师,擅长发现新兴技术。`, tools: { webSearch: searchTool }, }, writer: { model: 'anthropic/claude-sonnet-4.5', system: `角色:作家目标:撰写引人入胜的文章背景:你是一位技术博主,文章深入浅出。`, tools: {}, },};
// 管理者 Agentconst { text } = await generateText({ model: 'anthropic/claude-sonnet-4.5', stopWhen: stepCountIs(15), tools: { delegateToExpert: tool({ description: '委派任务给专家', inputSchema: z.object({ expert: z.enum(['researcher', 'writer']), task: z.string(), }), execute: async ({ expert, task }) => { const config = experts[expert]; const { text } = await generateText({ model: config.model, system: config.system, tools: config.tools, stopWhen: stepCountIs(5), prompt: task, }); return { expert, result: text }; }, }), }, system: '你是项目经理,协调研究员和作家完成任务。', prompt: '研究 AI Agent 的发展趋势,然后写一篇博客文章',});生产环境最佳实践
可观测性
import { generateText, stepCountIs } from 'ai';
const { text } = await generateText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(10), tools: { /* ... */ },
// 步骤监控 onStepFinish: async ({ stepType, toolCalls, usage, stepCount }) => { // 发送到监控系统 await metrics.record({ type: 'agent_step', stepType, stepCount, toolsCalled: toolCalls?.map(t => t.toolName), tokens: usage, timestamp: Date.now(), });
// 成本追踪 const cost = calculateCost(usage); await billing.track(cost); },
prompt: '执行任务...',});错误恢复与重试
import { generateText, stepCountIs, RetryError } from 'ai';
async function resilientAgent(prompt: string, maxRetries = 3) { let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) { try { const { text, steps } = await generateText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(10), tools: { /* ... */ }, prompt, });
return { text, steps, attempt }; } catch (error) { lastError = error as Error;
// 区分错误类型 if (RetryError.isInstance(error)) { // API 错误,可能是限流 const delay = Math.pow(2, attempt) * 1000; // 指数退避 await sleep(delay); continue; }
// 其他错误直接抛出 throw error; } }
throw lastError;}成本控制
import { generateText, stepCountIs } from 'ai';
const MAX_TOKENS_PER_REQUEST = 50000;let totalTokens = 0;
const { text } = await generateText({ model: 'openai/gpt-4o', stopWhen: stepCountIs(10), tools: { /* ... */ },
prepareStep: async ({ stepCount }) => { // 检查是否超出预算 if (totalTokens > MAX_TOKENS_PER_REQUEST) { // 强制停止或降级到更小的模型 return { model: 'openai/gpt-4o-mini', system: '请简洁回答,我们接近 token 限制。', }; } return {}; },
onStepFinish: async ({ usage }) => { totalTokens += usage.totalTokens; },
prompt: '执行任务...',});创新架构方案
基于对各框架的深度分析,以下是几个创新的组合方案,从简单到复杂,涵盖不同场景需求。
方案对比总览
方案一:事件溯源 Agent
核心思想:将 Agent 的所有状态变化记录为不可变事件序列,支持回放、回滚和审计。
适用场景:需要审计追踪、调试回放、或支持”撤销”操作的复杂 Agent。
import { generateText, tool, stepCountIs } from 'ai';import { z } from 'zod';
// 事件类型定义type AgentEvent = | { type: 'TASK_STARTED'; payload: { taskId: string; prompt: string; timestamp: number } } | { type: 'THOUGHT'; payload: { thought: string; stepIndex: number } } | { type: 'TOOL_CALLED'; payload: { toolName: string; input: unknown; stepIndex: number } } | { type: 'TOOL_RESULT'; payload: { toolName: string; output: unknown; stepIndex: number } } | { type: 'REFLECTION'; payload: { critique: string; shouldRetry: boolean } } | { type: 'TASK_COMPLETED'; payload: { result: string; totalSteps: number } } | { type: 'TASK_FAILED'; payload: { error: string; lastStep: number } } | { type: 'CHECKPOINT'; payload: { stateSnapshot: AgentState } };
interface AgentState { taskId: string; prompt: string; currentStep: number; toolResults: Map<string, unknown>; reflections: string[]; status: 'running' | 'completed' | 'failed' | 'paused';}
class EventSourcedAgent { private events: AgentEvent[] = []; private state: AgentState | null = null; private subscribers: ((event: AgentEvent) => void)[] = [];
// 追加事件 (不可变) private append(event: AgentEvent) { this.events.push(Object.freeze(event)); this.subscribers.forEach(fn => fn(event)); }
// 从事件重建状态 rebuildState(upToIndex?: number): AgentState { const events = upToIndex !== undefined ? this.events.slice(0, upToIndex + 1) : this.events;
return events.reduce((state, event) => { switch (event.type) { case 'TASK_STARTED': return { taskId: event.payload.taskId, prompt: event.payload.prompt, currentStep: 0, toolResults: new Map(), reflections: [], status: 'running' as const, }; case 'TOOL_RESULT': state.toolResults.set( `${event.payload.toolName}_${event.payload.stepIndex}`, event.payload.output ); state.currentStep = event.payload.stepIndex; return state; case 'REFLECTION': state.reflections.push(event.payload.critique); return state; case 'TASK_COMPLETED': state.status = 'completed'; return state; case 'TASK_FAILED': state.status = 'failed'; return state; default: return state; } }, {} as AgentState); }
// 回滚到指定事件 rollbackTo(eventIndex: number): AgentState { this.state = this.rebuildState(eventIndex); return this.state; }
// 获取事件日志 (用于审计) getAuditLog(): ReadonlyArray<AgentEvent> { return Object.freeze([...this.events]); }
// 订阅事件流 subscribe(callback: (event: AgentEvent) => void) { this.subscribers.push(callback); return () => { this.subscribers = this.subscribers.filter(fn => fn !== callback); }; }
async run(prompt: string): Promise<string> { const taskId = crypto.randomUUID();
this.append({ type: 'TASK_STARTED', payload: { taskId, prompt, timestamp: Date.now() }, });
try { const { text, steps } = await generateText({ model: 'anthropic/claude-sonnet-4.5', stopWhen: stepCountIs(15), tools: { search: tool({ description: '搜索信息', inputSchema: z.object({ query: z.string() }), execute: async ({ query }) => { const result = { data: `Results for: ${query}` }; return result; }, }), }, onStepFinish: async ({ stepType, toolCalls, toolResults, text: stepText }) => { const stepIndex = steps?.length || 0;
if (stepText) { this.append({ type: 'THOUGHT', payload: { thought: stepText, stepIndex }, }); }
toolCalls?.forEach((tc, i) => { this.append({ type: 'TOOL_CALLED', payload: { toolName: tc.toolName, input: tc.args, stepIndex }, });
if (toolResults?.[i]) { this.append({ type: 'TOOL_RESULT', payload: { toolName: tc.toolName, output: toolResults[i].result, stepIndex, }, }); } }); }, prompt, });
this.append({ type: 'TASK_COMPLETED', payload: { result: text, totalSteps: steps.length }, });
return text; } catch (error) { this.append({ type: 'TASK_FAILED', payload: { error: error instanceof Error ? error.message : 'Unknown error', lastStep: this.rebuildState().currentStep, }, }); throw error; } }}
// 使用示例const agent = new EventSourcedAgent();
// 订阅事件流 (实时监控)agent.subscribe((event) => { console.log(`[${event.type}]`, event.payload);});
// 执行任务const result = await agent.run('研究量子计算的最新进展');
// 获取完整审计日志const auditLog = agent.getAuditLog();console.log('审计日志:', auditLog);
// 时间旅行:回滚到第 5 个事件const pastState = agent.rollbackTo(5);console.log('历史状态:', pastState);方案二:双循环反思 Agent
核心思想:外循环负责高层规划(Plan-and-Execute),内循环负责执行和反思(ReAct + Reflection)。
适用场景:复杂多步骤任务,需要战略规划 + 战术执行 + 持续改进。
import { generateText, tool, stepCountIs, Output } from 'ai';import { z } from 'zod';
// 计划步骤定义const PlanStepSchema = z.object({ id: z.string(), description: z.string(), dependencies: z.array(z.string()).default([]), status: z.enum(['pending', 'running', 'completed', 'failed']).default('pending'), result: z.string().optional(),});
type PlanStep = z.infer<typeof PlanStepSchema>;
interface DualLoopConfig { maxPlanningIterations: number; maxExecutionSteps: number; maxReflectionCycles: number; plannerModel: string; executorModel: string; reflectorModel: string;}
class DualLoopAgent { private config: DualLoopConfig; private plan: PlanStep[] = []; private executionHistory: Array<{ step: PlanStep; attempts: number; reflections: string[] }> = [];
constructor(config: Partial<DualLoopConfig> = {}) { this.config = { maxPlanningIterations: 3, maxExecutionSteps: 10, maxReflectionCycles: 2, plannerModel: 'anthropic/claude-sonnet-4.5', executorModel: 'openai/gpt-4o', reflectorModel: 'openai/gpt-4o-mini', ...config, }; }
// ========== 外循环: 规划器 ========== private async planPhase(task: string): Promise<PlanStep[]> { const { output } = await generateText({ model: this.config.plannerModel, output: Output.object({ schema: z.object({ plan: z.array(PlanStepSchema), reasoning: z.string(), }), }), system: `你是一个战略规划专家。分解复杂任务为可执行的步骤。规则:1. 每个步骤应该是原子性的、可验证的2. 明确步骤间的依赖关系3. 考虑可能的失败点和备选方案`, prompt: `任务: ${task}\n\n请生成执行计划。`, });
return output.plan; }
// 动态重规划 private async replanPhase( originalTask: string, currentPlan: PlanStep[], failedStep: PlanStep, reflection: string ): Promise<PlanStep[]> { const { output } = await generateText({ model: this.config.plannerModel, output: Output.object({ schema: z.object({ updatedPlan: z.array(PlanStepSchema), changes: z.string(), }), }), system: '你是一个自适应规划专家。根据执行反馈调整计划。', prompt: `原始任务: ${originalTask}当前计划: ${JSON.stringify(currentPlan, null, 2)}失败步骤: ${JSON.stringify(failedStep)}反思: ${reflection}
请调整计划以解决问题。`, });
return output.updatedPlan; }
// ========== 内循环: 执行器 (ReAct) ========== private async executeStep(step: PlanStep, context: string): Promise<{ success: boolean; result: string; toolsUsed: string[]; }> { const toolsUsed: string[] = [];
const { text, steps } = await generateText({ model: this.config.executorModel, stopWhen: stepCountIs(this.config.maxExecutionSteps), tools: { search: tool({ description: '搜索网络信息', inputSchema: z.object({ query: z.string() }), execute: async ({ query }) => { toolsUsed.push('search'); return { results: `Search results for: ${query}` }; }, }), calculate: tool({ description: '执行计算', inputSchema: z.object({ expression: z.string() }), execute: async ({ expression }) => { toolsUsed.push('calculate'); try { // 安全的表达式求值 const result = Function(`"use strict"; return (${expression})`)(); return { result }; } catch { return { error: 'Invalid expression' }; } }, }), writeCode: tool({ description: '编写代码', inputSchema: z.object({ language: z.string(), task: z.string(), }), execute: async ({ language, task }) => { toolsUsed.push('writeCode'); // 调用代码生成子代理 const { text: code } = await generateText({ model: 'anthropic/claude-sonnet-4.5', system: `你是 ${language} 专家,只输出代码,不要解释。`, prompt: task, }); return { code }; }, }), }, system: `你是一个精确的任务执行者。使用 ReAct 模式:1. Thought: 思考下一步行动2. Action: 选择并执行工具3. Observation: 观察结果4. 重复直到完成
当前上下文: ${context}`, prompt: `执行步骤: ${step.description}`, });
return { success: !text.includes('失败') && !text.includes('无法'), result: text, toolsUsed, }; }
// ========== 反思循环 ========== private async reflect( step: PlanStep, executionResult: string, toolsUsed: string[] ): Promise<{ critique: string; shouldRetry: boolean; suggestions: string[]; }> { const { output } = await generateText({ model: this.config.reflectorModel, output: Output.object({ schema: z.object({ critique: z.string().describe('对执行结果的批评性分析'), quality: z.number().min(0).max(10).describe('质量评分 0-10'), shouldRetry: z.boolean().describe('是否需要重试'), suggestions: z.array(z.string()).describe('改进建议'), }), }), system: `你是一个严格的质量审查员。评估任务执行的质量。评估标准:- 是否完成了目标?- 结果是否准确?- 是否有遗漏或错误?- 工具使用是否合理?`, prompt: `步骤目标: ${step.description}执行结果: ${executionResult}使用工具: ${toolsUsed.join(', ')}
请进行批评性评估。`, });
return { critique: output.critique, shouldRetry: output.shouldRetry && output.quality < 7, suggestions: output.suggestions, }; }
// ========== 主循环 ========== async run(task: string): Promise<{ result: string; plan: PlanStep[]; reflections: string[]; }> { const allReflections: string[] = [];
// 1. 初始规划 this.plan = await this.planPhase(task); console.log('初始计划:', this.plan.map(s => s.description));
// 2. 按序执行 let context = ''; let planningIterations = 0;
for (let i = 0; i < this.plan.length; i++) { const step = this.plan[i]; step.status = 'running';
let attempts = 0; let success = false; let lastResult = '';
// 内循环: 执行 + 反思 while (!success && attempts < this.config.maxReflectionCycles) { attempts++;
// 执行 const execution = await this.executeStep(step, context); lastResult = execution.result;
// 反思 const reflection = await this.reflect(step, execution.result, execution.toolsUsed); allReflections.push(reflection.critique);
if (reflection.shouldRetry) { console.log(`步骤 ${step.id} 需要重试: ${reflection.critique}`); // 将改进建议注入上下文 context += `\n[改进建议]: ${reflection.suggestions.join('; ')}`; } else { success = true; } }
if (success) { step.status = 'completed'; step.result = lastResult; context += `\n[${step.id}完成]: ${lastResult}`; } else { step.status = 'failed';
// 触发重规划 if (planningIterations < this.config.maxPlanningIterations) { planningIterations++; console.log('触发重规划...'); this.plan = await this.replanPhase( task, this.plan, step, allReflections[allReflections.length - 1] ); i = -1; // 重新开始执行 } else { throw new Error(`步骤 ${step.id} 失败,已达最大重规划次数`); } } }
// 3. 生成最终结果 const { text: finalResult } = await generateText({ model: this.config.plannerModel, system: '整合所有步骤结果,生成最终回答。', prompt: `任务: ${task}执行结果:${this.plan.map(s => `- ${s.description}: ${s.result}`).join('\n')}
请生成最终综合回答。`, });
return { result: finalResult, plan: this.plan, reflections: allReflections, }; }}
// 使用示例const agent = new DualLoopAgent({ maxReflectionCycles: 3, plannerModel: 'anthropic/claude-sonnet-4.5',});
const { result, plan, reflections } = await agent.run( '分析 React 19 的新特性,写一篇技术博客,并生成示例代码');方案三:分层记忆 Agent
核心思想:模拟人类记忆系统,实现工作记忆、情景记忆、语义记忆的分层管理。
适用场景:长期交互、个性化助手、需要”学习”能力的 Agent。
import { generateText, tool, stepCountIs } from 'ai';import { z } from 'zod';
// ========== 记忆层接口 ==========interface MemoryEntry { id: string; content: string; embedding?: number[]; timestamp: number; accessCount: number; importance: number; metadata: Record<string, unknown>;}
interface MemoryLayer { add(entry: Omit<MemoryEntry, 'id' | 'timestamp' | 'accessCount'>): Promise<string>; search(query: string, limit?: number): Promise<MemoryEntry[]>; get(id: string): Promise<MemoryEntry | null>; decay(): Promise<void>; // 记忆衰减}
// L1:工作记忆 - 当前对话上下文(快速,有限容量)class WorkingMemory implements MemoryLayer { private buffer: MemoryEntry[] = []; private maxSize = 10;
async add(entry: Omit<MemoryEntry, 'id' | 'timestamp' | 'accessCount'>) { const id = crypto.randomUUID(); this.buffer.push({ ...entry, id, timestamp: Date.now(), accessCount: 1, });
// FIFO 淘汰 if (this.buffer.length > this.maxSize) { this.buffer.shift(); }
return id; }
async search(query: string, limit = 5) { // 简单的关键词匹配 (生产环境应使用向量相似度) return this.buffer .filter(e => e.content.toLowerCase().includes(query.toLowerCase())) .slice(-limit); }
async get(id: string) { const entry = this.buffer.find(e => e.id === id); if (entry) entry.accessCount++; return entry || null; }
async decay() { // 工作记忆快速衰减 this.buffer = this.buffer.filter(e => Date.now() - e.timestamp < 5 * 60 * 1000 // 5 分钟 ); }
getAll(): MemoryEntry[] { return [...this.buffer]; }}
// L2: 情景记忆 - 具体事件和经历 (中速, 中等容量)class EpisodicMemory implements MemoryLayer { private episodes: MemoryEntry[] = []; private maxSize = 100;
async add(entry: Omit<MemoryEntry, 'id' | 'timestamp' | 'accessCount'>) { const id = crypto.randomUUID(); this.episodes.push({ ...entry, id, timestamp: Date.now(), accessCount: 1, });
// 重要性淘汰 if (this.episodes.length > this.maxSize) { this.episodes.sort((a, b) => b.importance - a.importance); this.episodes = this.episodes.slice(0, this.maxSize); }
return id; }
async search(query: string, limit = 10) { // 结合时间和重要性排序 return this.episodes .filter(e => e.content.toLowerCase().includes(query.toLowerCase())) .sort((a, b) => { const recencyA = 1 / (Date.now() - a.timestamp + 1); const recencyB = 1 / (Date.now() - b.timestamp + 1); return (b.importance * recencyB) - (a.importance * recencyA); }) .slice(0, limit); }
async get(id: string) { const entry = this.episodes.find(e => e.id === id); if (entry) entry.accessCount++; return entry || null; }
async decay() { // 情景记忆缓慢衰减,但高重要性记忆保留 this.episodes = this.episodes.map(e => ({ ...e, importance: e.importance * 0.99, // 衰减因子 })).filter(e => e.importance > 0.1); }}
// L3: 语义记忆 - 抽象知识和规则 (持久化)class SemanticMemory implements MemoryLayer { private knowledge: Map<string, MemoryEntry> = new Map();
async add(entry: Omit<MemoryEntry, 'id' | 'timestamp' | 'accessCount'>) { const id = crypto.randomUUID(); this.knowledge.set(id, { ...entry, id, timestamp: Date.now(), accessCount: 1, }); return id; }
async search(query: string, limit = 5) { return Array.from(this.knowledge.values()) .filter(e => e.content.toLowerCase().includes(query.toLowerCase())) .sort((a, b) => b.accessCount - a.accessCount) .slice(0, limit); }
async get(id: string) { const entry = this.knowledge.get(id); if (entry) entry.accessCount++; return entry || null; }
async decay() { // 语义记忆几乎不衰减 }}
// ========== 记忆控制器 ==========class MemoryController { private working: WorkingMemory; private episodic: EpisodicMemory; private semantic: SemanticMemory;
constructor() { this.working = new WorkingMemory(); this.episodic = new EpisodicMemory(); this.semantic = new SemanticMemory(); }
// 智能路由: 决定存储到哪一层 async store(content: string, metadata: Record<string, unknown> = {}) { const importance = await this.evaluateImportance(content);
// 始终存入工作记忆 await this.working.add({ content, importance, metadata });
// 高重要性存入情景记忆 if (importance > 0.5) { await this.episodic.add({ content, importance, metadata }); }
// 如果是知识性内容,存入语义记忆 if (metadata.type === 'knowledge' || importance > 0.8) { await this.semantic.add({ content, importance, metadata }); } }
// 统一检索: 从所有层检索并合并 async retrieve(query: string, limit = 10): Promise<{ working: MemoryEntry[]; episodic: MemoryEntry[]; semantic: MemoryEntry[]; }> { const [working, episodic, semantic] = await Promise.all([ this.working.search(query, limit), this.episodic.search(query, limit), this.semantic.search(query, limit), ]);
return { working, episodic, semantic }; }
// 记忆巩固: 将重要的工作记忆转化为长期记忆 async consolidate() { const workingEntries = this.working.getAll();
for (const entry of workingEntries) { if (entry.accessCount > 3 && entry.importance > 0.6) { // 被多次访问的重要记忆,巩固到情景记忆 await this.episodic.add({ content: entry.content, importance: entry.importance * 1.2, // 提升重要性 metadata: { ...entry.metadata, consolidated: true }, }); } } }
// 记忆衰减 async decay() { await Promise.all([ this.working.decay(), this.episodic.decay(), this.semantic.decay(), ]); }
private async evaluateImportance(content: string): Promise<number> { const { output } = await generateText({ model: 'openai/gpt-4o-mini', output: Output.object({ schema: z.object({ importance: z.number().min(0).max(1), reasoning: z.string(), }), }), prompt: `评估以下信息的重要性 (0-1):"${content}"
考虑:- 是否包含关键事实?- 是否是用户偏好?- 是否影响后续交互?`, });
return output.importance; }}
// ========== 分层记忆 Agent ==========class HierarchicalMemoryAgent { private memory: MemoryController;
constructor() { this.memory = new MemoryController(); }
async chat(userMessage: string): Promise<string> { // 1. 检索相关记忆 const memories = await this.memory.retrieve(userMessage);
// 2. 构建上下文 const contextParts: string[] = [];
if (memories.semantic.length > 0) { contextParts.push( '[长期知识]\n' + memories.semantic.map(m => m.content).join('\n') ); }
if (memories.episodic.length > 0) { contextParts.push( '[相关经历]\n' + memories.episodic.map(m => m.content).join('\n') ); }
if (memories.working.length > 0) { contextParts.push( '[当前对话]\n' + memories.working.map(m => m.content).join('\n') ); }
// 3. 生成回复 const { text } = await generateText({ model: 'anthropic/claude-sonnet-4.5', system: `你是一个有记忆能力的智能助手。使用提供的记忆上下文来个性化回复。
${contextParts.join('\n\n')}`, prompt: userMessage, });
// 4. 存储新的交互 await this.memory.store(`用户: ${userMessage}`, { type: 'interaction' }); await this.memory.store(`助手: ${text}`, { type: 'interaction' });
// 5. 定期巩固和衰减 await this.memory.consolidate(); await this.memory.decay();
return text; }
// 主动学习: 将重要信息存入语义记忆 async learn(knowledge: string) { await this.memory.store(knowledge, { type: 'knowledge' }); }}
// 使用示例const agent = new HierarchicalMemoryAgent();
// 教会 Agent 一些知识await agent.learn('用户偏好: 喜欢简洁的代码风格');await agent.learn('技术栈: TypeScript, React, Node.js');
// 多轮对话console.log(await agent.chat('你好,我想学习 React Hooks'));console.log(await agent.chat('useState 怎么用?'));console.log(await agent.chat('还记得我的技术栈吗?')); // 应该能回忆起方案四:Actor 模型 Agent
核心思想:使用 Actor 模型实现高并发、容错的分布式 Agent 系统。
适用场景:高并发场景、需要故障隔离、分布式部署的 Agent 集群。
// actor-agent.ts(Node.js 实现 Actor 模型核心概念)
// ========== Actor 系统基础 ==========type Message = { type: string; payload: unknown; sender?: ActorRef };type ActorRef = { id: string; send: (msg: Message) => void };
class Actor { readonly id: string; private mailbox: Message[] = []; private processing = false; private supervisor?: ActorRef; private children: Map<string, Actor> = new Map();
constructor( id: string, private behavior: (msg: Message, ctx: ActorContext) => Promise<void> ) { this.id = id; }
ref(): ActorRef { return { id: this.id, send: (msg) => this.receive(msg), }; }
setSupervisor(supervisor: ActorRef) { this.supervisor = supervisor; }
private receive(msg: Message) { this.mailbox.push(msg); this.processMailbox(); }
private async processMailbox() { if (this.processing || this.mailbox.length === 0) return;
this.processing = true; const msg = this.mailbox.shift()!;
try { await this.behavior(msg, { self: this.ref(), spawn: (id, behavior) => this.spawnChild(id, behavior), children: this.children, supervisor: this.supervisor, }); } catch (error) { // "Let it crash" - 通知 supervisor this.supervisor?.send({ type: 'CHILD_FAILED', payload: { childId: this.id, error }, }); }
this.processing = false; this.processMailbox(); }
private spawnChild(id: string, behavior: Actor['behavior']): ActorRef { const child = new Actor(`${this.id}/${id}`, behavior); child.setSupervisor(this.ref()); this.children.set(id, child); return child.ref(); }}
interface ActorContext { self: ActorRef; spawn: (id: string, behavior: Actor['behavior']) => ActorRef; children: Map<string, Actor>; supervisor?: ActorRef;}
// ========== Agent Actors ==========import { generateText, tool, stepCountIs } from 'ai';import { z } from 'zod';
// Supervisor Actor - 管理和监督子 Agentfunction createSupervisorBehavior(taskQueue: string[]): Actor['behavior'] { return async (msg, ctx) => { switch (msg.type) { case 'START': { console.log('[Supervisor] 启动任务分发');
// 为每个任务 spawn 一个 worker for (let i = 0; i < Math.min(taskQueue.length, 5); i++) { const task = taskQueue.shift(); if (task) { const worker = ctx.spawn(`worker-${i}`, createWorkerBehavior()); worker.send({ type: 'EXECUTE', payload: { task }, sender: ctx.self }); } } break; }
case 'TASK_COMPLETED': { const { workerId, result } = msg.payload as any; console.log(`[Supervisor] Worker ${workerId} 完成: ${result.substring(0, 50)}...`);
// 分配新任务 if (taskQueue.length > 0) { const task = taskQueue.shift(); msg.sender?.send({ type: 'EXECUTE', payload: { task }, sender: ctx.self }); } break; }
case 'CHILD_FAILED': { const { childId, error } = msg.payload as any; console.log(`[Supervisor] Worker ${childId} 失败: ${error.message}`);
// 监督策略: 重启 worker const [, workerId] = childId.split('/'); const worker = ctx.spawn(workerId, createWorkerBehavior());
// 重新分配任务 if (taskQueue.length > 0) { const task = taskQueue.shift(); worker.send({ type: 'EXECUTE', payload: { task }, sender: ctx.self }); } break; } } };}
// Worker Actor - 执行具体任务的 Agentfunction createWorkerBehavior(): Actor['behavior'] { return async (msg, ctx) => { switch (msg.type) { case 'EXECUTE': { const { task } = msg.payload as { task: string }; console.log(`[Worker ${ctx.self.id}] 开始执行: ${task}`);
// 使用 AI SDK 执行任务 const { text } = await generateText({ model: 'openai/gpt-4o-mini', stopWhen: stepCountIs(5), tools: { search: tool({ description: '搜索信息', inputSchema: z.object({ query: z.string() }), execute: async ({ query }) => ({ result: `Results for ${query}` }), }), }, prompt: task, });
// 报告完成 msg.sender?.send({ type: 'TASK_COMPLETED', payload: { workerId: ctx.self.id, result: text }, sender: ctx.self, }); break; } } };}
// 使用示例const tasks = [ '研究 TypeScript 5.0 新特性', '分析 React 19 的并发特性', '比较 Bun 和 Node.js 性能', '调研 WebAssembly 最新进展', '总结 AI Agent 框架对比',];
const supervisor = new Actor('supervisor', createSupervisorBehavior([...tasks]));supervisor.ref().send({ type: 'START', payload: {} });方案五:MCTS 树搜索 Agent
核心思想:使用蒙特卡洛树搜索探索多个推理路径,选择最优解。
适用场景:复杂推理、需要探索多种可能性的问题求解。
import { generateText, Output } from 'ai';import { z } from 'zod';
interface MCTSNode { id: string; state: string; // 当前状态描述 action: string | null; // 导致此状态的动作 parent: MCTSNode | null; children: MCTSNode[]; visits: number; totalReward: number; isTerminal: boolean;}
interface MCTSConfig { explorationConstant: number; // UCB1 探索常数 maxIterations: number; maxDepth: number; simulationModel: string; evaluationModel: string;}
class MCTSAgent { private config: MCTSConfig; private root: MCTSNode | null = null;
constructor(config: Partial<MCTSConfig> = {}) { this.config = { explorationConstant: 1.41, // sqrt(2) maxIterations: 50, maxDepth: 10, simulationModel: 'openai/gpt-4o-mini', evaluationModel: 'openai/gpt-4o', ...config, }; }
// UCB1 选择公式 private ucb1(node: MCTSNode, parentVisits: number): number { if (node.visits === 0) return Infinity;
const exploitation = node.totalReward / node.visits; const exploration = this.config.explorationConstant * Math.sqrt(Math.log(parentVisits) / node.visits);
return exploitation + exploration; }
// 选择: 从根节点选择最有希望的叶节点 private select(node: MCTSNode): MCTSNode { while (node.children.length > 0 && !node.isTerminal) { node = node.children.reduce((best, child) => this.ucb1(child, node.visits) > this.ucb1(best, node.visits) ? child : best ); } return node; }
// 扩展: 生成可能的下一步动作 private async expand(node: MCTSNode, task: string): Promise<MCTSNode[]> { if (node.isTerminal) return [];
const { output } = await generateText({ model: this.config.simulationModel, output: Output.object({ schema: z.object({ actions: z.array(z.object({ description: z.string(), reasoning: z.string(), })).max(4), }), }), prompt: `任务: ${task}当前状态: ${node.state}历史路径: ${this.getPathToRoot(node).map(n => n.action).filter(Boolean).join(' -> ')}
生成 2-4 个可能的下一步动作。每个动作应该是不同的方向。`, });
const children: MCTSNode[] = output.actions.map((action, i) => ({ id: `${node.id}-${i}`, state: `${node.state}\n执行: ${action.description}`, action: action.description, parent: node, children: [], visits: 0, totalReward: 0, isTerminal: false, }));
node.children = children; return children; }
// 模拟: 快速评估一个路径的潜在价值 private async simulate(node: MCTSNode, task: string): Promise<number> { // 使用 LLM 评估当前状态的质量 const { output } = await generateText({ model: this.config.evaluationModel, output: Output.object({ schema: z.object({ score: z.number().min(0).max(1), isComplete: z.boolean(), critique: z.string(), }), }), prompt: `任务: ${task}当前推理路径:${this.getPathToRoot(node).map(n => n.action).filter(Boolean).join('\n-> ')}
当前状态: ${node.state}
评估:1. 这个推理路径有多接近解决任务? (0-1 分)2. 任务是否已完成?3. 简要批评这个路径的优缺点`, });
if (output.isComplete) { node.isTerminal = true; }
return output.score; }
// 回溯: 更新路径上所有节点的统计 private backpropagate(node: MCTSNode, reward: number) { while (node) { node.visits++; node.totalReward += reward; node = node.parent!; } }
private getPathToRoot(node: MCTSNode): MCTSNode[] { const path: MCTSNode[] = []; while (node) { path.unshift(node); node = node.parent!; } return path; }
// 主搜索循环 async search(task: string): Promise<{ bestPath: string[]; finalState: string; iterations: number; }> { // 初始化根节点 this.root = { id: 'root', state: `任务: ${task}\n初始状态: 开始分析问题`, action: null, parent: null, children: [], visits: 0, totalReward: 0, isTerminal: false, };
for (let i = 0; i < this.config.maxIterations; i++) { // 1. 选择 const selected = this.select(this.root);
// 2. 扩展 let nodeToSimulate: MCTSNode; if (selected.visits === 0 || selected.isTerminal) { nodeToSimulate = selected; } else { const children = await this.expand(selected, task); nodeToSimulate = children[0] || selected; }
// 3. 模拟 const reward = await this.simulate(nodeToSimulate, task);
// 4. 回溯 this.backpropagate(nodeToSimulate, reward);
// 早停: 找到完成的解 if (nodeToSimulate.isTerminal && reward > 0.9) { console.log(`迭代 ${i}: 找到高质量解`); break; }
if (i % 10 === 0) { console.log(`迭代 ${i}: 最佳路径分数 ${this.getBestPathScore()}`); } }
// 返回最佳路径 const bestPath = this.extractBestPath(); return { bestPath: bestPath.map(n => n.action).filter(Boolean) as string[], finalState: bestPath[bestPath.length - 1].state, iterations: this.config.maxIterations, }; }
private getBestPathScore(): number { if (!this.root) return 0; let node = this.root; while (node.children.length > 0) { node = node.children.reduce((best, child) => child.visits > 0 && child.totalReward / child.visits > (best.visits > 0 ? best.totalReward / best.visits : 0) ? child : best ); } return node.visits > 0 ? node.totalReward / node.visits : 0; }
private extractBestPath(): MCTSNode[] { if (!this.root) return [];
const path: MCTSNode[] = [this.root]; let current = this.root;
while (current.children.length > 0) { // 选择平均奖励最高的子节点 current = current.children.reduce((best, child) => { const childAvg = child.visits > 0 ? child.totalReward / child.visits : 0; const bestAvg = best.visits > 0 ? best.totalReward / best.visits : 0; return childAvg > bestAvg ? child : best; }); path.push(current); }
return path; }}
// 使用示例const agent = new MCTSAgent({ maxIterations: 30, explorationConstant: 1.5,});
const result = await agent.search( '设计一个高可用的微服务架构,需要支持 10 万 QPS,99.9% 可用性');
console.log('最佳推理路径:');result.bestPath.forEach((step, i) => { console.log(`${i + 1}. ${step}`);});console.log('\n最终状态:', result.finalState);方案选择指南
| 方案 | 复杂度 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|---|
| 事件溯源 | 中 | 审计、调试、可回滚 | 完整历史、时间旅行 | 存储开销大 |
| 双循环反思 | 高 | 复杂任务、需要改进 | 自我纠错、高质量 | 延迟高、成本高 |
| 分层记忆 | 高 | 长期交互、个性化 | 记忆持久、智能检索 | 实现复杂 |
| Actor 模型 | 中 | 高并发、分布式 | 容错、可扩展 | 调试困难 |
| MCTS 树搜索 | 高 | 复杂推理、探索问题 | 最优解、多路径 | 计算密集 |
组合建议
- 生产级助手:分层记忆 + 双循环反思
- 审计合规场景:事件溯源 + 双循环反思
- 高并发服务:Actor 模型 + 事件溯源
- 复杂推理任务:MCTS + 分层记忆
- 通用场景:双循环反思(平衡复杂度和效果)
前沿研究(2025-2026)
以下是 Agent 领域最新的前沿研究成果,涵盖记忆、推理、工具学习、多代理协作等核心方向。
研究方向全景
核心论文速览
记忆系统
| 论文 | 核心贡献 | 链接 |
|---|---|---|
| Agentic Memory (AgeMem) | 统一 LTM/STM 管理,将记忆操作作为工具暴露,三阶段渐进 RL 训练 | arXiv:2601.01885 |
| Hindsight | 结构化记忆库 + 推理层,统一事实召回与偏好推理 | arXiv:2512.12818 |
| Memory Survey | 系统性综述 Agent 记忆,统一术语和分类法 | arXiv:2512.13564 |
| A-Mem | 受 Zettelkasten 启发,记忆自主生成描述和关联 | arXiv:2502.12110 |
AgeMem 核心思想:
传统方法:LTM 和 STM 独立优化,后期拼接AgeMem:统一策略,记忆操作 = 工具调用 - store(content) - retrieve(query) - update(id, content) - summarize(ids) - discard(id)训练:三阶段渐进 RL + step-wise GRPO推理与世界模型
| 论文 | 核心贡献 | 链接 |
|---|---|---|
| Agentic Reasoning Survey | 将推理组织为 reason/act/interact 三维度,路线图 | arXiv:2601.12538 |
| World Models Survey | 具身 AI 世界模型综述,模拟与规划 | arXiv:2510.16732 |
| Agents Fail to Use World Models | 发现 Agent 很少调用模拟(低于 1%),常误用预测 | arXiv:2601.03905 |
| Agentic World Model Alignment | 解决 LLM 作为世界模型时的物理幻觉问题 | arXiv:2601.13247 |
World Model 关键发现:
问题:Agent 拥有世界模型却不会用- 调用率低于 1%- 误用率约 15%- 启用后性能反而下降 5%
瓶颈:1. 不知道何时模拟2. 不会解释预测结果3. 不会将预见整合到推理中自我进化
| 论文 | 核心贡献 | 链接 |
|---|---|---|
| Self-Evolving Agents Survey | 系统分类 What/When/How/Where 进化 | arXiv:2507.21046 |
| Self-Evolving AI Agents Survey | 桥接静态基础模型与终身 Agent 系统 | arXiv:2508.07407 |
| EvolveR | 经验生命周期:收集 -> 蒸馏 -> 应用 | arXiv:2510.16079 |
| AgentEvolver | 自我提问 + 自我导航 + 自我归因 | arXiv:2511.10395 |
| MemRL | 运行时 RL 在情景记忆上,冻结 LLM + 可塑记忆 | arXiv:2601.03192 |
| EvoAgent | 持续世界模型 + 闭环动态,Minecraft 成功率提升 105% | arXiv:2502.05907 |
AgentEvolver 三机制:
// 自我提问:好奇心驱动任务生成async function selfQuestioning(state: State): Promise<Task[]> { return generateCuriousTasks(state);}
// 自我导航:经验复用 + 混合策略引导async function selfNavigating(task: Task, experience: Exp[]): Promise<Action> { const relevant = retrieveExperience(task, experience); return hybridPolicy(task, relevant);}
// 自我归因:增强样本效率async function selfAttributing(trajectory: Traj): Promise<Attribution> { return analyzeContribution(trajectory);}工具学习
| 论文 | 核心贡献 | 链接 |
|---|---|---|
| Natural Language Tools (NLT) | 用自然语言替代 JSON 工具调用,准确率 +18.4% | arXiv:2510.14453 |
| PEARL | 离线探索 + 在线 RL,ToolHop 达到 56.5% SOTA | arXiv:2601.20439 |
| Think-Augmented Function Calling | 在函数签名中嵌入 “think” 参数,零架构改动 | arXiv:2601.18282 |
| ToolRM | 专为工具使用设计的轻量级奖励模型 | arXiv:2510.26167 |
PEARL 两阶段方法:
阶段一:离线探索- 探索工具,学习有效使用模式- 记录失败条件
阶段二:在线 RL- 训练 Planner(GRPO)- 精心设计的奖励函数- ToolHop:56.5% 成功率- 低调用错误率多代理协作
| 论文 | 核心贡献 | 链接 |
|---|---|---|
| Multi-Agent Collaboration Survey | LLM 多代理协作机制综述 | arXiv:2501.06322 |
| Emergent Coordination | 信息论框架检测高阶结构 | arXiv:2510.05174 |
| Evolving Orchestration | 木偶师范式,RL 训练动态编排器 | arXiv:2505.19591 |
| Pressure Fields | 共享状态实现隐式协调(stigmergy) | arXiv:2601.08129 |
涌现协调关键发现:
问题:多代理系统何时是"整体" vs "个体集合"?
方法:信息论分解- 测量动态涌现- 定位协调发生位置- 区分伪时序耦合 vs 性能相关协同
发现:压力场 + 时间衰减 = 无需显式通信的协调(类似蚂蚁信息素)安全与对齐
| 论文 | 核心贡献 | 链接 |
|---|---|---|
| AgentDoG | 三维分类法(来源/模式/后果)+ ATBench 基准 | arXiv:2601.18491 |
| Guardrails Collapse | 发现对齐数据与微调数据相似度导致安全崩溃 | arXiv:2506.05346 |
| PSG-Agent | 个性化安全护栏,无需训练 | arXiv:2509.23614 |
| Verifiably Safe Tool Use | 信息流控制保证安全 | arXiv:2601.08012 |
AgentDoG 三维风险分类:
维度一:WHERE(风险来源)- 用户输入- 工具交互- 环境反馈
维度二:HOW(失败模式)- 误解指令- 工具误用- 规划失败
维度三:WHAT(后果)- 隐私泄露- 财产损失- 物理伤害研究趋势总结
flowchart LR
subgraph "2024-2025"
A[静态 Agent]
B[ReAct/CoT]
C[简单工具调用]
end
subgraph "2025-2026"
D[自我进化 Agent]
E[统一记忆管理]
F[世界模型规划]
G[涌现式协调]
H[可验证安全]
end
A --> D
B --> E
B --> F
C --> G
A --> H
六大趋势:
- 从静态到进化:Agent 需要从经验中学习,持续改进
- 统一记忆架构:LTM/STM 不再分离,统一策略管理
- 世界模型内化:LLM 自身作为世界模型,而非外部模拟器
- 隐式协调:共享状态替代显式通信,涌现协作行为
- 可验证安全:从概率检查到形式化保证
- 工具学习优化:RL 驱动的工具使用策略
参考资源
AI SDK 官方
- AI SDK 官方文档
- AI SDK GitHub
- Agent Loop Control
- Tool Calling 文档
- ToolLoopAgent 参考
- AI SDK 6 发布博客
- Building Agents 指南
记忆与上下文
其他框架
架构模式与论文
- LATS 论文 - 语言 Agent 树搜索
- Plan-and-Execute 架构
- Reflexion 模式
- 事件溯源模式
- Actor 模型与 Elixir
- Google 多代理设计模式
- 事件驱动多代理系统