Skip to content

Commit ca22a76

Browse files
committed
fix and improve ping logic.
- previously the connection would close if an invalid ping was received, now it only closes if the pong is invalid - we now calculate latency and publish a telemetry event Signed-off-by: Alejandro M. Ramallo <[email protected]>
1 parent 3df3d04 commit ca22a76

File tree

3 files changed

+137
-49
lines changed

3 files changed

+137
-49
lines changed

include/partisan_peer_socket.hrl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,15 @@
22
-define(DATA_MSG(Tag), Tag == tcp orelse Tag == ssl).
33
-define(ERROR_MSG(Tag), Tag == tcp_error orelse Tag == ssl_error).
44
-define(CLOSED_MSG(Tag), Tag == tcp_closed orelse Tag == ssl_closed).
5+
6+
-record(ping, {
7+
from :: node(),
8+
id :: partisan:reference(),
9+
timestamp :: non_neg_integer()
10+
}).
11+
12+
-record(pong, {
13+
from :: node(),
14+
id :: partisan:reference(),
15+
timestamp :: non_neg_integer()
16+
}).

src/partisan_peer_service_client.erl

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
ping_idle_timeout :: non_neg_integer(),
3838
ping_tref :: optional(partisan_remote_ref:r()),
3939
ping_retry :: optional(partisan_retry:t()),
40-
ping_payload :: binary()
40+
ping_id :: optional(partisan:reference())
4141
}).
4242

4343
-type state() :: #state{}.
@@ -168,7 +168,7 @@ handle_call({send_message, Msg}, _From, #state{} = State) ->
168168

169169
Data = partisan_util:encode(Msg, State#state.encoding_opts),
170170

171-
case send(State#state.socket, Data) of
171+
case send_data(State#state.socket, Data) of
172172
ok ->
173173
?LOG_TRACE("Dispatched message: ~p", [Msg]),
174174
{reply, ok, State};
@@ -196,7 +196,7 @@ handle_cast({send_message, Msg}, #state{} = State) ->
196196

197197
Data = partisan_util:encode(Msg, State#state.encoding_opts),
198198

199-
case send(State#state.socket, Data) of
199+
case send_data(State#state.socket, Data) of
200200
ok ->
201201
?LOG_TRACE("Dispatched message: ~p", [Msg]),
202202
ok;
@@ -351,10 +351,15 @@ when is_atom(Channel), is_map(ChannelOpts) ->
351351

352352

353353
%% @private
354-
send(undefined, _) ->
354+
send_message(Socket, Message) ->
355+
send_data(Socket, erlang:term_to_iovec(Message)).
356+
357+
358+
%% @private
359+
send_data(undefined, _) ->
355360
{error, no_socket};
356361

357-
send(Socket, Data) ->
362+
send_data(Socket, Data) ->
358363
partisan_peer_socket:send(Socket, Data).
359364

360365

@@ -389,9 +394,9 @@ handle_inbound({state, Tag, LocalState}, #state{} = State) ->
389394

390395
handle_inbound({hello, Node}, #state{peer = #{name := Node}} = State) ->
391396
Socket = State#state.socket,
392-
Msg = term_to_binary({hello, partisan:node(), State#state.channel}),
397+
Msg = {hello, partisan:node(), State#state.channel},
393398

394-
case send(Socket, Msg) of
399+
case send_message(Socket, Msg) of
395400
ok ->
396401
{noreply, reset_ping(State)};
397402

@@ -414,21 +419,36 @@ when A =/= B ->
414419
}),
415420
{stop, {unexpected_peer, A, B}, State};
416421

417-
handle_inbound({ping, Node, Payload}, #state{peer = Node} = State) ->
418-
send(State#state.socket, term_to_binary({pong, Node, Payload})),
422+
handle_inbound(
423+
#ping{from = Node} = Ping, #state{peer = #{name := Node}} = State) ->
424+
ok = send_pong(State, Ping),
419425
{noreply, reset_ping(State)};
420426

421-
handle_inbound({ping, _, _}, #state{} = State) ->
422-
{stop, invalid_ping, State};
427+
handle_inbound(#ping{} = Ping, #state{} = State) ->
428+
?LOG_WARNING(#{
429+
description => "Received invalid ping message",
430+
message => Ping
431+
}),
432+
{noreply, State};
423433

424-
handle_inbound({pong, Node, _}, #state{} = State) ->
425-
case Node == partisan:node() of
426-
true ->
427-
{noreply, reset_ping(State)};
434+
handle_inbound(
435+
#pong{from = Node, id = Id, timestamp = Ts},
436+
#state{peer = #{name := Node}, ping_id = Id} = State) ->
437+
ok = telemetry:execute(
438+
[partisan, connection, client, hearbeat],
439+
#{latency => erlang:system_time(millisecond) - Ts},
440+
#{
441+
node => partisan:node(),
442+
channel => State#state.channel,
443+
listen_addr => State#state.listen_addr,
444+
socket => State#state.socket,
445+
peer_node => Node
446+
}
447+
),
448+
{noreply, reset_ping(State)};
428449

429-
false ->
430-
{stop, invalid_ping_response, State}
431-
end;
450+
handle_inbound(#pong{} = Pong, #state{} = State) ->
451+
{stop, {invalid_ping_response, Pong}, State};
432452

433453
handle_inbound(Msg, State) ->
434454
?LOG_WARNING(#{
@@ -467,7 +487,6 @@ maybe_enable_ping(State, #{enabled := true} = PingOpts) ->
467487

468488
State#state{
469489
ping_idle_timeout = IdleTimeout,
470-
ping_payload = partisan:make_ref(),
471490
ping_retry = Retry
472491
};
473492

@@ -510,10 +529,31 @@ when Limit == deadline orelse Limit == max_retries ->
510529
{stop, {shutdown, ping_timeout}, State};
511530

512531
maybe_send_ping(_Time, #state{} = State0) ->
513-
Data = term_to_binary({ping, partisan:node(), State0#state.ping_payload}),
514-
ok = send(State0#state.socket, Data),
532+
State = send_ping(State0),
533+
{noreply, State}.
534+
535+
536+
%% @private
537+
send_ping(State) ->
538+
Ref = partisan:make_ref(),
539+
Ping = #ping{
540+
from = partisan:node(),
541+
id = Ref,
542+
timestamp = erlang:system_time(millisecond)
543+
},
544+
ok = send_message(State#state.socket, Ping),
515545

516546
%% We schedule the next retry
517-
Ref = partisan_retry:fire(State0#state.ping_retry),
518-
State = State0#state{ping_tref = Ref},
519-
{noreply, State}.
547+
Tref = partisan_retry:fire(State#state.ping_retry),
548+
State#state{ping_id = Ref, ping_tref = Tref}.
549+
550+
551+
%% @private
552+
send_pong(State, #ping{id = Id, timestamp = Ts}) ->
553+
Pong = #pong{
554+
from = partisan:node(),
555+
id = Id,
556+
timestamp = Ts
557+
},
558+
send_message(State#state.socket, Pong).
559+

src/partisan_peer_service_server.erl

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
-record(state, {
1818
socket :: partisan_peer_socket:t(),
19-
peer :: node(),
19+
peer_node :: node(),
2020
channel :: partisan:channel(),
2121
ref :: reference(),
2222
ping_idle_timeout :: non_neg_integer(),
2323
ping_tref :: optional(partisan_remote_ref:r()),
2424
ping_retry :: optional(partisan_retry:t()),
25-
ping_payload :: binary()
25+
ping_id :: optional(partisan:reference())
2626
}).
2727

2828
-type state_t() :: #state{}.
@@ -125,7 +125,7 @@ handle_info(
125125
{timeout, Ref, ping_idle_timeout}, #state{ping_tref = Ref} = State) ->
126126
?LOG_INFO(#{
127127
description => "Connection idle, sending ping",
128-
peer => State#state.peer,
128+
peer_node => State#state.peer_node,
129129
channel => State#state.channel,
130130
attempt => partisan_retry:count(State#state.ping_retry)
131131
}),
@@ -171,7 +171,7 @@ handle_inbound({hello, Node, Channel}, #state{} = State0) ->
171171
put({?MODULE, peer}, Node),
172172
put({?MODULE, channel}, Channel),
173173

174-
State = State0#state{peer = Node},
174+
State = State0#state{peer_node = Node},
175175

176176
%% Connect the node with Distributed Erlang, just for now for
177177
%% control messaging in the test suite execution.
@@ -192,27 +192,40 @@ handle_inbound({hello, Node, Channel}, #state{} = State0) ->
192192
{noreply, reset_ping(State)};
193193

194194

195-
handle_inbound({ping, Node, Payload}, #state{peer = Node} = State) ->
196-
send_message(State#state.socket, {pong, Node, Payload}),
195+
handle_inbound(#ping{from = Node} = Ping, #state{peer_node = Node} = State) ->
196+
ok = send_pong(State, Ping),
197197
{noreply, reset_ping(State)};
198198

199-
handle_inbound({ping, _, _}, #state{} = State) ->
200-
{stop, invalid_ping, State};
199+
handle_inbound(#ping{} = Ping, #state{} = State) ->
200+
?LOG_WARNING(#{
201+
description => "Received invalid ping message",
202+
message => Ping
203+
}),
204+
{noreply, State};
201205

202-
handle_inbound({pong, Node, _}, #state{} = State) ->
203-
case Node == partisan:node() of
204-
true ->
205-
{noreply, reset_ping(State)};
206+
handle_inbound(
207+
#pong{from = Node, id = Id, timestamp = Ts},
208+
#state{peer_node = Node, ping_id = Id} = State) ->
209+
ok = telemetry:execute(
210+
[partisan, connection, server, hearbeat],
211+
#{latency => erlang:system_time(millisecond) - Ts},
212+
#{
213+
node => partisan:node(),
214+
channel => State#state.channel,
215+
socket => State#state.socket,
216+
peer_node => Node
217+
}
218+
),
219+
{noreply, reset_ping(State)};
206220

207-
false ->
208-
{stop, invalid_ping_response, State}
209-
end;
221+
handle_inbound(#pong{} = Pong, #state{} = State) ->
222+
{stop, {invalid_ping_response, Pong}, State};
210223

211224
handle_inbound(Message, State) ->
212-
Peer = get({?MODULE, peer}),
225+
PeerNode = get({?MODULE, peer_node}),
213226
Channel = get({?MODULE, channel}),
214227
Manager = partisan_peer_service:manager(),
215-
_ = Manager:receive_message(Peer, Channel, Message),
228+
_ = Manager:receive_message(PeerNode, Channel, Message),
216229
?LOG_TRACE("Dispatched ~p to manager.", [Message]),
217230
{noreply, reset_ping(State)}.
218231

@@ -236,8 +249,11 @@ maybe_delay() ->
236249

237250
%% @private
238251
send_message(Socket, Message) ->
239-
EncodedMessage = erlang:term_to_binary(Message),
240-
partisan_peer_socket:send(Socket, EncodedMessage).
252+
Data = erlang:term_to_iovec(Message),
253+
send_data(Socket, Data).
254+
255+
send_data(Socket, Data) ->
256+
partisan_peer_socket:send(Socket, Data).
241257

242258

243259
%% @private
@@ -278,7 +294,7 @@ maybe_enable_ping(State, #{enabled := true} = PingOpts) ->
278294

279295
State#state{
280296
ping_idle_timeout = IdleTimeout,
281-
ping_payload = partisan:make_ref(),
297+
ping_id = partisan:make_ref(),
282298
ping_retry = Retry
283299
};
284300

@@ -328,10 +344,30 @@ when Limit == deadline orelse Limit == max_retries ->
328344
{stop, {shutdown, ping_timeout}, State};
329345

330346
maybe_send_ping(_Time, #state{} = State0) ->
331-
Data = {ping, partisan:node(), State0#state.ping_payload},
332-
ok = send_message(State0#state.socket, Data),
347+
State = send_ping(State0),
348+
{noreply, State}.
349+
350+
351+
352+
%% @private
353+
send_ping(State) ->
354+
Ref = partisan:make_ref(),
355+
Ping = #ping{
356+
from = partisan:node(),
357+
id = Ref,
358+
timestamp = erlang:system_time(millisecond)
359+
},
360+
ok = send_message(State#state.socket, Ping),
333361

334362
%% We schedule the next retry
335-
Ref = partisan_retry:fire(State0#state.ping_retry),
336-
State = State0#state{ping_tref = Ref},
337-
{noreply, State}.
363+
Tref = partisan_retry:fire(State#state.ping_retry),
364+
State#state{ping_id = Ref, ping_tref = Tref}.
365+
366+
%% @private
367+
send_pong(State, #ping{id = Id, timestamp = Ts}) ->
368+
Pong = #pong{
369+
from = partisan:node(),
370+
id = Id,
371+
timestamp = Ts
372+
},
373+
send_message(State#state.socket, Pong).

0 commit comments

Comments
 (0)