From 0053fe86fb481c8e7ce22ea8bdd770022ede9fbe Mon Sep 17 00:00:00 2001 From: Daniel Jonathan Date: Tue, 28 Oct 2025 02:12:04 -0400 Subject: [PATCH] feat(js): add streaming utility templates for streamedListObjects Adds NDJSON streaming parser template to support streamedListObjects in the JavaScript SDK. Templates provide parsing utilities; actual streaming logic remains in custom code (client.ts, common.ts) maintained in js-sdk repository. Templates Added/Modified (5 files): - streaming.mustache (NEW): NDJSON parser for Node.js streams - Proper error propagation (reject pending promises on error) - onEnd guard prevents processing after error - Uint8Array handling alongside string/Buffer - Stream destruction in return()/throw() methods - Widened type signature for better DX - index.mustache: Export parseNDJSONStream utility - config.overrides.json: Register streaming + supportsStreamedListObjects flag - README_calling_api.mustache: Usage documentation - README_api_endpoints.mustache: API endpoint table entry Architecture: - Templates generate utilities (streaming.ts, exports) - Custom js-sdk code implements streaming (common.ts, client.ts) - No OpenAPI spec changes required Generated & Verified: - streaming.ts includes all error handling fixes - parseNDJSONStream exported correctly - Works with custom js-sdk streaming implementation Related: - Fixes #76 (JavaScript SDK) - Implements openfga/js-sdk#236 - Related PR: openfga/js-sdk#280 --- config/clients/js/config.overrides.json | 5 + .../js/template/README_api_endpoints.mustache | 3 +- .../js/template/README_calling_api.mustache | 22 +++ config/clients/js/template/index.mustache | 1 + config/clients/js/template/streaming.mustache | 176 ++++++++++++++++++ 5 files changed, 206 insertions(+), 1 deletion(-) create mode 100644 config/clients/js/template/streaming.mustache diff --git a/config/clients/js/config.overrides.json b/config/clients/js/config.overrides.json index cd63c056c..9540953da 100644 --- a/config/clients/js/config.overrides.json +++ b/config/clients/js/config.overrides.json @@ -9,6 +9,7 @@ "fossaComplianceNoticeId": "9c7d9da4-2a75-47c9-bfc9-31b301fb764f", "useSingleRequestParameter": false, "supportsES6": true, + "supportsStreamedListObjects": true, "modelPropertyNaming": "original", "openTelemetryDocumentation": "docs/opentelemetry.md", "files": { @@ -23,6 +24,10 @@ "npmrc.mustache": { "destinationFilename": ".npmrc", "templateType": "SupportingFiles" + }, + "streaming.mustache": { + "destinationFilename": "streaming.ts", + "templateType": "SupportingFiles" } } } diff --git a/config/clients/js/template/README_api_endpoints.mustache b/config/clients/js/template/README_api_endpoints.mustache index 7cd892cd9..59d526ea8 100644 --- a/config/clients/js/template/README_api_endpoints.mustache +++ b/config/clients/js/template/README_api_endpoints.mustache @@ -14,5 +14,6 @@ | [**BatchCheck**]({{apiDocsUrl}}#/Relationship%20Queries/BatchCheck) | **POST** /stores/{store_id}/batch-check | Similar to check, but accepts a list of relations to check | | [**Expand**]({{apiDocsUrl}}#/Relationship%20Queries/Expand) | **POST** /stores/{store_id}/expand | Expand all relationships in userset tree format, and following userset rewrite rules. Useful to reason about and debug a certain relationship | | [**ListObjects**]({{apiDocsUrl}}#/Relationship%20Queries/ListObjects) | **POST** /stores/{store_id}/list-objects | [EXPERIMENTAL] Get all objects of the given type that the user has a relation with | -| [**ReadAssertions**]({{apiDocsUrl}}#/Assertions/ReadAssertions) | **GET** /stores/{store_id}/assertions/{authorization_model_id} | Read assertions for an authorization model ID | +{{#supportsStreamedListObjects}}| [**StreamedListObjects**]({{apiDocsUrl}}#/Relationship%20Queries/StreamedListObjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with (Node.js only) | +{{/supportsStreamedListObjects}}| [**ReadAssertions**]({{apiDocsUrl}}#/Assertions/ReadAssertions) | **GET** /stores/{store_id}/assertions/{authorization_model_id} | Read assertions for an authorization model ID | | [**WriteAssertions**]({{apiDocsUrl}}#/Assertions/WriteAssertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | diff --git a/config/clients/js/template/README_calling_api.mustache b/config/clients/js/template/README_calling_api.mustache index bde283af8..bb3a9859d 100644 --- a/config/clients/js/template/README_calling_api.mustache +++ b/config/clients/js/template/README_calling_api.mustache @@ -466,6 +466,28 @@ const response = await fgaClient.listObjects({ // response.objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"] ``` +{{#supportsStreamedListObjects}} +##### Streamed List Objects + +List the objects of a particular type that the user has a certain relation to, using the streaming API. + +> **Note**: This is a Node.js-only feature. The streaming API allows you to retrieve more than the standard 1000 object limit. + +[API Documentation]({{apiDocsUrl}}#/Relationship%20Queries/StreamedListObjects) + +```javascript +const fgaClient = new OpenFgaClient({ apiUrl: "http://localhost:8080", storeId: "01H0H015178Y2V4CX10C2KGHF4" }); + +for await (const response of fgaClient.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document" +})) { + console.log(response.object); +} +``` + +{{/supportsStreamedListObjects}} ##### List Relations List the relations a user has with an object. This wraps around [BatchCheck](#batchcheck) to allow checking multiple relationships at once. diff --git a/config/clients/js/template/index.mustache b/config/clients/js/template/index.mustache index 8fc668b11..39a6d96c9 100644 --- a/config/clients/js/template/index.mustache +++ b/config/clients/js/template/index.mustache @@ -13,5 +13,6 @@ export * from "./telemetry/counters"; export * from "./telemetry/histograms"; export * from "./telemetry/metrics"; export * from "./errors"; +export { parseNDJSONStream } from "./streaming"; {{#withSeparateModelsAndApi}}export * from "./{{tsModelPackage}}";{{/withSeparateModelsAndApi}} diff --git a/config/clients/js/template/streaming.mustache b/config/clients/js/template/streaming.mustache new file mode 100644 index 000000000..d1d13050c --- /dev/null +++ b/config/clients/js/template/streaming.mustache @@ -0,0 +1,176 @@ +{{>partial_header}} + +import type { Readable } from "node:stream"; + +// Helper: create async iterable from classic EventEmitter-style Readable streams +const createAsyncIterableFromReadable = (readable: any): AsyncIterable => { + return { + [Symbol.asyncIterator](): AsyncIterator { + const chunkQueue: any[] = []; + const pendings: Array<{ resolve: (v: IteratorResult) => void; reject: (e?: any) => void }> = []; + let ended = false; + let error: any = null; + + const onData = (chunk: any) => { + if (pendings.length > 0) { + const { resolve } = pendings.shift()!; + resolve({ value: chunk, done: false }); + } else { + chunkQueue.push(chunk); + } + }; + + const onEnd = () => { + if (error) return; // Don't process end if error already occurred + ended = true; + while (pendings.length > 0) { + const { resolve } = pendings.shift()!; + resolve({ value: undefined, done: true }); + } + }; + + const onError = (err: any) => { + error = err; + while (pendings.length > 0) { + const { reject } = pendings.shift()!; + reject(err); + } + cleanup(); + }; + + readable.on("data", onData); + readable.once("end", onEnd); + readable.once("error", onError); + + const cleanup = () => { + readable.off("data", onData); + readable.off("end", onEnd); + readable.off("error", onError); + }; + + return { + next(): Promise> { + if (error) { + return Promise.reject(error); + } + if (chunkQueue.length > 0) { + const value = chunkQueue.shift(); + return Promise.resolve({ value, done: false }); + } + if (ended) { + cleanup(); + return Promise.resolve({ value: undefined, done: true }); + } + return new Promise>((resolve, reject) => { + pendings.push({ resolve, reject }); + }); + }, + return(): Promise> { + try { + cleanup(); + } finally { + if (readable && typeof readable.destroy === "function") { + readable.destroy(); + } + } + return Promise.resolve({ value: undefined, done: true }); + }, + throw(e?: any): Promise> { + try { + cleanup(); + } finally { + if (readable && typeof readable.destroy === "function") { + readable.destroy(e); + } + } + return Promise.reject(e); + } + }; + } + }; +}; + +/** + * Parse newline-delimited JSON (NDJSON) from a Node.js readable stream + * @param stream - Node.js readable stream, AsyncIterable, string, or Buffer + * @returns AsyncGenerator that yields parsed JSON objects + */ +export async function* parseNDJSONStream( + stream: Readable | AsyncIterable | string | Uint8Array | Buffer +): AsyncGenerator { + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + // If stream is actually a string or Buffer-like, handle as whole payload + const isString = typeof stream === "string"; + const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer && Buffer.isBuffer(stream); + const isUint8Array = typeof Uint8Array !== "undefined" && stream instanceof Uint8Array; + + if (isString || isBuffer || isUint8Array) { + const text = isString + ? (stream as string) + : decoder.decode(isBuffer ? new Uint8Array(stream as Buffer) : (stream as Uint8Array)); + const lines = text.split("\n"); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + return; + } + + const isAsyncIterable = stream && typeof (stream as any)[Symbol.asyncIterator] === "function"; + const source: AsyncIterable = isAsyncIterable ? (stream as any) : createAsyncIterableFromReadable(stream as any); + + for await (const chunk of source) { + // Node.js streams can return Buffer or string chunks + // Convert to Uint8Array if needed for TextDecoder + const uint8Chunk = typeof chunk === "string" + ? new TextEncoder().encode(chunk) + : chunk instanceof Buffer + ? new Uint8Array(chunk) + : chunk; + + // Append decoded chunk to buffer + buffer += decoder.decode(uint8Chunk, { stream: true }); + + // Split on newlines + const lines = buffer.split("\n"); + + // Keep the last (potentially incomplete) line in the buffer + buffer = lines.pop() || ""; + + // Parse and yield complete lines + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) { + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + } + } + + // Flush any remaining decoder state + buffer += decoder.decode(); + + // Handle any remaining data in buffer + if (buffer.trim()) { + try { + yield JSON.parse(buffer); + } catch (err) { + console.warn("Failed to parse final JSON buffer:", err); + } + } +} +