Skip to content

Commit 95e3815

Browse files
Properly yield the last chunk in streaming (#9089)
* properly mark the last chunk * fix jsonadapter streaming
1 parent d42f29e commit 95e3815

File tree

2 files changed

+127
-5
lines changed

2 files changed

+127
-5
lines changed

dspy/streaming/streaming_listener.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ def _json_adapter_handle_stream_chunk(self, token: str, chunk_message: str) -> S
274274
except ValueError:
275275
pass
276276

277-
if token:
277+
if token or self.stream_end:
278278
return StreamResponse(
279279
self.predict_name,
280280
self.signature_field_name,
@@ -292,7 +292,7 @@ def _default_handle_stream_chunk(self, token: str, end_identifier: str) -> Strea
292292
token = token + last_token if token else last_token
293293
token = token.rstrip() # Remove the trailing \n\n
294294

295-
if token:
295+
if token or self.stream_end:
296296
return StreamResponse(
297297
self.predict_name,
298298
self.signature_field_name,

tests/streaming/test_streaming.py

Lines changed: 125 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -613,10 +613,11 @@ async def gemini_stream_2(*args, **kwargs):
613613
assert all_chunks[0].predict_name == "predict1"
614614
assert all_chunks[0].signature_field_name == "answer"
615615
assert all_chunks[0].chunk == "To get to the other side."
616+
assert all_chunks[1].is_last_chunk is True
616617

617-
assert all_chunks[1].predict_name == "predict2"
618-
assert all_chunks[1].signature_field_name == "judgement"
619-
assert all_chunks[1].chunk == (
618+
assert all_chunks[2].predict_name == "predict2"
619+
assert all_chunks[2].signature_field_name == "judgement"
620+
assert all_chunks[2].chunk == (
620621
"The answer provides the standard punchline for this classic joke format, adapted to the specific location "
621622
"mentioned in the question. It is the expected and appropriate response."
622623
)
@@ -1649,6 +1650,127 @@ async def reasoning_stream(*args, **kwargs):
16491650
assert final_prediction.reasoning.content == expected_reasoning
16501651

16511652

1653+
@pytest.mark.anyio
1654+
async def test_stream_listener_empty_last_chunk_chat_adapter():
1655+
"""Test that StreamListener emits an empty chunk marking field end.
1656+
1657+
This test covers the scenario where:
1658+
1. Tokens that cannot form the end identifier are immediately yielded
1659+
2. The last chunk received contains only the marker for the next field (or completion marker)
1660+
3. An empty chunk with is_last_chunk=True is emitted to properly mark field end
1661+
"""
1662+
1663+
predict = dspy.Predict("question->reasoning, answer")
1664+
1665+
async def mock_stream(*args, **kwargs):
1666+
yield ModelResponseStream(
1667+
model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="[[ ## reasoning ## ]]\n"))]
1668+
)
1669+
yield ModelResponseStream(
1670+
model="gpt-4o-mini",
1671+
choices=[StreamingChoices(delta=Delta(content="Let's think about this problem step by step. "))],
1672+
)
1673+
yield ModelResponseStream(
1674+
model="gpt-4o-mini",
1675+
choices=[StreamingChoices(delta=Delta(content="We need to consider the context of a kitchen. "))],
1676+
)
1677+
yield ModelResponseStream(
1678+
model="gpt-4o-mini",
1679+
choices=[
1680+
StreamingChoices(delta=Delta(content="The chicken likely wants to reach something on the other side. "))
1681+
],
1682+
)
1683+
yield ModelResponseStream(
1684+
model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="\n\n[[ ## answer ## ]]\n"))]
1685+
)
1686+
yield ModelResponseStream(
1687+
model="gpt-4o-mini",
1688+
choices=[StreamingChoices(delta=Delta(content="To get to the other side!"))],
1689+
)
1690+
yield ModelResponseStream(
1691+
model="gpt-4o-mini",
1692+
choices=[StreamingChoices(delta=Delta(content="\n\n[[ ## completed ## ]]"))],
1693+
)
1694+
1695+
with mock.patch("litellm.acompletion", side_effect=mock_stream):
1696+
program = dspy.streamify(
1697+
predict,
1698+
stream_listeners=[
1699+
dspy.streaming.StreamListener(signature_field_name="reasoning"),
1700+
dspy.streaming.StreamListener(signature_field_name="answer"),
1701+
],
1702+
)
1703+
with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.ChatAdapter()):
1704+
output = program(question="Why did the chicken cross the kitchen?")
1705+
all_chunks = []
1706+
async for value in output:
1707+
if isinstance(value, dspy.streaming.StreamResponse):
1708+
all_chunks.append(value)
1709+
1710+
# Find answer and judgement chunks
1711+
reasoning_chunks = [c for c in all_chunks if c.signature_field_name == "reasoning"]
1712+
answer_chunks = [c for c in all_chunks if c.signature_field_name == "answer"]
1713+
1714+
# The last chunk should be marked as last chunk for both fields.
1715+
assert answer_chunks[-1].is_last_chunk is True
1716+
assert reasoning_chunks[-1].is_last_chunk is True
1717+
1718+
1719+
@pytest.mark.anyio
1720+
async def test_stream_listener_empty_last_chunk_json_adapter():
1721+
predict = dspy.Predict("question->reasoning, answer")
1722+
1723+
async def mock_stream(*args, **kwargs):
1724+
yield ModelResponseStream(
1725+
model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content='{"reasoning": "'))]
1726+
)
1727+
yield ModelResponseStream(
1728+
model="gpt-4o-mini",
1729+
choices=[StreamingChoices(delta=Delta(content="Let's think about this problem step by step. "))],
1730+
)
1731+
yield ModelResponseStream(
1732+
model="gpt-4o-mini",
1733+
choices=[StreamingChoices(delta=Delta(content="We need to consider the context of a kitchen. "))],
1734+
)
1735+
yield ModelResponseStream(
1736+
model="gpt-4o-mini",
1737+
choices=[
1738+
StreamingChoices(
1739+
delta=Delta(content='The chicken likely wants to reach something on the other side. "')
1740+
)
1741+
],
1742+
)
1743+
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content=',"answer": "'))])
1744+
yield ModelResponseStream(
1745+
model="gpt-4o-mini",
1746+
choices=[StreamingChoices(delta=Delta(content='To get to the other side!"'))],
1747+
)
1748+
yield ModelResponseStream(model="gpt-4o-mini", choices=[StreamingChoices(delta=Delta(content="\n}"))])
1749+
1750+
with mock.patch("litellm.acompletion", side_effect=mock_stream):
1751+
program = dspy.streamify(
1752+
predict,
1753+
stream_listeners=[
1754+
dspy.streaming.StreamListener(signature_field_name="reasoning"),
1755+
dspy.streaming.StreamListener(signature_field_name="answer"),
1756+
],
1757+
)
1758+
with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()):
1759+
output = program(question="Why did the chicken cross the kitchen?")
1760+
all_chunks = []
1761+
async for value in output:
1762+
if isinstance(value, dspy.streaming.StreamResponse):
1763+
all_chunks.append(value)
1764+
1765+
# Find answer and judgement chunks
1766+
reasoning_chunks = [c for c in all_chunks if c.signature_field_name == "reasoning"]
1767+
answer_chunks = [c for c in all_chunks if c.signature_field_name == "answer"]
1768+
1769+
# The last chunk should be marked as last chunk for both fields.
1770+
assert answer_chunks[-1].is_last_chunk is True
1771+
assert reasoning_chunks[-1].is_last_chunk is True
1772+
1773+
16521774
@pytest.mark.anyio
16531775
async def test_streaming_reasoning_fallback():
16541776
"""Test fallback behavior for non-reasoning models using dspy.Reasoning.

0 commit comments

Comments
 (0)