Skip to content
This repository was archived by the owner on May 6, 2025. It is now read-only.

Commit fc0f295

Browse files
authored
Merge pull request #5 from tidewave-ai/sd-reconnect
actively try to reconnect
2 parents 4a8dd8b + 2aabc97 commit fc0f295

File tree

4 files changed

+617
-285
lines changed

4 files changed

+617
-285
lines changed

lib/mcp_proxy.ex

Lines changed: 8 additions & 191 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ defmodule McpProxy do
22
@moduledoc false
33
require Logger
44

5+
alias McpProxy.SSE
6+
57
@doc false
68
def main(args) do
7-
{opts, args} = OptionParser.parse!(args, strict: [debug: :boolean])
9+
{opts, args} =
10+
OptionParser.parse!(args, strict: [debug: :boolean, max_disconnected_time: :integer])
811

912
base_url =
1013
case args do
@@ -19,200 +22,14 @@ defmodule McpProxy do
1922
raise "expected one or zero arguments, got: #{inspect(many)}"
2023
end
2124

22-
debug = Keyword.get(opts, :debug, false)
23-
24-
if debug do
25-
Logger.configure(level: :debug)
26-
Logger.debug("Starting MCP wrapper script with base URL: #{base_url}")
27-
end
28-
2925
Application.ensure_all_started(:req)
3026

31-
# Connect to SSE endpoint and get the message endpoint, then loop and process stdin
32-
case connect_to_sse(base_url, debug) do
33-
{:ok, endpoint, sse_pid} ->
34-
if debug, do: Logger.debug("Received endpoint: #{endpoint}")
35-
# Monitor the SSE process to detect when it dies
36-
sse_ref = Process.monitor(sse_pid)
37-
# Start the stdio loop with the received endpoint
38-
process_stdio(endpoint, debug, sse_ref)
39-
40-
{:error, reason} ->
41-
Logger.error("Failed to connect to SSE endpoint: #{inspect(reason)}")
42-
System.halt(1)
43-
end
44-
end
45-
46-
defp connect_to_sse(sse_url, debug) do
47-
if debug, do: Logger.debug("Connecting to SSE endpoint: #{sse_url}")
48-
49-
parent = self()
50-
51-
# Spawn a process to handle the SSE connection
52-
pid =
53-
spawn_link(fn ->
54-
try do
55-
# Use Req.get! with into function to process the streaming response
56-
Req.get!(
57-
sse_url,
58-
headers: [{"accept", "text/event-stream"}],
59-
into: fn {:data, chunk}, {req, resp} ->
60-
# Process each chunk of data
61-
process_sse_chunk(chunk, parent, debug)
62-
# Return {:cont, {req, resp}} to continue streaming
63-
{:cont, {req, resp}}
64-
end,
65-
receive_timeout: :infinity,
66-
retry: false
67-
)
68-
rescue
69-
e ->
70-
Logger.error("Error in SSE connection: #{inspect(e)}")
71-
System.stop(1)
72-
end
73-
74-
Logger.info("SSE connection closed. Exiting.")
75-
System.stop(1)
76-
end)
27+
{:ok, handler} = SSE.start_link({base_url, opts})
28+
ref = Process.monitor(handler)
7729

78-
# Wait for the endpoint URL
7930
receive do
80-
{:endpoint, endpoint} ->
81-
{:ok, endpoint, pid}
82-
after
83-
10_000 ->
84-
{:error, "Timeout waiting for endpoint URL"}
85-
end
86-
end
87-
88-
defp process_sse_chunk(chunk, parent, debug) do
89-
if debug, do: Logger.debug("Received SSE chunk: #{inspect(chunk)}")
90-
91-
# Split the chunk into lines and process each event
92-
chunk
93-
|> String.split("\n\n", trim: true)
94-
|> Enum.each(fn event_data ->
95-
case parse_sse_event(event_data) do
96-
{:ok, "endpoint", endpoint} ->
97-
if debug, do: Logger.debug("Found endpoint: #{endpoint}")
98-
send(parent, {:endpoint, endpoint})
99-
100-
{:ok, event_type, data} ->
101-
if debug, do: Logger.debug("Received SSE event: #{event_type}, data: #{data}")
102-
IO.write(:stdio, [data, ?\n])
103-
104-
{:error, error} ->
105-
if debug, do: Logger.debug("Error parsing SSE event: #{inspect(error)}")
106-
end
107-
end)
108-
end
109-
110-
defp parse_sse_event(data) do
111-
lines = String.split(data, "\n", trim: true)
112-
113-
event_type =
114-
lines
115-
|> Enum.find(fn line -> String.starts_with?(line, "event:") end)
116-
|> case do
117-
nil -> "message"
118-
line -> String.trim(String.replace_prefix(line, "event:", ""))
119-
end
120-
121-
data_line =
122-
lines
123-
|> Enum.find(fn line -> String.starts_with?(line, "data:") end)
124-
|> case do
125-
nil -> nil
126-
line -> String.trim(String.replace_prefix(line, "data:", ""))
127-
end
128-
129-
case data_line do
130-
nil -> {:error, "No data found in SSE event"}
131-
data -> {:ok, event_type, data}
132-
end
133-
end
134-
135-
defp process_stdio(endpoint, debug, sse_ref) do
136-
receive do
137-
{:DOWN, ^sse_ref, :process, _pid, reason} ->
138-
if debug, do: Logger.debug("SSE connection process terminated: #{inspect(reason)}")
139-
Logger.info("SSE connection closed, exiting")
140-
System.halt(0)
141-
after
142-
0 -> :ok
143-
end
144-
145-
case IO.read(:stdio, :line) do
146-
:eof ->
147-
# End of input, exit
148-
if debug, do: Logger.debug("Stdin closed (EOF), exiting")
149-
System.halt(0)
150-
151-
{:error, reason} ->
152-
Logger.error("Error reading from stdin: #{inspect(reason)}")
153-
System.halt(1)
154-
155-
line ->
156-
if debug, do: Logger.debug("Received input: #{inspect(line)}")
157-
# forward a POST to the message endpoint
158-
forward_request(endpoint, line, debug)
159-
process_stdio(endpoint, debug, sse_ref)
160-
end
161-
end
162-
163-
defp forward_request(endpoint, request, debug) do
164-
if debug, do: Logger.debug("Sending request to: #{endpoint}")
165-
166-
req =
167-
Req.new(
168-
url: endpoint,
169-
method: :post,
170-
body: request,
171-
headers: [
172-
{"accept", "application/json"},
173-
{"content-type", "application/json"}
174-
],
175-
receive_timeout: :infinity,
176-
retry: false
177-
)
178-
179-
result = Req.request(req)
180-
if debug, do: Logger.debug("Response: #{inspect(result)}")
181-
182-
case result do
183-
{:ok, %{status: 200, body: body}} ->
184-
body
185-
186-
{:ok, %{status: _status, body: body}} when is_map(body) ->
187-
body
188-
189-
{:ok, %{status: status, body: body}} ->
190-
if debug, do: Logger.debug("Unexpected response body: #{inspect(body)}")
191-
192-
%{
193-
jsonrpc: "2.0",
194-
error: %{
195-
code: -32603,
196-
message: "Internal error",
197-
data: %{
198-
details: "Server responded with status #{status}",
199-
body: body
200-
}
201-
}
202-
}
203-
204-
{:error, exception} ->
205-
# Handle all error cases
206-
%{
207-
jsonrpc: "2.0",
208-
error: %{
209-
code: -32603,
210-
message: "Internal error",
211-
data: %{
212-
details: "Error connecting to server: #{inspect(exception)}"
213-
}
214-
}
215-
}
31+
{:DOWN, ^ref, _, _, reason} ->
32+
System.stop((reason == :normal && 0) || 1)
21633
end
21734
end
21835
end

0 commit comments

Comments
 (0)