Skip to content

Commit 43672e2

Browse files
committed
refactor: process all streaming events for forward compatibility
Changes based on review feedback from @karpetrosyan: 1. Removed MESSAGE_EVENTS filtering - now processes any event received 2. Removed early response closing (response.close/aclose) - will be ported to codegen 3. Removed performance test file - no longer applicable This change improves forward compatibility by processing any event type the API sends, rather than filtering to a predefined set. The new structure handles completion, ping, and error events explicitly, while processing all other events generically. Closes review feedback in anthropics#993
1 parent 0aba3d8 commit 43672e2

File tree

2 files changed

+12
-799
lines changed

2 files changed

+12
-799
lines changed

src/anthropic/_streaming.py

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,6 @@
1919

2020
_T = TypeVar("_T")
2121

22-
# Centralized event type definitions for O(1) lookup performance
23-
# frozenset prevents accidental modification and is slightly faster than set
24-
MESSAGE_EVENTS = frozenset(
25-
{
26-
"message_start",
27-
"message_delta",
28-
"message_stop",
29-
"content_block_start",
30-
"content_block_delta",
31-
"content_block_stop",
32-
}
33-
)
3422

3523

3624
class _SyncStreamMeta(abc.ABCMeta):
@@ -93,21 +81,13 @@ def __stream__(self) -> Iterator[_T]:
9381
for sse in iterator:
9482
if sse.event == "completion":
9583
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
96-
# Single, fast membership test instead of multiple string comparisons
97-
if sse.event in MESSAGE_EVENTS:
98-
data = sse.json()
99-
if is_dict(data) and "type" not in data:
100-
data["type"] = sse.event
101-
102-
yield process_data(data=data, cast_to=cast_to, response=response)
10384

10485
if sse.event == "ping":
10586
continue
10687

10788
if sse.event == "error":
10889
body = sse.data
10990

110-
# Extract meaningful error messages and use specific exception handling
11191
try:
11292
body = sse.json()
11393
err_msg = f"{body}"
@@ -120,9 +100,12 @@ def __stream__(self) -> Iterator[_T]:
120100
response=self.response,
121101
)
122102

123-
# Explicitly close the response to release the connection
124-
# Immediately releases connection instead of consuming remaining stream
125-
self.response.close()
103+
# Process any other event for forward compatibility
104+
data = sse.json()
105+
if is_dict(data) and "type" not in data:
106+
data["type"] = sse.event
107+
108+
yield process_data(data=data, cast_to=cast_to, response=response)
126109

127110
def __enter__(self) -> Self:
128111
return self
@@ -205,13 +188,6 @@ async def __stream__(self) -> AsyncIterator[_T]:
205188
async for sse in iterator:
206189
if sse.event == "completion":
207190
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
208-
# Single, fast membership test instead of multiple string comparisons
209-
if sse.event in MESSAGE_EVENTS:
210-
data = sse.json()
211-
if is_dict(data) and "type" not in data:
212-
data["type"] = sse.event
213-
214-
yield process_data(data=data, cast_to=cast_to, response=response)
215191

216192
if sse.event == "ping":
217193
continue
@@ -231,9 +207,12 @@ async def __stream__(self) -> AsyncIterator[_T]:
231207
response=self.response,
232208
)
233209

234-
# Explicitly close the response to release the connection
235-
# Immediately releases connection instead of consuming remaining stream
236-
await self.response.aclose()
210+
# Process any other event for forward compatibility
211+
data = sse.json()
212+
if is_dict(data) and "type" not in data:
213+
data["type"] = sse.event
214+
215+
yield process_data(data=data, cast_to=cast_to, response=response)
237216

238217
async def __aenter__(self) -> Self:
239218
return self

0 commit comments

Comments
 (0)