Skip to content
This repository was archived by the owner on May 6, 2025. It is now read-only.

Commit a48db16

Browse files
authored
Merge pull request #6 from tidewave-ai/sd-receive-timeout
add configurable receive timeout for POST requests
2 parents a8fba00 + f8ffaf8 commit a48db16

File tree

5 files changed

+128
-35
lines changed

5 files changed

+128
-35
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,8 @@ Remember to replace `$HOME` by your home directory, such as "c:\Users\johndoe".
5151
## Configuration
5252

5353
`mcp-proxy` either accepts the SSE URL as argument or using the environment variable `SSE_URL`. For debugging purposes, you can also pass `--debug`, which will log debug messages on stderr.
54+
55+
Other supported flags:
56+
57+
* `--max-disconnected-time` the maximum amount of time for trying to reconnect while disconnected. When not set, defaults to infinity.
58+
* `--receive-timeout` the maximum amount of time to wait for an individual reply from the MCP server in milliseconds. Defaults to 60000 (60 seconds).

lib/mcp_proxy.ex

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,13 @@ defmodule McpProxy do
77
@doc false
88
def main(args) do
99
{opts, args} =
10-
OptionParser.parse!(args, strict: [debug: :boolean, max_disconnected_time: :integer])
10+
OptionParser.parse!(args,
11+
strict: [
12+
debug: :boolean,
13+
max_disconnected_time: :integer,
14+
receive_timeout: :integer
15+
]
16+
)
1117

1218
base_url =
1319
case args do
@@ -23,8 +29,9 @@ defmodule McpProxy do
2329
end
2430

2531
Application.ensure_all_started(:req)
32+
Application.put_all_env(mcp_proxy: opts)
2633

27-
{:ok, handler} = SSE.start_link({base_url, opts})
34+
{:ok, handler} = SSE.start_link(base_url)
2835
ref = Process.monitor(handler)
2936

3037
receive do

lib/mcp_proxy/sse.ex

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ defmodule McpProxy.SSE do
3434
end
3535

3636
@impl true
37-
def init({sse_url, opts}) do
37+
def init(sse_url) do
3838
{:ok,
3939
%{
4040
url: sse_url,
4141
endpoint: nil,
42-
debug: Keyword.get(opts, :debug, false),
43-
max_disconnected_time: Keyword.get(opts, :max_disconnected_time),
42+
debug: Application.get_env(:mcp_proxy, :debug, false),
43+
max_disconnected_time: Application.get_env(:mcp_proxy, :max_disconnected_time),
4444
disconnected_since: nil,
4545
state: :connecting,
4646
connect_tries: 0,
@@ -263,11 +263,17 @@ defmodule McpProxy.SSE do
263263
end
264264

265265
# regular events
266-
def handle_info({:sse_event, {:message, event}}, state),
267-
do: handle_event(event, state, &IO.puts(Jason.encode!(&1)))
266+
def handle_info({:sse_event, {:message, event}}, state) do
267+
handle_event(event, state, {&IO.puts(Jason.encode!(&1)), fn _ -> :ok end})
268+
end
268269

269-
def handle_info({:io_event, event}, state),
270-
do: handle_event(event, state, &forward_message(&1, state.endpoint, state.debug))
270+
def handle_info({:io_event, event}, state) do
271+
handle_event(
272+
event,
273+
state,
274+
{&forward_message(&1, state.endpoint, state.debug), &IO.puts(Jason.encode!(&1))}
275+
)
276+
end
271277

272278
# whenever the HTTP process dies, we try to reconnect
273279
def handle_info({:DOWN, _ref, :process, http_pid, _reason}, %{http_pid: http_pid} = state) do
@@ -327,52 +333,84 @@ defmodule McpProxy.SSE do
327333
Enum.reduce(messages, state, fn message, state -> handle_event(message, state, handler) end)
328334
end
329335

330-
defp handle_event(%{"jsonrpc" => "2.0", "id" => request_id} = event, state, handler)
336+
defp handle_event(%{"jsonrpc" => "2.0", "id" => request_id} = event, state, {req, resp})
331337
when is_request(event) do
332338
# whenever we get a request from the client (OR the server!)
333339
# we generate a random ID to prevent duplicate IDs, for example when
334340
# a reconnected server decides to send a ping and always starts with ID 0
335341
new_id = random_id!()
336342
event = Map.put(event, "id", new_id)
337343

338-
handler.(event)
344+
state =
345+
case req.(event) do
346+
:ok ->
347+
%{state | id_map: Map.put(state.id_map, new_id, request_id)}
348+
349+
{:reply_error, reply} ->
350+
resp.(Map.put(reply, "id", request_id))
351+
# we don't store the new_id when we already replied to prevent duplicates;
352+
# instead, we'll log an error if a server reply is received later and we already
353+
# replied
354+
state
355+
end
339356

340-
{:noreply, %{state | id_map: Map.put(state.id_map, new_id, request_id)}}
357+
{:noreply, state}
341358
end
342359

343-
defp handle_event(%{"jsonrpc" => "2.0", "id" => response_id} = event, state, handler)
360+
defp handle_event(%{"jsonrpc" => "2.0", "id" => response_id} = event, state, {handler, _})
344361
when is_response(event) do
345362
# whenever we receive a response (from the client or server)
346363
# we fetch the original ID from the id map to present the expected
347364
# ID in the reply
348-
original_id = Map.fetch!(state.id_map, response_id)
349-
event = Map.put(event, "id", original_id)
365+
case state.id_map do
366+
%{^response_id => original_id} ->
367+
event = Map.put(event, "id", original_id)
368+
369+
handler.(event)
370+
371+
{:noreply, %{state | id_map: Map.delete(state.id_map, response_id)}}
350372

351-
handler.(event)
373+
_ ->
374+
Logger.error(
375+
"Did not find original ID for response: #{response_id}. Discarding response!"
376+
)
352377

353-
{:noreply, %{state | id_map: Map.delete(state.id_map, response_id)}}
378+
{:noreply, state}
379+
end
354380
end
355381

356382
# no id, so must be a notification that we can just forward as is
357-
defp handle_event(%{"jsonrpc" => "2.0"} = event, state, handler) do
358-
handler.(event)
383+
defp handle_event(%{"jsonrpc" => "2.0"} = event, state, {req, _}) do
384+
req.(event)
359385

360386
{:noreply, state}
361387
end
362388

363389
## other helpers
364390

365391
defp forward_message(message, endpoint, debug) do
366-
try do
367-
if debug, do: Logger.debug("Forwarding request to server: #{inspect(message)}")
368-
Req.post!(endpoint, json: message)
369-
rescue
370-
error ->
371-
Logger.error(
372-
"Failed to forward message: #{Exception.format(:error, error, __STACKTRACE__)}"
373-
)
374-
375-
# TODO: store message and replay later?
392+
if debug, do: Logger.debug("Forwarding request to server: #{inspect(message)}")
393+
394+
case Req.post(endpoint,
395+
json: message,
396+
receive_timeout: Application.get_env(:mcp_proxy, :receive_timeout, 60_000)
397+
) do
398+
{:ok, _} ->
399+
# even when the server replies with a status code that is not in the 200 range
400+
# it might still send a reply on the SSE connection
401+
:ok
402+
403+
{:error, %Req.TransportError{reason: reason}} ->
404+
Logger.error("Failed to forward message #{inspect(message)}:\n#{inspect(reason)}")
405+
406+
{:reply_error,
407+
%{
408+
jsonrpc: "2.0",
409+
error: %{
410+
code: -32011,
411+
message: "Failed to forward request. Reason: #{inspect(reason)}"
412+
}
413+
}}
376414
end
377415
end
378416

test/mcp_proxy_test.exs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defmodule McpProxyTest do
66

77
alias McpProxy.SSEServer
88

9-
setup do
9+
setup context do
1010
{result, _output} =
1111
with_io(:stderr, fn ->
1212
_server_pid = start_supervised!(SSEServer, restart: :temporary)
@@ -17,7 +17,10 @@ defmodule McpProxyTest do
1717
_main_pid =
1818
spawn_link(fn ->
1919
Process.group_leader(self(), parent)
20-
McpProxy.main(["http://localhost:#{port}/sse", "--debug"])
20+
21+
McpProxy.main(
22+
["http://localhost:#{port}/sse", "--debug"] ++ (context[:extra_args] || [])
23+
)
2124
end)
2225

2326
assert_receive {:io_request, io_pid, reply_as, {:get_line, :unicode, []}}
@@ -116,6 +119,32 @@ defmodule McpProxyTest do
116119
assert io =~ "Flushing buffer"
117120
end
118121

122+
@tag extra_args: ["--receive-timeout", "50"]
123+
test "handles receive timeout", %{io_pid: io_pid, reply_as: reply_as} do
124+
capture_io(:stderr, fn ->
125+
send_message(
126+
io_pid,
127+
reply_as,
128+
%{
129+
jsonrpc: "2.0",
130+
id: "call-1",
131+
method: "tools/call",
132+
params: %{"name" => "sleep", "arguments" => %{"time" => 100}}
133+
}
134+
)
135+
136+
assert_receive {:io_request, _io_pid, _reply_as, {:get_line, :unicode, []}}
137+
assert_receive {:io_request, put_pid, put_reply_as, {:put_chars, :unicode, json}}, 100
138+
send(put_pid, {:io_reply, put_reply_as, :ok})
139+
140+
assert %{"id" => "call-1", "error" => %{"message" => message}} = Jason.decode!(json)
141+
assert message =~ "Failed to forward request"
142+
143+
# wait an extra 100 milliseconds for the log about discarding a duplicate response
144+
Process.sleep(100)
145+
end) =~ "Discarding!"
146+
end
147+
119148
defp send_message(io_pid, reply_as, json) do
120149
send(io_pid, {:io_reply, reply_as, Jason.encode_to_iodata!(json)})
121150
end

test/support/sse_server.ex

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ defmodule McpProxy.SSEServer do
7575
post "/message" do
7676
with %{query_params: %{"session_id" => session_id}} <- conn,
7777
[{pid, _}] <- Registry.lookup(McpProxy.SSEServer.Registry, session_id) do
78-
GenServer.cast(pid, {:request, conn.body_params})
78+
GenServer.call(pid, {:request, conn.body_params})
7979

8080
conn
8181
|> put_resp_content_type("application/json")
@@ -93,16 +93,16 @@ defmodule McpProxy.SSEServer do
9393
end
9494

9595
@impl GenServer
96-
def handle_cast({:request, message}, state) do
96+
def handle_call({:request, message}, _from, state) do
9797
result = handle_message(message)
9898

9999
if result do
100100
case Plug.Conn.chunk(state.conn, ["event: message\ndata: ", result]) do
101-
{:ok, conn} -> {:noreply, %{state | conn: conn}}
101+
{:ok, conn} -> {:reply, :ok, %{state | conn: conn}}
102102
{:error, :closed} -> {:stop, :shutdown, state}
103103
end
104104
else
105-
{:noreply, state}
105+
{:reply, :ok, state}
106106
end
107107
end
108108

@@ -164,6 +164,20 @@ defmodule McpProxy.SSEServer do
164164
})
165165
end
166166

167+
defp handle_message(%{
168+
"id" => request_id,
169+
"method" => "tools/call",
170+
"params" => %{"name" => "sleep", "arguments" => %{"time" => time}}
171+
}) do
172+
Process.sleep(time)
173+
174+
Jason.encode_to_iodata!(%{
175+
jsonrpc: "2.0",
176+
id: request_id,
177+
result: %{content: [%{text: "Ok"}]}
178+
})
179+
end
180+
167181
defp handle_message(%{"id" => id, "method" => other} = _message) do
168182
Jason.encode_to_iodata!(%{
169183
jsonrpc: "2.0",

0 commit comments

Comments
 (0)