Skip to content

Commit 7a74f98

Browse files
committed
add loop in reponse to deal with channels closing
1 parent 9a77969 commit 7a74f98

File tree

1 file changed

+21
-5
lines changed

1 file changed

+21
-5
lines changed

protoc-gen-go/rrpc/rrpc.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ func (g *rrpc) generateClientRequestReplyFunction(service *descriptor.ServiceDes
170170
g.P("}")
171171
g.P()
172172
g.P("payloads, errors := c.client.InvokeRequestResponse(ctx, ", service.GetName(), "ServiceName, ", method.GetName(), "FunctionName, &d, opts...)")
173-
g.P()
173+
g.P("loop:")
174+
g.P("for {")
174175
g.P("select {")
175176
g.P("case p, ok := <-payloads:")
176177
g.P("if ok {")
@@ -180,12 +181,19 @@ func (g *rrpc) generateClientRequestReplyFunction(service *descriptor.ServiceDes
180181
g.P("e := proto.Unmarshal(data, res)")
181182
g.P("if e != nil {")
182183
g.P("err <- e")
184+
g.P("break loop")
183185
g.P("} else {")
184186
g.P("response <- res")
185187
g.P("}")
188+
g.P("} else {")
189+
g.P("break loop")
186190
g.P("}")
187191
g.P("case e := <-errors:")
192+
g.P("if err != nil {")
188193
g.P("err <- e")
194+
g.P("break loop")
195+
g.P("}")
196+
g.P("}")
189197
g.P("}")
190198
g.P()
191199
g.P("return response, err")
@@ -375,9 +383,10 @@ func (g *rrpc) generateServerRequestResponse(service *descriptor.ServiceDescript
375383
continue
376384
}
377385

378-
var method = method
379-
var in = "_in" + strconv.Itoa(i)
380-
var out = "_out" + strconv.Itoa(i)
386+
var method= method
387+
var in= "_in" + strconv.Itoa(i)
388+
var out= "_out" + strconv.Itoa(i)
389+
var loop= "_loop" + strconv.Itoa(i)
381390

382391
g.P("case ", method.GetName(), "FunctionName:")
383392
g.P(in, " := &", cleanupType(method.GetInputType()), "{}")
@@ -392,6 +401,8 @@ func (g *rrpc) generateServerRequestResponse(service *descriptor.ServiceDescript
392401
g.P("}")
393402
g.P("}()")
394403
g.P(out, ", err := p.pp.", strings.Title(method.GetName()), "(ctx, ", in, ", ud)")
404+
g.P(loop, ":")
405+
g.P("for {")
395406
g.P("select {")
396407
g.P("case <-ctx.Done():")
397408
g.P("case r, ok := <-", out, ":")
@@ -401,11 +412,17 @@ func (g *rrpc) generateServerRequestResponse(service *descriptor.ServiceDescript
401412
g.P("sink.Error(e)")
402413
g.P("} else {")
403414
g.P("sink.Success(payload.New(bytes, nil))")
415+
g.P("break ", loop)
404416
g.P("}")
417+
g.P("} else {")
418+
g.P("break ", loop)
405419
g.P("}")
406420
g.P("case e := <-err:")
421+
g.P("if e != nil {")
407422
g.P("sink.Error(e)")
408423
g.P("}")
424+
g.P("}")
425+
g.P("}")
409426
}
410427
g.P("}")
411428
g.P("})")
@@ -536,7 +553,6 @@ func (g *rrpc) generateServerRequestChannel(service *descriptor.ServiceDescripto
536553

537554
var method = method
538555
var in = "_in" + strconv.Itoa(i)
539-
//var out = "_out" + strconv.Itoa(i)
540556

541557
g.P("case ", method.GetName(), "FunctionName:")
542558
g.P(in, " := &", cleanupType(method.GetInputType()), "{}")

0 commit comments

Comments
 (0)