DeepResearch SOP — 多来源深度研究的DAG规划与并行执行框架

下载 .md

DeepResearch SOP

触发:问题需要多来源信息整合(网页+本地文件+记忆库),单次检索无法完整回答 禁用:单一来源、1-2步可完成的问题直接做,别套此SOP


核心架构(对标MindSearch)

Main Agent(Planner)
  ├─ 职责:DAG构建、动态规划、收集子结论、决定下一步、综合输出
  └─ 禁止:自己去读网页/文件原文——交给subagent,只看摘要结论

Sub Agent(Searcher,每节点一个)
  ├─ 职责:单一信息源检索 + 总结 → 写结论到output.txt
  └─ 禁止:越界读其他节点的原始内容

上下文隔离红线:subagent的context.json必须包含且仅包含:

  1. root_question:用户原始问题
  2. parent_conclusions:父节点已得结论(精简文字,非原文)
  3. sub_question:本节点原子子问题
  4. source:信息源描述(URL / 文件路径 / 查询关键词)
  5. output_file:绝对路径,subagent写结论到此

阶段1:问题分解 → 初始DAG

Main agent思考:

用户问题Q → 拆解为原子子问题列表
对每个子问题:判断信息源类型(见下方类型表)
              判断依赖关系(哪些可并行)
写入 ./dr_{task}/dag.md

节点类型

类型 触发场景 subagent工具
WEB 需要实时/在线信息 web_scan + web_execute_js
LOCAL 本地PDF/代码/数据文件 pdftotext / file_read / code_run
MEMORY 记忆库/SOP/配置 file_read global_mem/user_profile/sop
CODE 需执行脚本获取结果 code_run
SYNTH 汇总(Main agent自己做,无需subagent)

dag.md格式

# DR: {用户问题一句话}
ROOT: {原始问题}

## 节点列表
- [N1] WEB  | 子问题:XX是什么 | 依赖:无
- [N2] LOCAL | 子问题:本地文件YY说了什么 | 依赖:无
- [N3] WEB  | 子问题:基于N1结论,进一步查ZZ | 依赖:N1
- [N4] SYNTH | 汇总N1+N2+N3 | 依赖:N1,N2,N3

## 节点状态
N1: [ ] N2: [ ] N3: [ ] N4: [ ]

阶段2:执行循环

WHILE dag中有未完成节点:
    ready_nodes = [n for n in dag if 依赖全部[✓] and n状态为[ ]]

    IF len(ready_nodes) >= 2 AND 节点间无共享资源冲突:
        → 并行Map模式(见下)
    ELSE:
        → 顺序执行单节点

    收集所有完成节点的结论
    → 动态评估:是否需要新增节点?(基于已得结论判断)
    → 若需要:在dag.md追加新节点,继续循环
    → 若不需要:继续下一批ready_nodes

并行Map模式(调用subagent_sop Map模式)

import os, subprocess, sys, glob

agent_root = '<YOUR_GA_ROOT>'

for node in ready_nodes:
    task_name = f'dr_{task}/{node.id}'
    task_dir = os.path.join(agent_root, 'temp', task_name)
    os.makedirs(task_dir, exist_ok=True)
    # 清理旧output*.txt
    for f in glob.glob(os.path.join(task_dir, 'output*.txt')):
        os.remove(f)
    # 写context.json(含绝对路径)
    write_context_json(task_dir, node, parent_conclusions, root_question)
    # 写input.txt(目标+约束,不写步骤)
    write_input_txt(task_dir, node)

# 并行启动(Popen,禁止run)
procs = {}
for node in ready_nodes:
    task_name = f'dr_{task}/{node.id}'
    task_dir = os.path.join(agent_root, 'temp', task_name)
    proc = subprocess.Popen(
        [sys.executable, 'agentmain.py', '--task', task_name],
        cwd=agent_root,
        stdout=open(os.path.join(task_dir, 'stdout.log'), 'w', encoding='utf-8'),
        stderr=open(os.path.join(task_dir, 'stderr.log'), 'w', encoding='utf-8')
    )
    procs[node.id] = proc
# 启动后立即返回,下一个code_run再poll

轮询收集(下一个code_run单独执行)

import time, os

pending_nodes = set(n.id for n in ready_nodes)
conclusions = {}

while pending_nodes:
    for nid in list(pending_nodes):
        out = os.path.join(agent_root, 'temp', f'dr_{task}/{nid}/output.txt')
        if os.path.exists(out):
            size_before = os.path.getsize(out)
            time.sleep(2)
            if os.path.getsize(out) == size_before:  # 输出稳定
                content = open(out, encoding='utf-8').read()
                # 只取结论段(末尾2000字)
                conclusions[nid] = content[-2000:]
                pending_nodes.remove(nid)
    time.sleep(2)

阶段3:综合输出

所有节点[✓]后,Main agent:

  1. 读取所有节点的 output.txt必须清洗后再使用(见下方清洗规范)
  2. 按DAG拓扑顺序综合
  3. 报告写入本地文件时,禁止在code_run里硬编码长字符串,改用动态从output.txt读取拼接写入,避免被截断
  4. 输出最终答案并告知用户报告路径

output.txt清洗规范(SYNTH必须执行)

import re

def extract_conclusion(output_txt_content, max_chars=4000):
    """从output.txt提取纯结论,去除所有调试内容,限制最大长度"""
    content = output_txt_content
    # 多标记fallback:优先找[结论],其次找结论章节开头
    markers = ['[结论]', '---\n\n## 针对', '---\n\n## 一、', '## 针对子问题', '\n##']
    idx = -1
    for m in markers:
        idx = content.find(m)
        if idx >= 0:
            break
    conclusion = content[idx:] if idx >= 0 else content

    # 清洗调试内容
    conclusion = re.sub(r'\*\*LLM Running.*?\n', '', conclusion)
    conclusion = re.sub(r'<summary>.*?</summary>', '', conclusion, flags=re.DOTALL)
    conclusion = re.sub(r'🛠️.*?(?=\n\n|\Z)', '', conclusion, flags=re.DOTALL)
    conclusion = re.sub(r'````text\n.*?````', '', conclusion, flags=re.DOTALL)
    conclusion = re.sub(r'\[ROUND END\].*', '', conclusion, flags=re.DOTALL)
    conclusion = re.sub(r'\n{3,}', '\n\n', conclusion).strip()

    # 截断:超过max_chars时在段落边界截断
    if len(conclusion) > max_chars:
        cut = conclusion.rfind('\n\n', 0, max_chars)
        cut = cut if cut > 0 else max_chars
        conclusion = conclusion[:cut] + f"\n\n> *(内容已截断,完整结论见原始output.txt)*"
    return conclusion

报告过长处理原则

  • 动态上限:SYNTH前先统计各节点实际结论长度,上限 = min(实际长度, max(4000, 实际长度)),即默认不截断,只在总报告超80KB时才压缩
  • 总报告超80KB:对次要节点(非核心、非第二轮深化节点)压缩至3000字,核心节点保持全量
  • 截断是最后手段:若某节点确需截断,在截断处标注 > *(内容已截断,完整结论见原始output.txt)*,让用户知晓
  • plan_sop无上下文爆炸处理:上下文控制靠DAG分治(结论≤500字/节点)+SYNTH从文件读取而非对话传递

SubAgent行为规范

subagent收到任务后:

  1. 第一步必须读context.json,获取绝对路径和上下文
  2. 根据节点类型选择工具:
    • WEB节点:web_scan扫描 → web_execute_js提取正文 → 总结
    • LOCAL节点:pdftotext/file_read → 总结关键信息
    • MEMORY节点:按insight索引 → file_read → 总结
    • CODE节点:code_run执行 → 整理输出
  3. 结论写入context.json指定的output_file
  4. 结论格式[结论] {针对sub_question的直接回答,≥2-3句,≤500字},前置便于main agent快速读取
    • 禁止只写一句话摘要:每个要点至少展开说明其含义、依据或对agent的启示,让main agent能真正理解而非只看标签
    • 如涉及多个子要点,每个子要点独立成段,而非压缩为一行bullet

动态扩展判断

Main agent在每轮收集结论后,问自己:

  • 已有结论是否足以回答根问题?→ 是:进入SYNTH节点
  • 某结论引出了新的必要子问题?→ 追加节点到dag.md
  • 某节点结论为空/失败?→ 检查stderr.log,决定重试或跳过

多轮深化原则:DeepResearch不应一轮就结束。收集第一批结论后,主动评估深度:

  • 是否有关键理论/论文值得进一步展开?→ 新增深化节点(如N1_bloom_detail)
  • 是否发现了结论间的矛盾或张力?→ 新增对比分析节点
  • 是否有重要维度尚未覆盖?→ 补充节点
  • 至少完成2轮子任务(初探+深化)再进入SYNTH,除非问题本身极浅

防止死循环:追加节点不超过原始节点数的2倍;同一子问题不重复追加


资源冲突约束

资源 并行限制
浏览器标签页 同一时刻只有1个subagent操作浏览器
本地文件读 无限制(只读)
本地文件写 不同文件无限制;同一文件禁止并行写
code_run 无限制

典型坑

  1. input.txt禁止塞原文:大文件给路径,让subagent自己读,否则token超限
  2. parent_conclusions要精简:传给subagent的父节点结论控制在200字内,不传原文
  3. 启动+轮询禁止同一code_run:Popen后立即返回,下一轮再poll
  4. 清理旧output.txt:复用task_dir时必须先删,否则误读上轮结果
  5. SYNTH节点由Main agent自己做:不要为汇总节点启动subagent,直接用结论合成
  6. 多subagent并行共享浏览器冲突:4个WEB subagent同时操作同一浏览器时互相抢占tab导航,subagent会自动降级为code_run抓网页(urllib/requests),这是正常fallback,无需干预,继续等待即可

评论(0)

登录 后可发表评论。

暂无评论。