Skip to content

Ability to close server side of BidiStream from separate goroutine #823

@anuraaga

Description

@anuraaga

Is your feature request related to a problem? Please describe.

I am implementing a BidiStream which maps messages back and forth with a separate service that is similar to a BiDiStream (technically it's a websocket service). I couldn't find a way to receive messages from the other service in a separate goroutine since it would have no way to terminate a stream.Receive call AFAICT.

func (h *Handler) Chat(ctx context.Context, stream *connect.BidiStream[frontendapi.ChatRequest, frontendapi.ChatResponse]) error {
sess := genai.Connect(...)
defer sess.Close()
go func() {
	for {
		msg, err := sess.Receive()
		if err != nil {
			// Want to close BidiStream
			println(err)
			return
		}
		for _, p := range msg.ServerContent.ModelTurn.Parts {
			if p.InlineData == nil {
				continue
			}
			if !strings.HasPrefix(p.InlineData.MIMEType, "audio/") {
				continue
			}
			if err := stream.Send(&frontendapi.ChatResponse{
				Content: &frontendapi.ChatContent{
					Payload: &frontendapi.ChatContent_Audio{
						Audio: p.InlineData.Data,
					},
				},
			}); err != nil {
				println(err)
				return
			}
		}
	}
}()

for {
	msg, err := stream.Receive() // BiDiStream is stuck here and can't be canceled I think
	if errors.Is(err, io.EOF) {
		return nil
	}
	if err != nil {
		return fmt.Errorf("chat: receiving message: %w", err)
	}
	if p, ok := msg.GetContent().GetPayload().(*frontendapi.ChatContent_Audio); ok {
		if err := sess.Send(&genai.LiveClientMessage{
			RealtimeInput: &genai.LiveClientRealtimeInput{
				MediaChunks: []*genai.Blob{
					{
						MIMEType: "audio/wav",
						Data:     p.Audio,
					},
				},
			},
		}); err != nil {
			return fmt.Errorf("chat: sending audio to genai: %w", err)
		}
	}
}

For an error case I want to end this BiDiStream, but with stream.Receive() blocking for a message, it doesn't seem to be possible. While a client has close methods on the stream, server does not.

Describe the solution you'd like

Either is fine

  • Add Close(error) on BiDiStream. Pending Receive() will return with the error. If error is not accepted by Close, then a generic error that can be errors.Is can be returned instead
  • Add ctx to Receive (Likely a new method like ReceiveContext(ctx), when the context is canceled the receive method returns. This was brought up in Support context.Context in BidiStream's Receive() #735 but IIUC it was a somewhat different case than this one. But I guess with current golang stdlib this may not be implementable (more direct access to the http2 stream would be needed for proper cancellation)

Describe alternatives you've considered
If you've proposed a solution, are there any alternatives? Why are they worse
than your preferred approach?

Always call stream.Receive in another goroutine, instead of vice-versa. In my example, sess does have a Close method which would cause sess.Receive() in the main goroutine to return with error, similar to my proposal. And if it was otherwise a connect-go BidiStream client, it would also be closeable. I don't know if there is some hidden "always receive client messages in another goroutine" connect best practice, but intuitively there shouldn't be I guess.

Additional context
Add any other context or screenshots about the feature request here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions