Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1138134
feat: Add PostgreSQL table creation and logging for langgraph events …
changxubo Aug 20, 2025
f44d286
feat: Implement conversation management with API endpoints and UI dia…
changxubo Aug 20, 2025
4d305a5
chore: Clean up unused imports and code in various files
changxubo Aug 20, 2025
1e72486
feat: Improve background investigation results filtering and logging;…
changxubo Aug 20, 2025
c2f290c
chore: Clean up and organize test files for improved readability and …
changxubo Aug 20, 2025
7d5318d
feat: Add mock_config_thread fixture and update human_feedback_node t…
changxubo Aug 20, 2025
cf6c958
chore: Clean up and organize test fixtures for improved structure and…
changxubo Aug 20, 2025
32ac76e
chore: Remove unused code and improve test structure for better maint…
changxubo Aug 20, 2025
d4faafe
chore: Remove unused functions and clean up code in nodes.py
changxubo Aug 20, 2025
2b7fb1a
Merge branch 'feat/conversations' of https://github.com/changxubo/dee…
changxubo Aug 20, 2025
a6ab092
feat: Update human_feedback_node to accept mock_config_thread and adj…
changxubo Aug 20, 2025
ec7be6f
chore: Clean up and organize code structure in nodes.py and tavily_se…
changxubo Aug 20, 2025
1e7c5bc
chore: No code changes made; empty commit for tracking purposes
changxubo Aug 20, 2025
7e66478
feat: Enhance ConversationsDialog with dynamic icons and improved mes…
changxubo Aug 20, 2025
74f28bc
fix: Use nullish coalescing operator for replay ID extraction in useR…
changxubo Aug 21, 2025
81007c5
Merge branch 'main' into feat/conversations
WillemJiang Aug 26, 2025
cc7e624
fix lint errors of frontend
changxubo Sep 2, 2025
319af31
Merge branch 'main' into feat/conversations
changxubo Sep 4, 2025
c8b018d
Merge branch 'main' of https://github.com/changxubo/deer-flow into fe…
changxubo Sep 12, 2025
6f1af31
Merge branch 'main' into feat/conversations
WillemJiang Sep 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/config/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_recursion_limit(default: int = 25) -> int:
class Configuration:
"""The configurable fields."""

thread_id: str = field(default="")
resources: list[Resource] = field(
default_factory=list
) # Resources to be used for the research
Expand Down
362 changes: 360 additions & 2 deletions src/graph/checkpoint.py

Large diffs are not rendered by default.

109 changes: 90 additions & 19 deletions src/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import logging
import os
from typing import Annotated, Literal
from typing import Annotated, Literal, cast

from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
Expand All @@ -28,6 +28,7 @@
from src.utils.json_utils import repair_json_output

from ..config import SELECTED_SEARCH_ENGINE, SearchEngine
from .checkpoint import log_research_replays, log_graph_event
from .types import State

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,13 +59,18 @@ def background_investigation_node(state: State, config: RunnableConfig):
searched_content = searched_content[0]
if isinstance(searched_content, list):
background_investigation_results = [
f"## {elem['title']}\n\n{elem['content']}" for elem in searched_content
f"## {elem['title']}\n\n{elem['content'] }"
for elem in searched_content # if elem.get("type") == "page"
]
return {
"background_investigation_results": "\n\n".join(
background_investigation_results
)
}
results = "\n\n".join(background_investigation_results)
# Build checkpoint with the background investigation results
log_graph_event(
configurable.thread_id,
"background_investigator",
"info",
{"goto": "planner", "investigations": results},
)
return {"background_investigation_results": results}
else:
logger.error(
f"Tavily search returned malformed response: {searched_content}"
Expand All @@ -73,11 +79,15 @@ def background_investigation_node(state: State, config: RunnableConfig):
background_investigation_results = get_web_search_tool(
configurable.max_search_results
).invoke(query)
return {
"background_investigation_results": json.dumps(
background_investigation_results, ensure_ascii=False
)
}
results = json.dumps(background_investigation_results, ensure_ascii=False)
# Build checkpoint with the background investigation results
log_graph_event(
configurable.thread_id,
"background_investigator",
"info",
{"goto": "planner", "investigations": results},
)
return {"background_investigation_results": results}


def planner_node(
Expand Down Expand Up @@ -139,13 +149,27 @@ def planner_node(
if isinstance(curr_plan, dict) and curr_plan.get("has_enough_context"):
logger.info("Planner response has enough context.")
new_plan = Plan.model_validate(curr_plan)
# Build checkpoint with the current plan
log_graph_event(
configurable.thread_id,
"planner",
"info",
{"goto": "reporter", "current_plan": curr_plan},
)
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
"current_plan": new_plan,
},
goto="reporter",
)
# Build checkpoint with the current plan
log_graph_event(
configurable.thread_id,
"planner",
"info",
{"goto": "human_feedback", "current_plan": curr_plan},
)
return Command(
update={
"messages": [AIMessage(content=full_response, name="planner")],
Expand All @@ -156,8 +180,9 @@ def planner_node(


def human_feedback_node(
state,
state, config: RunnableConfig
) -> Command[Literal["planner", "research_team", "reporter", "__end__"]]:
configurable = Configuration.from_runnable_config(config)
current_plan = state.get("current_plan", "")
# check if the plan is auto accepted
auto_accepted_plan = state.get("auto_accepted_plan", False)
Expand Down Expand Up @@ -194,7 +219,13 @@ def human_feedback_node(
return Command(goto="reporter")
else:
return Command(goto="__end__")

# Build checkpoint with the current plan
log_graph_event(
configurable.thread_id,
"human_feedback",
"info",
{"goto": goto, "current_plan": new_plan, "plan_iterations": plan_iterations},
)
return Command(
update={
"current_plan": Plan.model_validate(new_plan),
Expand Down Expand Up @@ -248,6 +279,18 @@ def coordinator_node(
messages = state.get("messages", [])
if response.content:
messages.append(HumanMessage(content=response.content, name="coordinator"))

# Build checkpoint with the current plan
log_research_replays(
configurable.thread_id, research_topic, configurable.report_style, 0
)
log_graph_event(
configurable.thread_id,
"coordinator",
"info",
{"goto": goto, "research_topic": research_topic},
)

return Command(
update={
"messages": messages,
Expand Down Expand Up @@ -294,7 +337,13 @@ def reporter_node(state: State, config: RunnableConfig):
response = get_llm_by_type(AGENT_LLM_MAP["reporter"]).invoke(invoke_messages)
response_content = response.content
logger.info(f"reporter response: {response_content}")

# Build checkpoint with the current plan
log_graph_event(
configurable.thread_id,
"reporter",
"info",
{"goto": "end", "final_report": response_content},
)
return {"final_report": response_content}


Expand All @@ -305,13 +354,13 @@ def research_team_node(state: State):


async def _execute_agent_step(
state: State, agent, agent_name: str
state: State, config: RunnableConfig, agent, agent_name: str
) -> Command[Literal["research_team"]]:
"""Helper function to execute a step using the specified agent."""
current_plan = state.get("current_plan")
plan_title = current_plan.title
observations = state.get("observations", [])

configurable = Configuration.from_runnable_config(config)
# Find the first unexecuted step
current_step = None
completed_steps = []
Expand Down Expand Up @@ -402,6 +451,28 @@ async def _execute_agent_step(
# Update the step with the execution result
current_step.execution_res = response_content
logger.info(f"Step '{current_step.title}' execution completed by {agent_name}")
# Build checkpoint with the current plan
agent_input_messages = []
for message in agent_input["messages"]:
if isinstance(message, tuple):
agent_input_messages.append(
{
"role": message.type,
"content": message.content,
"name": message.name,
}
)
log_graph_event(
configurable.thread_id,
"agent",
"info",
{
"goto": "research_team",
"agent": agent_name,
"input": agent_input_messages,
"observations": observations + [response_content],
},
)

return Command(
update={
Expand Down Expand Up @@ -470,11 +541,11 @@ async def _setup_and_execute_agent_step(
)
loaded_tools.append(tool)
agent = create_agent(agent_type, agent_type, loaded_tools, agent_type)
return await _execute_agent_step(state, agent, agent_type)
return await _execute_agent_step(state, config, agent, agent_type)
else:
# Use default tools if no MCP servers are configured
agent = create_agent(agent_type, agent_type, default_tools, agent_type)
return await _execute_agent_step(state, agent, agent_type)
return await _execute_agent_step(state, config, agent, agent_type)


async def researcher_node(
Expand Down
57 changes: 57 additions & 0 deletions src/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,19 @@
RAGResourceRequest,
RAGResourcesResponse,
)
from src.server.conversation_request import (
Conversation,
ConversationsRequest,
ConversationsResponse,
)
from src.tools import VolcengineTTS

from src.graph.checkpoint import (
chat_stream_message,
get_conversation,
list_conversations,
)

from src.utils.json_utils import sanitize_args

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -611,3 +623,48 @@ async def config():
rag=RAGConfigResponse(provider=SELECTED_RAG_PROVIDER),
models=get_configured_llm_models(),
)


@app.get("/api/conversation/{thread_id}", response_model=str)
async def get_converstation(thread_id: str) -> Response:
"""Get the Conversation content for a specific thread ID."""
try:
content = get_conversation(thread_id)
if not content:
raise HTTPException(status_code=404, detail="Converstation not found")

return Response(
content=content,
media_type="text/plain",
headers={"Content-Type": "text/plain; charset=utf-8"},
)

except Exception as e:
logger.exception(f"Error getting Converstation: {str(e)}")
raise HTTPException(status_code=500, detail=INTERNAL_SERVER_ERROR_DETAIL)


@app.get("/api/conversations", response_model=ConversationsResponse)
async def get_conversations(
request: Annotated[ConversationsRequest, Query()],
) -> ConversationsResponse:
"""Get conversations based on the provided request parameters."""
try:
conversations = list_conversations(limit=request.limit, sort=request.sort)
response = []
for conversation in conversations:
response.append(
Conversation(
id=conversation.get("thread_id", ""),
title=conversation.get("research_topic", ""),
count=conversation.get("messages", 0),
date=conversation.get("ts", ""),
category=conversation.get("report_style", ""),
data_type="database", # Assuming default data type is 'txt'
)
)
data = ConversationsResponse(data=response)
return data
except Exception as e:
logger.exception(f"Error getting conversations: {str(e)}")
raise HTTPException(status_code=500, detail=INTERNAL_SERVER_ERROR_DETAIL)
53 changes: 53 additions & 0 deletions src/server/conversation_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from datetime import datetime
from typing import Optional

from pydantic import BaseModel, Field


class Conversation(BaseModel):
id: Optional[str] = Field("", description="The thread ID of the conversation.")
title: Optional[str] = Field("", description="The title of the conversation")
date: Optional[datetime] = Field(
"", description="The date of the conversation, formatted as 'YYYY-MM-DD'."
)

category: Optional[str] = Field(
"Social Media", description="The writing style of the conversation."
)
count: Optional[int] = Field(
0, description="The number of messages in the conversation."
)
data_type: Optional[str] = Field(
"txt", description="The type of data in the conversation, e.g., 'txt', 'json'."
)


class ConversationsResponse(BaseModel):

data: Optional[list[Conversation]] = Field(
default_factory=list,
description="List of replays matching the request criteria",
)


class ConversationsRequest(BaseModel):
"""Request model for RAG resource queries.

This model represents a request to search for resources within the RAG system.
It encapsulates the search query and any associated parameters.

Attributes:
query: The search query string used to find relevant resources.
Can be None if no specific query is provided.
"""

limit: Optional[int] = Field(
None, description="The maximum number of resources to retrieve"
)
offset: Optional[int] = Field(
None,
description="The offset for pagination, used to skip a number of resources",
)
sort: Optional[str] = Field(
None, description="The field by which to sort the resources"
)
19 changes: 14 additions & 5 deletions src/tools/tavily_search/tavily_search_api_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,19 @@ def clean_results_with_images(
clean_results.append(clean_result)
images = raw_results["images"]
for image in images:
clean_result = {
"type": "image",
"image_url": image["url"],
"image_description": image["description"],
}
if isinstance(image, str):
clean_result = {
"type": "image",
"image_url": image,
"image_description": "",
}
elif isinstance(image, dict):
clean_result = {
"type": "image",
"image_url": image.get("url"),
"image_description": image.get("description", ""),
}
else:
continue
clean_results.append(clean_result)
return clean_results
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import json
import logging
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Literal, Optional, Tuple, Union

from langchain.callbacks.manager import (
AsyncCallbackManagerForToolRun,
Expand Down
Loading