@@ -5,7 +5,7 @@ import type {DataStore, CancellationContext} from '@tus/utils'
55import { ERRORS , type Upload , StreamLimiter , EVENTS } from '@tus/utils'
66import throttle from 'lodash.throttle'
77import stream from 'node:stream/promises'
8- import { PassThrough , type Readable } from 'node:stream'
8+ import { PassThrough , Readable } from 'node:stream'
99
1010const reExtractFileID = / ( [ ^ / ] + ) \/ ? $ /
1111const reForwardedHost = / h o s t = " ? ( [ ^ " ; ] + ) /
@@ -127,7 +127,7 @@ export class BaseHandler extends EventEmitter {
127127 }
128128
129129 protected writeToStore (
130- data : Readable ,
130+ webStream : ReadableStream | null ,
131131 upload : Upload ,
132132 maxFileSize : number ,
133133 context : CancellationContext
@@ -143,10 +143,17 @@ export class BaseHandler extends EventEmitter {
143143 // Create a PassThrough stream as a proxy to manage the request stream.
144144 // This allows for aborting the write process without affecting the incoming request stream.
145145 const proxy = new PassThrough ( )
146+ const nodeStream = webStream ? Readable . fromWeb ( webStream ) : Readable . from ( [ ] )
147+
148+ // Ignore errors on the data stream to prevent crashes from client disconnections
149+ // We handle errors on the proxy stream instead.
150+ nodeStream . on ( 'error' , ( err ) => {
151+ /* do nothing */
152+ } )
146153
147154 // gracefully terminate the proxy stream when the request is aborted
148155 const onAbort = ( ) => {
149- data . unpipe ( proxy )
156+ nodeStream . unpipe ( proxy )
150157
151158 if ( ! proxy . closed ) {
152159 proxy . end ( )
@@ -155,13 +162,13 @@ export class BaseHandler extends EventEmitter {
155162 context . signal . addEventListener ( 'abort' , onAbort , { once : true } )
156163
157164 proxy . on ( 'error' , ( err ) => {
158- data . unpipe ( proxy )
165+ nodeStream . unpipe ( proxy )
159166 reject ( err . name === 'AbortError' ? ERRORS . ABORTED : err )
160167 } )
161168
162169 const postReceive = throttle (
163170 ( offset : number ) => {
164- this . emit ( EVENTS . POST_RECEIVE , data , { ...upload , offset} )
171+ this . emit ( EVENTS . POST_RECEIVE , nodeStream , { ...upload , offset} )
165172 } ,
166173 this . options . postReceiveInterval ,
167174 { leading : false }
@@ -177,9 +184,13 @@ export class BaseHandler extends EventEmitter {
177184 // to ensure that errors in the pipeline do not cause the request stream to be destroyed,
178185 // which would result in a socket hangup error for the client.
179186 stream
180- . pipeline ( data . pipe ( proxy ) , new StreamLimiter ( maxFileSize ) , async ( stream ) => {
181- return this . store . write ( stream as StreamLimiter , upload . id , upload . offset )
182- } )
187+ . pipeline (
188+ nodeStream . pipe ( proxy ) ,
189+ new StreamLimiter ( maxFileSize ) ,
190+ async ( stream ) => {
191+ return this . store . write ( stream as StreamLimiter , upload . id , upload . offset )
192+ }
193+ )
183194 . then ( resolve )
184195 . catch ( reject )
185196 . finally ( ( ) => {
0 commit comments