-
Notifications
You must be signed in to change notification settings - Fork 134
Description
Is your feature request related to a problem? Please describe.
I am implementing a BidiStream which maps messages back and forth with a separate service that is similar to a BiDiStream (technically it's a websocket service). I couldn't find a way to receive messages from the other service in a separate goroutine since it would have no way to terminate a stream.Receive call AFAICT.
func (h *Handler) Chat(ctx context.Context, stream *connect.BidiStream[frontendapi.ChatRequest, frontendapi.ChatResponse]) error {
sess := genai.Connect(...)
defer sess.Close()
go func() {
for {
msg, err := sess.Receive()
if err != nil {
// Want to close BidiStream
println(err)
return
}
for _, p := range msg.ServerContent.ModelTurn.Parts {
if p.InlineData == nil {
continue
}
if !strings.HasPrefix(p.InlineData.MIMEType, "audio/") {
continue
}
if err := stream.Send(&frontendapi.ChatResponse{
Content: &frontendapi.ChatContent{
Payload: &frontendapi.ChatContent_Audio{
Audio: p.InlineData.Data,
},
},
}); err != nil {
println(err)
return
}
}
}
}()
for {
msg, err := stream.Receive() // BiDiStream is stuck here and can't be canceled I think
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return fmt.Errorf("chat: receiving message: %w", err)
}
if p, ok := msg.GetContent().GetPayload().(*frontendapi.ChatContent_Audio); ok {
if err := sess.Send(&genai.LiveClientMessage{
RealtimeInput: &genai.LiveClientRealtimeInput{
MediaChunks: []*genai.Blob{
{
MIMEType: "audio/wav",
Data: p.Audio,
},
},
},
}); err != nil {
return fmt.Errorf("chat: sending audio to genai: %w", err)
}
}
}For an error case I want to end this BiDiStream, but with stream.Receive() blocking for a message, it doesn't seem to be possible. While a client has close methods on the stream, server does not.
Describe the solution you'd like
Either is fine
- Add
Close(error)onBiDiStream. PendingReceive()will return with theerror. Iferroris not accepted byClose, then a generic error that can beerrors.Iscan be returned instead - Add
ctxtoReceive(Likely a new method likeReceiveContext(ctx), when the context is canceled the receive method returns. This was brought up in Support context.Context in BidiStream's Receive() #735 but IIUC it was a somewhat different case than this one. But I guess with current golang stdlib this may not be implementable (more direct access to the http2 stream would be needed for proper cancellation)
Describe alternatives you've considered
If you've proposed a solution, are there any alternatives? Why are they worse
than your preferred approach?
Always call stream.Receive in another goroutine, instead of vice-versa. In my example, sess does have a Close method which would cause sess.Receive() in the main goroutine to return with error, similar to my proposal. And if it was otherwise a connect-go BidiStream client, it would also be closeable. I don't know if there is some hidden "always receive client messages in another goroutine" connect best practice, but intuitively there shouldn't be I guess.
Additional context
Add any other context or screenshots about the feature request here.