Skip to content

Commit 6e02144

Browse files
authored
fix: handle malformed messages missing <|message|> token (#82)
* fix: handle malformed messages missing <|message|> token StreamableParser now gracefully handles LLM output where stop tokens appear before the expected <|message|> token. The parser extracts header metadata (channel, recipient, content_type) from accumulated tokens and treats the remainder as message content. Refactors header parsing into a shared helper to eliminate duplication. Signed-off-by: Ben Browning <[email protected]> * Keep backwards compatibility with new ParseOptions This moves the more less strict parsing behavior behind a new ParseOptions.strict value. It keeps all Rust signatures the same, adding new `_with_options` variants where we need to pass in options. For Python and wasm bindings, it adds the `strict` field as an optional kwarg. Signed-off-by: Ben Browning <[email protected]> --------- Author: Ben Browning <[email protected]>
1 parent 508cbaa commit 6e02144

File tree

10 files changed

+332
-49
lines changed

10 files changed

+332
-49
lines changed

docs/format.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,12 @@ convo = Conversation.from_messages(
107107
tokens = encoding.render_conversation_for_completion(convo, Role.ASSISTANT)
108108

109109
# After receiving a token response
110-
# Do not pass in the stop token
111-
parsed_response = encoding.parse_messages_from_completion_tokens(new_tokens, Role.ASSISTANT)
110+
# Do not pass in the stop token. Set strict=False to tolerate malformed headers.
111+
parsed_response = encoding.parse_messages_from_completion_tokens(
112+
new_tokens,
113+
Role.ASSISTANT,
114+
strict=True,
115+
)
112116
```
113117

114118
Additionally the openai_harmony library also includes a StreamableParser for parsing and decoding as the model is generating new tokens. This can be helpful for example to stream output and handle unicode characters during decoding.
@@ -269,7 +273,7 @@ If you are not using function tool calling your developer message would just loo
269273

270274
Where `{instructions}` is replaced with your “system prompt”.
271275

272-
For defining function calling tools, [check out the dedicated section](#function-calling).
276+
For defining function calling tools, [check out the dedicated section](#function-calling).
273277
For defining an output format to be used in structured outputs, [check out this section of the guide](#structured-output).
274278

275279
### Reasoning
@@ -301,7 +305,7 @@ And the actual answer is:
301305
2 + 2 = 4
302306
```
303307

304-
**Important:**
308+
**Important:**
305309
The model has not been trained to the same safety standards in the chain-of-thought as it has for final output. We recommend not to show the chain-of-thought to your users as they might contain harmful content. [Learn more in the model card](https://openai.com/index/gpt-oss-model-card/).
306310

307311
#### Handling reasoning output in subsequent sampling

docs/python.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,14 @@ Methods:
107107
- `render_conversation_for_training(conversation, config=None)` – render a conversation for training.
108108
- `render_conversation(conversation, config=None)` – render a conversation without appending a new role.
109109
- `render(message)` – render a single message into tokens.
110-
- `parse_messages_from_completion_tokens(tokens, role=None)` – parse tokens back into `Message` objects.
110+
- `parse_messages_from_completion_tokens(tokens, role=None, strict=True)` – parse tokens back into `Message` objects (set `strict=False` to enable permissive parsing).
111111
- `decode_utf8(tokens)` – decode tokens with the underlying tokenizer.
112112
- `stop_tokens()` / `stop_tokens_for_assistant_actions()` – lists of stop tokens.
113113

114+
Use `strict=False` when you need the parser to recover from malformed model output that omits markers such as `<|message|>`.
115+
114116
### `StreamableParser`
115-
Incremental parser built on top of an encoding. Construct with `StreamableParser(encoding, role)` and feed tokens via `process(token)`. Inspect state via properties like `current_content`, `current_role`, `tokens` and `state`.
117+
Incremental parser built on top of an encoding. Construct with `StreamableParser(encoding, role)` and feed tokens via `process(token)`. Inspect state via properties like `current_content`, `current_role`, `tokens` and `state`. Pass `strict=False` to enable permissive parsing (mirrors `ParseOptions { strict: false }` on the Rust side).
116118

117119
### `load_harmony_encoding(name)`
118120
Return a `HarmonyEncoding` by name. Accepts either the string name or a value from the `HarmonyEncodingName` enum (`HARMONY_GPT_OSS`).

docs/rust.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,15 @@ Important methods:
8888
- `render_conversation_for_training(conversation, config)` – render a conversation for training data.
8989
- `render_conversation(conversation, config)` – render a conversation without appending a new role.
9090
- `render(message)` – render a single message into tokens.
91-
- `parse_messages_from_completion_tokens(tokens, role)` – parse a list of tokens back into messages.
91+
- `parse_messages_from_completion_tokens(tokens, role)` – parse a list of tokens back into messages using strict validation.
92+
- `parse_messages_from_completion_tokens_with_options(tokens, role, options)` – parse tokens with custom `ParseOptions` (e.g. to disable strict validation).
9293
- `stop_tokens()` and `stop_tokens_for_assistant_actions()` – sets of stop tokens for sampling.
9394

95+
`ParseOptions` currently exposes a single field, `strict`, which defaults to `true`. Set it to `false` when you need to recover from malformed model output in downstream systems.
96+
9497
### `StreamableParser`
9598

96-
Incremental parser that consumes tokens one by one. Create with `StreamableParser::new(encoding, role)` and feed tokens via `process`. Access information via getters like `current_content`, `current_role`, `messages`, `tokens` and `state_json`.
99+
Incremental parser that consumes tokens one by one. Create with `StreamableParser::new(encoding, role)` and feed tokens via `process`. Access information via getters like `current_content`, `current_role`, `messages`, `tokens` and `state_json`. Use `StreamableParser::new_with_options(encoding, role, options)` when you need to override defaults such as `ParseOptions { strict: false }`.
97100

98101
## registry module
99102

python/openai_harmony/__init__.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,10 +520,14 @@ def render(
520520
# -- Parsing -------------------------------------------------------
521521

522522
def parse_messages_from_completion_tokens(
523-
self, tokens: Sequence[int], role: Optional[Role] | None = None
523+
self,
524+
tokens: Sequence[int],
525+
role: Optional[Role] | None = None,
526+
*,
527+
strict: bool = True,
524528
) -> List[Message]:
525529
raw_json: str = self._inner.parse_messages_from_completion_tokens(
526-
list(tokens), None if role is None else str(role.value)
530+
list(tokens), None if role is None else str(role.value), strict
527531
)
528532
return [Message.from_dict(m) for m in json.loads(raw_json)]
529533

@@ -619,9 +623,15 @@ class StreamState(Enum):
619623
class StreamableParser:
620624
"""Incremental parser over completion tokens."""
621625

622-
def __init__(self, encoding: HarmonyEncoding, role: Role | None):
626+
def __init__(
627+
self,
628+
encoding: HarmonyEncoding,
629+
role: Role | None,
630+
*,
631+
strict: bool = True,
632+
) -> None:
623633
role_str = str(role.value) if role is not None else None
624-
self._inner = _PyStreamableParser(encoding._inner, role_str)
634+
self._inner = _PyStreamableParser(encoding._inner, role_str, strict)
625635

626636
def process(self, token: int) -> "StreamableParser":
627637
self._inner.process(token)

src/encoding.rs

Lines changed: 127 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -369,22 +369,38 @@ impl HarmonyEncoding {
369369
Ok(())
370370
}
371371

372-
pub fn parse_messages_from_completion_tokens<I>(
372+
pub fn parse_messages_from_completion_tokens_with_options<I>(
373373
&self,
374374
tokens: I,
375375
role: Option<Role>,
376+
options: ParseOptions,
376377
) -> anyhow::Result<Vec<Message>>
377378
where
378379
I: IntoIterator<Item = Rank>,
379380
{
380-
let mut parser = StreamableParser::new(self.clone(), role)?;
381+
let mut parser = StreamableParser::new_with_options(self.clone(), role, options)?;
381382
for token in tokens {
382383
parser.process(token)?;
383384
}
384385
parser.process_eos()?;
385386
Ok(parser.into_messages())
386387
}
387388

389+
pub fn parse_messages_from_completion_tokens<I>(
390+
&self,
391+
tokens: I,
392+
role: Option<Role>,
393+
) -> anyhow::Result<Vec<Message>>
394+
where
395+
I: IntoIterator<Item = Rank>,
396+
{
397+
self.parse_messages_from_completion_tokens_with_options(
398+
tokens,
399+
role,
400+
ParseOptions::default(),
401+
)
402+
}
403+
388404
/// Helper to convert a JSON schema (OpenAPI style) to a TypeScript type definition.
389405
fn json_schema_to_typescript(schema: &serde_json::Value, indent: &str) -> String {
390406
// Helper to check if this schema is an enum
@@ -1019,6 +1035,17 @@ impl Render<crate::chat::DeveloperContent> for HarmonyEncoding {
10191035
}
10201036
}
10211037

1038+
#[derive(Clone, Copy, Debug)]
1039+
pub struct ParseOptions {
1040+
pub strict: bool,
1041+
}
1042+
1043+
impl Default for ParseOptions {
1044+
fn default() -> Self {
1045+
Self { strict: true }
1046+
}
1047+
}
1048+
10221049
/// Incremental parser that can consume tokens one by one.
10231050
///
10241051
/// It keeps track of all tokens seen so far, exposes all fully parsed messages
@@ -1032,6 +1059,7 @@ pub struct StreamableParser {
10321059
stop_tokens: HashSet<Rank>,
10331060
last_content_delta: Option<String>,
10341061
undecoded_tokens: Vec<Rank>,
1062+
options: ParseOptions,
10351063
}
10361064

10371065
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
@@ -1049,6 +1077,15 @@ pub enum StreamState {
10491077
impl StreamableParser {
10501078
/// Create a new streaming parser starting with the given role.
10511079
pub fn new(encoding: HarmonyEncoding, role: Option<Role>) -> anyhow::Result<Self> {
1080+
Self::new_with_options(encoding, role, ParseOptions::default())
1081+
}
1082+
1083+
/// Create a new streaming parser with explicit options.
1084+
pub fn new_with_options(
1085+
encoding: HarmonyEncoding,
1086+
role: Option<Role>,
1087+
options: ParseOptions,
1088+
) -> anyhow::Result<Self> {
10521089
let stop_tokens = encoding.stop_tokens()?;
10531090
let (state, next_role) = match role {
10541091
Some(role) => (
@@ -1068,6 +1105,7 @@ impl StreamableParser {
10681105
stop_tokens,
10691106
last_content_delta: None,
10701107
undecoded_tokens: Vec::new(),
1108+
options,
10711109
})
10721110
}
10731111

@@ -1123,6 +1161,34 @@ impl StreamableParser {
11231161
content_tokens: Vec::new(),
11241162
};
11251163
}
1164+
Some(token) if !self.options.strict && self.stop_tokens.contains(&token) => {
1165+
// Encountered a stop token while in Header state. This means we have
1166+
// accumulated header tokens but never saw a <|message|> token, so the
1167+
// message is malformed. If we have a role, parse header metadata and
1168+
// treat remaining tokens as content.
1169+
if let Some(role) = next_role_clone {
1170+
if !header_tokens.is_empty() {
1171+
let decoded =
1172+
self.encoding.tokenizer().decode_utf8(header_tokens)?;
1173+
let (header, remaining_content) =
1174+
self.parse_header_from_string(decoded, Some(role), false)?;
1175+
1176+
// Use remaining content if present, otherwise empty string
1177+
let text = remaining_content.unwrap_or_default();
1178+
let message = Message {
1179+
author: header.author.clone(),
1180+
recipient: header.recipient.clone(),
1181+
channel: header.channel.clone(),
1182+
content_type: header.content_type.clone(),
1183+
content: vec![Content::Text(TextContent { text })],
1184+
};
1185+
self.messages.push(message);
1186+
}
1187+
}
1188+
// Transition to ExpectStart to wait for the next message
1189+
self.state = StreamState::ExpectStart;
1190+
self.next_role = None;
1191+
}
11261192
Some(token) => {
11271193
header_tokens.push(token);
11281194
}
@@ -1194,17 +1260,18 @@ impl StreamableParser {
11941260
Ok(self)
11951261
}
11961262

1197-
fn parse_header_from_tokens(
1263+
/// Helper to parse header metadata from a decoded string.
1264+
/// Returns the parsed header and any remaining content after extracting header parts.
1265+
///
1266+
/// If `parse_recipient_and_type` is true, tries to parse recipient and content_type from
1267+
/// whitespace-separated tokens (normal header parsing). If false, treats all remaining
1268+
/// text after extracting channel as content (for malformed messages).
1269+
fn parse_header_from_string(
11981270
&self,
1199-
header_tokens: &[Rank],
1271+
mut header_string: String,
12001272
role: Option<Role>,
1201-
) -> anyhow::Result<ParsedHeader> {
1202-
let mut header_string = self
1203-
.encoding
1204-
.tokenizer()
1205-
.decode_utf8(header_tokens)
1206-
.context("could not decode header")?;
1207-
1273+
parse_recipient_and_type: bool,
1274+
) -> anyhow::Result<(ParsedHeader, Option<String>)> {
12081275
let mut channel: Option<String> = None;
12091276
if let Some(channel_marker) = self.encoding.mapped_format_token(FormattingToken::Channel) {
12101277
if let Some(idx) = header_string.find(channel_marker) {
@@ -1280,10 +1347,9 @@ impl StreamableParser {
12801347

12811348
let mut recipient: Option<String> = None;
12821349
let mut content_type: Option<String> = None;
1350+
let remaining_content: Option<String>;
12831351

1284-
if !parts.is_empty() {
1285-
// Determine whether the last token is a content-type or part of the
1286-
// recipient specification.
1352+
if parse_recipient_and_type && !parts.is_empty() {
12871353
let num_parts = parts.len();
12881354
// SAFETY: we know that there is at least one part remaining, because of is_empty check above
12891355
let last_part = parts.pop().unwrap();
@@ -1308,25 +1374,61 @@ impl StreamableParser {
13081374
};
13091375
}
13101376
}
1377+
1378+
// Any remaining parts are content (not header metadata)
1379+
remaining_content = if !parts.is_empty() {
1380+
Some(parts.join(" "))
1381+
} else {
1382+
None
1383+
};
1384+
} else {
1385+
// Treat all remaining parts as content when not parsing recipient and content type
1386+
remaining_content = if !parts.is_empty() {
1387+
Some(parts.join(" "))
1388+
} else {
1389+
None
1390+
};
13111391
}
1312-
anyhow::ensure!(
1313-
parts.is_empty(),
1314-
"unexpected tokens remaining in message header: {:?}",
1315-
parts
1316-
);
13171392

13181393
let author = if role == Role::Tool {
13191394
let name = role_str_opt;
13201395
Author { role, name }
13211396
} else {
13221397
Author { role, name: None }
13231398
};
1324-
Ok(ParsedHeader {
1325-
author,
1326-
recipient,
1327-
channel,
1328-
content_type,
1329-
})
1399+
Ok((
1400+
ParsedHeader {
1401+
author,
1402+
recipient,
1403+
channel,
1404+
content_type,
1405+
},
1406+
remaining_content,
1407+
))
1408+
}
1409+
1410+
fn parse_header_from_tokens(
1411+
&self,
1412+
header_tokens: &[Rank],
1413+
role: Option<Role>,
1414+
) -> anyhow::Result<ParsedHeader> {
1415+
let header_string = self
1416+
.encoding
1417+
.tokenizer()
1418+
.decode_utf8(header_tokens)
1419+
.context("could not decode header")?;
1420+
1421+
let (header, remaining_content) =
1422+
self.parse_header_from_string(header_string, role, true)?;
1423+
1424+
if remaining_content.is_some() {
1425+
anyhow::bail!(
1426+
"unexpected tokens remaining in message header: {:?}",
1427+
remaining_content
1428+
);
1429+
}
1430+
1431+
Ok(header)
13301432
}
13311433

13321434
/// Return the textual content of the current message so far.

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ mod registry;
66
mod tiktoken;
77
pub mod tiktoken_ext;
88

9-
pub use encoding::{HarmonyEncoding, StreamableParser};
9+
pub use encoding::{HarmonyEncoding, ParseOptions, StreamableParser};
1010
pub use registry::load_harmony_encoding;
1111
pub use registry::HarmonyEncodingName;
1212

0 commit comments

Comments
 (0)