Skip to content

Commit 90c4881

Browse files
authored
feat: use common streaming utils, reverse sync to template (#261)
* feat: use common streaming utils, reverse sync to template * feat: revert package downgrade * feat: address copilot comments
1 parent 806fc05 commit 90c4881

File tree

6 files changed

+576
-116
lines changed

6 files changed

+576
-116
lines changed

.openapi-generator/FILES

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ docs/WriteAuthorizationModelResponse.md
9797
docs/WriteRequest.md
9898
docs/WriteRequestDeletes.md
9999
docs/WriteRequestWrites.md
100+
src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java
100101
src/main/java/dev/openfga/sdk/api/OpenFgaApi.java
102+
src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java
101103
src/main/java/dev/openfga/sdk/api/model/AbortedMessageResponse.java
102104
src/main/java/dev/openfga/sdk/api/model/AbstractOpenApiSchema.java
103105
src/main/java/dev/openfga/sdk/api/model/Any.java
@@ -157,6 +159,7 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java
157159
src/main/java/dev/openfga/sdk/api/model/SourceInfo.java
158160
src/main/java/dev/openfga/sdk/api/model/Status.java
159161
src/main/java/dev/openfga/sdk/api/model/Store.java
162+
src/main/java/dev/openfga/sdk/api/model/StreamResult.java
160163
src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java
161164
src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java
162165
src/main/java/dev/openfga/sdk/api/model/Tuple.java

examples/streamed-list-objects/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ openfga_version=latest
66
language=java
77

88
build:
9-
./gradlew -P language=$(language) build
9+
../../gradlew -P language=$(language) build
1010

1111
run:
12-
./gradlew -P language=$(language) run
12+
../../gradlew -P language=$(language) run
1313

1414
run-openfga:
1515
docker pull docker.io/openfga/openfga:${openfga_version} && \
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* OpenFGA
3+
* A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar.
4+
*
5+
* The version of the OpenAPI document: 1.x
6+
* Contact: [email protected]
7+
*
8+
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
9+
* https://openapi-generator.tech
10+
* Do not edit the class manually.
11+
*/
12+
13+
package dev.openfga.sdk.api;
14+
15+
import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace;
16+
17+
import com.fasterxml.jackson.core.type.TypeReference;
18+
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import dev.openfga.sdk.api.client.ApiClient;
20+
import dev.openfga.sdk.api.configuration.Configuration;
21+
import dev.openfga.sdk.api.model.Status;
22+
import dev.openfga.sdk.api.model.StreamResult;
23+
import dev.openfga.sdk.errors.ApiException;
24+
import dev.openfga.sdk.errors.FgaInvalidParameterException;
25+
import java.net.http.HttpRequest;
26+
import java.net.http.HttpResponse;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.function.Consumer;
29+
import java.util.stream.Stream;
30+
31+
/**
32+
* Base class for handling streaming API responses.
33+
* This class provides generic streaming functionality that can be reused across
34+
* different streaming endpoints by handling the common streaming parsing and error handling logic.
35+
*
36+
* @param <T> The type of response objects in the stream
37+
*/
38+
public abstract class BaseStreamingApi<T> {
39+
protected final Configuration configuration;
40+
protected final ApiClient apiClient;
41+
protected final ObjectMapper objectMapper;
42+
protected final TypeReference<StreamResult<T>> streamResultTypeRef;
43+
44+
/**
45+
* Constructor for BaseStreamingApi
46+
*
47+
* @param configuration The API configuration
48+
* @param apiClient The API client for making HTTP requests
49+
* @param streamResultTypeRef TypeReference for deserializing StreamResult<T>
50+
*/
51+
protected BaseStreamingApi(
52+
Configuration configuration, ApiClient apiClient, TypeReference<StreamResult<T>> streamResultTypeRef) {
53+
this.configuration = configuration;
54+
this.apiClient = apiClient;
55+
this.objectMapper = apiClient.getObjectMapper();
56+
this.streamResultTypeRef = streamResultTypeRef;
57+
}
58+
59+
/**
60+
* Process a streaming response asynchronously.
61+
* Each line in the response is parsed and delivered to the consumer callback.
62+
*
63+
* @param request The HTTP request to execute
64+
* @param consumer Callback to handle each response object (invoked asynchronously)
65+
* @param errorConsumer Optional callback to handle errors during streaming
66+
* @return CompletableFuture<Void> that completes when streaming finishes
67+
*/
68+
protected CompletableFuture<Void> processStreamingResponse(
69+
HttpRequest request, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
70+
71+
// Use async HTTP client with streaming body handler
72+
// ofLines() provides line-by-line streaming
73+
return apiClient
74+
.getHttpClient()
75+
.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
76+
.thenCompose(response -> {
77+
// Check response status
78+
int statusCode = response.statusCode();
79+
if (statusCode < 200 || statusCode >= 300) {
80+
ApiException apiException =
81+
new ApiException(statusCode, "API error: " + statusCode, response.headers(), null);
82+
return CompletableFuture.failedFuture(apiException);
83+
}
84+
85+
// Process the stream - this runs on HttpClient's executor thread
86+
try (Stream<String> lines = response.body()) {
87+
lines.forEach(line -> {
88+
if (!isNullOrWhitespace(line)) {
89+
processLine(line, consumer, errorConsumer);
90+
}
91+
});
92+
return CompletableFuture.completedFuture((Void) null);
93+
} catch (Exception e) {
94+
return CompletableFuture.failedFuture(e);
95+
}
96+
})
97+
.handle((result, throwable) -> {
98+
if (throwable != null) {
99+
// Unwrap CompletionException to get the original exception
100+
Throwable actualException = throwable;
101+
if (throwable instanceof java.util.concurrent.CompletionException
102+
&& throwable.getCause() != null) {
103+
actualException = throwable.getCause();
104+
}
105+
106+
if (errorConsumer != null) {
107+
errorConsumer.accept(actualException);
108+
}
109+
// Re-throw to keep the CompletableFuture in failed state
110+
if (actualException instanceof RuntimeException) {
111+
throw (RuntimeException) actualException;
112+
}
113+
throw new RuntimeException(actualException);
114+
}
115+
return result;
116+
});
117+
}
118+
119+
/**
120+
* Process a single line from the stream
121+
*
122+
* @param line The JSON line to process
123+
* @param consumer Callback to handle the parsed result
124+
* @param errorConsumer Optional callback to handle errors
125+
*/
126+
private void processLine(String line, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
127+
try {
128+
// Parse the JSON line to extract the object
129+
StreamResult<T> streamResult = objectMapper.readValue(line, streamResultTypeRef);
130+
131+
if (streamResult.getError() != null) {
132+
// Handle error in stream
133+
if (errorConsumer != null) {
134+
Status error = streamResult.getError();
135+
String errorMessage = error.getMessage() != null
136+
? "Stream error: " + error.getMessage()
137+
: "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown");
138+
errorConsumer.accept(new ApiException(errorMessage));
139+
}
140+
} else if (streamResult.getResult() != null) {
141+
// Deliver the response object to the consumer
142+
T result = streamResult.getResult();
143+
if (result != null) {
144+
consumer.accept(result);
145+
}
146+
}
147+
} catch (Exception e) {
148+
if (errorConsumer != null) {
149+
errorConsumer.accept(e);
150+
}
151+
}
152+
}
153+
154+
/**
155+
* Build an HTTP request for the streaming endpoint
156+
*
157+
* @param method HTTP method (e.g., "POST")
158+
* @param path The API path
159+
* @param body The request body
160+
* @param configuration The configuration to use
161+
* @return HttpRequest ready to execute
162+
* @throws ApiException if request building fails
163+
* @throws FgaInvalidParameterException if parameters are invalid
164+
*/
165+
protected HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration)
166+
throws ApiException, FgaInvalidParameterException {
167+
try {
168+
byte[] bodyBytes = objectMapper.writeValueAsBytes(body);
169+
HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration);
170+
171+
// Apply request interceptors if any
172+
var interceptor = apiClient.getRequestInterceptor();
173+
if (interceptor != null) {
174+
interceptor.accept(requestBuilder);
175+
}
176+
177+
return requestBuilder.build();
178+
} catch (Exception e) {
179+
throw new ApiException(e);
180+
}
181+
}
182+
}
Lines changed: 19 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,42 @@
1+
/*
2+
* OpenFGA
3+
* A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar.
4+
*
5+
* The version of the OpenAPI document: 1.x
6+
* Contact: [email protected]
7+
*
8+
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
9+
* https://openapi-generator.tech
10+
* Do not edit the class manually.
11+
*/
12+
113
package dev.openfga.sdk.api;
214

3-
import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace;
415
import static dev.openfga.sdk.util.Validation.assertParamExists;
516

6-
import com.fasterxml.jackson.databind.ObjectMapper;
17+
import com.fasterxml.jackson.core.type.TypeReference;
718
import dev.openfga.sdk.api.client.ApiClient;
819
import dev.openfga.sdk.api.configuration.Configuration;
920
import dev.openfga.sdk.api.configuration.ConfigurationOverride;
1021
import dev.openfga.sdk.api.model.ListObjectsRequest;
11-
import dev.openfga.sdk.api.model.Status;
12-
import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse;
22+
import dev.openfga.sdk.api.model.StreamResult;
1323
import dev.openfga.sdk.api.model.StreamedListObjectsResponse;
1424
import dev.openfga.sdk.errors.ApiException;
1525
import dev.openfga.sdk.errors.FgaInvalidParameterException;
1626
import dev.openfga.sdk.util.StringUtil;
1727
import java.net.http.HttpRequest;
18-
import java.net.http.HttpResponse;
1928
import java.util.concurrent.CompletableFuture;
2029
import java.util.function.Consumer;
21-
import java.util.stream.Stream;
2230

2331
/**
2432
* API layer for handling streaming responses from the streamedListObjects endpoint.
25-
* This class provides true asynchronous streaming with consumer callbacks using CompletableFuture
26-
* and Java 11's HttpClient async streaming capabilities.
33+
* This class extends BaseStreamingApi to provide true asynchronous streaming with consumer callbacks
34+
* using CompletableFuture and Java 11's HttpClient async streaming capabilities.
2735
*/
28-
public class StreamedListObjectsApi {
29-
private final Configuration configuration;
30-
private final ApiClient apiClient;
31-
private final ObjectMapper objectMapper;
36+
public class StreamedListObjectsApi extends BaseStreamingApi<StreamedListObjectsResponse> {
3237

3338
public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) {
34-
this.configuration = configuration;
35-
this.apiClient = apiClient;
36-
this.objectMapper = apiClient.getObjectMapper();
39+
super(configuration, apiClient, new TypeReference<StreamResult<StreamedListObjectsResponse>>() {});
3740
}
3841

3942
/**
@@ -142,110 +145,12 @@ private CompletableFuture<Void> streamedListObjects(
142145

143146
try {
144147
HttpRequest request = buildHttpRequest("POST", path, body, configuration);
145-
146-
// Use async HTTP client with streaming body handler
147-
// ofLines() provides line-by-line streaming which is perfect for NDJSON
148-
return apiClient
149-
.getHttpClient()
150-
.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
151-
.thenCompose(response -> {
152-
// Check response status
153-
int statusCode = response.statusCode();
154-
if (statusCode < 200 || statusCode >= 300) {
155-
ApiException apiException =
156-
new ApiException(statusCode, "API error: " + statusCode, response.headers(), null);
157-
return CompletableFuture.failedFuture(apiException);
158-
}
159-
160-
// Process the stream - this runs on HttpClient's executor thread
161-
try (Stream<String> lines = response.body()) {
162-
lines.forEach(line -> {
163-
if (!isNullOrWhitespace(line)) {
164-
processLine(line, consumer, errorConsumer);
165-
}
166-
});
167-
return CompletableFuture.completedFuture((Void) null);
168-
} catch (Exception e) {
169-
return CompletableFuture.failedFuture(e);
170-
}
171-
})
172-
.handle((result, throwable) -> {
173-
if (throwable != null) {
174-
// Unwrap CompletionException to get the original exception
175-
Throwable actualException = throwable;
176-
if (throwable instanceof java.util.concurrent.CompletionException
177-
&& throwable.getCause() != null) {
178-
actualException = throwable.getCause();
179-
}
180-
181-
if (errorConsumer != null) {
182-
errorConsumer.accept(actualException);
183-
}
184-
// Re-throw to keep the CompletableFuture in failed state
185-
if (actualException instanceof RuntimeException) {
186-
throw (RuntimeException) actualException;
187-
}
188-
throw new RuntimeException(actualException);
189-
}
190-
return result;
191-
});
192-
148+
return processStreamingResponse(request, consumer, errorConsumer);
193149
} catch (Exception e) {
194150
if (errorConsumer != null) {
195151
errorConsumer.accept(e);
196152
}
197153
return CompletableFuture.failedFuture(e);
198154
}
199155
}
200-
201-
/**
202-
* Process a single line from the NDJSON stream
203-
*/
204-
private void processLine(
205-
String line, Consumer<StreamedListObjectsResponse> consumer, Consumer<Throwable> errorConsumer) {
206-
try {
207-
// Parse the JSON line to extract the object
208-
StreamResultOfStreamedListObjectsResponse streamResult =
209-
objectMapper.readValue(line, StreamResultOfStreamedListObjectsResponse.class);
210-
211-
if (streamResult.getError() != null) {
212-
// Handle error in stream
213-
if (errorConsumer != null) {
214-
Status error = streamResult.getError();
215-
String errorMessage = error.getMessage() != null
216-
? "Stream error: " + error.getMessage()
217-
: "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown");
218-
errorConsumer.accept(new ApiException(errorMessage));
219-
}
220-
} else if (streamResult.getResult() != null) {
221-
// Deliver the response object to the consumer
222-
StreamedListObjectsResponse result = streamResult.getResult();
223-
if (result != null) {
224-
consumer.accept(result);
225-
}
226-
}
227-
} catch (Exception e) {
228-
if (errorConsumer != null) {
229-
errorConsumer.accept(e);
230-
}
231-
}
232-
}
233-
234-
private HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration)
235-
throws ApiException, FgaInvalidParameterException {
236-
try {
237-
byte[] bodyBytes = objectMapper.writeValueAsBytes(body);
238-
HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration);
239-
240-
// Apply request interceptors if any
241-
var interceptor = apiClient.getRequestInterceptor();
242-
if (interceptor != null) {
243-
interceptor.accept(requestBuilder);
244-
}
245-
246-
return requestBuilder.build();
247-
} catch (Exception e) {
248-
throw new ApiException(e);
249-
}
250-
}
251156
}

0 commit comments

Comments
 (0)