diff --git a/config/clients/js/config.overrides.json b/config/clients/js/config.overrides.json index cd63c056..9540953d 100644 --- a/config/clients/js/config.overrides.json +++ b/config/clients/js/config.overrides.json @@ -9,6 +9,7 @@ "fossaComplianceNoticeId": "9c7d9da4-2a75-47c9-bfc9-31b301fb764f", "useSingleRequestParameter": false, "supportsES6": true, + "supportsStreamedListObjects": true, "modelPropertyNaming": "original", "openTelemetryDocumentation": "docs/opentelemetry.md", "files": { @@ -23,6 +24,10 @@ "npmrc.mustache": { "destinationFilename": ".npmrc", "templateType": "SupportingFiles" + }, + "streaming.mustache": { + "destinationFilename": "streaming.ts", + "templateType": "SupportingFiles" } } } diff --git a/config/clients/js/template/README_api_endpoints.mustache b/config/clients/js/template/README_api_endpoints.mustache index 7cd892cd..59d526ea 100644 --- a/config/clients/js/template/README_api_endpoints.mustache +++ b/config/clients/js/template/README_api_endpoints.mustache @@ -14,5 +14,6 @@ | [**BatchCheck**]({{apiDocsUrl}}#/Relationship%20Queries/BatchCheck) | **POST** /stores/{store_id}/batch-check | Similar to check, but accepts a list of relations to check | | [**Expand**]({{apiDocsUrl}}#/Relationship%20Queries/Expand) | **POST** /stores/{store_id}/expand | Expand all relationships in userset tree format, and following userset rewrite rules. Useful to reason about and debug a certain relationship | | [**ListObjects**]({{apiDocsUrl}}#/Relationship%20Queries/ListObjects) | **POST** /stores/{store_id}/list-objects | [EXPERIMENTAL] Get all objects of the given type that the user has a relation with | -| [**ReadAssertions**]({{apiDocsUrl}}#/Assertions/ReadAssertions) | **GET** /stores/{store_id}/assertions/{authorization_model_id} | Read assertions for an authorization model ID | +{{#supportsStreamedListObjects}}| [**StreamedListObjects**]({{apiDocsUrl}}#/Relationship%20Queries/StreamedListObjects) | **POST** /stores/{store_id}/streamed-list-objects | Stream all objects of the given type that the user has a relation with (Node.js only) | +{{/supportsStreamedListObjects}}| [**ReadAssertions**]({{apiDocsUrl}}#/Assertions/ReadAssertions) | **GET** /stores/{store_id}/assertions/{authorization_model_id} | Read assertions for an authorization model ID | | [**WriteAssertions**]({{apiDocsUrl}}#/Assertions/WriteAssertions) | **PUT** /stores/{store_id}/assertions/{authorization_model_id} | Upsert assertions for an authorization model ID | diff --git a/config/clients/js/template/README_calling_api.mustache b/config/clients/js/template/README_calling_api.mustache index bde283af..bb3a9859 100644 --- a/config/clients/js/template/README_calling_api.mustache +++ b/config/clients/js/template/README_calling_api.mustache @@ -466,6 +466,28 @@ const response = await fgaClient.listObjects({ // response.objects = ["document:0192ab2a-d83f-756d-9397-c5ed9f3cb69a"] ``` +{{#supportsStreamedListObjects}} +##### Streamed List Objects + +List the objects of a particular type that the user has a certain relation to, using the streaming API. + +> **Note**: This is a Node.js-only feature. The streaming API allows you to retrieve more than the standard 1000 object limit. + +[API Documentation]({{apiDocsUrl}}#/Relationship%20Queries/StreamedListObjects) + +```javascript +const fgaClient = new OpenFgaClient({ apiUrl: "http://localhost:8080", storeId: "01H0H015178Y2V4CX10C2KGHF4" }); + +for await (const response of fgaClient.streamedListObjects({ + user: "user:anne", + relation: "owner", + type: "document" +})) { + console.log(response.object); +} +``` + +{{/supportsStreamedListObjects}} ##### List Relations List the relations a user has with an object. This wraps around [BatchCheck](#batchcheck) to allow checking multiple relationships at once. diff --git a/config/clients/js/template/index.mustache b/config/clients/js/template/index.mustache index 8fc668b1..39a6d96c 100644 --- a/config/clients/js/template/index.mustache +++ b/config/clients/js/template/index.mustache @@ -13,5 +13,6 @@ export * from "./telemetry/counters"; export * from "./telemetry/histograms"; export * from "./telemetry/metrics"; export * from "./errors"; +export { parseNDJSONStream } from "./streaming"; {{#withSeparateModelsAndApi}}export * from "./{{tsModelPackage}}";{{/withSeparateModelsAndApi}} diff --git a/config/clients/js/template/streaming.mustache b/config/clients/js/template/streaming.mustache new file mode 100644 index 00000000..d1d13050 --- /dev/null +++ b/config/clients/js/template/streaming.mustache @@ -0,0 +1,176 @@ +{{>partial_header}} + +import type { Readable } from "node:stream"; + +// Helper: create async iterable from classic EventEmitter-style Readable streams +const createAsyncIterableFromReadable = (readable: any): AsyncIterable => { + return { + [Symbol.asyncIterator](): AsyncIterator { + const chunkQueue: any[] = []; + const pendings: Array<{ resolve: (v: IteratorResult) => void; reject: (e?: any) => void }> = []; + let ended = false; + let error: any = null; + + const onData = (chunk: any) => { + if (pendings.length > 0) { + const { resolve } = pendings.shift()!; + resolve({ value: chunk, done: false }); + } else { + chunkQueue.push(chunk); + } + }; + + const onEnd = () => { + if (error) return; // Don't process end if error already occurred + ended = true; + while (pendings.length > 0) { + const { resolve } = pendings.shift()!; + resolve({ value: undefined, done: true }); + } + }; + + const onError = (err: any) => { + error = err; + while (pendings.length > 0) { + const { reject } = pendings.shift()!; + reject(err); + } + cleanup(); + }; + + readable.on("data", onData); + readable.once("end", onEnd); + readable.once("error", onError); + + const cleanup = () => { + readable.off("data", onData); + readable.off("end", onEnd); + readable.off("error", onError); + }; + + return { + next(): Promise> { + if (error) { + return Promise.reject(error); + } + if (chunkQueue.length > 0) { + const value = chunkQueue.shift(); + return Promise.resolve({ value, done: false }); + } + if (ended) { + cleanup(); + return Promise.resolve({ value: undefined, done: true }); + } + return new Promise>((resolve, reject) => { + pendings.push({ resolve, reject }); + }); + }, + return(): Promise> { + try { + cleanup(); + } finally { + if (readable && typeof readable.destroy === "function") { + readable.destroy(); + } + } + return Promise.resolve({ value: undefined, done: true }); + }, + throw(e?: any): Promise> { + try { + cleanup(); + } finally { + if (readable && typeof readable.destroy === "function") { + readable.destroy(e); + } + } + return Promise.reject(e); + } + }; + } + }; +}; + +/** + * Parse newline-delimited JSON (NDJSON) from a Node.js readable stream + * @param stream - Node.js readable stream, AsyncIterable, string, or Buffer + * @returns AsyncGenerator that yields parsed JSON objects + */ +export async function* parseNDJSONStream( + stream: Readable | AsyncIterable | string | Uint8Array | Buffer +): AsyncGenerator { + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + // If stream is actually a string or Buffer-like, handle as whole payload + const isString = typeof stream === "string"; + const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer && Buffer.isBuffer(stream); + const isUint8Array = typeof Uint8Array !== "undefined" && stream instanceof Uint8Array; + + if (isString || isBuffer || isUint8Array) { + const text = isString + ? (stream as string) + : decoder.decode(isBuffer ? new Uint8Array(stream as Buffer) : (stream as Uint8Array)); + const lines = text.split("\n"); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + return; + } + + const isAsyncIterable = stream && typeof (stream as any)[Symbol.asyncIterator] === "function"; + const source: AsyncIterable = isAsyncIterable ? (stream as any) : createAsyncIterableFromReadable(stream as any); + + for await (const chunk of source) { + // Node.js streams can return Buffer or string chunks + // Convert to Uint8Array if needed for TextDecoder + const uint8Chunk = typeof chunk === "string" + ? new TextEncoder().encode(chunk) + : chunk instanceof Buffer + ? new Uint8Array(chunk) + : chunk; + + // Append decoded chunk to buffer + buffer += decoder.decode(uint8Chunk, { stream: true }); + + // Split on newlines + const lines = buffer.split("\n"); + + // Keep the last (potentially incomplete) line in the buffer + buffer = lines.pop() || ""; + + // Parse and yield complete lines + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) { + try { + yield JSON.parse(trimmed); + } catch (err) { + console.warn("Failed to parse JSON line:", err); + } + } + } + } + + // Flush any remaining decoder state + buffer += decoder.decode(); + + // Handle any remaining data in buffer + if (buffer.trim()) { + try { + yield JSON.parse(buffer); + } catch (err) { + console.warn("Failed to parse final JSON buffer:", err); + } + } +} +