Skip to content

Commit 03c5e6e

Browse files
RainYuYakshatsinha0heliang666s
authored
Fix/mcp sse content type and endpoint (#15763)
* fix(http12,mcp): set SSE content-type and honor configured MCP message path Ensure text/event-stream for SSE and use dubbo.protocol.triple.rest.mcp.path.message in endpoint event. * unitTest(http12):assert SSE[ServerSentEventEncoder] content-type and encoding format * ran spotless apply command * fix ci --------- Co-authored-by: Akshat Sinha <[email protected]> Co-authored-by: heliang <[email protected]>
1 parent 84c4802 commit 03c5e6e

File tree

4 files changed

+98
-4
lines changed

4 files changed

+98
-4
lines changed

dubbo-plugin/dubbo-mcp/src/main/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProvider.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717
package org.apache.dubbo.mcp.transport;
1818

1919
import org.apache.dubbo.cache.support.expiring.ExpiringMap;
20+
import org.apache.dubbo.common.config.Configuration;
21+
import org.apache.dubbo.common.config.ConfigurationUtils;
2022
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
2123
import org.apache.dubbo.common.logger.LoggerFactory;
2224
import org.apache.dubbo.common.stream.StreamObserver;
2325
import org.apache.dubbo.common.utils.IOUtils;
2426
import org.apache.dubbo.common.utils.StringUtils;
27+
import org.apache.dubbo.mcp.McpConstant;
2528
import org.apache.dubbo.remoting.http12.HttpMethods;
2629
import org.apache.dubbo.remoting.http12.HttpRequest;
2730
import org.apache.dubbo.remoting.http12.HttpResponse;
2831
import org.apache.dubbo.remoting.http12.HttpResult;
2932
import org.apache.dubbo.remoting.http12.HttpStatus;
3033
import org.apache.dubbo.remoting.http12.message.ServerSentEvent;
3134
import org.apache.dubbo.rpc.RpcContext;
35+
import org.apache.dubbo.rpc.model.ApplicationModel;
3236

3337
import java.io.IOException;
3438
import java.nio.charset.StandardCharsets;
@@ -159,7 +163,9 @@ private void handleSseConnection(StreamObserver<ServerSentEvent<String>> respons
159163
new DubboMcpSessionTransport(responseObserver, objectMapper);
160164
McpServerSession mcpServerSession = sessionFactory.create(dubboMcpSessionTransport);
161165
sessions.put(mcpServerSession.getId(), mcpServerSession);
162-
sendEvent(responseObserver, ENDPOINT_EVENT_TYPE, "/mcp/message" + "?sessionId=" + mcpServerSession.getId());
166+
Configuration conf = ConfigurationUtils.getGlobalConfiguration(ApplicationModel.defaultModel());
167+
String messagePath = conf.getString(McpConstant.SETTINGS_MCP_PATHS_MESSAGE, "/mcp/message");
168+
sendEvent(responseObserver, ENDPOINT_EVENT_TYPE, messagePath + "?sessionId=" + mcpServerSession.getId());
163169
}
164170

165171
private void refreshSessionExpire(McpServerSession session) {

dubbo-plugin/dubbo-mcp/src/test/java/org/apache/dubbo/mcp/transport/DubboMcpSseTransportProviderTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.junit.jupiter.api.BeforeEach;
3737
import org.junit.jupiter.api.Test;
3838
import org.junit.jupiter.api.extension.ExtendWith;
39+
import org.mockito.ArgumentCaptor;
3940
import org.mockito.InjectMocks;
4041
import org.mockito.Mock;
4142
import org.mockito.MockedStatic;
@@ -99,7 +100,11 @@ void handleRequestHandlesGetRequest() {
99100
transportProvider.handleRequest(responseObserver);
100101

101102
verify(httpRequest, times(1)).method();
102-
verify(responseObserver, times(1)).onNext(any(ServerSentEvent.class));
103+
ArgumentCaptor<ServerSentEvent> captor = ArgumentCaptor.forClass(ServerSentEvent.class);
104+
verify(responseObserver, times(1)).onNext(captor.capture());
105+
ServerSentEvent evt = captor.getValue();
106+
Assertions.assertEquals("endpoint", evt.getEvent());
107+
Assertions.assertTrue(((String) evt.getData()).startsWith("/mcp/message?sessionId="));
103108
}
104109

105110
@Test

dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,13 @@ private static void appendField(StringBuilder sb, String name, Object value) {
9393

9494
@Override
9595
public String contentType() {
96-
return httpMessageEncoder.contentType();
96+
// An idea:sse use text/event-stream regardless of the underlying data encoder...
97+
return MediaType.TEXT_EVENT_STREAM.getName();
9798
}
9899

99100
@Override
100101
public MediaType mediaType() {
101-
return httpMessageEncoder.mediaType();
102+
return MediaType.TEXT_EVENT_STREAM;
102103
}
103104

104105
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.remoting.http12.message;
18+
19+
import org.apache.dubbo.remoting.http12.exception.EncodeException;
20+
21+
import java.io.ByteArrayOutputStream;
22+
import java.io.OutputStream;
23+
import java.nio.charset.StandardCharsets;
24+
25+
import org.junit.jupiter.api.Assertions;
26+
import org.junit.jupiter.api.Test;
27+
28+
class ServerSentEventEncoderTest {
29+
30+
static class DummyJsonEncoder implements HttpMessageEncoder {
31+
@Override
32+
public void encode(OutputStream outputStream, Object data, java.nio.charset.Charset charset)
33+
throws EncodeException {
34+
try {
35+
if (data instanceof byte[]) {
36+
outputStream.write((byte[]) data);
37+
} else {
38+
outputStream.write(String.valueOf(data).getBytes(charset));
39+
}
40+
} catch (Exception e) {
41+
throw new EncodeException("encode error", e);
42+
}
43+
}
44+
45+
@Override
46+
public MediaType mediaType() {
47+
return MediaType.APPLICATION_JSON;
48+
}
49+
50+
@Override
51+
public boolean supports(String mediaType) {
52+
return true;
53+
}
54+
}
55+
56+
@Test
57+
void shouldUseTextEventStreamContentType() {
58+
ServerSentEventEncoder sse = new ServerSentEventEncoder(new DummyJsonEncoder());
59+
Assertions.assertEquals(MediaType.TEXT_EVENT_STREAM, sse.mediaType());
60+
Assertions.assertEquals(MediaType.TEXT_EVENT_STREAM.getName(), sse.contentType());
61+
}
62+
63+
@Test
64+
void shouldEncodeServerSentEventFormat() {
65+
ServerSentEventEncoder sse = new ServerSentEventEncoder(new DummyJsonEncoder());
66+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
67+
sse.encode(
68+
bos,
69+
ServerSentEvent.builder()
70+
.event("message")
71+
.data("{\"a\":1}")
72+
.id("1")
73+
.build(),
74+
StandardCharsets.UTF_8);
75+
byte[] bytes = bos.toByteArray();
76+
String text = new String(bytes, StandardCharsets.UTF_8);
77+
Assertions.assertTrue(text.contains("id:1\n"));
78+
Assertions.assertTrue(text.contains("event:message\n"));
79+
Assertions.assertTrue(text.contains("data:{\"a\":1}\n"));
80+
Assertions.assertTrue(text.endsWith("\n"));
81+
}
82+
}

0 commit comments

Comments
 (0)