支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


DeerFlow的LangGraph节点解读

发布日期:2025-05-16 18:54:35 浏览次数: 1678 作者:影子的分享站
推荐语

深入理解DeerFlow后端的核心组件,探索LangGraph节点在工作流中的关键作用。

核心内容:
1. LangGraph节点的定义与功能
2. DeerFlow工作流中的8个主要LangGraph节点
3. 节点间协同工作机制及其代码实现

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家

 

本章详细介绍了 DeerFlow 后端系统中使用的 LangGraph 节点。LangGraph 节点是工作流图的核心处理单元,每个节点负责研究流程中的特定任务。这些节点协同工作,处理用户查询、执行研究并生成综合报告。

节点结构和工作流程

DeerFlow 中的 LangGraph 工作流由 8 个主要节点组成,每个节点在代码库中以函数形式实现,有两个构建图方法

  • • build_graph():构建图
  • • build_graph_with_memory():携带记忆构建图
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

from .types import State
from .nodes import (
    coordinator_node,
    planner_node,
    reporter_node,
    research_team_node,
    researcher_node,
    coder_node,
    human_feedback_node,
    background_investigation_node,
)


def _build_base_graph():
    """Build and return the base state graph with all nodes and edges."""
    builder = StateGraph(State)
    builder.add_edge(START, "coordinator")
    builder.add_node("coordinator", coordinator_node)
    builder.add_node("background_investigator", background_investigation_node)
    builder.add_node("planner", planner_node)
    builder.add_node("reporter", reporter_node)
    builder.add_node("research_team", research_team_node)
    builder.add_node("researcher", researcher_node)
    builder.add_node("coder", coder_node)
    builder.add_node("human_feedback", human_feedback_node)
    builder.add_edge("reporter", END)
    return builder


def build_graph_with_memory():
    """Build and return the Agent workflow graph with memory."""
    # use persistent memory to save conversation history
    TODO: be compatible with SQLite / PostgreSQL
    memory = MemorySaver()

    # build state graph
    builder = _build_base_graph()
    return builder.compile(checkpointer=memory)


def build_graph():
    """Build and return the agent workflow graph without memory."""
    # build state graph
    builder = _build_base_graph()
    return builder.compile()


graph = build_graph()

状态

from langgraph.graph import MessagesState

from src.prompts.planner_model import Plan


class State(MessagesState):
    """State for the agent system, extends MessagesState with next field."""

    # Runtime Variables
    locale: str = "en-US"
    observations: list[str] = []
    plan_iterations: int = 0
    current_plan: Plan | str = None
    final_report: str = ""
    auto_accepted_plan: bool = False
    enable_background_investigation: bool = True
    background_investigation_results: str = None

核心节点

LangGraph 工作流中的每个节点都实现为一个 Python 函数,该函数处理当前状态并返回更新的状态或转换到另一个节点的命令。

coordinator_node(协调节点)

作用:协调用户交互流程的节点,根据当前状态调用 LLM,决定下一步跳转到哪个节点

  1. 1. 跳转下一个节点:planner、background_investigator、end
  2. 2. 准备提示模版:messages = apply_prompt_template("coordinator", state)
  3. 3. 调用 LLM 并绑定工具
    response = (
      get_llm_by_type(AGENT_LLM_MAP["coordinator"])
      .bind_tools([handoff_to_planner])
      .invoke(messages)
    )
  4. 4. 触发工具
    if len(response.tool_calls) > 0:
        goto = "planner"
        if state.get("enable_background_investigation"):
            # if the search_before_planning is True, add the web search tool to the planner agent
            goto = "background_investigator"
        try:
            for tool_call in response.tool_calls:
                if tool_call.get("name""") != "handoff_to_planner":
                    continue
                if tool_locale := tool_call.get("args", {}).get("locale"):
                    locale = tool_locale
                    break
        except Exception as e:
            logger.error(f"Error processing tool calls: {e}")
def coordinator_node(
    state: State,
) -> Command[Literal["planner""background_investigator""__end__"]]:
    _"""Coordinator node that communicate with customers."""_
_    _logger.info("Coordinator talking.")
    messages = apply_prompt_template("coordinator", state)
    response = (
        get_llm_by_type(AGENT_LLM_MAP["coordinator"])
        .bind_tools([handoff_to_planner])
        .invoke(messages)
    )
    logger.debug(f"Current state messages: {state['messages']}")

    goto = "__end__"
    locale = state.get("locale""en-US")  # Default locale if not specified

    if len(response.tool_calls) > 0:
        goto = "planner"
        if state.get("enable_background_investigation"):
            # if the search_before_planning is True, add the web search tool to the planner agent
            goto = "background_investigator"
        try:
            for tool_call in response.tool_calls:
                if tool_call.get("name""") != "handoff_to_planner":
                    continue
                if tool_locale := tool_call.get("args", {}).get("locale"):
                    locale = tool_locale
                    break
        except Exception as e:
            logger.error(f"Error processing tool calls: {e}")
    else:
        logger.warning(
            "Coordinator response contains no tool calls. Terminating workflow execution."
        )
        logger.debug(f"Coordinator response: {response}")

    return Command(
        update={"locale": locale},
        goto=goto,
    )

background_investigation_node(背景调查节点)

作用:搜索用户最新消息中的内容,执行背景调查任务,并将结果传递给下一个节点

  1. 1. 网络搜索:使用 Tavily 搜索用户最新消息的内容,提取标题和摘要
    searched_content = LoggedTavilySearch(max_results=SEARCH_MAX_RESULTS).invoke(
    {"query": state["messages"][-1].content}
    )
  2. 2. 保存结果,并跳转planner节点
Command(
    update={
        "background_investigation_results": json.dumps(
            background_investigation_results, ensure_ascii=False
        )
    },
    goto="planner",
)
def background_investigation_node(state: State) -> Command[Literal["planner"]]:

    logger.info("background investigation node is running.")
    searched_content = LoggedTavilySearch(max_results=SEARCH_MAX_RESULTS).invoke(
        {"query": state["messages"][-1].content}
    )
    background_investigation_results = None
    if isinstance(searched_content, list):
        background_investigation_results = [
            {"title": elem["title"], "content": elem["content"]}
            for elem in searched_content
        ]
    else:
        logger.error(f"Tavily search returned malformed response: {searched_content}")
    return Command(
        update={
            "background_investigation_results": json.dumps(
                background_investigation_results, ensure_ascii=False
            )
        },
        goto="planner",
    )

planner_node(规划节点)

作用:用于生成完整的任务计划,支持迭代生成计划,直到满足条件。此外可根据计划是否包含足够上下文,决定跳转到 human_feedback 或 reporter 节点

  1. 1. 跳转下一个节点:human_feedback、reporter
  2. 2. 提取系统配置 configurable
    configurable = Configuration.from_runnable_config(config)
  3. 3. 生成适用于 planner 角色的提示消息
messages = apply_prompt_template("planner", state, configurable)
  1. 4. 若是第一次生产计划,且启用了背景调查功能,将背景调查结果添加到提示消息中
if (
    plan_iterations == 0
    and state.get("enable_background_investigation")
    and state.get("background_investigation_results")
):
    messages += [
        {
            "role""user",
            "content": (
                "background investigation results of user query:\n"
                + state["background_investigation_results"]
                + "\n"
            ),
        }
    ]
  1. 1. 获取 planner 对应的 LLM 实例,若为 "basic" 类型,则要求输出格式为 Plan 结构化 JSON
if AGENT_LLM_MAP["planner"] == "basic":
    llm = get_llm_by_type(AGENT_LLM_MAP["planner"]).with_structured_output(
        Plan,
        method="json_mode",
    )
else:
    llm = get_llm_by_type(AGENT_LLM_MAP["planner"])
  1. 1. 若当前迭代次数超过最大次数,则跳过人工反馈,直接进入 reporter 节点
if plan_iterations >= configurable.max_plan_iterations:
    return Command(goto="reporter")
  1. 1. 调用 LLM 获取响应
full_response = ""
if AGENT_LLM_MAP["planner"] == "basic":
    response = llm.invoke(messages)
    full_response = response.model_dump_json(indent=4, exclude_none=True)
else:
    response = llm.stream(messages)
    for chunk in response:
        full_response += chunk.content
  1. 1. 若计划有 "has_enough_context" 字段且为真,说明上下文充足,直接跳转到 reporter 节点
if curr_plan.get("has_enough_context"):
    logger.info("Planner response has enough context.")
    new_plan = Plan.model_validate(curr_plan)
    return Command(
        update={
            "messages": [AIMessage(content=full_response, name="planner")],
            "current_plan": new_plan,
        },
        goto="reporter",
    )
  1. 1. 更新状态中的消息和计划内容,并跳转到 human_feedback 节点等待用户反馈
Command(
    update={
        "messages": [AIMessage(content=full_response, name="planner")],
        "current_plan": full_response,
    },
    goto="human_feedback",
)
def planner_node(
    state: State, config: RunnableConfig
) -> Command[Literal["human_feedback""reporter"]]:
    """Planner node that generate the full plan."""
    logger.info("Planner generating full plan")
    configurable = Configuration.from_runnable_config(config)
    plan_iterations = state["plan_iterations"if state.get("plan_iterations"0else 0
    messages = apply_prompt_template("planner", state, configurable)

    if (
        plan_iterations == 0
        and state.get("enable_background_investigation")
        and state.get("background_investigation_results")
    ):
        messages += [
            {
                "role""user",
                "content": (
                    "background investigation results of user query:\n"
                    + state["background_investigation_results"]
                    + "\n"
                ),
            }
        ]

    if AGENT_LLM_MAP["planner"] == "basic":
        llm = get_llm_by_type(AGENT_LLM_MAP["planner"]).with_structured_output(
            Plan,
            method="json_mode",
        )
    else:
        llm = get_llm_by_type(AGENT_LLM_MAP["planner"])

    # if the plan iterations is greater than the max plan iterations, return the reporter node
    if plan_iterations >= configurable.max_plan_iterations:
        return Command(goto="reporter")

    full_response = ""
    if AGENT_LLM_MAP["planner"] == "basic":
        response = llm.invoke(messages)
        full_response = response.model_dump_json(indent=4, exclude_none=True)
    else:
        response = llm.stream(messages)
        for chunk in response:
            full_response += chunk.content
    logger.debug(f"Current state messages: {state['messages']}")
    logger.info(f"Planner response: {full_response}")

    try:
        curr_plan = json.loads(repair_json_output(full_response))
    except json.JSONDecodeError:
        logger.warning("Planner response is not a valid JSON")
        if plan_iterations > 0:
            return Command(goto="reporter")
        else:
            return Command(goto="__end__")
    if curr_plan.get("has_enough_context"):
        logger.info("Planner response has enough context.")
        new_plan = Plan.model_validate(curr_plan)
        return Command(
            update={
                "messages": [AIMessage(content=full_response, name="planner")],
                "current_plan": new_plan,
            },
            goto="reporter",
        )
    return Command(
        update={
            "messages": [AIMessage(content=full_response, name="planner")],
            "current_plan": full_response,
        },
        goto="human_feedback",
    )

_setup_and_execute_agent_step

说明:一个辅助函数,用于为用于为 researcher_node 和 coder_node 设置合适的工具并执行当前步骤

核心作用:根据配置动态加载 MCP 工具,并创建相应的 Agent 来处理任务,具体调用 _execute_agent_step 执行具体步骤

  1. 1. 若配置中 MCP 设置,则遍历所有服务器,若该服务器支持当前 agent 类型,并且有启发式工具,则将该服务器加入 mcp_servers
  2. 2. 同时将工具名称和对应服务器记录到 enabled_tools 中
  3. 3. 存在 MCP 服务器则会创建一个 MultiServerMCPClient 客户端,加载客户端提供工具添加到 default_tools 列表中
  4. 4. 使用 create_agent 创建一个新的 Agent 实例,传入更新后的工具列表
  5. 5. 最后调用 _execute_agent_step 执行当前步骤,并返回结果
async def _setup_and_execute_agent_step(
    state: State,
    config: RunnableConfig,
    agent_type: str,
    default_agent,
    default_tools: list,
) -> Command[Literal["research_team"]]:
    """Helper function to set up an agent with appropriate tools and execute a step.

    This function handles the common logic for both researcher_node and coder_node:
    1. Configures MCP servers and tools based on agent type
    2. Creates an agent with the appropriate tools or uses the default agent
    3. Executes the agent on the current step

    Args:
        state: The current state
        config: The runnable config
        agent_type: The type of agent ("researcher" or "coder")
        default_agent: The default agent to use if no MCP servers are configured
        default_tools: The default tools to add to the agent

    Returns:
        Command to update state and go to research_team
    """

    configurable = Configuration.from_runnable_config(config)
    mcp_servers = {}
    enabled_tools = {}

    # Extract MCP server configuration for this agent type
    if configurable.mcp_settings:
        for server_name, server_config in configurable.mcp_settings["servers"].items():
            if (
                server_config["enabled_tools"]
                and agent_type in server_config["add_to_agents"]
            ):
                mcp_servers[server_name] = {
                    k: v
                    for k, v in server_config.items()
                    if k in ("transport""command""args""url""env")
                }
                for tool_name in server_config["enabled_tools"]:
                    enabled_tools[tool_name] = server_name

    # Create and execute agent with MCP tools if available
    if mcp_servers:
        async with MultiServerMCPClient(mcp_servers) as client:
            loaded_tools = default_tools[:]
            for tool in client.get_tools():
                if tool.name in enabled_tools:
                    tool.description = (
                        f"Powered by '{enabled_tools[tool.name]}'.\n{tool.description}"
                    )
                    loaded_tools.append(tool)
            agent = create_agent(agent_type, agent_type, loaded_tools, agent_type)
            return await _execute_agent_step(state, agent, agent_type)
    else:
        # Use default agent if no MCP servers are configured
        return await _execute_agent_step(state, default_agent, agent_type)

_execute_agent_step

  1. 1. 找到第一个未执行的任务步骤
  2. 2. 构建输入提示并调用对应的 Agent
  3. 3. 获取响应后更新步骤执行结果和状态信息
  4. 4. 跳转到 research_team 节点
async def _execute_agent_step(
    state: State, agent, agent_name: str
) -> Command[Literal["research_team"]]:
    """Helper function to execute a step using the specified agent."""
    current_plan = state.get("current_plan")
    observations = state.get("observations", [])

    # Find the first unexecuted step
    for step in current_plan.steps:
        if not step.execution_res:
            break

    logger.info(f"Executing step: {step.title}")

    # Prepare the input for the agent
    agent_input = {
        "messages": [
            HumanMessage(
                content=f"#Task\n\n##title\n\n{step.title}\n\n##description\n\n{step.description}\n\n##locale\n\n{state.get('locale''en-US')}"
            )
        ]
    }

    # Add citation reminder for researcher agent
    if agent_name == "researcher":
        agent_input["messages"].append(
            HumanMessage(
                content="IMPORTANT: DO NOT include inline citations in the text. Instead, track all sources and include a References section at the end using link reference format. Include an empty line between each citation for better readability. Use this format for each reference:\n- [Source Title](URL)\n\n- [Another Source](URL)",
                name="system",
            )
        )

    # Invoke the agent
    result = await agent.ainvoke(input=agent_input)

    # Process the result
    response_content = result["messages"][-1].content
    logger.debug(f"{agent_name.capitalize()} full response: {response_content}")

    # Update the step with the execution result
    step.execution_res = response_content
    logger.info(f"Step '{step.title}' execution completed by {agent_name}")

    return Command(
        update={
            "messages": [
                HumanMessage(
                    content=response_content,
                    name=agent_name,
                )
            ],
            "observations": observations + [response_content],
        },
        goto="research_team",
    )

researcher_node(研究节点)

主要负责网络搜索

async def researcher_node(
    state: State, config: RunnableConfig
) -> Command[Literal["research_team"]]:
    """Researcher node that do research"""
    logger.info("Researcher node is researching.")
    return await _setup_and_execute_agent_step(
        state,
        config,
        "researcher",
        research_agent,
        [web_search_tool, crawl_tool],
    )

research_team_node(研究团队节点)

作用:检查当前任务计划是否为空、是否有步骤、步骤是否完成等,决定跳转到哪个节点

  1. 1. 下一步跳转的节点可能为:"planner"、"researcher" 或 "coder"
def research_team_node(
    state: State,
) -> Command[Literal["planner""researcher""coder"]]:
    """Research team node that collaborates on tasks."""
    logger.info("Research team is collaborating on tasks.")
    current_plan = state.get("current_plan")
    if not current_plan or not current_plan.steps:
        return Command(goto="planner")
    if all(step.execution_res for step in current_plan.steps):
        return Command(goto="planner")
    for step in current_plan.steps:
        if not step.execution_res:
            break
    if step.step_type and step.step_type == StepType.RESEARCH:
        return Command(goto="researcher")
    if step.step_type and step.step_type == StepType.PROCESSING:
        return Command(goto="coder")
    return Command(goto="planner")

coder_node(代码节点)

主要负责数据处理

async def coder_node(
    state: State, config: RunnableConfig
) -> Command[Literal["research_team"]]:
    """Coder node that do code analysis."""
    logger.info("Coder node is coding.")
    return await _setup_and_execute_agent_step(
        state,
        config,
        "coder",
        coder_agent,
        [python_repl_tool],
    )

human_feedback_node(人类反馈节点)

作用:用于在用户与系统之间插入人工反馈环节,让用户可对当前生成的任务进行确认或修改

  1. 1. 检查是否启用自动接受计划:auto_accepted_plan
  2. 2. 若未自动接受,则请求用户反馈
  3. 3. 根据用户反馈决定下一步跳转哪个节点
  • • [EDIT_PLAN]:返回给 planner 修改
  • • [ACCEPTED]:继续执行研究团队或报告节点
  • 4. 支持 JSON 格式校验和上下文完整性判断
  • def human_feedback_node(
        state,
    ) -> Command[Literal["planner""research_team""reporter""__end__"]]:
        current_plan = state.get("current_plan""")
        # check if the plan is auto accepted
        auto_accepted_plan = state.get("auto_accepted_plan"False)
        if not auto_accepted_plan:
            feedback = interrupt("Please Review the Plan.")

            # if the feedback is not accepted, return the planner node
            if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
                return Command(
                    update={
                        "messages": [
                            HumanMessage(content=feedback, name="feedback"),
                        ],
                    },
                    goto="planner",
                )
            elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
                logger.info("Plan is accepted by user.")
            else:
                raise TypeError(f"Interrupt value of {feedback} is not supported.")

        # if the plan is accepted, run the following node
        plan_iterations = state["plan_iterations"if state.get("plan_iterations"0else 0
        goto = "research_team"
        try:
            current_plan = repair_json_output(current_plan)
            # increment the plan iterations
            plan_iterations += 1
            # parse the plan
            new_plan = json.loads(current_plan)
            if new_plan["has_enough_context"]:
                goto = "reporter"
        except json.JSONDecodeError:
            logger.warning("Planner response is not a valid JSON")
            if plan_iterations > 0:
                return Command(goto="reporter")
            else:
                return Command(goto="__end__")

        return Command(
            update={
                "current_plan": Plan.model_validate(new_plan),
                "plan_iterations": plan_iterations,
                "locale": new_plan["locale"],
            },
            goto=goto,
        )

    reporter_node(报告节点)

    说明:该节点在整个流程中通常是最后一步,基于之前生成的任务计划和研究团队的执行结果,输出一个结果清晰、格式规范的研究报告

    1. 1. 使用任务计划中的标题和描述构建输入提示
    2. 2. 加入格式说明与引用规则提醒
    3. 3. 将研究过程中收集到的观察信息附加到提示中
    4. 4. 调用 LLM 生成最终报告
    def reporter_node(state: State):
        """Reporter node that write a final report."""
        logger.info("Reporter write final report")
        current_plan = state.get("current_plan")
        input_ = {
            "messages": [
                HumanMessage(
                    f"# Research Requirements\n\n## Task\n\n{current_plan.title}\n\n## Description\n\n{current_plan.thought}"
                )
            ],
            "locale": state.get("locale""en-US"),
        }
        invoke_messages = apply_prompt_template("reporter", input_)
        observations = state.get("observations", [])

        # Add a reminder about the new report format, citation style, and table usage
        invoke_messages.append(
            HumanMessage(
                content="IMPORTANT: Structure your report according to the format in the prompt. Remember to include:\n\n1. Key Points - A bulleted list of the most important findings\n2. Overview - A brief introduction to the topic\n3. Detailed Analysis - Organized into logical sections\n4. Survey Note (optional) - For more comprehensive reports\n5. Key Citations - List all references at the end\n\nFor citations, DO NOT include inline citations in the text. Instead, place all citations in the 'Key Citations' section at the end using the format: `- [Source Title](URL)`. Include an empty line between each citation for better readability.\n\nPRIORITIZE USING MARKDOWN TABLES for data presentation and comparison. Use tables whenever presenting comparative data, statistics, features, or options. Structure tables with clear headers and aligned columns. Example table format:\n\n| Feature | Description | Pros | Cons |\n|---------|-------------|------|------|\n| Feature 1 | Description 1 | Pros 1 | Cons 1 |\n| Feature 2 | Description 2 | Pros 2 | Cons 2 |",
                name="system",
            )
        )

        for observation in observations:
            invoke_messages.append(
                HumanMessage(
                    content=f"Below are some observations for the research task:\n\n{observation}",
                    name="observation",
                )
            )
        logger.debug(f"Current invoke messages: {invoke_messages}")
        response = get_llm_by_type(AGENT_LLM_MAP["reporter"]).invoke(invoke_messages)
        response_content = response.content
        logger.info(f"reporter response: {response_content}")

        return {"final_report": response_content}

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询