Skip to content

Commit f95f898

Browse files
authored
feat: streaming endpoint utils, using templates for streaming (#666)
2 parents 48ad3c4 + b8d5a96 commit f95f898

File tree

3 files changed

+673
-0
lines changed

3 files changed

+673
-0
lines changed

config/clients/go/config.overrides.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@
2020
"api_client.mustache": {
2121
"destinationFilename": "api_client.go",
2222
"templateType": "SupportingFiles"
23+
},
24+
"streaming.mustache": {
25+
"destinationFilename": "streaming.go",
26+
"templateType": "SupportingFiles"
27+
},
28+
"streaming_test.mustache": {
29+
"destinationFilename": "streaming_test.go",
30+
"templateType": "SupportingFiles"
2331
}
2432
}
2533
}
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
{{>partial_header}}
2+
package {{packageName}}
3+
4+
import (
5+
"bufio"
6+
"context"
7+
"encoding/json"
8+
"errors"
9+
"io"
10+
"net/http"
11+
"net/url"
12+
"strings"
13+
)
14+
15+
// StreamResult represents a generic streaming result wrapper with either a result or an error
16+
type StreamResult[T any] struct {
17+
Result *T `json:"result,omitempty" yaml:"result,omitempty"`
18+
Error *Status `json:"error,omitempty" yaml:"error,omitempty"`
19+
}
20+
21+
// StreamingChannel represents a generic channel for streaming responses
22+
type StreamingChannel[T any] struct {
23+
Results chan T
24+
Errors chan error
25+
cancel context.CancelFunc
26+
}
27+
28+
// Close cancels the streaming context and cleans up resources
29+
func (s *StreamingChannel[T]) Close() {
30+
if s.cancel != nil {
31+
s.cancel()
32+
}
33+
}
34+
35+
// ProcessStreamingResponse processes an HTTP response as a streaming NDJSON response
36+
// and returns a StreamingChannel with results and errors
37+
//
38+
// Parameters:
39+
// - ctx: The context for cancellation
40+
// - httpResponse: The HTTP response to process
41+
// - bufferSize: The buffer size for the channels (default 10 if <= 0)
42+
//
43+
// Returns:
44+
// - *StreamingChannel[T]: A channel containing streaming results and errors
45+
// - error: An error if the response is invalid
46+
func ProcessStreamingResponse[T any](ctx context.Context, httpResponse *http.Response, bufferSize int) (*StreamingChannel[T], error) {
47+
streamCtx, cancel := context.WithCancel(ctx)
48+
49+
// Use default buffer size of 10 if not specified or invalid
50+
if bufferSize <= 0 {
51+
bufferSize = 10
52+
}
53+
54+
channel := &StreamingChannel[T]{
55+
Results: make(chan T, bufferSize),
56+
Errors: make(chan error, 1),
57+
cancel: cancel,
58+
}
59+
60+
if httpResponse == nil || httpResponse.Body == nil {
61+
cancel()
62+
return nil, errors.New("response or response body is nil")
63+
}
64+
65+
go func() {
66+
defer close(channel.Results)
67+
defer close(channel.Errors)
68+
defer cancel()
69+
defer func() { _ = httpResponse.Body.Close() }()
70+
71+
scanner := bufio.NewScanner(httpResponse.Body)
72+
// Allow large NDJSON entries (up to 10MB). Tune as needed.
73+
buf := make([]byte, 0, 64*1024)
74+
scanner.Buffer(buf, 10*1024*1024)
75+
76+
for scanner.Scan() {
77+
select {
78+
case <-streamCtx.Done():
79+
channel.Errors <- streamCtx.Err()
80+
return
81+
default:
82+
line := scanner.Bytes()
83+
if len(line) == 0 {
84+
continue
85+
}
86+
87+
var streamResult StreamResult[T]
88+
if err := json.Unmarshal(line, &streamResult); err != nil {
89+
channel.Errors <- err
90+
return
91+
}
92+
93+
if streamResult.Error != nil {
94+
msg := "stream error"
95+
if streamResult.Error.Message != nil {
96+
msg = *streamResult.Error.Message
97+
}
98+
channel.Errors <- errors.New(msg)
99+
return
100+
}
101+
102+
if streamResult.Result != nil {
103+
select {
104+
case <-streamCtx.Done():
105+
channel.Errors <- streamCtx.Err()
106+
return
107+
case channel.Results <- *streamResult.Result:
108+
}
109+
}
110+
}
111+
}
112+
113+
if err := scanner.Err(); err != nil {
114+
// Prefer context error if we were canceled to avoid surfacing net/http "use of closed network connection".
115+
if streamCtx.Err() != nil {
116+
channel.Errors <- streamCtx.Err()
117+
return
118+
}
119+
channel.Errors <- err
120+
}
121+
}()
122+
123+
return channel, nil
124+
}
125+
126+
// StreamedListObjectsChannel maintains backward compatibility with the old channel structure
127+
type StreamedListObjectsChannel struct {
128+
Objects chan StreamedListObjectsResponse
129+
Errors chan error
130+
cancel context.CancelFunc
131+
}
132+
133+
// Close cancels the streaming context and cleans up resources
134+
func (s *StreamedListObjectsChannel) Close() {
135+
if s.cancel != nil {
136+
s.cancel()
137+
}
138+
}
139+
140+
// ProcessStreamedListObjectsResponse processes a StreamedListObjects response
141+
// This is a backward compatibility wrapper around ProcessStreamingResponse
142+
func ProcessStreamedListObjectsResponse(ctx context.Context, httpResponse *http.Response, bufferSize int) (*StreamedListObjectsChannel, error) {
143+
channel, err := ProcessStreamingResponse[StreamedListObjectsResponse](ctx, httpResponse, bufferSize)
144+
if err != nil {
145+
return nil, err
146+
}
147+
148+
// Create a new channel with the old field name for backward compatibility
149+
compatChannel := &StreamedListObjectsChannel{
150+
Objects: channel.Results,
151+
Errors: channel.Errors,
152+
cancel: channel.cancel,
153+
}
154+
155+
return compatChannel, nil
156+
}
157+
158+
// ExecuteStreamedListObjects executes a StreamedListObjects request
159+
func ExecuteStreamedListObjects(client *APIClient, ctx context.Context, storeId string, body ListObjectsRequest, options RequestOptions) (*StreamedListObjectsChannel, error) {
160+
return ExecuteStreamedListObjectsWithBufferSize(client, ctx, storeId, body, options, 0)
161+
}
162+
163+
// ExecuteStreamedListObjectsWithBufferSize executes a StreamedListObjects request with a custom buffer size
164+
func ExecuteStreamedListObjectsWithBufferSize(client *APIClient, ctx context.Context, storeId string, body ListObjectsRequest, options RequestOptions, bufferSize int) (*StreamedListObjectsChannel, error) {
165+
channel, err := executeStreamingRequest[ListObjectsRequest, StreamedListObjectsResponse](
166+
client,
167+
ctx,
168+
"/stores/{store_id}/streamed-list-objects",
169+
storeId,
170+
body,
171+
options,
172+
bufferSize,
173+
"StreamedListObjects",
174+
)
175+
if err != nil {
176+
return nil, err
177+
}
178+
179+
// Convert to backward-compatible channel structure
180+
return &StreamedListObjectsChannel{
181+
Objects: channel.Results,
182+
Errors: channel.Errors,
183+
cancel: channel.cancel,
184+
}, nil
185+
}
186+
187+
// executeStreamingRequest is a generic function to execute streaming requests
188+
func executeStreamingRequest[TReq any, TRes any](
189+
client *APIClient,
190+
ctx context.Context,
191+
pathTemplate string,
192+
storeId string,
193+
body TReq,
194+
options RequestOptions,
195+
bufferSize int,
196+
operationName string,
197+
) (*StreamingChannel[TRes], error) {
198+
if storeId == "" {
199+
return nil, reportError("storeId is required and must be specified")
200+
}
201+
202+
path := pathTemplate
203+
path = strings.ReplaceAll(path, "{"+"store_id"+"}", url.PathEscape(parameterToString(storeId, "")))
204+
205+
localVarHeaderParams := make(map[string]string)
206+
localVarQueryParams := url.Values{}
207+
208+
localVarHTTPContentType := "application/json"
209+
localVarHeaderParams["Content-Type"] = localVarHTTPContentType
210+
localVarHeaderParams["Accept"] = "application/x-ndjson"
211+
212+
for header, val := range options.Headers {
213+
localVarHeaderParams[header] = val
214+
}
215+
216+
req, err := client.prepareRequest(ctx, path, http.MethodPost, body, localVarHeaderParams, localVarQueryParams)
217+
if err != nil {
218+
return nil, err
219+
}
220+
221+
httpResponse, err := client.callAPI(req)
222+
if err != nil {
223+
return nil, err
224+
}
225+
if httpResponse == nil {
226+
return nil, errors.New("nil HTTP response from API client")
227+
}
228+
229+
if httpResponse.StatusCode >= http.StatusMultipleChoices {
230+
responseBody, readErr := io.ReadAll(httpResponse.Body)
231+
_ = httpResponse.Body.Close()
232+
if readErr != nil {
233+
return nil, readErr
234+
}
235+
err = client.handleAPIError(httpResponse, responseBody, body, operationName, storeId)
236+
return nil, err
237+
}
238+
239+
return ProcessStreamingResponse[TRes](ctx, httpResponse, bufferSize)
240+
}
241+
242+

0 commit comments

Comments
 (0)