diff --git a/lib/transmit.js b/lib/transmit.js index b4500e52d..dafd2e369 100755 --- a/lib/transmit.js +++ b/lib/transmit.js @@ -266,6 +266,7 @@ internals.pipe = function (request, stream) { } else { stream.on('error', end); + stream.on('close', aborted); stream.pipe(request.raw.res); } @@ -364,6 +365,7 @@ internals.chain = function (sources) { for (let i = 1; i < sources.length; ++i) { const to = sources[i]; if (to) { + from.on('close', internals.destroyPipe.bind(from, to)); from.on('error', internals.errorPipe.bind(from, to)); from = from.pipe(to); } @@ -373,6 +375,13 @@ internals.chain = function (sources) { }; +internals.destroyPipe = function (to) { + + if (!this.readableEnded && !this.errored) { + to.destroy(); + } +}; + internals.errorPipe = function (to, err) { to.emit('error', err); diff --git a/test/transmit.js b/test/transmit.js index 523bd24a7..b6f1fd7cd 100755 --- a/test/transmit.js +++ b/test/transmit.js @@ -1418,6 +1418,63 @@ describe('transmission', () => { expect(count).to.equal(1); }); + it('handles stream that is destroyed with no error', async () => { + + const handler = (request, h) => { + + const stream = new Stream.Readable({ read: Hoek.ignore }); + + stream.push('hello'); + Hoek.wait(1).then(() => stream.destroy()); + + return h.response(stream).type('text/html'); + }; + + const server = Hapi.server(); + server.route({ method: 'GET', path: '/', handler }); + + const log = server.events.once('response'); + const err = await expect(server.inject({ url: '/', headers: { 'accept-encoding': 'gzip' } })).to.reject(Boom.Boom); + expect(err.output.statusCode).to.equal(499); + + const [request] = await log; + expect(request.response.isBoom).to.be.true(); + expect(request.response.output.statusCode).to.equal(499); + }); + + it('handles stream that is destroyed with error', async () => { + + const handler = (request, h) => { + + const stream = new Stream.Readable({ read: Hoek.ignore }); + if (stream.errored === undefined) { + + // Expose errored property on node 14 & 16 to enable coverage + + stream.on('error', () => { + + stream.errored = true; + }); + } + + stream.push('hello'); + Hoek.wait(1).then(() => stream.destroy(new Error('failed'))); + + return h.response(stream).type('text/html'); + }; + + const server = Hapi.server(); + server.route({ method: 'GET', path: '/', handler }); + + const log = server.events.once('response'); + const err = await expect(server.inject({ url: '/', headers: { 'accept-encoding': 'gzip' } })).to.reject(Boom.Boom); + expect(err.output.statusCode).to.equal(499); + + const [request] = await log; + expect(request.response.isBoom).to.be.true(); + expect(request.response.output.statusCode).to.equal(500); + }); + describe('response range', () => { const fileStreamHandler = (request, h) => {