Skip to content

Commit 6af3441

Browse files
authored
fix(web): centralize stream cancellation logic (#98)
1 parent 45c6dfd commit 6af3441

File tree

9 files changed

+166
-50
lines changed

9 files changed

+166
-50
lines changed

README.md

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,25 @@ await writer.close();
7676
// When done adding entries, finalize the archive
7777
controller.finalize();
7878

79-
// `readable` now contains the complete tar archive which can be piped or processed
80-
const tarStream = readable;
81-
82-
// Create a tar decoder
83-
const decoder = createTarDecoder();
84-
const decodedStream = tarStream.pipeThrough(decoder);
79+
// Pipe the archive right into a decoder
80+
const decodedStream = readable.pipeThrough(createTarDecoder());
8581
for await (const entry of decodedStream) {
8682
console.log(`Decoded: ${entry.header.name}`);
87-
// Process `entry.body` stream as needed
83+
84+
const shouldSkip = entry.header.name.endsWith(".md");
85+
if (shouldSkip) {
86+
// You MUST drain the body with cancel() to proceed to the next entry or read it fully,
87+
// otherwise the stream will stall.
88+
await entry.body.cancel();
89+
continue;
90+
}
91+
92+
const reader = entry.body.getReader();
93+
while (true) {
94+
const { done, value } = await reader.read();
95+
if (done) break;
96+
processChunk(value);
97+
}
8898
}
8999
```
90100

REFERENCE.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,21 @@ const entriesStream = tarStream.pipeThrough(decoder);
7575

7676
for await (const entry of entriesStream) {
7777
console.log(`Entry: ${entry.header.name}`);
78-
// Process entry.body stream as needed
78+
79+
const shouldSkip = entry.header.name.endsWith('.md');
80+
if (shouldSkip) {
81+
// You MUST drain the body with cancel() to proceed to the next entry or read it fully,
82+
// otherwise the stream will stall.
83+
await entry.body.cancel();
84+
continue;
85+
}
86+
87+
const reader = entry.body.getReader();
88+
while (true) {
89+
const { done, value } = await reader.read();
90+
if (done) break;
91+
processChunk(value);
92+
}
7993
}
8094
```
8195

src/fs/unpack.ts

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,6 @@ import { createFileSink } from "./file-sink";
88
import { createPathCache } from "./path-cache";
99
import type { UnpackOptionsFS } from "./types";
1010

11-
/**
12-
* Helper to discard body data for filtered entries.
13-
* Returns true if complete, false if more data is needed.
14-
*/
15-
function discardBody(unpacker: ReturnType<typeof createUnpacker>): boolean {
16-
while (!unpacker.isBodyComplete()) {
17-
// Callback always returns true to discard data.
18-
unpacker.streamBody(() => true);
19-
if (!unpacker.isBodyComplete()) return false;
20-
}
21-
22-
while (!unpacker.skipPadding()) return false;
23-
24-
return true;
25-
}
26-
2711
/**
2812
* Extract a tar archive to a directory.
2913
*
@@ -77,7 +61,7 @@ export function unpackTar(
7761

7862
// If we're in the middle of discarding a body, continue it
7963
if (needsDiscardBody) {
80-
if (!discardBody(unpacker)) {
64+
if (!unpacker.skipEntry()) {
8165
// Still need more data
8266
cb();
8367
return;
@@ -139,7 +123,7 @@ export function unpackTar(
139123
const transformedHeader = transformHeader(header, options);
140124
// Filtered out.
141125
if (!transformedHeader) {
142-
if (!discardBody(unpacker)) {
126+
if (!unpacker.skipEntry()) {
143127
needsDiscardBody = true;
144128
cb();
145129
return;
@@ -198,7 +182,7 @@ export function unpackTar(
198182
opQueue.add(() => fileStream.end());
199183
} else {
200184
// No body data or already handled.
201-
if (!discardBody(unpacker)) {
185+
if (!unpacker.skipEntry()) {
202186
needsDiscardBody = true;
203187
cb();
204188
return;

src/tar/unpacker.ts

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -199,25 +199,21 @@ export function createUnpacker(options: DecoderOptions = {}) {
199199
return true;
200200
},
201201

202-
hasEnded(): boolean {
203-
return ended;
204-
},
202+
/**
203+
* Discards the current entry's body and its padding.
204+
*
205+
* Returns true when the full entry has been skipped, false if more
206+
* data is required.
207+
*/
208+
skipEntry(): boolean {
209+
if (state !== STATE_BODY || !currentEntry) return true;
205210

206-
/** Check if unpacker likely has enough data to make progress without waiting. */
207-
canContinueProcessing(): boolean {
208-
// If we're waiting for a header, we need at least BLOCK_SIZE bytes
209-
if (state === STATE_HEADER) return available() >= BLOCK_SIZE;
210-
211-
// If we're in body state, we can process if we have any data or need to skip padding
212-
if (state === STATE_BODY && currentEntry) {
213-
return currentEntry.remaining === 0
214-
? // If body is complete, we just need to skip padding
215-
available() >= currentEntry.padding
216-
: // If body isn't complete, any data is useful
217-
available() > 0;
211+
while (!unpacker.isBodyComplete()) {
212+
const fed = unpacker.streamBody(() => true);
213+
if (fed === 0) return false;
218214
}
219215

220-
return true;
216+
return unpacker.skipPadding();
221217
},
222218

223219
validateEOF() {

src/web/unpack.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,21 @@ import type { ParsedTarEntry } from "./types";
1616
*
1717
* for await (const entry of entriesStream) {
1818
* console.log(`Entry: ${entry.header.name}`);
19-
* // Process entry.body stream as needed
19+
*
20+
* const shouldSkip = entry.header.name.endsWith('.md');
21+
* if (shouldSkip) {
22+
* // You MUST drain the body with cancel() to proceed to the next entry or read it fully,
23+
* // otherwise the stream will stall.
24+
* await entry.body.cancel();
25+
* continue;
26+
* }
27+
*
28+
* const reader = entry.body.getReader();
29+
* while (true) {
30+
* const { done, value } = await reader.read();
31+
* if (done) break;
32+
* processChunk(value);
33+
* }
2034
* }
2135
*/
2236
export function createTarDecoder(
@@ -65,6 +79,8 @@ export function createTarDecoder(
6579
while (!controllerTerminated) {
6680
// Look for the next header.
6781
if (!bodyController) {
82+
if (!unpacker.skipEntry()) break;
83+
6884
// Respect backpressure on the main stream.
6985
if (!force && (controller.desiredSize ?? 0) < 0) break;
7086

@@ -83,6 +99,7 @@ export function createTarDecoder(
8399
// If the consumer cancels this body stream, clear the controller.
84100
cancel: () => {
85101
bodyController = null;
102+
pump(controller);
86103
},
87104
});
88105

tests/web/extract.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,38 @@ describe("createTarDecoder", () => {
158158
expect(names).toEqual(["file.txt", "dir/", "dir/nested.txt"]);
159159
});
160160

161+
it("iterate over headers by cancelling", async () => {
162+
const archive = await createBaseArchive([
163+
{ header: { name: "dir/", type: "directory", size: 0 } },
164+
{
165+
header: { name: "dir/file.txt", type: "file", size: 5 },
166+
body: "hello",
167+
},
168+
{
169+
header: { name: "dir/empty.txt", type: "file", size: 0 },
170+
body: new Uint8Array(),
171+
},
172+
]);
173+
174+
const stream = new ReadableStream({
175+
start(controller) {
176+
controller.enqueue(archive);
177+
controller.close();
178+
},
179+
});
180+
181+
const decoder = createTarDecoder();
182+
const entryStream = stream.pipeThrough(decoder);
183+
184+
const names: string[] = [];
185+
for await (const entry of entryStream) {
186+
names.push(entry.header.name);
187+
await entry.body?.cancel();
188+
}
189+
190+
expect(names).toEqual(["dir/", "dir/file.txt", "dir/empty.txt"]);
191+
});
192+
161193
it("rejects a stream with an invalid checksum in strict mode", async () => {
162194
const archive = await createBaseArchive([
163195
{ header: { name: "test.txt", type: "file", size: 0 }, body: "" },

tests/web/fixtures/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,7 @@ export const LODASH_TGZ = join(__dirname, "lodash-4.17.21.tgz");
3838
export const NEXT_SWC_TGZ = join(__dirname, "next-swc-linux-14.2.15.tgz");
3939
export const SHARP_TGZ = join(__dirname, "sharp-0.33.5.tgz");
4040
export const ELECTRON_TGZ = join(__dirname, "electron-33.0.2.tgz");
41+
export const NODE_V25_DARWIN_ARM64_TAR_GZ = join(
42+
__dirname,
43+
"node-v25.2.0-darwin-arm64.tar.gz",
44+
);
50.2 MB
Binary file not shown.

tests/web/real-world.test.ts

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,18 @@ import * as fs from "node:fs";
22
import { describe, expect, it } from "vitest";
33
import {
44
createGzipDecoder,
5+
createTarDecoder,
56
type ParsedTarEntryWithData,
67
unpackTar,
78
} from "../../src/web";
89
import { streamToBuffer } from "../../src/web/stream-utils";
9-
import { ELECTRON_TGZ, LODASH_TGZ, NEXT_SWC_TGZ, SHARP_TGZ } from "./fixtures";
10+
import {
11+
ELECTRON_TGZ,
12+
LODASH_TGZ,
13+
NEXT_SWC_TGZ,
14+
NODE_V25_DARWIN_ARM64_TAR_GZ,
15+
SHARP_TGZ,
16+
} from "./fixtures";
1017

1118
async function extractTgz(filePath: string): Promise<ParsedTarEntryWithData[]> {
1219
// @ts-expect-error ReadableStream.from is supported in Node tests
@@ -32,7 +39,7 @@ describe("real world examples", () => {
3239
(e) => e.header.name === "package/README.md",
3340
);
3441
expect(readmeEntry).toBeDefined();
35-
expect(readmeEntry?.data.length).toBeGreaterThan(1000);
42+
expect(readmeEntry?.data?.length).toBe(1107);
3643
});
3744

3845
it(
@@ -51,7 +58,7 @@ describe("real world examples", () => {
5158
(e) => e.header.name === "package/next-swc.linux-x64-gnu.node",
5259
);
5360
expect(binaryEntry).toBeDefined();
54-
expect(binaryEntry?.data.length).toBeGreaterThan(30 * 1024 * 1024); // > 30MB
61+
expect(binaryEntry?.data?.length).toBe(131406240);
5562

5663
// Verify package.json exists
5764
expect(
@@ -72,14 +79,14 @@ describe("real world examples", () => {
7279
const cppFiles = entries.filter(
7380
(e) => e.header.name.endsWith(".cc") || e.header.name.endsWith(".h"),
7481
);
75-
expect(cppFiles.length).toBeGreaterThan(5);
82+
expect(cppFiles.length).toBe(13);
7683

7784
// Verify a specific C++ file has substantial content
7885
const sharpCcEntry = entries.find(
7986
(e) => e.header.name === "package/src/sharp.cc",
8087
);
8188
expect(sharpCcEntry).toBeDefined();
82-
expect(sharpCcEntry?.data.length).toBeGreaterThan(1000);
89+
expect(sharpCcEntry?.data?.length).toBe(1465);
8390
});
8491

8592
it("extracts a package with installation scripts (electron)", async () => {
@@ -104,6 +111,58 @@ describe("real world examples", () => {
104111
(e) => e.header.name === "package/electron.d.ts",
105112
);
106113
expect(electronDtsEntry).toBeDefined();
107-
expect(electronDtsEntry?.data.length).toBeGreaterThan(1000);
114+
expect(electronDtsEntry?.data?.length).toBe(987499);
115+
});
116+
117+
it("extracts a Node.js release tarball", async () => {
118+
// @ts-expect-error ReadableStream.from is supported in Node tests
119+
const fileStream = ReadableStream.from(
120+
fs.createReadStream(NODE_V25_DARWIN_ARM64_TAR_GZ),
121+
);
122+
123+
const entryStream = fileStream
124+
.pipeThrough(createGzipDecoder())
125+
.pipeThrough(createTarDecoder());
126+
127+
let count = 0;
128+
let lastEntry = "";
129+
let totalBytes = 0;
130+
for await (const entry of entryStream) {
131+
count++;
132+
lastEntry = entry.header.name;
133+
134+
const reader = entry.body.getReader();
135+
while (true) {
136+
const { done, value } = await reader.read();
137+
if (done) break;
138+
if (value) totalBytes += value.length;
139+
}
140+
}
141+
142+
expect(count).toBe(5986);
143+
expect(lastEntry).toBe("node-v25.2.0-darwin-arm64/bin/npm");
144+
expect(totalBytes).toBe(200544142);
145+
});
146+
147+
it("streams entries from the Node.js release tarball", async () => {
148+
// @ts-expect-error ReadableStream.from is supported in Node tests
149+
const fileStream = ReadableStream.from(
150+
fs.createReadStream(NODE_V25_DARWIN_ARM64_TAR_GZ),
151+
);
152+
153+
const entryStream = fileStream
154+
.pipeThrough(createGzipDecoder())
155+
.pipeThrough(createTarDecoder());
156+
157+
let count = 0;
158+
let lastEntry = "";
159+
for await (const entry of entryStream) {
160+
count++;
161+
lastEntry = entry.header.name;
162+
await entry.body.cancel();
163+
}
164+
165+
expect(count).toBe(5986);
166+
expect(lastEntry).toBe("node-v25.2.0-darwin-arm64/bin/npm");
108167
});
109168
});

0 commit comments

Comments
 (0)