Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ docs/WriteAuthorizationModelResponse.md
docs/WriteRequest.md
docs/WriteRequestDeletes.md
docs/WriteRequestWrites.md
src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java
src/main/java/dev/openfga/sdk/api/OpenFgaApi.java
src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java
src/main/java/dev/openfga/sdk/api/model/AbortedMessageResponse.java
src/main/java/dev/openfga/sdk/api/model/AbstractOpenApiSchema.java
src/main/java/dev/openfga/sdk/api/model/Any.java
Expand Down Expand Up @@ -157,6 +159,7 @@ src/main/java/dev/openfga/sdk/api/model/RelationshipCondition.java
src/main/java/dev/openfga/sdk/api/model/SourceInfo.java
src/main/java/dev/openfga/sdk/api/model/Status.java
src/main/java/dev/openfga/sdk/api/model/Store.java
src/main/java/dev/openfga/sdk/api/model/StreamResult.java
src/main/java/dev/openfga/sdk/api/model/StreamResultOfStreamedListObjectsResponse.java
src/main/java/dev/openfga/sdk/api/model/StreamedListObjectsResponse.java
src/main/java/dev/openfga/sdk/api/model/Tuple.java
Expand Down
4 changes: 2 additions & 2 deletions examples/streamed-list-objects/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ openfga_version=latest
language=java

build:
./gradlew -P language=$(language) build
../../gradlew -P language=$(language) build

run:
./gradlew -P language=$(language) run
../../gradlew -P language=$(language) run

run-openfga:
docker pull docker.io/openfga/openfga:${openfga_version} && \
Expand Down
182 changes: 182 additions & 0 deletions src/main/java/dev/openfga/sdk/api/BaseStreamingApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* OpenFGA
* A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar.
*
* The version of the OpenAPI document: 1.x
* Contact: [email protected]
*
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
* https://openapi-generator.tech
* Do not edit the class manually.
*/

package dev.openfga.sdk.api;

import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.openfga.sdk.api.client.ApiClient;
import dev.openfga.sdk.api.configuration.Configuration;
import dev.openfga.sdk.api.model.Status;
import dev.openfga.sdk.api.model.StreamResult;
import dev.openfga.sdk.errors.ApiException;
import dev.openfga.sdk.errors.FgaInvalidParameterException;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
* Base class for handling streaming API responses.
* This class provides generic streaming functionality that can be reused across
* different streaming endpoints by handling the common streaming parsing and error handling logic.
*
* @param <T> The type of response objects in the stream
*/
public abstract class BaseStreamingApi<T> {
protected final Configuration configuration;
protected final ApiClient apiClient;
protected final ObjectMapper objectMapper;
protected final TypeReference<StreamResult<T>> streamResultTypeRef;

/**
* Constructor for BaseStreamingApi
*
* @param configuration The API configuration
* @param apiClient The API client for making HTTP requests
* @param streamResultTypeRef TypeReference for deserializing StreamResult<T>
*/
protected BaseStreamingApi(
Configuration configuration, ApiClient apiClient, TypeReference<StreamResult<T>> streamResultTypeRef) {
this.configuration = configuration;
this.apiClient = apiClient;
this.objectMapper = apiClient.getObjectMapper();
this.streamResultTypeRef = streamResultTypeRef;
}

/**
* Process a streaming response asynchronously.
* Each line in the response is parsed and delivered to the consumer callback.
*
* @param request The HTTP request to execute
* @param consumer Callback to handle each response object (invoked asynchronously)
* @param errorConsumer Optional callback to handle errors during streaming
* @return CompletableFuture<Void> that completes when streaming finishes
*/
protected CompletableFuture<Void> processStreamingResponse(
HttpRequest request, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {

// Use async HTTP client with streaming body handler
// ofLines() provides line-by-line streaming
return apiClient
.getHttpClient()
.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
.thenCompose(response -> {
// Check response status
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
ApiException apiException =
new ApiException(statusCode, "API error: " + statusCode, response.headers(), null);
return CompletableFuture.failedFuture(apiException);
}

// Process the stream - this runs on HttpClient's executor thread
try (Stream<String> lines = response.body()) {
lines.forEach(line -> {
if (!isNullOrWhitespace(line)) {
processLine(line, consumer, errorConsumer);
}
});
return CompletableFuture.completedFuture((Void) null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
})
.handle((result, throwable) -> {
if (throwable != null) {
// Unwrap CompletionException to get the original exception
Throwable actualException = throwable;
if (throwable instanceof java.util.concurrent.CompletionException
&& throwable.getCause() != null) {
actualException = throwable.getCause();
}

if (errorConsumer != null) {
errorConsumer.accept(actualException);
}
// Re-throw to keep the CompletableFuture in failed state
if (actualException instanceof RuntimeException) {
throw (RuntimeException) actualException;
}
throw new RuntimeException(actualException);
}
return result;
});
}

/**
* Process a single line from the stream
*
* @param line The JSON line to process
* @param consumer Callback to handle the parsed result
* @param errorConsumer Optional callback to handle errors
*/
private void processLine(String line, Consumer<T> consumer, Consumer<Throwable> errorConsumer) {
try {
// Parse the JSON line to extract the object
StreamResult<T> streamResult = objectMapper.readValue(line, streamResultTypeRef);

if (streamResult.getError() != null) {
// Handle error in stream
if (errorConsumer != null) {
Status error = streamResult.getError();
String errorMessage = error.getMessage() != null
? "Stream error: " + error.getMessage()
: "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown");
errorConsumer.accept(new ApiException(errorMessage));
}
} else if (streamResult.getResult() != null) {
// Deliver the response object to the consumer
T result = streamResult.getResult();
if (result != null) {
consumer.accept(result);
}
}
} catch (Exception e) {
if (errorConsumer != null) {
errorConsumer.accept(e);
}
}
}

/**
* Build an HTTP request for the streaming endpoint
*
* @param method HTTP method (e.g., "POST")
* @param path The API path
* @param body The request body
* @param configuration The configuration to use
* @return HttpRequest ready to execute
* @throws ApiException if request building fails
* @throws FgaInvalidParameterException if parameters are invalid
*/
protected HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration)
throws ApiException, FgaInvalidParameterException {
try {
byte[] bodyBytes = objectMapper.writeValueAsBytes(body);
HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration);

// Apply request interceptors if any
var interceptor = apiClient.getRequestInterceptor();
if (interceptor != null) {
interceptor.accept(requestBuilder);
}

return requestBuilder.build();
} catch (Exception e) {
throw new ApiException(e);
}
}
}
133 changes: 19 additions & 114 deletions src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java
Original file line number Diff line number Diff line change
@@ -1,39 +1,42 @@
/*
* OpenFGA
* A high performance and flexible authorization/permission engine built for developers and inspired by Google Zanzibar.
*
* The version of the OpenAPI document: 1.x
* Contact: [email protected]
*
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
* https://openapi-generator.tech
* Do not edit the class manually.
*/

package dev.openfga.sdk.api;

import static dev.openfga.sdk.util.StringUtil.isNullOrWhitespace;
import static dev.openfga.sdk.util.Validation.assertParamExists;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
import dev.openfga.sdk.api.client.ApiClient;
import dev.openfga.sdk.api.configuration.Configuration;
import dev.openfga.sdk.api.configuration.ConfigurationOverride;
import dev.openfga.sdk.api.model.ListObjectsRequest;
import dev.openfga.sdk.api.model.Status;
import dev.openfga.sdk.api.model.StreamResultOfStreamedListObjectsResponse;
import dev.openfga.sdk.api.model.StreamResult;
import dev.openfga.sdk.api.model.StreamedListObjectsResponse;
import dev.openfga.sdk.errors.ApiException;
import dev.openfga.sdk.errors.FgaInvalidParameterException;
import dev.openfga.sdk.util.StringUtil;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
* API layer for handling streaming responses from the streamedListObjects endpoint.
* This class provides true asynchronous streaming with consumer callbacks using CompletableFuture
* and Java 11's HttpClient async streaming capabilities.
* This class extends BaseStreamingApi to provide true asynchronous streaming with consumer callbacks
* using CompletableFuture and Java 11's HttpClient async streaming capabilities.
*/
public class StreamedListObjectsApi {
private final Configuration configuration;
private final ApiClient apiClient;
private final ObjectMapper objectMapper;
public class StreamedListObjectsApi extends BaseStreamingApi<StreamedListObjectsResponse> {

public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient) {
this.configuration = configuration;
this.apiClient = apiClient;
this.objectMapper = apiClient.getObjectMapper();
super(configuration, apiClient, new TypeReference<StreamResult<StreamedListObjectsResponse>>() {});
}

/**
Expand Down Expand Up @@ -142,110 +145,12 @@ private CompletableFuture<Void> streamedListObjects(

try {
HttpRequest request = buildHttpRequest("POST", path, body, configuration);

// Use async HTTP client with streaming body handler
// ofLines() provides line-by-line streaming which is perfect for NDJSON
return apiClient
.getHttpClient()
.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
.thenCompose(response -> {
// Check response status
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
ApiException apiException =
new ApiException(statusCode, "API error: " + statusCode, response.headers(), null);
return CompletableFuture.failedFuture(apiException);
}

// Process the stream - this runs on HttpClient's executor thread
try (Stream<String> lines = response.body()) {
lines.forEach(line -> {
if (!isNullOrWhitespace(line)) {
processLine(line, consumer, errorConsumer);
}
});
return CompletableFuture.completedFuture((Void) null);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
})
.handle((result, throwable) -> {
if (throwable != null) {
// Unwrap CompletionException to get the original exception
Throwable actualException = throwable;
if (throwable instanceof java.util.concurrent.CompletionException
&& throwable.getCause() != null) {
actualException = throwable.getCause();
}

if (errorConsumer != null) {
errorConsumer.accept(actualException);
}
// Re-throw to keep the CompletableFuture in failed state
if (actualException instanceof RuntimeException) {
throw (RuntimeException) actualException;
}
throw new RuntimeException(actualException);
}
return result;
});

return processStreamingResponse(request, consumer, errorConsumer);
} catch (Exception e) {
if (errorConsumer != null) {
errorConsumer.accept(e);
}
return CompletableFuture.failedFuture(e);
}
}

/**
* Process a single line from the NDJSON stream
*/
private void processLine(
String line, Consumer<StreamedListObjectsResponse> consumer, Consumer<Throwable> errorConsumer) {
try {
// Parse the JSON line to extract the object
StreamResultOfStreamedListObjectsResponse streamResult =
objectMapper.readValue(line, StreamResultOfStreamedListObjectsResponse.class);

if (streamResult.getError() != null) {
// Handle error in stream
if (errorConsumer != null) {
Status error = streamResult.getError();
String errorMessage = error.getMessage() != null
? "Stream error: " + error.getMessage()
: "Stream error: " + (error.getCode() != null ? "code " + error.getCode() : "unknown");
errorConsumer.accept(new ApiException(errorMessage));
}
} else if (streamResult.getResult() != null) {
// Deliver the response object to the consumer
StreamedListObjectsResponse result = streamResult.getResult();
if (result != null) {
consumer.accept(result);
}
}
} catch (Exception e) {
if (errorConsumer != null) {
errorConsumer.accept(e);
}
}
}

private HttpRequest buildHttpRequest(String method, String path, Object body, Configuration configuration)
throws ApiException, FgaInvalidParameterException {
try {
byte[] bodyBytes = objectMapper.writeValueAsBytes(body);
HttpRequest.Builder requestBuilder = ApiClient.requestBuilder(method, path, bodyBytes, configuration);

// Apply request interceptors if any
var interceptor = apiClient.getRequestInterceptor();
if (interceptor != null) {
interceptor.accept(requestBuilder);
}

return requestBuilder.build();
} catch (Exception e) {
throw new ApiException(e);
}
}
}
Loading
Loading