-
Notifications
You must be signed in to change notification settings - Fork 25
feat: add streamedListObjects for unlimited object retrieval #280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis PR adds a new streaming variant of the ListObjects API to the JavaScript SDK. It includes a new Changes
Sequence Diagram(s)sequenceDiagram
actor User
participant Client as OpenFgaClient
participant API as OpenFgaApi
participant HTTP as axios
participant Server as OpenFGA Server
participant Parser as parseNDJSONStream
User->>Client: streamedListObjects(request)
Client->>API: streamedListObjects()
API->>HTTP: GET /stores/{id}/streamed-list-objects<br/>(responseType: stream, telemetry)
HTTP->>Server: Request with streaming
Server-->>HTTP: NDJSON stream (line-delimited JSON)
HTTP-->>API: Response stream
API-->>Client: Response.data (stream)
Client->>Parser: parseNDJSONStream(stream)
loop For each line in stream
Parser->>Parser: UTF-8 decode, buffer, split on \\n
Parser->>Parser: Parse JSON object
Parser-->>Client: yield StreamedListObjectsResponse
end
Client-->>User: AsyncGenerator<StreamedListObjectsResponse>
User->>User: Iterate and process results
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~35 minutes
Possibly related issues
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #280 +/- ##
==========================================
+ Coverage 89.45% 89.56% +0.11%
==========================================
Files 24 25 +1
Lines 1299 1447 +148
Branches 234 243 +9
==========================================
+ Hits 1162 1296 +134
- Misses 81 91 +10
- Partials 56 60 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (10)
apiModel.ts (1)
864-876: Clarify docstring: SDK-yielded type vs wire shape.The client parses NDJSON lines like
{ result: { object } }and yields{ object }. Consider adjusting the comment to “Element yielded by streamedListObjects” to avoid implying this is the on-the-wire shape. Otherwise the interface looks good.example/streamed-list-objects-local/README.md (1)
5-7: Align Node version and add browser note.
- Change “Node.js 18+” to match SDK minimum (e.g., “Node.js ≥16.15.0”, unless 18+ is explicitly required here).
- Add: “Not supported in browsers; uses Node Readable streams.”
Please confirm whether any example dependency requires Node 18+ specifically (e.g., APIs not in Node 16.15).
common.ts (1)
353-418: Harden streaming request: headers default, NDJSON Accept, cancellation, telemetry parity.
- Ensure
headersexists before auth header injection.- Set
Accept: application/x-ndjsonif not provided.- Allow
AbortSignalpassthrough (so callers can cancel).- Optionally record request-body attributes (parity with non-stream paths, if/when added there).
Apply this diff:
export const createStreamingRequestFunction = function (axiosArgs: RequestArgs, axiosInstance: AxiosInstance, configuration: Configuration, credentials: Credentials, methodAttributes: Record<string, string | number> = {}) { configuration.isValid(); @@ - return async (axios: AxiosInstance = axiosInstance): Promise<any> => { - await setBearerAuthToObject(axiosArgs.options.headers, credentials!); + return async (axios: AxiosInstance = axiosInstance): Promise<any> => { + // ensure headers object + axiosArgs.options.headers = axiosArgs.options.headers || {}; + await setBearerAuthToObject(axiosArgs.options.headers, credentials!); @@ - const axiosRequestArgs = { ...axiosArgs.options, responseType: "stream", url: url }; + // default NDJSON accept unless caller overrides + if (!axiosArgs.options.headers["Accept"]) { + axiosArgs.options.headers["Accept"] = "application/x-ndjson"; + } + const axiosRequestArgs = { ...axiosArgs.options, responseType: "stream", url }; @@ - attributes = TelemetryAttributes.fromResponse({ + attributes = TelemetryAttributes.fromResponse({ response, attributes, });Optional: If you add
signal?: AbortSignalto client options, pass it viaaxiosArgs.options.signal.Please confirm the server’s preferred Accept header for streamed list objects (e.g.,
application/x-ndjson). If different, we should set it accordingly.tests/helpers/nocks.ts (1)
249-264: Simulate chunked NDJSON to better exercise the parser.Current mock emits a single large chunk. Emit smaller chunks to surface boundary/chunking bugs in tests.
- return nock(basePath) - .post(`/stores/${storeId}/streamed-list-objects`) - .reply(200, () => Readable.from([ndjsonResponse]), { - "Content-Type": "application/x-ndjson" - }); + return nock(basePath) + .post(`/stores/${storeId}/streamed-list-objects`) + .reply(200, () => { + // send ~32-byte chunks to simulate real streaming + const chunks = Array.from(ndjsonResponse.matchAll(/.{1,32}/gs), m => m[0]); + return Readable.from(chunks); + }, { + "Content-Type": "application/x-ndjson" + });example/streamed-list-objects-local/streamedListObjectsLocal.mjs (1)
55-57: Ensure store cleanup in a finally block.If an earlier step throws, the store may be orphaned. Move deleteStore into finally and guard on storeId.
Example:
let storeId; try { // ... create store, set storeId, do work ... } finally { if (storeId) { await new OpenFgaClient(new ClientConfiguration({ apiUrl, storeId })).deleteStore().catch(() => {}); } }api.ts (2)
387-425: Be explicit about NDJSON in request headers.Setting Accept helps interoperability and test clarity.
- const localVarHeaderParameter = {} as any; + const localVarHeaderParameter = {} as any; + localVarHeaderParameter["Accept"] = "application/x-ndjson";
955-970: Align telemetry with other methods.Include store id and body-derived attributes for consistency.
- return createStreamingRequestFunction(localVarAxiosArgs, globalAxios, configuration, credentials, { - [TelemetryAttribute.FgaClientRequestMethod]: "StreamedListObjects" - }); + return createStreamingRequestFunction(localVarAxiosArgs, globalAxios, configuration, credentials, { + [TelemetryAttribute.FgaClientRequestMethod]: "StreamedListObjects", + [TelemetryAttribute.FgaClientRequestStoreId]: storeId ?? "", + ...TelemetryAttributes.fromRequestBody(body), + });example/streamed-list-objects/README.md (1)
16-22: Doc: mention building the SDK or using the published package.Examples import from ../../dist; advise building first or installing from npm.
Suggested snippet:
# From repo root pnpm install pnpm build cd example/streamed-list-objects node streamedListObjects.mjsexample/streamed-list-objects/streamedListObjects.mjs (2)
19-36: writeTuples drops remainder; handle non-multiples of 100.If quantity isn’t divisible by 100, the tail is lost.
- const chunks = Math.floor(quantity / 100); + const chunks = Math.floor(quantity / 100); + const remainder = quantity % 100; @@ for (let chunk = 0; chunk < chunks; ++chunk) { const tuples = []; for (let t = 0; t < 100; ++t) { tuples.push({ user: "user:anne", relation: "owner", object: `document:${chunk * 100 + t}` }); } await fgaClient.writeTuples(tuples); } + if (remainder) { + const tuples = []; + for (let t = 0; t < remainder; ++t) { + tuples.push({ + user: "user:anne", + relation: "owner", + object: `document:${chunks * 100 + t}` + }); + } + await fgaClient.writeTuples(tuples); + }
94-99: Ensure cleanup in finally.Move deleteStore into finally and guard on storeId to avoid orphaned stores on errors.
Example:
let storeId; try { storeId = await createStore(fgaClient); // ... } finally { if (storeId) await fgaClient.deleteStore().catch(() => {}); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
CHANGELOG.md(1 hunks)api.ts(5 hunks)apiModel.ts(1 hunks)client.ts(3 hunks)common.ts(1 hunks)example/streamed-list-objects-local/README.md(1 hunks)example/streamed-list-objects-local/streamedListObjectsLocal.mjs(1 hunks)example/streamed-list-objects/README.md(1 hunks)example/streamed-list-objects/model.json(1 hunks)example/streamed-list-objects/streamedListObjects.mjs(1 hunks)index.ts(1 hunks)streaming.ts(1 hunks)tests/helpers/nocks.ts(2 hunks)tests/streaming.test.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
example/streamed-list-objects/streamedListObjects.mjs (2)
client.ts (4)
createStore(324-326)writeTuples(568-572)streamedListObjects(825-845)listObjects(797-807)example/streamed-list-objects-local/streamedListObjectsLocal.mjs (2)
model(13-29)storeId(9-9)
tests/helpers/nocks.ts (1)
tests/helpers/default-config.ts (1)
defaultConfiguration(56-56)
streaming.ts (1)
index.ts (1)
parseNDJSONStream(27-27)
client.ts (2)
apiModel.ts (1)
StreamedListObjectsResponse(869-876)streaming.ts (1)
parseNDJSONStream(96-168)
api.ts (3)
apiModel.ts (1)
ListObjectsRequest(804-847)common.ts (6)
RequestArgs(30-33)DUMMY_BASE_URL(23-23)setSearchParams(51-66)serializeDataIfNeeded(88-96)toPathString(102-104)createStreamingRequestFunction(353-418)validation.ts (1)
assertParamExists(8-12)
example/streamed-list-objects-local/streamedListObjectsLocal.mjs (1)
client.ts (2)
OpenFgaClient(233-945)ClientConfiguration(59-92)
tests/streaming.test.ts (1)
streaming.ts (1)
parseNDJSONStream(96-168)
common.ts (4)
base.ts (1)
RequestArgs(15-18)configuration.ts (1)
Configuration(58-202)telemetry/attributes.ts (1)
TelemetryAttributes(35-130)telemetry/histograms.ts (1)
TelemetryHistograms(20-32)
🔇 Additional comments (10)
client.ts (1)
24-25: Imports look correct.Also applies to: 52-53
common.ts (1)
345-347: No functional change here.tests/helpers/nocks.ts (1)
3-4: LGTM: Node stream import fits the streaming mock.api.ts (1)
1442-1454: OO wrapper wiring looks correct.index.ts (1)
27-27: LGTM: public export of parseNDJSONStream.example/streamed-list-objects/model.json (1)
1-251: LGTM: model is coherent and matches example relations.tests/streaming.test.ts (4)
14-19: LGTM! Well-organized test suite structure.The imports and test suite organization are clean and appropriate for testing the NDJSON streaming parser.
20-96: Excellent coverage of core parsing scenarios.These tests cover the fundamental NDJSON parsing cases effectively. The chunked data test (lines 48-65) is particularly valuable as it validates the buffering logic when JSON objects are split across stream chunks—a critical real-world scenario.
98-162: Comprehensive input type coverage.These tests validate the parser's flexibility with different input types (Buffer, string, async iterable). The test for JSON without a trailing newline (lines 109-117) is particularly important as it validates the final buffer flush logic. The
as anytype casts are acceptable here to test the parser's runtime behavior with various input types.
164-257: Thorough async generator protocol and cleanup testing.These tests rigorously validate the async iterator implementation, including error handling, early cancellation, buffering behavior, and proper cleanup of event listeners. The listener cleanup assertions (lines 193-195, 241-243) are particularly valuable for preventing memory leaks. While these tests exercise internal implementation details, the guarantees they provide about correctness and resource management justify their inclusion.
Updates JavaScript SDK templates to support the streaming API endpoint for unlimited object retrieval. Templates now handle streaming operations differently using vendor extension conditionals. Changes: - Add streaming.mustache template with NDJSON parser for Node.js - Update api.mustache to import createStreamingRequestFunction - Update apiInner.mustache with x-fga-streaming vendor extension logic - Uses createStreamingRequestFunction for streaming ops - Returns Promise<any> instead of PromiseResult<T> - Simplified telemetry (method name only) - Update index.mustache to export parseNDJSONStream - Update config.overrides.json with streaming file + feature flag - Add README documentation for Streamed List Objects API - Update API endpoints table with streaming endpoint Implementation: - Conditional template logic based on x-fga-streaming vendor extension - Preserves telemetry while returning raw Node.js stream - Aligned with Python SDK template patterns Dependencies: - Requires x-fga-streaming: true in OpenAPI spec (openfga/api) Related: - Fixes #76 (JavaScript SDK) - Implements openfga/js-sdk#236 - Related PR: openfga/js-sdk#280
Updates JavaScript SDK templates to support the streaming API endpoint for unlimited object retrieval. Templates now handle streaming operations differently using vendor extension conditionals. Changes: - Add streaming.mustache template with NDJSON parser for Node.js - Update api.mustache to import createStreamingRequestFunction - Update apiInner.mustache with x-fga-streaming vendor extension logic - Uses createStreamingRequestFunction for streaming ops - Returns Promise<any> instead of PromiseResult<T> - Simplified telemetry (method name only) - Update index.mustache to export parseNDJSONStream - Update config.overrides.json with streaming file + feature flag - Add README documentation for Streamed List Objects API - Update API endpoints table with streaming endpoint Implementation: - Conditional template logic based on x-fga-streaming vendor extension - Preserves telemetry while returning raw Node.js stream - Aligned with Python SDK template patterns - Fixed error propagation in async iterator adapter - Widened parseNDJSONStream type signature for better DX Dependencies: - Requires x-fga-streaming: true in OpenAPI spec (openfga/api) Related: - Fixes #76 (JavaScript SDK) - Implements openfga/js-sdk#236 - Related PR: openfga/js-sdk#280
Updates JavaScript SDK templates to support the streaming API endpoint for unlimited object retrieval. Templates now handle streaming operations differently using vendor extension conditionals. Templates Modified (7 files): - Add streaming.mustache template with NDJSON parser for Node.js - Update api.mustache to import createStreamingRequestFunction - Update apiInner.mustache with x-fga-streaming vendor extension logic - Uses createStreamingRequestFunction for streaming ops - Returns Promise<any> instead of PromiseResult<T> - Simplified telemetry (method name only) - Update index.mustache to export parseNDJSONStream - Update config.overrides.json with streaming file + feature flag - Add README_calling_api.mustache documentation for Streamed List Objects - Add README_api_endpoints.mustache table entry for streaming endpoint Implementation: - Conditional template logic based on x-fga-streaming vendor extension - Preserves telemetry while returning raw Node.js stream - Aligned with Python SDK template patterns - Fixed error propagation in async iterator adapter - Widened parseNDJSONStream type signature for better DX - Added guard to prevent onEnd processing after error state Generated SDK Verification: - ✅ streaming.ts generated with all error handling fixes - ✅ parseNDJSONStream exported from index.ts - ✅ StreamedListObjectsResponse interface in apiModel.ts -⚠️ streamedListObjects method uses regular handling (needs x-fga-streaming: true in spec) Dependencies: - Requires x-fga-streaming: true vendor extension in OpenAPI spec (openfga/api) - Without vendor extension, method is generated but uses wrong request handler Related: - Fixes #76 (JavaScript SDK) - Implements openfga/js-sdk#236 - Related PR: openfga/js-sdk#280
Adds NDJSON streaming parser template to support streamedListObjects in the JavaScript SDK. Templates provide the parsing utility; actual streaming implementation remains in custom code (client.ts, common.ts) in js-sdk repo. Templates Added/Modified (5 files): - streaming.mustache (NEW) - NDJSON parser for Node.js streams - Proper error propagation (pending promises reject on error) - onEnd guard prevents processing after error state - Widened type signature (Readable|AsyncIterable|string|Buffer) - index.mustache - Export parseNDJSONStream utility - config.overrides.json - Register streaming.mustache + supportsStreamedListObjects flag - README_calling_api.mustache - Add Streamed List Objects usage documentation - README_api_endpoints.mustache - Add endpoint to API table Architecture: - Templates generate utilities and basic API methods - Custom code in js-sdk handles actual streaming (client.ts, common.ts) - No OpenAPI spec changes required Tested: - Generated streaming.ts includes all error handling fixes - parseNDJSONStream exported correctly - Custom js-sdk code works with generated utilities Related: - Fixes #76 (JavaScript SDK) - Implements openfga/js-sdk#236 - Related PR: openfga/js-sdk#280
Adds streamedListObjects method for retrieving unlimited objects via the streaming API endpoint. Node.js-only implementation with resilient NDJSON parsing, proper error handling, and automatic resource cleanup. Requires OpenFGA server v1.2.0+ Features: - Streams beyond 1000-object limit (tested with 2000 objects) - Memory-efficient incremental results via async generators - Automatic stream cleanup prevents connection leaks - Proper error propagation through async iterators - Flexible input types (Readable|AsyncIterable|string|Buffer|Uint8Array) - Telemetry maintained through streaming request helper Implementation: - streaming.ts: NDJSON parser with robust error handling - common.ts: createStreamingRequestFunction for axios streaming - client.ts: streamedListObjects() async generator wrapper - Error handling: Pending promises reject on error, onEnd guarded - Resource management: Stream destruction in return()/throw()/finally - Type safety: Wide signature eliminates unnecessary casts Testing (153/153 tests passing): - 17 streaming tests (parsing, errors, cleanup, edge cases) - 95% coverage on streaming.ts - Live tested: 3-object and 2000-object streaming verified Examples: - example/streamed-list-objects: Full model with 2000 tuples - example/streamed-list-objects-local: Minimal local setup Related: - Fixes #236 - Parent issue: openfga/sdk-generator#76 - Related PR: openfga/sdk-generator#654 (templates)
Adds NDJSON streaming parser template to support streamedListObjects in the JavaScript SDK. Templates provide parsing utilities; actual streaming logic remains in custom code (client.ts, common.ts) maintained in js-sdk repository. Templates Added/Modified (5 files): - streaming.mustache (NEW): NDJSON parser for Node.js streams - Proper error propagation (reject pending promises on error) - onEnd guard prevents processing after error - Uint8Array handling alongside string/Buffer - Stream destruction in return()/throw() methods - Widened type signature for better DX - index.mustache: Export parseNDJSONStream utility - config.overrides.json: Register streaming + supportsStreamedListObjects flag - README_calling_api.mustache: Usage documentation - README_api_endpoints.mustache: API endpoint table entry Architecture: - Templates generate utilities (streaming.ts, exports) - Custom js-sdk code implements streaming (common.ts, client.ts) - No OpenAPI spec changes required Generated & Verified: - streaming.ts includes all error handling fixes - parseNDJSONStream exported correctly - Works with custom js-sdk streaming implementation Related: - Fixes #76 (JavaScript SDK) - Implements openfga/js-sdk#236 - Related PR: openfga/js-sdk#280
3db22bb to
0c2a0ab
Compare
example/streamed-list-objects-local/streamedListObjectsLocal.mjs
Outdated
Show resolved
Hide resolved
example/streamed-list-objects-local/streamedListObjectsLocal.mjs
Outdated
Show resolved
Hide resolved
example/streamed-list-objects-local/streamedListObjectsLocal.mjs
Outdated
Show resolved
Hide resolved
|
Can you also review @SoulPancake's PR here: openfga/go-sdk#252 Ideally both would have the same semantics, example, tests, config options, README, etc.. |
- Simplify to one example using syntax transformer - Add StreamedListObjects documentation to main README - Add 5 client integration tests (streaming, headers, errors, retry, consistency) - Simplify CHANGELOG with links to documentation - Use @openfga/syntax-transformer in example
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
Add documentation for the StreamedListObjects API to the List Objects guide. Changes: - Add Streamed List Objects section explaining streaming differences - Include Node.js and Python SDK examples - Add note about SDK availability - Add related link to StreamedListObjects API reference Related: - openfga/js-sdk#280 - openfga/sdk-generator#654 - openfga/sdk-generator#76
Switch JavaScript SDK build to use build-client-streamed target (like Python SDK). This includes the streaming endpoints in the generated SDK. Changes: - Makefile: build-client-js uses build-client-streamed instead of build-client This allows the JS SDK to include StreamedListObjects endpoint while custom code in js-sdk repo (common.ts, client.ts) implements the streaming logic. Related: - openfga/js-sdk#280 - #76
- Add detailed explanation of StreamedListObjects - Document async generator pattern - Show early break and cleanup examples - List benefits and performance considerations - Update to reflect 2000 tuples in example
- Update authorization model to show owner/viewer/can_read pattern - Write tuples to base relations (owner, viewer) - Query computed relation (can_read = owner OR viewer) - Demonstrates OpenFGA's value: derived permissions from base relations - Update CHANGELOG to be brief with link to README (per @rhamzeh feedback) - Remove OPENFGA_LIST_OBJECTS_DEADLINE from manually-written docs - Clarify no pagination limit vs server timeout in all documentation - Add detailed explanation of computed relations in example README Addresses feedback from @aaguiarz and @SoulPancake: - openfga/js-sdk#280 (comment) - Shows why StreamedListObjects is valuable for computed relations All tests passing. Example builds successfully.
- Write tuples in batches of 100 (OpenFGA write limit) - Prevents validation error when writing 2000 tuples - Matches JS SDK batching pattern - Verified working with live OpenFGA server Example successfully streams 2000 objects from computed can_read relation.
- Write tuples in batches of 100 (OpenFGA write limit) - Prevents validation error when writing 2000 tuples - Matches JS SDK batching pattern - Verified working with live OpenFGA server Example successfully streams 2000 objects from computed can_read relation.
Resolves CHANGELOG conflict: - Place streamedListObjects in Unreleased section (not yet released) - Keep conflict options feature in v0.9.1 (already released) - Brings in latest workflow updates and package changes from main All 182 tests passing. Example verified working.
- Sanitize error logging to avoid exposing config values - Handle FgaValidationError separately (log field name only) - Keep helpful ECONNREFUSED hint for connection issues - Generic error message for other errors Addresses CodeQL security finding: Clear-text logging of sensitive information All 182 tests passing. Example verified working.
- Don't log err.field (could contain sensitive field names like apiTokenIssuer) - Use fully generic validation error message - Addresses CodeQL: Clear-text logging of sensitive information All 182 tests passing. Example verified working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds streaming support for listing objects through a new streamedListObjects method, which allows retrieving unlimited objects beyond the standard 1000-object pagination limit. The implementation is Node.js-specific and uses NDJSON (newline-delimited JSON) parsing with proper error handling and resource cleanup.
Key changes:
- Implements async generator pattern for streaming object retrieval
- Adds comprehensive NDJSON stream parser supporting multiple input types
- Integrates with existing telemetry, retry, and authentication infrastructure
- Includes thorough test coverage for streaming utilities and client integration
Reviewed changes
Copilot reviewed 14 out of 15 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| streaming.ts | Core NDJSON parser with EventEmitter-to-AsyncIterable adapter and multi-format input support |
| client.ts | High-level streamedListObjects method wrapping API call with stream parsing and cleanup |
| api.ts | Low-level API layer implementation with streaming request support |
| apiModel.ts | TypeScript interface definition for streaming response objects |
| common.ts | Streaming-specific request function handling axios stream responses |
| index.ts | Exports parseNDJSONStream utility for external use |
| tests/streaming.test.ts | Comprehensive unit tests for NDJSON parsing including chunking, errors, and cleanup |
| tests/client.test.ts | Integration tests for client method including retry, headers, and errors |
| tests/helpers/nocks.ts | Mock helper for streaming API responses in tests |
| CHANGELOG.md | Documents new feature addition |
| README.md | User-facing documentation with usage examples |
| example/streamed-list-objects/* | Complete working example demonstrating large-scale streaming with 2000 objects |
Files not reviewed (1)
- example/streamed-list-objects/package-lock.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try { | ||
| yield JSON.parse(trimmed); | ||
| } catch (err) { | ||
| console.warn("Failed to parse JSON line:", err); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of console.warn for error logging is inconsistent with the rest of the codebase, which doesn't use console methods for logging. This could pollute production logs with warnings that may not be actionable by users.
Consider either:
- Silently skipping invalid JSON lines (since the tests show this is expected behavior)
- Using a proper error handling mechanism that allows library consumers to handle these cases
If these warnings are for debugging purposes only, they should be removed. The current behavior already handles invalid JSON gracefully by skipping the line and continuing to parse.
| try { | ||
| yield JSON.parse(trimmed); | ||
| } catch (err) { | ||
| console.warn("Failed to parse JSON line:", err); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of console.warn for error logging is inconsistent with the rest of the codebase, which doesn't use console methods for logging. This could pollute production logs with warnings that may not be actionable by users.
Consider either:
- Silently skipping invalid JSON lines (since the tests show this is expected behavior)
- Using a proper error handling mechanism that allows library consumers to handle these cases
If these warnings are for debugging purposes only, they should be removed. The current behavior already handles invalid JSON gracefully by skipping the line and continuing to parse.
| try { | ||
| yield JSON.parse(buffer); | ||
| } catch (err) { | ||
| console.warn("Failed to parse final JSON buffer:", err); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of console.warn for error logging is inconsistent with the rest of the codebase, which doesn't use console methods for logging. This could pollute production logs with warnings that may not be actionable by users.
Consider either:
- Silently skipping invalid JSON lines (since the tests show this is expected behavior)
- Using a proper error handling mechanism that allows library consumers to handle these cases
If these warnings are for debugging purposes only, they should be removed. The current behavior already handles invalid JSON gracefully by skipping the line and continuing to parse.
| const { reject } = pendings.shift()!; | ||
| reject(err); | ||
| } | ||
| cleanup(); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling in createAsyncIterableFromReadable has a logical issue. When an error occurs, it calls cleanup() (line 48), but the cleanup function tries to remove listeners including the error listener itself (lines 56-58). Since the error event uses .once() (line 53), it's already been removed by Node.js after firing, which could cause issues.
Additionally, after an error, subsequent calls to next() will return Promise.reject(error) (line 64), but the stream might not be properly destroyed. Consider adding stream destruction in the onError handler:
const onError = (err: any) => {
error = err;
while (pendings.length > 0) {
const { reject } = pendings.shift()!;
reject(err);
}
cleanup();
if (readable && typeof readable.destroy === "function") {
readable.destroy(err);
}
};| cleanup(); | |
| cleanup(); | |
| if (readable && typeof readable.destroy === "function") { | |
| readable.destroy(err); | |
| } |
|
|
||
| // If stream is actually a string or Buffer-like, handle as whole payload | ||
| const isString = typeof stream === "string"; | ||
| const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer && Buffer.isBuffer(stream); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The condition check typeof Buffer.isBuffer is redundant. The expression is already checking if Buffer.isBuffer exists, so the second call to Buffer.isBuffer(stream) can directly use it.
Consider simplifying to:
const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer?.(stream);This uses optional chaining which is safer and more concise.
| const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer && Buffer.isBuffer(stream); | |
| const isBuffer = typeof Buffer !== "undefined" && Buffer.isBuffer?.(stream); |
| describe("StreamedListObjects", () => { | ||
| it("should stream objects and yield them incrementally", async () => { | ||
| const objects = ["document:1", "document:2", "document:3"]; | ||
| const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); | ||
|
|
||
| expect(scope.isDone()).toBe(false); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:81684243-9356-4421-8fbf-a4f8d36aa31b", | ||
| relation: "can_read", | ||
| type: "document", | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| expect(results).toHaveLength(3); | ||
| expect(results).toEqual(expect.arrayContaining(objects)); | ||
| }); | ||
|
|
||
| it("should handle custom headers", async () => { | ||
| const objects = ["document:1"]; | ||
|
|
||
| const scope = nock(defaultConfiguration.getBasePath()) | ||
| .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) | ||
| .reply(function () { | ||
| // Verify custom headers were sent | ||
| expect(this.req.headers["x-custom-header"]).toBe("custom-value"); | ||
| expect(this.req.headers["x-request-id"]).toBe("test-123"); | ||
|
|
||
| // Return NDJSON stream | ||
| const ndjsonResponse = objects | ||
| .map(obj => JSON.stringify({ result: { object: obj } })) | ||
| .join("\n") + "\n"; | ||
|
|
||
| return [200, Readable.from([ndjsonResponse]), { | ||
| "Content-Type": "application/x-ndjson" | ||
| }]; | ||
| }); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| }, { | ||
| headers: { | ||
| "X-Custom-Header": "custom-value", | ||
| "X-Request-ID": "test-123" | ||
| } | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| expect(results).toEqual(objects); | ||
| }); | ||
|
|
||
| it("should handle errors from the stream", async () => { | ||
| const scope = nock(defaultConfiguration.getBasePath()) | ||
| .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) | ||
| .reply(500, { code: "internal_error", message: "Server error" }); | ||
|
|
||
| await expect(async () => { | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| })) { | ||
| // Should not get here | ||
| } | ||
| }).rejects.toThrow(); | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| }); | ||
|
|
||
| it("should handle retry on 429 error", async () => { | ||
| const objects = ["document:1"]; | ||
|
|
||
| // Create client with retry enabled | ||
| const fgaClientWithRetry = new OpenFgaClient({ | ||
| ...baseConfig, | ||
| credentials: { method: CredentialsMethod.None }, | ||
| retryParams: { maxRetry: 2, minWaitInMs: 10 } | ||
| }); | ||
|
|
||
| // First attempt fails with 429 (called exactly once) | ||
| const scope1 = nock(defaultConfiguration.getBasePath()) | ||
| .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) | ||
| .times(1) | ||
| .reply(429, { code: "rate_limit_exceeded", message: "Rate limited" }, { | ||
| "Retry-After": "1" | ||
| }); | ||
|
|
||
| // Second attempt succeeds (retry - called exactly once) | ||
| const scope2 = nocks.streamedListObjects(baseConfig.storeId!, objects); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClientWithRetry.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| // Verify both scopes were called (proves retry happened) | ||
| expect(scope1.isDone()).toBe(true); | ||
| expect(scope2.isDone()).toBe(true); | ||
| expect(results).toEqual(objects); | ||
| }); | ||
|
|
||
| it("should support consistency preference", async () => { | ||
| const objects = ["document:1"]; | ||
| const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| }, { | ||
| consistency: ConsistencyPreference.HigherConsistency | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| expect(results).toEqual(objects); | ||
| }); | ||
| }); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests for streamedListObjects don't cover the edge case of an empty result set (no objects returned). This is an important scenario to test to ensure the async generator properly completes when the server returns an empty stream.
Consider adding a test case like:
it("should handle empty result set", async () => {
const scope = nocks.streamedListObjects(baseConfig.storeId!, []);
const results: string[] = [];
for await (const response of fgaClient.streamedListObjects({
user: "user:anne",
relation: "owner",
type: "document",
})) {
results.push(response.object);
}
expect(scope.isDone()).toBe(true);
expect(results).toHaveLength(0);
});| describe("StreamedListObjects", () => { | ||
| it("should stream objects and yield them incrementally", async () => { | ||
| const objects = ["document:1", "document:2", "document:3"]; | ||
| const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); | ||
|
|
||
| expect(scope.isDone()).toBe(false); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:81684243-9356-4421-8fbf-a4f8d36aa31b", | ||
| relation: "can_read", | ||
| type: "document", | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| expect(results).toHaveLength(3); | ||
| expect(results).toEqual(expect.arrayContaining(objects)); | ||
| }); | ||
|
|
||
| it("should handle custom headers", async () => { | ||
| const objects = ["document:1"]; | ||
|
|
||
| const scope = nock(defaultConfiguration.getBasePath()) | ||
| .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) | ||
| .reply(function () { | ||
| // Verify custom headers were sent | ||
| expect(this.req.headers["x-custom-header"]).toBe("custom-value"); | ||
| expect(this.req.headers["x-request-id"]).toBe("test-123"); | ||
|
|
||
| // Return NDJSON stream | ||
| const ndjsonResponse = objects | ||
| .map(obj => JSON.stringify({ result: { object: obj } })) | ||
| .join("\n") + "\n"; | ||
|
|
||
| return [200, Readable.from([ndjsonResponse]), { | ||
| "Content-Type": "application/x-ndjson" | ||
| }]; | ||
| }); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| }, { | ||
| headers: { | ||
| "X-Custom-Header": "custom-value", | ||
| "X-Request-ID": "test-123" | ||
| } | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| expect(results).toEqual(objects); | ||
| }); | ||
|
|
||
| it("should handle errors from the stream", async () => { | ||
| const scope = nock(defaultConfiguration.getBasePath()) | ||
| .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) | ||
| .reply(500, { code: "internal_error", message: "Server error" }); | ||
|
|
||
| await expect(async () => { | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| })) { | ||
| // Should not get here | ||
| } | ||
| }).rejects.toThrow(); | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| }); | ||
|
|
||
| it("should handle retry on 429 error", async () => { | ||
| const objects = ["document:1"]; | ||
|
|
||
| // Create client with retry enabled | ||
| const fgaClientWithRetry = new OpenFgaClient({ | ||
| ...baseConfig, | ||
| credentials: { method: CredentialsMethod.None }, | ||
| retryParams: { maxRetry: 2, minWaitInMs: 10 } | ||
| }); | ||
|
|
||
| // First attempt fails with 429 (called exactly once) | ||
| const scope1 = nock(defaultConfiguration.getBasePath()) | ||
| .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) | ||
| .times(1) | ||
| .reply(429, { code: "rate_limit_exceeded", message: "Rate limited" }, { | ||
| "Retry-After": "1" | ||
| }); | ||
|
|
||
| // Second attempt succeeds (retry - called exactly once) | ||
| const scope2 = nocks.streamedListObjects(baseConfig.storeId!, objects); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClientWithRetry.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| // Verify both scopes were called (proves retry happened) | ||
| expect(scope1.isDone()).toBe(true); | ||
| expect(scope2.isDone()).toBe(true); | ||
| expect(results).toEqual(objects); | ||
| }); | ||
|
|
||
| it("should support consistency preference", async () => { | ||
| const objects = ["document:1"]; | ||
| const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| }, { | ||
| consistency: ConsistencyPreference.HigherConsistency | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| expect(results).toEqual(objects); | ||
| }); | ||
| }); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The tests for streamedListObjects only exist at the OpenFgaClient level (in client.test.ts), but not at the OpenFgaApi level. Looking at the pattern in tests/index.test.ts, other methods like listObjects (line 1475) have both client-level and API-level tests.
Consider adding an API-level test in tests/index.test.ts similar to the listObjects test to ensure the low-level API function works correctly independent of the client wrapper.
| describe("StreamedListObjects", () => { | ||
| it("should stream objects and yield them incrementally", async () => { | ||
| const objects = ["document:1", "document:2", "document:3"]; | ||
| const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); | ||
|
|
||
| expect(scope.isDone()).toBe(false); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:81684243-9356-4421-8fbf-a4f8d36aa31b", | ||
| relation: "can_read", | ||
| type: "document", | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| expect(results).toHaveLength(3); | ||
| expect(results).toEqual(expect.arrayContaining(objects)); | ||
| }); | ||
|
|
||
| it("should handle custom headers", async () => { | ||
| const objects = ["document:1"]; | ||
|
|
||
| const scope = nock(defaultConfiguration.getBasePath()) | ||
| .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) | ||
| .reply(function () { | ||
| // Verify custom headers were sent | ||
| expect(this.req.headers["x-custom-header"]).toBe("custom-value"); | ||
| expect(this.req.headers["x-request-id"]).toBe("test-123"); | ||
|
|
||
| // Return NDJSON stream | ||
| const ndjsonResponse = objects | ||
| .map(obj => JSON.stringify({ result: { object: obj } })) | ||
| .join("\n") + "\n"; | ||
|
|
||
| return [200, Readable.from([ndjsonResponse]), { | ||
| "Content-Type": "application/x-ndjson" | ||
| }]; | ||
| }); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| }, { | ||
| headers: { | ||
| "X-Custom-Header": "custom-value", | ||
| "X-Request-ID": "test-123" | ||
| } | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| expect(results).toEqual(objects); | ||
| }); | ||
|
|
||
| it("should handle errors from the stream", async () => { | ||
| const scope = nock(defaultConfiguration.getBasePath()) | ||
| .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) | ||
| .reply(500, { code: "internal_error", message: "Server error" }); | ||
|
|
||
| await expect(async () => { | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| })) { | ||
| // Should not get here | ||
| } | ||
| }).rejects.toThrow(); | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| }); | ||
|
|
||
| it("should handle retry on 429 error", async () => { | ||
| const objects = ["document:1"]; | ||
|
|
||
| // Create client with retry enabled | ||
| const fgaClientWithRetry = new OpenFgaClient({ | ||
| ...baseConfig, | ||
| credentials: { method: CredentialsMethod.None }, | ||
| retryParams: { maxRetry: 2, minWaitInMs: 10 } | ||
| }); | ||
|
|
||
| // First attempt fails with 429 (called exactly once) | ||
| const scope1 = nock(defaultConfiguration.getBasePath()) | ||
| .post(`/stores/${baseConfig.storeId}/streamed-list-objects`) | ||
| .times(1) | ||
| .reply(429, { code: "rate_limit_exceeded", message: "Rate limited" }, { | ||
| "Retry-After": "1" | ||
| }); | ||
|
|
||
| // Second attempt succeeds (retry - called exactly once) | ||
| const scope2 = nocks.streamedListObjects(baseConfig.storeId!, objects); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClientWithRetry.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| // Verify both scopes were called (proves retry happened) | ||
| expect(scope1.isDone()).toBe(true); | ||
| expect(scope2.isDone()).toBe(true); | ||
| expect(results).toEqual(objects); | ||
| }); | ||
|
|
||
| it("should support consistency preference", async () => { | ||
| const objects = ["document:1"]; | ||
| const scope = nocks.streamedListObjects(baseConfig.storeId!, objects); | ||
|
|
||
| const results: string[] = []; | ||
| for await (const response of fgaClient.streamedListObjects({ | ||
| user: "user:anne", | ||
| relation: "owner", | ||
| type: "document", | ||
| }, { | ||
| consistency: ConsistencyPreference.HigherConsistency | ||
| })) { | ||
| results.push(response.object); | ||
| } | ||
|
|
||
| expect(scope.isDone()).toBe(true); | ||
| expect(results).toEqual(objects); | ||
| }); | ||
| }); |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests don't cover the early termination scenario (breaking out of the for await loop before consuming all results). This is an important feature mentioned in the example README (lines 74-82 in example/streamed-list-objects/README.md) and the cleanup logic in client.ts (lines 889-893) specifically handles this case.
Consider adding a test case like:
it("should clean up properly on early termination", async () => {
const objects = ["document:1", "document:2", "document:3"];
const scope = nocks.streamedListObjects(baseConfig.storeId!, objects);
const results: string[] = [];
for await (const response of fgaClient.streamedListObjects({
user: "user:anne",
relation: "owner",
type: "document",
})) {
results.push(response.object);
if (results.length === 1) {
break; // Early termination
}
}
expect(results).toHaveLength(1);
expect(results[0]).toBe("document:1");
// Stream should be cleaned up properly
});| .reply(500, { code: "internal_error", message: "Server error" }); | ||
|
|
||
| await expect(async () => { | ||
| for await (const response of fgaClient.streamedListObjects({ |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused variable response.
| for await (const response of fgaClient.streamedListObjects({ | |
| for await (const _ of fgaClient.streamedListObjects({ |
| .reply(500, { code: "internal_error", message: "Server error" }); | ||
|
|
||
| await expect(async () => { | ||
| for await (const response of fgaClient.streamedListObjects({ |
Copilot
AI
Dec 1, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For loop variable response is not used in the loop body.
Summary
Adds
streamedListObjectsmethod for retrieving unlimited objects via the streaming API. Node.js-only implementation with NDJSON parsing, error handling, and automatic resource cleanup.Requires OpenFGA server v1.2.0+
Fixes #236
Changes
streaming.tswith NDJSON parser for Node.js streamsStreamedListObjectsResponseinterface toapiModel.tsOpenFgaClient.streamedListObjects()async generator methodcreateStreamingRequestFunctiontocommon.tsfor streaming requestsparseNDJSONStreamfromindex.tsImplementation
Usage
Testing
All 153 tests passing (10/10 suites)
Related