Skip to content

Commit 877d26c

Browse files
authored
fix(web): decoder should not wait for last chunk (#96)
* fix(web): decoder should not wait for last chunk * style: format
1 parent f0df93d commit 877d26c

File tree

2 files changed

+107
-39
lines changed

2 files changed

+107
-39
lines changed

src/web/unpack.ts

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export function createTarDecoder(
6666
// Look for the next header.
6767
if (!bodyController) {
6868
// Respect backpressure on the main stream.
69-
if (!force && (controller.desiredSize ?? 1) <= 0) break;
69+
if (!force && (controller.desiredSize ?? 0) < 0) break;
7070

7171
const header = unpacker.readHeader();
7272
if (header === null) break; // Not enough data
@@ -126,16 +126,18 @@ export function createTarDecoder(
126126
try {
127127
// biome-ignore lint/style/noNonNullAssertion: body processing state.
128128
bodyController!.enqueue(chunk);
129+
129130
// If the body stream's buffer is full, signal to pause the pump.
130131
// biome-ignore lint/style/noNonNullAssertion: body processing state.
131132
if ((bodyController!.desiredSize ?? 1) <= 0) shouldPause = true;
132133
} catch {
133134
return true; // Body errored or closed, discard
134135
}
135-
return true; // Continue processing unpacker's buffer
136+
137+
return true;
136138
});
137139

138-
// No buffered data was available; wait for the next chunk to resume pumping.
140+
// No buffered data is available. Wait for the next chunk to resume pumping.
139141
if (fed === 0) break;
140142

141143
// Check again if the body is now complete after streaming.
@@ -160,46 +162,51 @@ export function createTarDecoder(
160162
}
161163
};
162164

163-
return new TransformStream<Uint8Array, ParsedTarEntry>({
164-
transform(chunk, controller) {
165-
try {
166-
// In strict mode, ensure EOF blocks are all zeroes.
167-
if (eofReached && options.strict && chunk.some((byte) => byte !== 0))
168-
throw new Error("Invalid EOF.");
169-
170-
// Write incoming data to the unpacker.
171-
unpacker.write(chunk);
172-
pump(controller);
173-
} catch (error) {
174-
abortAll(error, controller);
175-
throw error;
176-
}
177-
},
165+
return new TransformStream<Uint8Array, ParsedTarEntry>(
166+
{
167+
transform(chunk, controller) {
168+
try {
169+
// In strict mode, ensure EOF blocks are all zeroes.
170+
if (eofReached && options.strict && chunk.some((byte) => byte !== 0))
171+
throw new Error("Invalid EOF.");
172+
173+
// Write incoming data to the unpacker.
174+
unpacker.write(chunk);
175+
pump(controller);
176+
} catch (error) {
177+
abortAll(error, controller);
178+
throw error;
179+
}
180+
},
178181

179-
flush(controller) {
180-
try {
181-
unpacker.end();
182-
pump(controller, true); // Force pump for remaining data
182+
flush(controller) {
183+
try {
184+
unpacker.end();
185+
pump(controller, true); // Force pump for remaining data
186+
187+
// If a bodyController still exists, the archive was truncated mid-file.
188+
if (bodyController) {
189+
if (options.strict) throw new Error("Tar archive is truncated.");
183190

184-
// If a bodyController still exists, the archive was truncated mid-file.
185-
if (bodyController) {
186-
if (options.strict) {
187-
throw new Error("Tar archive is truncated.");
191+
// In non-strict mode, just close the partial stream.
192+
try {
193+
bodyController.close();
194+
} catch {}
195+
bodyController = null;
188196
}
189-
// In non-strict mode, just close the partial stream.
190-
try {
191-
bodyController.close();
192-
} catch {}
193-
bodyController = null;
194-
}
195197

196-
unpacker.validateEOF();
198+
unpacker.validateEOF();
197199

198-
if (!controllerTerminated) controller.terminate();
199-
} catch (error) {
200-
abortAll(error, controller);
201-
throw error;
202-
}
200+
if (!controllerTerminated) controller.terminate();
201+
} catch (error) {
202+
abortAll(error, controller);
203+
throw error;
204+
}
205+
},
206+
},
207+
undefined,
208+
{
209+
highWaterMark: 1,
203210
},
204-
});
211+
);
205212
}

tests/web/extract.test.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const createBaseArchive = (
1919
): Promise<Uint8Array> => {
2020
return packTar(entries);
2121
};
22+
2223
describe("unpackTar", () => {
2324
it("extracts a single file tar", async () => {
2425
const buffer = await fs.readFile(ONE_FILE_TAR);
@@ -97,6 +98,66 @@ describe("unpackTar", () => {
9798
});
9899

99100
describe("createTarDecoder", () => {
101+
it("streams entries as data arrives", async () => {
102+
const archive = await createBaseArchive([
103+
{ header: { name: "file.txt", type: "file", size: 5 }, body: "hello" },
104+
{ header: { name: "dir/", type: "directory", size: 0 } },
105+
{
106+
header: { name: "dir/nested.txt", type: "file", size: 3 },
107+
body: new Uint8Array([97, 98, 99]),
108+
},
109+
]);
110+
111+
const decoder = createTarDecoder();
112+
const reader = decoder.readable.getReader();
113+
const writer = decoder.writable.getWriter();
114+
115+
const splitPoint = Math.floor((archive.length * 2) / 3);
116+
const firstPart = archive.subarray(0, splitPoint);
117+
118+
const firstResultPromise = Promise.race([
119+
reader.read(),
120+
new Promise<never>((_, reject) =>
121+
setTimeout(
122+
() => reject(new Error("Timed out waiting for first entry")),
123+
100,
124+
),
125+
),
126+
]);
127+
128+
await writer.write(firstPart);
129+
const firstResult = await firstResultPromise;
130+
expect(firstResult.done).toBe(false);
131+
const firstEntry = firstResult.value;
132+
if (!firstEntry) throw new Error("Expected first entry");
133+
expect(firstEntry.header.name).toBe("file.txt");
134+
135+
await firstEntry.body.cancel();
136+
137+
const names = [firstEntry.header.name];
138+
139+
for (let i = 0; i < 2; i++) {
140+
const nextResult = await Promise.race([
141+
reader.read(),
142+
new Promise<never>((_, reject) =>
143+
setTimeout(
144+
() => reject(new Error("Timed out waiting for next entry")),
145+
100,
146+
),
147+
),
148+
]);
149+
expect(nextResult.done).toBe(false);
150+
const entry = nextResult.value;
151+
if (!entry) throw new Error("Expected streamed entry");
152+
153+
names.push(entry.header.name);
154+
155+
if (entry.header.size > 0) await entry.body.cancel();
156+
}
157+
158+
expect(names).toEqual(["file.txt", "dir/", "dir/nested.txt"]);
159+
});
160+
100161
it("rejects a stream with an invalid checksum in strict mode", async () => {
101162
const archive = await createBaseArchive([
102163
{ header: { name: "test.txt", type: "file", size: 0 }, body: "" },

0 commit comments

Comments
 (0)