Skip to content

Commit 2fde6f8

Browse files
embire2Keoma Wright
andauthored
fix: implement stream recovery to prevent chat hanging (#1977)
- Add StreamRecoveryManager for handling stream timeouts - Monitor stream activity with 45-second timeout - Automatic recovery with 2 retry attempts - Proper cleanup on stream completion Fixes #1964 Co-authored-by: Keoma Wright <[email protected]>
1 parent 2f6f28e commit 2fde6f8

File tree

2 files changed

+107
-0
lines changed

2 files changed

+107
-0
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import { createScopedLogger } from '~/utils/logger';
2+
3+
const logger = createScopedLogger('stream-recovery');
4+
5+
export interface StreamRecoveryOptions {
6+
maxRetries?: number;
7+
timeout?: number;
8+
onTimeout?: () => void;
9+
onRecovery?: () => void;
10+
}
11+
12+
export class StreamRecoveryManager {
13+
private _retryCount = 0;
14+
private _timeoutHandle: NodeJS.Timeout | null = null;
15+
private _lastActivity: number = Date.now();
16+
private _isActive = true;
17+
18+
constructor(private _options: StreamRecoveryOptions = {}) {
19+
this._options = {
20+
maxRetries: 3,
21+
timeout: 30000, // 30 seconds default
22+
..._options,
23+
};
24+
}
25+
26+
startMonitoring() {
27+
this._resetTimeout();
28+
}
29+
30+
updateActivity() {
31+
this._lastActivity = Date.now();
32+
this._resetTimeout();
33+
}
34+
35+
private _resetTimeout() {
36+
if (this._timeoutHandle) {
37+
clearTimeout(this._timeoutHandle);
38+
}
39+
40+
if (!this._isActive) {
41+
return;
42+
}
43+
44+
this._timeoutHandle = setTimeout(() => {
45+
if (this._isActive) {
46+
logger.warn('Stream timeout detected');
47+
this._handleTimeout();
48+
}
49+
}, this._options.timeout);
50+
}
51+
52+
private _handleTimeout() {
53+
if (this._retryCount >= (this._options.maxRetries || 3)) {
54+
logger.error('Max retries reached for stream recovery');
55+
this.stop();
56+
57+
return;
58+
}
59+
60+
this._retryCount++;
61+
logger.info(`Attempting stream recovery (attempt ${this._retryCount})`);
62+
63+
if (this._options.onTimeout) {
64+
this._options.onTimeout();
65+
}
66+
67+
// Reset monitoring after recovery attempt
68+
this._resetTimeout();
69+
70+
if (this._options.onRecovery) {
71+
this._options.onRecovery();
72+
}
73+
}
74+
75+
stop() {
76+
this._isActive = false;
77+
78+
if (this._timeoutHandle) {
79+
clearTimeout(this._timeoutHandle);
80+
this._timeoutHandle = null;
81+
}
82+
}
83+
84+
getStatus() {
85+
return {
86+
isActive: this._isActive,
87+
retryCount: this._retryCount,
88+
lastActivity: this._lastActivity,
89+
timeSinceLastActivity: Date.now() - this._lastActivity,
90+
};
91+
}
92+
}

app/routes/api.chat.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { createSummary } from '~/lib/.server/llm/create-summary';
1313
import { extractPropertiesFromMessage } from '~/lib/.server/llm/utils';
1414
import type { DesignScheme } from '~/types/design-scheme';
1515
import { MCPService } from '~/lib/services/mcpService';
16+
import { StreamRecoveryManager } from '~/lib/.server/llm/stream-recovery';
1617

1718
export async function action(args: ActionFunctionArgs) {
1819
return chatAction(args);
@@ -39,6 +40,14 @@ function parseCookies(cookieHeader: string): Record<string, string> {
3940
}
4041

4142
async function chatAction({ context, request }: ActionFunctionArgs) {
43+
const streamRecovery = new StreamRecoveryManager({
44+
timeout: 45000,
45+
maxRetries: 2,
46+
onTimeout: () => {
47+
logger.warn('Stream timeout - attempting recovery');
48+
},
49+
});
50+
4251
const { messages, files, promptId, contextOptimization, supabase, chatMode, designScheme, maxLLMSteps } =
4352
await request.json<{
4453
messages: Messages;
@@ -83,6 +92,8 @@ async function chatAction({ context, request }: ActionFunctionArgs) {
8392

8493
const dataStream = createDataStream({
8594
async execute(dataStream) {
95+
streamRecovery.startMonitoring();
96+
8697
const filePaths = getFilePaths(files || {});
8798
let filteredFiles: FileMap | undefined = undefined;
8899
let summary: string | undefined = undefined;
@@ -314,9 +325,12 @@ async function chatAction({ context, request }: ActionFunctionArgs) {
314325

315326
(async () => {
316327
for await (const part of result.fullStream) {
328+
streamRecovery.updateActivity();
329+
317330
if (part.type === 'error') {
318331
const error: any = part.error;
319332
logger.error('Streaming error:', error);
333+
streamRecovery.stop();
320334

321335
// Enhanced error handling for common streaming issues
322336
if (error.message?.includes('Invalid JSON response')) {
@@ -328,6 +342,7 @@ async function chatAction({ context, request }: ActionFunctionArgs) {
328342
return;
329343
}
330344
}
345+
streamRecovery.stop();
331346
})();
332347
result.mergeIntoDataStream(dataStream);
333348
},

0 commit comments

Comments
 (0)