44
55use super :: RequestBuilder ;
66use crate :: { RpcChannel , RpcError , RpcMessage , RpcResult } ;
7- use futures:: { Future , FutureExt , StreamExt , TryFutureExt , TryStreamExt } ;
8- use hyper:: { http, rt , Client , Request , Uri } ;
7+ use futures:: { future , Future , FutureExt , StreamExt , TryFutureExt } ;
8+ use hyper:: { http, Client , Request , Uri } ;
99
1010/// Create a HTTP Client
11- pub fn connect < TClient > ( url : & str ) -> impl Future < Output = RpcResult < TClient > >
11+ pub async fn connect < TClient > ( url : & str ) -> RpcResult < TClient >
1212where
1313 TClient : From < RpcChannel > ,
1414{
15- let ( sender, receiver) = futures:: channel:: oneshot:: channel ( ) ;
16- let url = url. to_owned ( ) ;
17-
18- std:: thread:: spawn ( move || {
19- let connect = rt:: lazy ( move || {
20- do_connect ( & url)
21- . map ( |client| {
22- if sender. send ( client) . is_err ( ) {
23- panic ! ( "The caller did not wait for the server." ) ;
24- }
25- Ok ( ( ) )
26- } )
27- . compat ( )
28- } ) ;
29- rt:: run ( connect) ;
30- } ) ;
15+ let url: Uri = url. parse ( ) . map_err ( |e| RpcError :: Other ( Box :: new ( e) ) ) ?;
3116
32- receiver . map ( |res| res . expect ( "Server closed prematurely." ) . map ( TClient :: from ) )
33- }
17+ let ( client_api , client_worker ) = do_connect ( url ) . await ;
18+ tokio :: spawn ( client_worker ) ;
3419
35- fn do_connect ( url : & str ) -> impl Future < Output = RpcResult < RpcChannel > > {
36- use futures :: future :: ready ;
20+ Ok ( TClient :: from ( client_api ) )
21+ }
3722
23+ async fn do_connect ( url : Uri ) -> ( RpcChannel , impl Future < Output = ( ) > ) {
3824 let max_parallel = 8 ;
39- let url: Uri = match url. parse ( ) {
40- Ok ( url) => url,
41- Err ( e) => return ready ( Err ( RpcError :: Other ( Box :: new ( e) ) ) ) ,
42- } ;
4325
4426 #[ cfg( feature = "tls" ) ]
45- let connector = match hyper_tls:: HttpsConnector :: new ( 4 ) {
46- Ok ( connector) => connector,
47- Err ( e) => return ready ( Err ( RpcError :: Other ( Box :: new ( e) ) ) ) ,
48- } ;
27+ let connector = hyper_tls:: HttpsConnector :: new ( ) ;
4928 #[ cfg( feature = "tls" ) ]
5029 let client = Client :: builder ( ) . build :: < _ , hyper:: Body > ( connector) ;
5130
5231 #[ cfg( not( feature = "tls" ) ) ]
5332 let client = Client :: new ( ) ;
54-
33+ // Keep track of internal request IDs when building subsequent requests
5534 let mut request_builder = RequestBuilder :: new ( ) ;
5635
5736 let ( sender, receiver) = futures:: channel:: mpsc:: unbounded ( ) ;
5837
59- use futures01:: { Future , Stream } ;
6038 let fut = receiver
61- . map ( Ok )
62- . compat ( )
6339 . filter_map ( move |msg : RpcMessage | {
64- let ( request , sender ) = match msg {
40+ future :: ready ( match msg {
6541 RpcMessage :: Call ( call) => {
6642 let ( _, request) = request_builder. call_request ( & call) ;
67- ( request, Some ( call. sender ) )
43+ Some ( ( request, Some ( call. sender ) ) )
6844 }
69- RpcMessage :: Notify ( notify) => ( request_builder. notification ( & notify) , None ) ,
45+ RpcMessage :: Notify ( notify) => Some ( ( request_builder. notification ( & notify) , None ) ) ,
7046 RpcMessage :: Subscribe ( _) => {
7147 log:: warn!( "Unsupported `RpcMessage` type `Subscribe`." ) ;
72- return None ;
48+ None
7349 }
74- } ;
75-
50+ } )
51+ } )
52+ . map ( move |( request, sender) | {
7653 let request = Request :: post ( & url)
7754 . header (
7855 http:: header:: CONTENT_TYPE ,
@@ -85,46 +62,42 @@ fn do_connect(url: &str) -> impl Future<Output = RpcResult<RpcChannel>> {
8562 . body ( request. into ( ) )
8663 . expect ( "Uri and request headers are valid; qed" ) ;
8764
88- Some ( client. request ( request) . then ( move |response| Ok ( ( response, sender) ) ) )
65+ client
66+ . request ( request)
67+ . then ( |response| async move { ( response, sender) } )
8968 } )
9069 . buffer_unordered ( max_parallel)
91- . for_each ( |( result, sender) | {
92- use futures01:: future:: {
93- self ,
94- Either :: { A , B } ,
95- } ;
96- let future = match result {
70+ . for_each ( |( response, sender) | async {
71+ let result = match response {
9772 Ok ( ref res) if !res. status ( ) . is_success ( ) => {
9873 log:: trace!( "http result status {}" , res. status( ) ) ;
99- A ( future :: err ( RpcError :: Client ( format ! (
74+ Err ( RpcError :: Client ( format ! (
10075 "Unexpected response status code: {}" ,
10176 res. status( )
102- ) ) ) )
77+ ) ) )
78+ }
79+ Err ( err) => Err ( RpcError :: Other ( Box :: new ( err) ) ) ,
80+ Ok ( res) => {
81+ hyper:: body:: to_bytes ( res. into_body ( ) )
82+ . map_err ( |e| RpcError :: ParseError ( e. to_string ( ) , Box :: new ( e) ) )
83+ . await
10384 }
104- Ok ( res) => B ( res
105- . into_body ( )
106- . map_err ( |e| RpcError :: ParseError ( e. to_string ( ) , Box :: new ( e) ) )
107- . concat2 ( ) ) ,
108- Err ( err) => A ( future:: err ( RpcError :: Other ( Box :: new ( err) ) ) ) ,
10985 } ;
110- future. then ( |result| {
111- if let Some ( sender) = sender {
112- let response = result
113- . and_then ( |response| {
114- let response_str = String :: from_utf8_lossy ( response. as_ref ( ) ) . into_owned ( ) ;
115- super :: parse_response ( & response_str)
116- } )
117- . and_then ( |r| r. 1 ) ;
118- if let Err ( err) = sender. send ( response) {
119- log:: warn!( "Error resuming asynchronous request: {:?}" , err) ;
120- }
86+
87+ if let Some ( sender) = sender {
88+ let response = result
89+ . and_then ( |response| {
90+ let response_str = String :: from_utf8_lossy ( response. as_ref ( ) ) . into_owned ( ) ;
91+ super :: parse_response ( & response_str)
92+ } )
93+ . and_then ( |r| r. 1 ) ;
94+ if let Err ( err) = sender. send ( response) {
95+ log:: warn!( "Error resuming asynchronous request: {:?}" , err) ;
12196 }
122- Ok ( ( ) )
123- } )
97+ }
12498 } ) ;
12599
126- rt:: spawn ( fut. map_err ( |e : RpcError | log:: error!( "RPC Client error: {:?}" , e) ) ) ;
127- ready ( Ok ( sender. into ( ) ) )
100+ ( sender. into ( ) , fut)
128101}
129102
130103#[ cfg( test) ]
@@ -218,7 +191,7 @@ mod tests {
218191 Ok ( ( ) ) as RpcResult < _ >
219192 } ;
220193
221- futures :: executor :: block_on ( run) . unwrap ( ) ;
194+ tokio :: runtime :: Runtime :: new ( ) . unwrap ( ) . block_on ( run) . unwrap ( ) ;
222195 }
223196
224197 #[ test]
@@ -227,18 +200,16 @@ mod tests {
227200
228201 // given
229202 let server = TestServer :: serve ( id) ;
230- let ( tx, rx) = std:: sync:: mpsc:: channel ( ) ;
231203
232204 // when
233- let run = async move {
205+ let run = async {
234206 let client: TestClient = connect ( & server. uri ) . await . unwrap ( ) ;
235207 client. notify ( 12 ) . unwrap ( ) ;
236- tx. send ( ( ) ) . unwrap ( ) ;
237208 } ;
238209
239- let pool = futures :: executor :: ThreadPool :: builder ( ) . pool_size ( 1 ) . create ( ) . unwrap ( ) ;
240- pool . spawn_ok ( run ) ;
241- rx . recv ( ) . unwrap ( ) ;
210+ tokio :: runtime :: Runtime :: new ( ) . unwrap ( ) . block_on ( run ) ;
211+ // Ensure that server has not been moved into runtime
212+ drop ( server ) ;
242213 }
243214
244215 #[ test]
@@ -249,7 +220,8 @@ mod tests {
249220 let invalid_uri = "invalid uri" ;
250221
251222 // when
252- let res: RpcResult < TestClient > = futures:: executor:: block_on ( connect ( invalid_uri) ) ;
223+ let fut = connect ( invalid_uri) ;
224+ let res: RpcResult < TestClient > = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) . block_on ( fut) ;
253225
254226 // then
255227 assert_matches ! (
@@ -271,7 +243,7 @@ mod tests {
271243 let client: TestClient = connect ( & server. uri ) . await ?;
272244 client. fail ( ) . await
273245 } ;
274- let res = futures :: executor :: block_on ( run) ;
246+ let res = tokio :: runtime :: Runtime :: new ( ) . unwrap ( ) . block_on ( run) ;
275247
276248 // then
277249 if let Err ( RpcError :: JsonRpcError ( err) ) = res {
@@ -312,6 +284,6 @@ mod tests {
312284 Ok ( ( ) ) as RpcResult < _ >
313285 } ;
314286
315- futures :: executor :: block_on ( run) . unwrap ( ) ;
287+ tokio :: runtime :: Runtime :: new ( ) . unwrap ( ) . block_on ( run) . unwrap ( ) ;
316288 }
317289}
0 commit comments