Skip to content

Commit ee55d28

Browse files
committed
write request before fist response
1 parent fb7360f commit ee55d28

File tree

4 files changed

+129
-93
lines changed

4 files changed

+129
-93
lines changed

lib/eio/client/client.ml

Lines changed: 95 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -121,92 +121,102 @@ module Bidirectional_streaming = struct
121121
(_, headers, stream_error, conn_error, net_response) result' =
122122
match call ~sw ~io ~service ~method_name ~headers () with
123123
| Ok { writer; recv; grpc_status; write_exn } -> (
124-
match Eio.Promise.await recv with
125-
| Ok { net_response; recv_seq; trailers } ->
126-
let (module Io') = io in
127-
if Io'.Net_response.is_ok net_response then (
128-
let error = ref None in
129-
let closed = ref false in
130-
let writer =
131-
{
132-
write = writer.write;
133-
close =
134-
(fun () ->
135-
writer.close ();
136-
closed := true);
137-
}
138-
in
139-
let rec read recv_seq' () =
140-
match recv_seq' () with
141-
| Grpc_eio_core.Recv_seq.Done -> Seq.Nil
142-
| Err e ->
143-
let () = error := Some e in
144-
Seq.Nil
145-
| Next (t, next) -> Seq.Cons (t, fun () -> read next ())
146-
in
124+
let closed = ref false in
125+
let writer =
126+
{
127+
write =
128+
(fun req ->
129+
let result = writer.write req in
147130

148-
let res = f net_response ~writer ~read:(read recv_seq) in
149-
if not !closed then writer.close ();
150-
match !error with
151-
| Some error ->
152-
`Stream_result
153-
{
154-
result = res;
155-
trailers = Eio.Promise.await trailers;
156-
err =
157-
Some
131+
result);
132+
close =
133+
(fun () ->
134+
writer.close ();
135+
closed := true);
136+
}
137+
in
138+
let error = ref None in
139+
let res =
140+
f
141+
(fun () ->
142+
match Eio.Promise.await recv with
143+
| Ok { net_response; _ } -> Ok net_response
144+
| Error e -> Error e)
145+
~writer
146+
~read:(fun () ->
147+
match Eio.Promise.await recv with
148+
| Ok { net_response; recv_seq; _ } ->
149+
let (module Io') = io in
150+
if Io'.Net_response.is_ok net_response then
151+
let rec read recv_seq' () =
152+
match recv_seq' () with
153+
| Grpc_eio_core.Recv_seq.Done -> Seq.Nil
154+
| Err e ->
155+
let () = error := Some e in
156+
Seq.Nil
157+
| Next (t, next) -> Seq.Cons (t, fun () -> read next ())
158+
in
159+
read recv_seq ()
160+
else Seq.Nil
161+
| Error _ -> Seq.Nil)
162+
in
163+
164+
match Eio.Promise.await recv with
165+
| Error _e -> Obj.magic ()
166+
| Ok { net_response = _; trailers; _ } -> (
167+
if not !closed then writer.close ();
168+
match !error with
169+
| Some error ->
170+
`Stream_result
171+
{
172+
result = res;
173+
trailers = Eio.Promise.await trailers;
174+
err =
175+
Some
176+
{
177+
stream_error = Some error;
178+
grpc_status = Eio.Promise.await grpc_status;
179+
write_exn = !write_exn;
180+
};
181+
}
182+
| None -> (
183+
let status = Eio.Promise.await grpc_status in
184+
match Grpc.Status.code status with
185+
| Grpc.Status.OK -> (
186+
match !write_exn with
187+
| None ->
188+
`Stream_result
158189
{
159-
stream_error = Some error;
160-
grpc_status = Eio.Promise.await grpc_status;
161-
write_exn = !write_exn;
162-
};
163-
}
164-
| None -> (
165-
let status = Eio.Promise.await grpc_status in
166-
match Grpc.Status.code status with
167-
| Grpc.Status.OK -> (
168-
match !write_exn with
169-
| None ->
170-
`Stream_result
171-
{
172-
result = res;
173-
err = None;
174-
trailers = Eio.Promise.await trailers;
175-
}
176-
| Some _ ->
177-
`Stream_result
190+
result = res;
191+
err = None;
192+
trailers = Eio.Promise.await trailers;
193+
}
194+
| Some _ ->
195+
`Stream_result
196+
{
197+
result = res;
198+
trailers = Eio.Promise.await trailers;
199+
err =
200+
Some
201+
{
202+
write_exn = !write_exn;
203+
grpc_status = Eio.Promise.await grpc_status;
204+
stream_error = None;
205+
};
206+
})
207+
| _ ->
208+
`Stream_result
209+
{
210+
result = res;
211+
trailers = Eio.Promise.await trailers;
212+
err =
213+
Some
178214
{
179-
result = res;
180-
trailers = Eio.Promise.await trailers;
181-
err =
182-
Some
183-
{
184-
write_exn = !write_exn;
185-
grpc_status = Eio.Promise.await grpc_status;
186-
stream_error = None;
187-
};
188-
})
189-
| _ ->
190-
`Stream_result
191-
{
192-
result = res;
193-
trailers = Eio.Promise.await trailers;
194-
err =
195-
Some
196-
{
197-
grpc_status = status;
198-
stream_error = None;
199-
write_exn = !write_exn;
200-
};
201-
}))
202-
else
203-
`Response_not_ok
204-
{
205-
net_response;
206-
grpc_status = Eio.Promise.await grpc_status;
207-
trailers = Eio.Promise.await trailers;
208-
}
209-
| Error e -> `Connection_error e)
215+
grpc_status = status;
216+
stream_error = None;
217+
write_exn = !write_exn;
218+
};
219+
})))
210220
| Error e -> `Connection_error e
211221
end
212222

@@ -281,8 +291,8 @@ module Unary = struct
281291
trailers = Eio.Promise.await trailers;
282292
}
283293
| _ ->
284-
(* Not reachable under normal circumstances
285-
https://github.com/grpc/grpc/issues/12824 *)
294+
(* Not reachable under normal circumstances
295+
https://github.com/grpc/grpc/issues/12824 *)
286296
`Response_not_ok
287297
{
288298
net_response;

lib/eio/client/client.mli

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ module Server_streaming : sig
167167
method_name:string ->
168168
headers:Grpc_client.request_headers ->
169169
'request ->
170-
('net_response -> read:(unit -> 'response Seq.node) -> 'a) ->
170+
((unit -> ('net_response, 'conn_err) result) ->
171+
read:(unit -> 'response Seq.node) ->
172+
'a) ->
171173
[ `Stream_result of ('a, 'headers, 'stream_error) streaming_result
172174
| `Write_error of ('stream_error, 'headers) streaming_err option * 'headers
173175
| ('net_response, 'headers, 'conn_err) common_error ]
@@ -177,7 +179,7 @@ module Bidirectional_streaming : sig
177179
type ('a, 'headers, 'stream_err, 'conn_err, 'net_response) result' =
178180
[ `Stream_result of ('a, 'headers, 'stream_err) streaming_result
179181
| ('net_response, 'headers, 'conn_err) common_error ]
180-
182+
(*
181183
val call :
182184
sw:Eio.Switch.t ->
183185
io:
@@ -191,7 +193,29 @@ module Bidirectional_streaming : sig
191193
service:string ->
192194
method_name:string ->
193195
headers:Grpc_client.request_headers ->
196+
?init_requests:'request Seq.t ->
194197
('net_response ->
198+
'request Seq.t ->
199+
writer:'request writer ->
200+
read:(unit -> 'response Seq.node) ->
201+
'a) ->
202+
('a, 'headers, 'stream_error, 'conn_error, 'net_response) result'
203+
*)
204+
205+
val call :
206+
sw:Eio.Switch.t ->
207+
io:
208+
( 'headers,
209+
'net_response,
210+
'request,
211+
'response,
212+
'stream_error,
213+
'conn_error )
214+
Io.t ->
215+
service:string ->
216+
method_name:string ->
217+
headers:Grpc_client.request_headers ->
218+
((unit -> ('net_response, 'conn_error) result) ->
195219
writer:'request writer ->
196220
read:(unit -> 'response Seq.node) ->
197221
'a) ->

lib/eio/core/recv_seq.ml

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
type ('a, 'err) t = unit -> ('a, 'err) recv_item
22
and ('a, 'err) recv_item = Done | Next of 'a * ('a, 'err) t | Err of 'err
33

4-
let rec map f recv =
5-
fun () ->
4+
let rec map f recv () =
65
match recv () with
76
| Done -> Done
87
| Next (x, recv) -> Next (f x, map f recv)
@@ -13,8 +12,12 @@ let to_seq ?err_to_exn recv =
1312
match recv () with
1413
| Done -> Seq.Nil
1514
| Next (x, recv) -> Seq.Cons (x, loop recv)
16-
| Err err -> match err_to_exn with
17-
| None -> failwith "Unexpected error on read. Implement err_to_exn for a more granular error."
18-
| Some f -> raise (f err)
15+
| Err err -> (
16+
match err_to_exn with
17+
| None ->
18+
failwith
19+
"Unexpected error on read. Implement err_to_exn for a more \
20+
granular error."
21+
| Some f -> raise (f err))
1922
in
2023
loop recv

lib/eio/io-server-h2-ocaml-protoc/io_server_h2_ocaml_protoc.ml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
exception Unexpected_eof
22

3-
43
module Io = struct
54
type request = Pbrt.Decoder.t Grpc_eio_core.Body_reader.consumer
65
type response = Pbrt.Encoder.t -> unit

0 commit comments

Comments
 (0)