Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 102 additions & 68 deletions vllm/entrypoints/harmony_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@

_harmony_encoding = None

# Builtin tools that should be included in the system message when
# they are available and requested by the user.
# Tool args are provided by MCP tool descriptions. Output
# of the tools are stringified.
MCP_BUILTIN_TOOLS: set[str] = {
"web_search_preview",
"code_interpreter",
Expand All @@ -69,10 +65,6 @@


def has_custom_tools(tool_types: set[str]) -> bool:
"""
Checks if the given tool types are custom tools
(i.e. any tool other than MCP buildin tools)
"""
return not tool_types.issubset(MCP_BUILTIN_TOOLS)


Expand Down Expand Up @@ -107,7 +99,6 @@ def get_system_message(
REASONING_EFFORT[reasoning_effort]
)
if start_date is None:
# NOTE(woosuk): This brings non-determinism in vLLM. Be careful.
start_date = datetime.datetime.now().strftime("%Y-%m-%d")
sys_msg_content = sys_msg_content.with_conversation_start_date(start_date)
if browser_description is not None:
Expand Down Expand Up @@ -157,11 +148,7 @@ def get_developer_message(
"container",
"mcp",
):
# These are built-in tools that are added to the system message.
# Adding in MCP for now until we support MCP tools executed
# server side
pass

elif tool.type == "function":
function_tools.append(tool)
else:
Expand Down Expand Up @@ -191,9 +178,6 @@ def parse_response_input(
role = response_msg["role"]
content = response_msg["content"]
if role == "system":
# User is trying to set a system message. Change it to:
# <|start|>developer<|message|># Instructions
# {instructions}<|end|>
role = "developer"
text_prefix = "Instructions:\n"
else:
Expand Down Expand Up @@ -237,33 +221,30 @@ def parse_response_input(

def parse_input_to_harmony_message(chat_msg) -> list[Message]:
if not isinstance(chat_msg, dict):
# Handle Pydantic models
chat_msg = chat_msg.model_dump(exclude_none=True)

role = chat_msg.get("role")

# Assistant message with tool calls
tool_calls = chat_msg.get("tool_calls")
if role == "assistant" and tool_calls:
msgs: list[Message] = []
for call in tool_calls:
func = call.get("function", {})
name = func.get("name", "")
arguments = func.get("arguments", "") or ""
if isinstance(arguments, dict):
arguments = json.dumps(arguments)

msg = Message.from_role_and_content(Role.ASSISTANT, arguments)
msg = msg.with_channel("commentary")
msg = msg.with_recipient(f"functions.{name}")
msg = msg.with_content_type("json")
msgs.append(msg)
return msgs

# Tool role message (tool output)
if role == "tool":
name = chat_msg.get("name", "")
content = chat_msg.get("content", "") or ""
if isinstance(content, list):
# Handle array format for tool message content
# by concatenating all text parts.
content = "".join(
item.get("text", "")
for item in content
Expand All @@ -275,12 +256,10 @@ def parse_input_to_harmony_message(chat_msg) -> list[Message]:
).with_channel("commentary")
return [msg]

# Default: user/assistant/system messages with content
content = chat_msg.get("content", "")
if isinstance(content, str):
contents = [TextContent(text=content)]
else:
# TODO: Support refusal.
contents = [TextContent(text=c.get("text", "")) for c in content]
msg = Message.from_role_and_contents(role, contents)
return [msg]
Expand All @@ -292,12 +271,8 @@ def construct_harmony_previous_input_messages(
messages: list[OpenAIHarmonyMessage] = []
if request.previous_input_messages:
for message in request.previous_input_messages:
# Handle both OpenAIHarmonyMessage objects and dictionary inputs
if isinstance(message, OpenAIHarmonyMessage):
message_role = message.author.role
# To match OpenAI, instructions, reasoning and tools are
# always taken from the most recent Responses API request
# not carried over from previous requests
if (
message_role == OpenAIHarmonyRole.SYSTEM
or message_role == OpenAIHarmonyRole.DEVELOPER
Expand All @@ -308,9 +283,6 @@ def construct_harmony_previous_input_messages(
harmony_messages = parse_input_to_harmony_message(message)
for harmony_msg in harmony_messages:
message_role = harmony_msg.author.role
# To match OpenAI, instructions, reasoning and tools are
# always taken from the most recent Responses API request
# not carried over from previous requests
if (
message_role == OpenAIHarmonyRole.SYSTEM
or message_role == OpenAIHarmonyRole.DEVELOPER
Expand All @@ -329,31 +301,19 @@ def render_for_completion(messages: list[Message]) -> list[int]:


def parse_output_message(message: Message) -> list[ResponseOutputItem]:
"""
Parse a Harmony message into a list of output response items.
"""
if message.author.role != "assistant":
# This is a message from a tool to the assistant (e.g., search result).
# Don't include it in the final output for now. This aligns with
# OpenAI's behavior on models like o4-mini.
return []

output_items: list[ResponseOutputItem] = []
recipient = message.recipient

if recipient is not None and recipient.startswith("browser."):
if len(message.content) != 1:
raise ValueError("Invalid number of contents in browser message")
content = message.content[0]
# We do not need to check the VLLM_TOOL_JSON_ERROR_AUTOMATIC_RETRY
# env variable since if it is not set, we are certain the json is valid
# The use of Actions for web search will be removed entirely in
# the future, so this is only necessary temporarily
try:
browser_call = json.loads(content.text)
except json.JSONDecodeError:
# If the content is not valid JSON, then it was
# caught and retried by vLLM, which means we
# need to make note of that so the user is aware
json_retry_output_message = (
f"Invalid JSON args, caught and retried: {content.text}"
)
Expand All @@ -362,7 +322,7 @@ def parse_output_message(message: Message) -> list[ResponseOutputItem]:
"url": json_retry_output_message,
"pattern": json_retry_output_message,
}
# TODO: translate to url properly!

if recipient == "browser.search":
action = ActionSearch(
query=f"cursor:{browser_call.get('query', '')}", type="search"
Expand All @@ -386,6 +346,7 @@ def parse_output_message(message: Message) -> list[ResponseOutputItem]:
type="web_search_call",
)
output_items.append(web_search_item)

elif message.channel == "analysis":
for content in message.content:
reasoning_item = ResponseReasoningItem(
Expand All @@ -400,9 +361,13 @@ def parse_output_message(message: Message) -> list[ResponseOutputItem]:
status=None,
)
output_items.append(reasoning_item)

elif message.channel == "commentary":
if recipient is not None and recipient.startswith("functions."):
function_name = recipient.split(".")[-1]
# FIX: Strict name sanitization to remove leaked tags like <|channel|>
raw_name = recipient.split("functions.")[1]
function_name = raw_name.split("<")[0].strip()

for content in message.content:
random_id = random_uuid()
response_item = ResponseFunctionToolCall(
Expand All @@ -413,6 +378,7 @@ def parse_output_message(message: Message) -> list[ResponseOutputItem]:
id=f"fc_{random_id}",
)
output_items.append(response_item)

elif recipient is not None and (
recipient.startswith("python")
or recipient.startswith("browser")
Expand All @@ -433,14 +399,15 @@ def parse_output_message(message: Message) -> list[ResponseOutputItem]:
output_items.append(reasoning_item)
else:
raise ValueError(f"Unknown recipient: {recipient}")

elif message.channel == "final":
contents = []
for content in message.content:
output_text = ResponseOutputText(
text=content.text,
annotations=[], # TODO
annotations=[],
type="output_text",
logprobs=None, # TODO
logprobs=None,
)
contents.append(output_text)
text_item = ResponseOutputMessage(
Expand Down Expand Up @@ -478,19 +445,34 @@ def parse_remaining_state(parser: StreamableParser) -> list[ResponseOutputItem]:
status=None,
)
return [reasoning_item]

elif parser.current_channel == "commentary":
if current_recipient is not None and current_recipient.startswith("functions."):
# FIX: Strict name sanitization here as well
raw_name = current_recipient.split("functions.")[1]
function_name = raw_name.split("<")[0].strip()

random_id = random_uuid()
response_item = ResponseFunctionToolCall(
arguments=parser.current_content,
call_id=f"call_{random_id}",
type="function_call",
name=function_name,
id=f"fc_{random_id}",
)
return [response_item]

elif parser.current_channel == "final":
output_text = ResponseOutputText(
text=parser.current_content,
annotations=[], # TODO
annotations=[],
type="output_text",
logprobs=None, # TODO
logprobs=None,
)
text_item = ResponseOutputMessage(
id=f"msg_{random_uuid()}",
content=[output_text],
role="assistant",
# if the parser still has messages (ie if the generator got cut
# abruptly), this should be incomplete
status="incomplete",
type="message",
)
Expand All @@ -508,8 +490,36 @@ def get_streamable_parser_for_assistant() -> StreamableParser:

def parse_output_into_messages(token_ids: Iterable[int]) -> StreamableParser:
parser = get_streamable_parser_for_assistant()
for token_id in token_ids:
parser.process(token_id)

tokens = list(token_ids)
if not tokens:
return parser

encoding = get_encoding()

# FIX: Use allowed_special="all" to avoid Tokenizer error
start_token = encoding.encode("<|start|>", allowed_special="all")[0]

if tokens[0] != start_token:

def get_id(text):
return encoding.encode(text, allowed_special="all")[0]

header_tokens = [
start_token,
get_id("assistant"),
get_id("<|channel|>"),
get_id("analysis"),
get_id("<|message|>"),
]
tokens = header_tokens + tokens

for token_id in tokens:
try:
parser.process(token_id)
except Exception:
break
Comment on lines +520 to +521
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of a broad except Exception: that silently breaks can hide underlying bugs in the parser or issues with the token stream. If an exception occurs, token processing stops, and the output is silently truncated. This can lead to incomplete results without any warning, making debugging difficult. This is a high-risk pattern that can cause silent data loss. It is highly recommended to log the exception before breaking the loop to provide visibility into why parsing was stopped.

Suggested change
except Exception:
break
except Exception:
# TODO: Add logging here to avoid silent failures. For example:
# logger.warning("Error processing token %d. Stopping parsing.",
# token_id, exc_info=True)
break


return parser


Expand All @@ -518,18 +528,42 @@ def parse_chat_output(
) -> tuple[str | None, str | None, bool]:
parser = parse_output_into_messages(token_ids)
output_msgs = parser.messages
is_tool_call = False # TODO: update this when tool call is supported
if len(output_msgs) == 0:
# The generation has stopped during reasoning.
reasoning = parser.current_content
final_content = None
elif len(output_msgs) == 1:
# The generation has stopped during final message.
reasoning = output_msgs[0].content[0].text
final_content = parser.current_content
else:
reasoning_msg = output_msgs[:-1]
final_msg = output_msgs[-1]
reasoning = "\n".join([msg.content[0].text for msg in reasoning_msg])
final_content = final_msg.content[0].text

reasoning_parts = []
final_content = None
is_tool_call = False

for msg in output_msgs:
if msg.channel == "analysis":
for content in msg.content:
reasoning_parts.append(content.text)
elif msg.channel == "final":
for content in msg.content:
final_content = content.text
elif (
msg.channel == "commentary"
and msg.recipient
and msg.recipient.startswith("functions.")
):
is_tool_call = True
if not final_content:
final_content = ""
for content in msg.content:
final_content = content.text

if parser.current_content:
if parser.current_channel == "analysis":
reasoning_parts.append(parser.current_content)
elif parser.current_channel == "final":
final_content = parser.current_content
elif (
parser.current_channel == "commentary"
and parser.current_recipient
and parser.current_recipient.startswith("functions.")
):
is_tool_call = True
final_content = parser.current_content

reasoning = "\n".join(reasoning_parts) if reasoning_parts else None

return reasoning, final_content, is_tool_call
Loading