DeepResearch SOP
触发:问题需要多来源信息整合(网页+本地文件+记忆库),单次检索无法完整回答 禁用:单一来源、1-2步可完成的问题直接做,别套此SOP
核心架构(对标MindSearch)
Main Agent(Planner)
├─ 职责:DAG构建、动态规划、收集子结论、决定下一步、综合输出
└─ 禁止:自己去读网页/文件原文——交给subagent,只看摘要结论
Sub Agent(Searcher,每节点一个)
├─ 职责:单一信息源检索 + 总结 → 写结论到output.txt
└─ 禁止:越界读其他节点的原始内容
上下文隔离红线:subagent的context.json必须包含且仅包含:
root_question:用户原始问题parent_conclusions:父节点已得结论(精简文字,非原文)sub_question:本节点原子子问题source:信息源描述(URL / 文件路径 / 查询关键词)output_file:绝对路径,subagent写结论到此
阶段1:问题分解 → 初始DAG
Main agent思考:
用户问题Q → 拆解为原子子问题列表
对每个子问题:判断信息源类型(见下方类型表)
判断依赖关系(哪些可并行)
写入 ./dr_{task}/dag.md
节点类型:
| 类型 | 触发场景 | subagent工具 |
|---|---|---|
| WEB | 需要实时/在线信息 | web_scan + web_execute_js |
| LOCAL | 本地PDF/代码/数据文件 | pdftotext / file_read / code_run |
| MEMORY | 记忆库/SOP/配置 | file_read global_mem/user_profile/sop |
| CODE | 需执行脚本获取结果 | code_run |
| SYNTH | 汇总(Main agent自己做,无需subagent) | — |
dag.md格式:
# DR: {用户问题一句话}
ROOT: {原始问题}
## 节点列表
- [N1] WEB | 子问题:XX是什么 | 依赖:无
- [N2] LOCAL | 子问题:本地文件YY说了什么 | 依赖:无
- [N3] WEB | 子问题:基于N1结论,进一步查ZZ | 依赖:N1
- [N4] SYNTH | 汇总N1+N2+N3 | 依赖:N1,N2,N3
## 节点状态
N1: [ ] N2: [ ] N3: [ ] N4: [ ]
阶段2:执行循环
WHILE dag中有未完成节点:
ready_nodes = [n for n in dag if 依赖全部[✓] and n状态为[ ]]
IF len(ready_nodes) >= 2 AND 节点间无共享资源冲突:
→ 并行Map模式(见下)
ELSE:
→ 顺序执行单节点
收集所有完成节点的结论
→ 动态评估:是否需要新增节点?(基于已得结论判断)
→ 若需要:在dag.md追加新节点,继续循环
→ 若不需要:继续下一批ready_nodes
并行Map模式(调用subagent_sop Map模式)
import os, subprocess, sys, glob
agent_root = '<YOUR_GA_ROOT>'
for node in ready_nodes:
task_name = f'dr_{task}/{node.id}'
task_dir = os.path.join(agent_root, 'temp', task_name)
os.makedirs(task_dir, exist_ok=True)
# 清理旧output*.txt
for f in glob.glob(os.path.join(task_dir, 'output*.txt')):
os.remove(f)
# 写context.json(含绝对路径)
write_context_json(task_dir, node, parent_conclusions, root_question)
# 写input.txt(目标+约束,不写步骤)
write_input_txt(task_dir, node)
# 并行启动(Popen,禁止run)
procs = {}
for node in ready_nodes:
task_name = f'dr_{task}/{node.id}'
task_dir = os.path.join(agent_root, 'temp', task_name)
proc = subprocess.Popen(
[sys.executable, 'agentmain.py', '--task', task_name],
cwd=agent_root,
stdout=open(os.path.join(task_dir, 'stdout.log'), 'w', encoding='utf-8'),
stderr=open(os.path.join(task_dir, 'stderr.log'), 'w', encoding='utf-8')
)
procs[node.id] = proc
# 启动后立即返回,下一个code_run再poll
轮询收集(下一个code_run单独执行)
import time, os
pending_nodes = set(n.id for n in ready_nodes)
conclusions = {}
while pending_nodes:
for nid in list(pending_nodes):
out = os.path.join(agent_root, 'temp', f'dr_{task}/{nid}/output.txt')
if os.path.exists(out):
size_before = os.path.getsize(out)
time.sleep(2)
if os.path.getsize(out) == size_before: # 输出稳定
content = open(out, encoding='utf-8').read()
# 只取结论段(末尾2000字)
conclusions[nid] = content[-2000:]
pending_nodes.remove(nid)
time.sleep(2)
阶段3:综合输出
所有节点[✓]后,Main agent:
- 读取所有节点的
output.txt,必须清洗后再使用(见下方清洗规范) - 按DAG拓扑顺序综合
- 报告写入本地文件时,禁止在code_run里硬编码长字符串,改用
动态从output.txt读取拼接写入,避免被截断 - 输出最终答案并告知用户报告路径
output.txt清洗规范(SYNTH必须执行)
import re
def extract_conclusion(output_txt_content, max_chars=4000):
"""从output.txt提取纯结论,去除所有调试内容,限制最大长度"""
content = output_txt_content
# 多标记fallback:优先找[结论],其次找结论章节开头
markers = ['[结论]', '---\n\n## 针对', '---\n\n## 一、', '## 针对子问题', '\n##']
idx = -1
for m in markers:
idx = content.find(m)
if idx >= 0:
break
conclusion = content[idx:] if idx >= 0 else content
# 清洗调试内容
conclusion = re.sub(r'\*\*LLM Running.*?\n', '', conclusion)
conclusion = re.sub(r'<summary>.*?</summary>', '', conclusion, flags=re.DOTALL)
conclusion = re.sub(r'🛠️.*?(?=\n\n|\Z)', '', conclusion, flags=re.DOTALL)
conclusion = re.sub(r'````text\n.*?````', '', conclusion, flags=re.DOTALL)
conclusion = re.sub(r'\[ROUND END\].*', '', conclusion, flags=re.DOTALL)
conclusion = re.sub(r'\n{3,}', '\n\n', conclusion).strip()
# 截断:超过max_chars时在段落边界截断
if len(conclusion) > max_chars:
cut = conclusion.rfind('\n\n', 0, max_chars)
cut = cut if cut > 0 else max_chars
conclusion = conclusion[:cut] + f"\n\n> *(内容已截断,完整结论见原始output.txt)*"
return conclusion
报告过长处理原则:
- 动态上限:SYNTH前先统计各节点实际结论长度,上限 =
min(实际长度, max(4000, 实际长度)),即默认不截断,只在总报告超80KB时才压缩 - 总报告超80KB:对次要节点(非核心、非第二轮深化节点)压缩至3000字,核心节点保持全量
- 截断是最后手段:若某节点确需截断,在截断处标注
> *(内容已截断,完整结论见原始output.txt)*,让用户知晓 - plan_sop无上下文爆炸处理:上下文控制靠DAG分治(结论≤500字/节点)+SYNTH从文件读取而非对话传递
SubAgent行为规范
subagent收到任务后:
- 第一步必须读context.json,获取绝对路径和上下文
- 根据节点类型选择工具:
- WEB节点:web_scan扫描 → web_execute_js提取正文 → 总结
- LOCAL节点:pdftotext/file_read → 总结关键信息
- MEMORY节点:按insight索引 → file_read → 总结
- CODE节点:code_run执行 → 整理输出
- 结论写入context.json指定的output_file
- 结论格式:
[结论] {针对sub_question的直接回答,≥2-3句,≤500字},前置便于main agent快速读取- 禁止只写一句话摘要:每个要点至少展开说明其含义、依据或对agent的启示,让main agent能真正理解而非只看标签
- 如涉及多个子要点,每个子要点独立成段,而非压缩为一行bullet
动态扩展判断
Main agent在每轮收集结论后,问自己:
- 已有结论是否足以回答根问题?→ 是:进入SYNTH节点
- 某结论引出了新的必要子问题?→ 追加节点到dag.md
- 某节点结论为空/失败?→ 检查stderr.log,决定重试或跳过
多轮深化原则:DeepResearch不应一轮就结束。收集第一批结论后,主动评估深度:
- 是否有关键理论/论文值得进一步展开?→ 新增深化节点(如N1_bloom_detail)
- 是否发现了结论间的矛盾或张力?→ 新增对比分析节点
- 是否有重要维度尚未覆盖?→ 补充节点
- 至少完成2轮子任务(初探+深化)再进入SYNTH,除非问题本身极浅
防止死循环:追加节点不超过原始节点数的2倍;同一子问题不重复追加
资源冲突约束
| 资源 | 并行限制 |
|---|---|
| 浏览器标签页 | 同一时刻只有1个subagent操作浏览器 |
| 本地文件读 | 无限制(只读) |
| 本地文件写 | 不同文件无限制;同一文件禁止并行写 |
| code_run | 无限制 |
典型坑
- input.txt禁止塞原文:大文件给路径,让subagent自己读,否则token超限
- parent_conclusions要精简:传给subagent的父节点结论控制在200字内,不传原文
- 启动+轮询禁止同一code_run:Popen后立即返回,下一轮再poll
- 清理旧output.txt:复用task_dir时必须先删,否则误读上轮结果
- SYNTH节点由Main agent自己做:不要为汇总节点启动subagent,直接用结论合成
- 多subagent并行共享浏览器冲突:4个WEB subagent同时操作同一浏览器时互相抢占tab导航,subagent会自动降级为code_run抓网页(urllib/requests),这是正常fallback,无需干预,继续等待即可