Skip to content

Commit d8f916b

Browse files
authored
Merge pull request #6 from FabianMeiswinkel/users/fabianm/rntbdOrdering
Create/Read working in quick and dirty PoC
2 parents ed53372 + 5d820ca commit d8f916b

File tree

24 files changed

+629
-71
lines changed

24 files changed

+629
-71
lines changed

sdk/cosmos/azure-cosmos-dotnet-benchmark/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ Licensed under the MIT License.
410410
</descriptorRefs>
411411
<archive>
412412
<manifest>
413-
<mainClass>com.azure.cosmos.dotnet.benchmark.Main</mainClass>
413+
<mainClass>com.azure.cosmos.dotnet.benchmark.ThinClientReproMain</mainClass>
414414
</manifest>
415415
</archive>
416416
</configuration>
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.azure.cosmos.dotnet.benchmark;
2+
3+
import com.azure.cosmos.ConsistencyLevel;
4+
import com.azure.cosmos.CosmosAsyncClient;
5+
import com.azure.cosmos.CosmosAsyncContainer;
6+
import com.azure.cosmos.CosmosClientBuilder;
7+
import com.azure.cosmos.CosmosException;
8+
import com.azure.cosmos.models.CosmosContainerResponse;
9+
import com.azure.cosmos.models.CosmosItemResponse;
10+
import com.azure.cosmos.models.PartitionKey;
11+
import com.fasterxml.jackson.databind.ObjectMapper;
12+
import com.fasterxml.jackson.databind.node.ObjectNode;
13+
14+
import java.util.UUID;
15+
16+
public class ThinClientReproMain {
17+
public static void main(String[] args) {
18+
try {
19+
System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
20+
System.setProperty("COSMOS.HTTP2_ENABLED", "true");
21+
22+
CosmosAsyncClient client = new CosmosClientBuilder()
23+
.key(System.getProperty("COSMOS.KEY"))
24+
.endpoint(System.getProperty("COSMOS.ENDPOINT"))
25+
.gatewayMode()
26+
.consistencyLevel(ConsistencyLevel.SESSION)
27+
.userAgentSuffix("fabianmThinClientProxyTest")
28+
.buildAsyncClient();
29+
30+
CosmosAsyncContainer container = client.getDatabase("HashV2Small1").getContainer("HashV2Small1");
31+
CosmosContainerResponse containerResponse = container.read().block();
32+
System.out.println("Container RID: " + containerResponse.getProperties().getResourceId());
33+
ObjectMapper mapper = new ObjectMapper();
34+
ObjectNode doc = mapper.createObjectNode();
35+
String idValue = UUID.randomUUID().toString();
36+
doc.put("id", idValue);
37+
System.out.println("Document to be ingested - " + doc.toPrettyString());
38+
39+
while (true) {
40+
try {
41+
// container.readItem(
42+
// "HelloWorld",
43+
// new PartitionKey("HelloWorld"),
44+
// ObjectNode.class)
45+
CosmosItemResponse<ObjectNode> createResponse = container.createItem(doc).block();
46+
System.out.println("CREATE DIAGNOSTICS: " + createResponse.getDiagnostics());
47+
break;
48+
} catch (CosmosException cosmosError) {
49+
System.out.println("COSMOS ERROR: " + cosmosError.getStatusCode() + "/" + cosmosError.getShortMessage());
50+
Thread.sleep(10_000);
51+
}
52+
}
53+
54+
CosmosItemResponse<ObjectNode> response = container.readItem(idValue, new PartitionKey(idValue), ObjectNode.class).block();
55+
System.out.println("READ DIAGNOSTICS: " + response.getDiagnostics());
56+
ObjectNode readDoc = response.getItem();
57+
58+
System.out.println("Document read - " + readDoc.toPrettyString());
59+
} catch (CosmosException | InterruptedException cosmosException) {
60+
System.out.println("COSMOS ERROR: " + cosmosException);
61+
}
62+
}
63+
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientTest.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,20 @@
44
import com.azure.cosmos.CosmosAsyncClient;
55
import com.azure.cosmos.CosmosAsyncContainer;
66
import com.azure.cosmos.CosmosClientBuilder;
7-
import com.azure.cosmos.GatewayConnectionConfig;
87
import com.azure.cosmos.implementation.throughputControl.TestItem;
9-
import com.azure.cosmos.models.CosmosDatabaseProperties;
10-
import com.azure.cosmos.models.CosmosDatabaseResponse;
118
import com.azure.cosmos.models.PartitionKey;
12-
import com.azure.cosmos.util.CosmosPagedFlux;
139
import com.fasterxml.jackson.databind.JsonNode;
1410
import org.testng.annotations.Test;
15-
import reactor.core.publisher.Mono;
1611

1712
public class ThinClientTest {
13+
1814
@Test
1915
public void testThinclientHttp2() {
2016
try {
2117
//String thinclientEndpoint = "https://cdb-ms-stage-eastus2-fe2.eastus2.cloudapp.azure.com:10650";
22-
String thinclientEndpoint = "https://chukangzhongstagesignoff.documents-staging.windows-ppe.net:443/";
18+
//String thinclientEndpoint = "https://chukangzhongstagesignoff.documents-staging.windows-ppe.net:443/";
2319
System.setProperty(Configs.THINCLIENT_ENABLED, "true");
24-
System.setProperty(Configs.THINCLIENT_ENDPOINT, thinclientEndpoint);
20+
//System.setProperty(Configs.THINCLIENT_ENDPOINT, thinclientEndpoint);
2521
System.setProperty(Configs.HTTP2_ENABLED, "true");
2622

2723
CosmosAsyncClient client = new CosmosClientBuilder()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.azure.cosmos.implementation.directconnectivity.rntbd;
2+
3+
import com.azure.cosmos.implementation.HttpConstants;
4+
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
5+
import com.azure.cosmos.implementation.ThinClientStoreModel;
6+
import com.azure.cosmos.implementation.http.HttpHeaders;
7+
import io.netty.buffer.ByteBuf;
8+
import io.netty.buffer.Unpooled;
9+
import org.apache.commons.io.FileUtils;
10+
import org.mockito.Mockito;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import org.testng.annotations.Test;
14+
15+
import java.io.File;
16+
import java.io.IOException;
17+
import java.time.Instant;
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
21+
import static com.azure.cosmos.implementation.directconnectivity.WFConstants.BackendHeaders.EFFECTIVE_PARTITION_KEY;
22+
23+
public class RntbdTokenStreamTests {
24+
// Created this test for thin client testing, to make sure thin client headers are always special cased
25+
private static final Logger logger = LoggerFactory.getLogger(RntbdTokenStreamTests.class);
26+
@Test
27+
public void parseDotNet() {
28+
//dumpFile("E:\\Temp\\dotnetRead.bin");
29+
//dumpFile("E:\\Temp\\javad6bd7634-35ed-44d3-849f-364456be3001.bin");
30+
31+
dumpFile("E:\\Temp\\dotnet_proxyInputRequest_Read_18.bin");
32+
dumpFile("E:\\Temp\\java_proxyInputRequest_Read_29.bin");
33+
}
34+
35+
private void dumpFile(String fileName) {
36+
logger.error("FILENAME: {}", fileName);
37+
File file = new File(fileName);
38+
byte[] byteArray = null;
39+
try {
40+
byteArray = FileUtils.readFileToByteArray(file);
41+
} catch (IOException e) {
42+
e.printStackTrace();
43+
}
44+
45+
ByteBuf content = Unpooled.wrappedBuffer(byteArray);
46+
47+
if (RntbdFramer.canDecodeHead(content)) {
48+
49+
final RntbdRequest request = RntbdRequest.decode(content);
50+
51+
if (request != null) {
52+
logger.error("HEADERS: {}", request.getHeaders().dumpTokens());
53+
}
54+
55+
logger.error("RNTBD REQUEST empty");
56+
}
57+
}
58+
59+
@Test(groups = { "unit" })
60+
public void testThinClientSpecialCasing() {
61+
RxDocumentServiceRequest mockRequest = Mockito.mock(RxDocumentServiceRequest.class);
62+
Map<String, String> headers = new HashMap<>();
63+
headers.put(EFFECTIVE_PARTITION_KEY, "13A141365AE34002732EE6DD02677CFC");
64+
headers.put(HttpConstants.HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME, "globalDatabaseAccountName");
65+
Mockito.doReturn(headers).when(mockRequest).getHeaders();
66+
RntbdRequestArgs mockRntbdRequestArgs = Mockito.mock(RntbdRequestArgs.class);
67+
Mockito.doReturn(mockRequest).when(mockRntbdRequestArgs).serviceRequest();
68+
Mockito.doReturn("").when(mockRntbdRequestArgs).replicaPath();
69+
Mockito.doReturn(0L).when(mockRntbdRequestArgs).transportRequestId();
70+
71+
RntbdRequestFrame mockRntbdRequestFrame = Mockito.mock(RntbdRequestFrame.class);
72+
Mockito.doReturn(RntbdConstants.RntbdOperationType.Connection).when(mockRntbdRequestFrame).getOperationType();
73+
RntbdRequestHeaders rntbdRequestHeaders = new RntbdRequestHeaders(mockRntbdRequestArgs, mockRntbdRequestFrame);
74+
75+
final ByteBuf out = Unpooled.buffer();
76+
rntbdRequestHeaders.encode(out, false);
77+
}
78+
79+
/*final class TestRntbdTokenStream extends RntbdTokenStream<RntbdConstants.RntbdRequestHeader> {
80+
TestRntbdTokenStream(EnumSet<RntbdConstants.RntbdRequestHeader> headers, Map<Short, RntbdConstants.RntbdRequestHeader> ids, ByteBuf in, Class<RntbdConstants.RntbdRequestHeader> classType) {
81+
super(headers, ids, in, classType);
82+
}
83+
}*/
84+
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,7 @@ private SslContext sslContextInit(boolean serverCertVerificationDisabled, boolea
333333
.forClient()
334334
.sslProvider(SslContext.defaultClientProvider());
335335

336-
if (serverCertVerificationDisabled) {
337-
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); // disable cert verification
338-
}
336+
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); // disable cert verification
339337

340338
if (http2Enabled) {
341339
sslContextBuilder

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ public static class HttpHeaders {
255255
public static final String POPULATE_INDEX_METRICS = "x-ms-cosmos-populateindexmetrics";
256256
public static final String INDEX_UTILIZATION = "x-ms-cosmos-index-utilization";
257257
public static final String QUERY_EXECUTION_INFO = "x-ms-cosmos-query-execution-info";
258+
public static final String START_EPK_HASH = "x-ms-cosmos-start-epk-hash";
259+
public static final String END_EPK_HASH = "x-ms-cosmos-end-epk-hash";
258260

259261
// Batch operations
260262
public static final String IS_BATCH_ATOMIC = "x-ms-cosmos-batch-atomic";
@@ -283,6 +285,8 @@ public static class HttpHeaders {
283285
// Priority Level for throttling
284286
public static final String PRIORITY_LEVEL = "x-ms-cosmos-priority-level";
285287

288+
public static final String GLOBAL_DATABASE_ACCOUNT_NAME = "GlobalDatabaseAccountName";
289+
286290
// Thinclient headers
287291
public static final String THINCLIENT_PROXY_OPERATION_TYPE = "x-ms-thinclient-proxy-operation-type";
288292
public static final String THINCLIENT_PROXY_RESOURCE_TYPE = "x-ms-thinclient-proxy-resource-type";

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3436,6 +3436,8 @@ private Mono<ResourceResponse<Document>> readDocumentInternal(
34363436
getEffectiveClientContext(clientContextOverride),
34373437
OperationType.Read, ResourceType.Document, path, requestHeaders, options);
34383438

3439+
request.useThinProxy = Configs.isThinClientEnabled() && request.useGatewayMode ? true : false;
3440+
request.useThinProxy = true;
34393441
DocumentServiceRequestContext requestContext = request.requestContext;
34403442

34413443
options.getMarkE2ETimeoutInRequestContextCallbackHook().set(

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java

Lines changed: 92 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,35 @@
33
package com.azure.cosmos.implementation;
44

55
import com.azure.cosmos.ConsistencyLevel;
6+
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
7+
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
8+
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants;
9+
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdFramer;
610
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequest;
711
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestArgs;
12+
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdResponse;
813
import com.azure.cosmos.implementation.http.HttpClient;
914
import com.azure.cosmos.implementation.http.HttpHeaders;
1015
import com.azure.cosmos.implementation.http.HttpRequest;
11-
import com.azure.cosmos.models.FeedRange;
16+
import com.azure.cosmos.implementation.routing.HexConvert;
1217
import io.netty.buffer.ByteBuf;
1318
import io.netty.buffer.Unpooled;
1419
import io.netty.handler.codec.http.HttpMethod;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
1522
import reactor.core.publisher.Flux;
1623
import reactor.core.publisher.Mono;
1724

25+
import java.io.IOException;
1826
import java.net.URI;
27+
import java.nio.file.Files;
28+
import java.time.Instant;
29+
import java.util.Arrays;
1930
import java.util.HashMap;
31+
import java.util.List;
2032
import java.util.Map;
33+
import java.util.UUID;
2134

22-
import static com.azure.cosmos.implementation.directconnectivity.WFConstants.BackendHeaders.EFFECTIVE_PARTITION_KEY;
2335
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
2436

2537
/**
@@ -30,6 +42,22 @@
3042
*/
3143
public class ThinClientStoreModel extends RxGatewayStoreModel {
3244

45+
private static final Logger logger = LoggerFactory.getLogger(ThinClientStoreModel.class);
46+
47+
private static final List<RntbdConstants.RntbdRequestHeader> thinClientHeadersInOrder = Arrays.asList(
48+
RntbdConstants.RntbdRequestHeader.EffectivePartitionKey,
49+
RntbdConstants.RntbdRequestHeader.GlobalDatabaseAccountName,
50+
RntbdConstants.RntbdRequestHeader.DatabaseName,
51+
RntbdConstants.RntbdRequestHeader.CollectionName,
52+
RntbdConstants.RntbdRequestHeader.CollectionRid,
53+
//RntbdConstants.RntbdRequestHeader.ResourceId,
54+
RntbdConstants.RntbdRequestHeader.PayloadPresent,
55+
RntbdConstants.RntbdRequestHeader.DocumentName,
56+
RntbdConstants.RntbdRequestHeader.AuthorizationToken,
57+
RntbdConstants.RntbdRequestHeader.Date);
58+
59+
60+
3361
public ThinClientStoreModel(
3462
DiagnosticsClientContext clientContext,
3563
ISessionContainer sessionContainer,
@@ -80,7 +108,37 @@ protected Map<String, String> getDefaultHeaders(
80108
@Override
81109
public URI getRootUri(RxDocumentServiceRequest request) {
82110
//var uri = this.globalEndpointManager.resolveServiceEndpoint(request).getThinClientLocationEndpoint();
83-
return URI.create("https://chukangzhongstagesignoff-eastus2.documents-staging.windows-ppe.net:10650/");
111+
return URI.create("https://57.155.105.105:10650/"); // https://chukangzhongstagesignoff-eastus2.documents-staging.windows-ppe.net:10650/
112+
}
113+
114+
@Override
115+
public StoreResponse unwrapToStoreResponse(RxDocumentServiceRequest request, int statusCode, HttpHeaders headers, ByteBuf content) {
116+
if (content == null || content.readableBytes() == 0) {
117+
return super.unwrapToStoreResponse(request, statusCode, headers, Unpooled.EMPTY_BUFFER);
118+
}
119+
120+
Instant decodeStartTime = Instant.now();
121+
122+
if (RntbdFramer.canDecodeHead(content)) {
123+
124+
final RntbdResponse response = RntbdResponse.decode(content);
125+
126+
if (response != null) {
127+
response.setDecodeEndTime(Instant.now());
128+
response.setDecodeStartTime(decodeStartTime);
129+
130+
return super.unwrapToStoreResponse(
131+
request,
132+
response.getStatus().code(),
133+
new HttpHeaders(response.getHeaders().asMap(request.getActivityId())),
134+
response.getContent()
135+
);
136+
}
137+
138+
return super.unwrapToStoreResponse(request, statusCode, headers, null);
139+
}
140+
141+
throw new IllegalStateException("Invalid rntbd response");
84142
}
85143

86144
@Override
@@ -89,12 +147,14 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque
89147
// todo - neharao1 - validate b/w name() v/s toString()
90148
request.setThinclientHeaders(request.getOperationType().name(), request.getResourceType().name());
91149

92-
String epk = request.getPartitionKeyInternal().getEffectivePartitionKeyString(request.getPartitionKeyInternal(), request.getPartitionKeyDefinition());
150+
byte[] epk = request.getPartitionKeyInternal().getEffectivePartitionKeyBytes(request.getPartitionKeyInternal(), request.getPartitionKeyDefinition());
93151
if (request.properties == null) {
94152
request.properties = new HashMap<>();
95153
}
96-
request.properties.put(EFFECTIVE_PARTITION_KEY, epk);
97-
//request.getHeaders().put(EFFECTIVE_PARTITION_KEY, epk);
154+
//request.properties.put(EFFECTIVE_PARTITION_KEY, epk);
155+
//request.properties.put(HttpConstants.HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME, "chukangzhongstagesignoff");
156+
request.getHeaders().put(HttpConstants.HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME, "tiagonapoli-cdb-test"); // "chukangzhongstagesignoff"
157+
request.getHeaders().put(WFConstants.BackendHeaders.COLLECTION_RID, "cLklAJU8SN0=");
98158
// todo - neharao1: no concept of a replica / service endpoint that can be passed
99159
RntbdRequestArgs rntbdRequestArgs = new RntbdRequestArgs(request);
100160

@@ -103,24 +163,46 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque
103163
HttpHeaders headers = this.getHttpHeaders();
104164

105165
RntbdRequest rntbdRequest = RntbdRequest.from(rntbdRequestArgs);
106-
166+
boolean success = rntbdRequest.setHeaderValue(
167+
RntbdConstants.RntbdRequestHeader.EffectivePartitionKey,
168+
epk);
169+
if (!success) {
170+
logger.error("Failed to update EPK to value {}", HexConvert.bytesToHex(epk));
171+
} else {
172+
logger.error("Updated EPK to value {}", HexConvert.bytesToHex(epk));
173+
}
107174
// todo: neharao1 - validate whether Java heap buffer is okay v/s Direct buffer
108175
// todo: eventually need to use pooled buffer
109176
ByteBuf byteBuf = Unpooled.buffer();
110177

178+
logger.error("HEADERS: {}", rntbdRequest.getHeaders().dumpTokens());
179+
111180
// todo: lifting the logic from there to encode the RntbdRequest instance into a ByteBuf (ByteBuf is a network compatible format)
112181
// todo: double-check with fabianm to see if RntbdRequest across RNTBD over TCP (Direct connectivity mode) is same as that when using ThinClient proxy
113182
// todo: need to conditionally add some headers (userAgent, replicaId/endpoint, etc)
114-
rntbdRequest.encode(byteBuf);
183+
rntbdRequest.encode(byteBuf, true);
184+
185+
byte[] contentAsByteArray = new byte[byteBuf.writerIndex()];
186+
byteBuf.getBytes(0, contentAsByteArray, 0, byteBuf.writerIndex());
187+
188+
try {
189+
Files.write(java.nio.file.Paths.get("E:\\Temp\\java" + UUID.randomUUID() + ".bin"), contentAsByteArray);
190+
} catch (IOException e) {
191+
e.printStackTrace();
192+
}
115193

116194
return new HttpRequest(
117195
HttpMethod.POST,
118196
//requestUri,
119-
URI.create("https://chukangzhongstagesignoff-eastus2.documents-staging.windows-ppe.net:10650/"),
197+
//https://thinclient-performancetests-eastus2.documents-staging.windows-ppe.net:10650
198+
//https://cdb-ms-stage-eastus2-fe2-sql.eastus2.cloudapp.azure.com:10650
199+
//https://57.155.105.105:10650/
200+
// https://tiagonapoli-cdb-test-westus3.documents.azure.com:10650
201+
URI.create("https://57.155.105.105:10650/"), // https://127.0.0.1:10650/ //https://chukangzhongstagesignoff-eastus2.documents-staging.windows-ppe.net:10650/ // thinclient-performancetests-eastus2.documents-staging.windows-ppe.net cdb-ms-stage-eastus2-fe2-sql.eastus2.cloudapp.azure.com
120202
//requestUri.getPort(),
121203
10650,
122204
headers,
123-
Flux.just(byteBuf.array()));
205+
Flux.just(contentAsByteArray));
124206
}
125207

126208
private HttpHeaders getHttpHeaders() {

0 commit comments

Comments
 (0)