微信扫码
添加专属顾问
我要投稿
深入理解DeerFlow后端的核心组件,探索LangGraph节点在工作流中的关键作用。 核心内容: 1. LangGraph节点的定义与功能 2. DeerFlow工作流中的8个主要LangGraph节点 3. 节点间协同工作机制及其代码实现
本章详细介绍了 DeerFlow 后端系统中使用的 LangGraph 节点。LangGraph 节点是工作流图的核心处理单元,每个节点负责研究流程中的特定任务。这些节点协同工作,处理用户查询、执行研究并生成综合报告。
DeerFlow 中的 LangGraph 工作流由 8 个主要节点组成,每个节点在代码库中以函数形式实现,有两个构建图方法
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from .types import State
from .nodes import (
coordinator_node,
planner_node,
reporter_node,
research_team_node,
researcher_node,
coder_node,
human_feedback_node,
background_investigation_node,
)
def _build_base_graph():
"""Build and return the base state graph with all nodes and edges."""
builder = StateGraph(State)
builder.add_edge(START, "coordinator")
builder.add_node("coordinator", coordinator_node)
builder.add_node("background_investigator", background_investigation_node)
builder.add_node("planner", planner_node)
builder.add_node("reporter", reporter_node)
builder.add_node("research_team", research_team_node)
builder.add_node("researcher", researcher_node)
builder.add_node("coder", coder_node)
builder.add_node("human_feedback", human_feedback_node)
builder.add_edge("reporter", END)
return builder
def build_graph_with_memory():
"""Build and return the Agent workflow graph with memory."""
# use persistent memory to save conversation history
# TODO: be compatible with SQLite / PostgreSQL
memory = MemorySaver()
# build state graph
builder = _build_base_graph()
return builder.compile(checkpointer=memory)
def build_graph():
"""Build and return the agent workflow graph without memory."""
# build state graph
builder = _build_base_graph()
return builder.compile()
graph = build_graph()
状态
from langgraph.graph import MessagesState
from src.prompts.planner_model import Plan
class State(MessagesState):
"""State for the agent system, extends MessagesState with next field."""
# Runtime Variables
locale: str = "en-US"
observations: list[str] = []
plan_iterations: int = 0
current_plan: Plan | str = None
final_report: str = ""
auto_accepted_plan: bool = False
enable_background_investigation: bool = True
background_investigation_results: str = None
LangGraph 工作流中的每个节点都实现为一个 Python 函数,该函数处理当前状态并返回更新的状态或转换到另一个节点的命令。
作用:协调用户交互流程的节点,根据当前状态调用 LLM,决定下一步跳转到哪个节点
response = (
get_llm_by_type(AGENT_LLM_MAP["coordinator"])
.bind_tools([handoff_to_planner])
.invoke(messages)
)
if len(response.tool_calls) > 0:
goto = "planner"
if state.get("enable_background_investigation"):
# if the search_before_planning is True, add the web search tool to the planner agent
goto = "background_investigator"
try:
for tool_call in response.tool_calls:
if tool_call.get("name", "") != "handoff_to_planner":
continue
if tool_locale := tool_call.get("args", {}).get("locale"):
locale = tool_locale
break
except Exception as e:
logger.error(f"Error processing tool calls: {e}")
def coordinator_node(
state: State,
) -> Command[Literal["planner", "background_investigator", "__end__"]]:
_"""Coordinator node that communicate with customers."""_
_ _logger.info("Coordinator talking.")
messages = apply_prompt_template("coordinator", state)
response = (
get_llm_by_type(AGENT_LLM_MAP["coordinator"])
.bind_tools([handoff_to_planner])
.invoke(messages)
)
logger.debug(f"Current state messages: {state['messages']}")
goto = "__end__"
locale = state.get("locale", "en-US") # Default locale if not specified
if len(response.tool_calls) > 0:
goto = "planner"
if state.get("enable_background_investigation"):
# if the search_before_planning is True, add the web search tool to the planner agent
goto = "background_investigator"
try:
for tool_call in response.tool_calls:
if tool_call.get("name", "") != "handoff_to_planner":
continue
if tool_locale := tool_call.get("args", {}).get("locale"):
locale = tool_locale
break
except Exception as e:
logger.error(f"Error processing tool calls: {e}")
else:
logger.warning(
"Coordinator response contains no tool calls. Terminating workflow execution."
)
logger.debug(f"Coordinator response: {response}")
return Command(
update={"locale": locale},
goto=goto,
)
作用:搜索用户最新消息中的内容,执行背景调查任务,并将结果传递给下一个节点
searched_content = LoggedTavilySearch(max_results=SEARCH_MAX_RESULTS).invoke(
{"query": state["messages"][-1].content}
)
Command(
update={
"background_investigation_results": json.dumps(
background_investigation_results, ensure_ascii=False
)
},
goto="planner",
)
def background_investigation_node(state: State) -> Command[Literal["planner"]]:
logger.info("background investigation node is running.")
searched_content = LoggedTavilySearch(max_results=SEARCH_MAX_RESULTS).invoke(
{"query": state["messages"][-1].content}
)
background_investigation_results = None
if isinstance(searched_content, list):
background_investigation_results = [
{"title": elem["title"], "content": elem["content"]}
for elem in searched_content
]
else:
logger.error(f"Tavily search returned malformed response: {searched_content}")
return Command(
update={
"background_investigation_results": json.dumps(
background_investigation_results, ensure_ascii=False
)
},
goto="planner",
)
作用:用于生成完整的任务计划,支持迭代生成计划,直到满足条件。此外可根据计划是否包含足够上下文,决定跳转到 human_feedback 或 reporter 节点
configurable = Configuration.from_runnable_config(config)
messages = apply_prompt_template("planner", state, configurable)
if (
plan_iterations == 0
and state.get("enable_background_investigation")
and state.get("background_investigation_results")
):
messages += [
{
"role": "user",
"content": (
"background investigation results of user query:\n"
+ state["background_investigation_results"]
+ "\n"
),
}
]
if AGENT_LLM_MAP["planner"] == "basic":
llm = get_llm_by_type(AGENT_LLM_MAP["planner"]).with_structured_output(
Plan,
method="json_mode",
)
else:
llm = get_llm_by_type(AGENT_LLM_MAP["planner"])
if plan_iterations >= configurable.max_plan_iterations:
return Command(goto="reporter")
full_response = ""
if AGENT_LLM_MAP["planner"] == "basic":
response = llm.invoke(messages)
full_response = response.model_dump_json(indent=4, exclude_none=True)
else:
response = llm.stream(messages)
for chunk in response:
full_response += chunk.content
if curr_plan.get("has_enough_context"):
logger.info("Planner response has enough context.")
new_plan = Plan.model_validate(curr_plan)
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": new_plan,
},
goto="reporter",
)
Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": full_response,
},
goto="human_feedback",
)
def planner_node(
state: State, config: RunnableConfig
) -> Command[Literal["human_feedback", "reporter"]]:
"""Planner node that generate the full plan."""
logger.info("Planner generating full plan")
configurable = Configuration.from_runnable_config(config)
plan_iterations = state["plan_iterations"] if state.get("plan_iterations", 0) else 0
messages = apply_prompt_template("planner", state, configurable)
if (
plan_iterations == 0
and state.get("enable_background_investigation")
and state.get("background_investigation_results")
):
messages += [
{
"role": "user",
"content": (
"background investigation results of user query:\n"
+ state["background_investigation_results"]
+ "\n"
),
}
]
if AGENT_LLM_MAP["planner"] == "basic":
llm = get_llm_by_type(AGENT_LLM_MAP["planner"]).with_structured_output(
Plan,
method="json_mode",
)
else:
llm = get_llm_by_type(AGENT_LLM_MAP["planner"])
# if the plan iterations is greater than the max plan iterations, return the reporter node
if plan_iterations >= configurable.max_plan_iterations:
return Command(goto="reporter")
full_response = ""
if AGENT_LLM_MAP["planner"] == "basic":
response = llm.invoke(messages)
full_response = response.model_dump_json(indent=4, exclude_none=True)
else:
response = llm.stream(messages)
for chunk in response:
full_response += chunk.content
logger.debug(f"Current state messages: {state['messages']}")
logger.info(f"Planner response: {full_response}")
try:
curr_plan = json.loads(repair_json_output(full_response))
except json.JSONDecodeError:
logger.warning("Planner response is not a valid JSON")
if plan_iterations > 0:
return Command(goto="reporter")
else:
return Command(goto="__end__")
if curr_plan.get("has_enough_context"):
logger.info("Planner response has enough context.")
new_plan = Plan.model_validate(curr_plan)
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": new_plan,
},
goto="reporter",
)
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": full_response,
},
goto="human_feedback",
)
说明:一个辅助函数,用于为用于为 researcher_node 和 coder_node 设置合适的工具并执行当前步骤
核心作用:根据配置动态加载 MCP 工具,并创建相应的 Agent 来处理任务,具体调用 _execute_agent_step 执行具体步骤
async def _setup_and_execute_agent_step(
state: State,
config: RunnableConfig,
agent_type: str,
default_agent,
default_tools: list,
) -> Command[Literal["research_team"]]:
"""Helper function to set up an agent with appropriate tools and execute a step.
This function handles the common logic for both researcher_node and coder_node:
1. Configures MCP servers and tools based on agent type
2. Creates an agent with the appropriate tools or uses the default agent
3. Executes the agent on the current step
Args:
state: The current state
config: The runnable config
agent_type: The type of agent ("researcher" or "coder")
default_agent: The default agent to use if no MCP servers are configured
default_tools: The default tools to add to the agent
Returns:
Command to update state and go to research_team
"""
configurable = Configuration.from_runnable_config(config)
mcp_servers = {}
enabled_tools = {}
# Extract MCP server configuration for this agent type
if configurable.mcp_settings:
for server_name, server_config in configurable.mcp_settings["servers"].items():
if (
server_config["enabled_tools"]
and agent_type in server_config["add_to_agents"]
):
mcp_servers[server_name] = {
k: v
for k, v in server_config.items()
if k in ("transport", "command", "args", "url", "env")
}
for tool_name in server_config["enabled_tools"]:
enabled_tools[tool_name] = server_name
# Create and execute agent with MCP tools if available
if mcp_servers:
async with MultiServerMCPClient(mcp_servers) as client:
loaded_tools = default_tools[:]
for tool in client.get_tools():
if tool.name in enabled_tools:
tool.description = (
f"Powered by '{enabled_tools[tool.name]}'.\n{tool.description}"
)
loaded_tools.append(tool)
agent = create_agent(agent_type, agent_type, loaded_tools, agent_type)
return await _execute_agent_step(state, agent, agent_type)
else:
# Use default agent if no MCP servers are configured
return await _execute_agent_step(state, default_agent, agent_type)
async def _execute_agent_step(
state: State, agent, agent_name: str
) -> Command[Literal["research_team"]]:
"""Helper function to execute a step using the specified agent."""
current_plan = state.get("current_plan")
observations = state.get("observations", [])
# Find the first unexecuted step
for step in current_plan.steps:
if not step.execution_res:
break
logger.info(f"Executing step: {step.title}")
# Prepare the input for the agent
agent_input = {
"messages": [
HumanMessage(
content=f"#Task\n\n##title\n\n{step.title}\n\n##description\n\n{step.description}\n\n##locale\n\n{state.get('locale', 'en-US')}"
)
]
}
# Add citation reminder for researcher agent
if agent_name == "researcher":
agent_input["messages"].append(
HumanMessage(
content="IMPORTANT: DO NOT include inline citations in the text. Instead, track all sources and include a References section at the end using link reference format. Include an empty line between each citation for better readability. Use this format for each reference:\n- [Source Title](URL)\n\n- [Another Source](URL)",
name="system",
)
)
# Invoke the agent
result = await agent.ainvoke(input=agent_input)
# Process the result
response_content = result["messages"][-1].content
logger.debug(f"{agent_name.capitalize()} full response: {response_content}")
# Update the step with the execution result
step.execution_res = response_content
logger.info(f"Step '{step.title}' execution completed by {agent_name}")
return Command(
update={
"messages": [
HumanMessage(
content=response_content,
name=agent_name,
)
],
"observations": observations + [response_content],
},
goto="research_team",
)
主要负责网络搜索
async def researcher_node(
state: State, config: RunnableConfig
) -> Command[Literal["research_team"]]:
"""Researcher node that do research"""
logger.info("Researcher node is researching.")
return await _setup_and_execute_agent_step(
state,
config,
"researcher",
research_agent,
[web_search_tool, crawl_tool],
)
作用:检查当前任务计划是否为空、是否有步骤、步骤是否完成等,决定跳转到哪个节点
def research_team_node(
state: State,
) -> Command[Literal["planner", "researcher", "coder"]]:
"""Research team node that collaborates on tasks."""
logger.info("Research team is collaborating on tasks.")
current_plan = state.get("current_plan")
if not current_plan or not current_plan.steps:
return Command(goto="planner")
if all(step.execution_res for step in current_plan.steps):
return Command(goto="planner")
for step in current_plan.steps:
if not step.execution_res:
break
if step.step_type and step.step_type == StepType.RESEARCH:
return Command(goto="researcher")
if step.step_type and step.step_type == StepType.PROCESSING:
return Command(goto="coder")
return Command(goto="planner")
主要负责数据处理
async def coder_node(
state: State, config: RunnableConfig
) -> Command[Literal["research_team"]]:
"""Coder node that do code analysis."""
logger.info("Coder node is coding.")
return await _setup_and_execute_agent_step(
state,
config,
"coder",
coder_agent,
[python_repl_tool],
)
作用:用于在用户与系统之间插入人工反馈环节,让用户可对当前生成的任务进行确认或修改
def human_feedback_node(
state,
) -> Command[Literal["planner", "research_team", "reporter", "__end__"]]:
current_plan = state.get("current_plan", "")
# check if the plan is auto accepted
auto_accepted_plan = state.get("auto_accepted_plan", False)
if not auto_accepted_plan:
feedback = interrupt("Please Review the Plan.")
# if the feedback is not accepted, return the planner node
if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
return Command(
update={
"messages": [
HumanMessage(content=feedback, name="feedback"),
],
},
goto="planner",
)
elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
logger.info("Plan is accepted by user.")
else:
raise TypeError(f"Interrupt value of {feedback} is not supported.")
# if the plan is accepted, run the following node
plan_iterations = state["plan_iterations"] if state.get("plan_iterations", 0) else 0
goto = "research_team"
try:
current_plan = repair_json_output(current_plan)
# increment the plan iterations
plan_iterations += 1
# parse the plan
new_plan = json.loads(current_plan)
if new_plan["has_enough_context"]:
goto = "reporter"
except json.JSONDecodeError:
logger.warning("Planner response is not a valid JSON")
if plan_iterations > 0:
return Command(goto="reporter")
else:
return Command(goto="__end__")
return Command(
update={
"current_plan": Plan.model_validate(new_plan),
"plan_iterations": plan_iterations,
"locale": new_plan["locale"],
},
goto=goto,
)
说明:该节点在整个流程中通常是最后一步,基于之前生成的任务计划和研究团队的执行结果,输出一个结果清晰、格式规范的研究报告
def reporter_node(state: State):
"""Reporter node that write a final report."""
logger.info("Reporter write final report")
current_plan = state.get("current_plan")
input_ = {
"messages": [
HumanMessage(
f"# Research Requirements\n\n## Task\n\n{current_plan.title}\n\n## Description\n\n{current_plan.thought}"
)
],
"locale": state.get("locale", "en-US"),
}
invoke_messages = apply_prompt_template("reporter", input_)
observations = state.get("observations", [])
# Add a reminder about the new report format, citation style, and table usage
invoke_messages.append(
HumanMessage(
content="IMPORTANT: Structure your report according to the format in the prompt. Remember to include:\n\n1. Key Points - A bulleted list of the most important findings\n2. Overview - A brief introduction to the topic\n3. Detailed Analysis - Organized into logical sections\n4. Survey Note (optional) - For more comprehensive reports\n5. Key Citations - List all references at the end\n\nFor citations, DO NOT include inline citations in the text. Instead, place all citations in the 'Key Citations' section at the end using the format: `- [Source Title](URL)`. Include an empty line between each citation for better readability.\n\nPRIORITIZE USING MARKDOWN TABLES for data presentation and comparison. Use tables whenever presenting comparative data, statistics, features, or options. Structure tables with clear headers and aligned columns. Example table format:\n\n| Feature | Description | Pros | Cons |\n|---------|-------------|------|------|\n| Feature 1 | Description 1 | Pros 1 | Cons 1 |\n| Feature 2 | Description 2 | Pros 2 | Cons 2 |",
name="system",
)
)
for observation in observations:
invoke_messages.append(
HumanMessage(
content=f"Below are some observations for the research task:\n\n{observation}",
name="observation",
)
)
logger.debug(f"Current invoke messages: {invoke_messages}")
response = get_llm_by_type(AGENT_LLM_MAP["reporter"]).invoke(invoke_messages)
response_content = response.content
logger.info(f"reporter response: {response_content}")
return {"final_report": response_content}
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-05-27
Langchain创始人最新分享:如何跨越“原型惊艳”到“生产可靠”的鸿沟
2025-05-26
简单实战LangChain多任务应用开发
2025-05-26
二、LangChain提示词工程
2025-05-24
AI Agent技术原理解析 —— LangManus、Deerflow...
2025-05-21
首个落地案例!WEBTOON 用 LangGraph 构建故事理解 Agent,创作者 AI 时代的范式来了?
2025-05-21
一文学会LangChain搭建Agent工具
2025-05-19
新手必看!LangGraph如何轻松搞定多语言模型协同工作
2025-05-19
LangChain:一个AI应用框架的开发生态
2024-10-10
2024-07-13
2024-06-03
2024-04-08
2024-09-04
2024-04-08
2024-08-18
2024-03-28
2024-06-24
2024-07-10
2025-05-21
2025-05-19
2025-05-08
2025-05-06
2025-04-22
2025-04-18
2025-03-22
2025-03-22