Skip to content

Commit 0e2023b

Browse files
committed
Updated protocol definition for publishers with new changes
* Changed `EndOfStream(BEHIND)` to `BehindPublisher` so we do not disconnect immediately * This allows a Publisher to immediately catch up a block node that is only slightly "behind". * Changed many `BlockProof` references to `BlockEnd` to reflect that blocks need an explicit end of block because they may have multiple block proofs. * Cleaned up a few other protocol issues * Updated the API definition with the changes for `BehindPublisher`. * Updated the node service API with the start of a new "server status details" API. * Updated a distressing array of test classes, mostly due to tests that basically cross-checked the _compiler_, not our design. Signed-off-by: Joseph S. <[email protected]>
1 parent 2c8ddf1 commit 0e2023b

File tree

14 files changed

+243
-172
lines changed

14 files changed

+243
-172
lines changed

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/server/TestBlockNodeServer.java

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@
22
package org.hiero.block.node.app.fixtures.server;
33

44
import com.hedera.pbj.grpc.helidon.PbjRouting;
5+
import com.hedera.pbj.grpc.helidon.PbjRouting.Builder;
56
import com.hedera.pbj.grpc.helidon.config.PbjConfig;
7+
import edu.umd.cs.findbugs.annotations.NonNull;
68
import io.helidon.webserver.ConnectionConfig;
79
import io.helidon.webserver.WebServer;
810
import io.helidon.webserver.WebServerConfig;
911
import org.hiero.block.api.BlockEnd;
1012
import org.hiero.block.api.BlockItemSet;
1113
import org.hiero.block.api.BlockNodeServiceInterface;
1214
import org.hiero.block.api.BlockStreamSubscribeServiceInterface;
15+
import org.hiero.block.api.ServerStatusDetailResponse;
16+
import org.hiero.block.api.ServerStatusRequest;
1317
import org.hiero.block.api.ServerStatusResponse;
1418
import org.hiero.block.api.SubscribeStreamResponse;
19+
import org.hiero.block.api.SubscribeStreamResponse.Code;
1520
import org.hiero.block.node.spi.historicalblocks.HistoricalBlockFacility;
1621

1722
public class TestBlockNodeServer {
@@ -23,13 +28,8 @@ public TestBlockNodeServer(int port, HistoricalBlockFacility historicalBlockFaci
2328
PbjConfig.builder().name("pbj").maxMessageSizeBytes(4_194_304).build();
2429

2530
// Create the service builder
26-
final PbjRouting.Builder pbjRoutingBuilder = PbjRouting.builder()
27-
.service((BlockNodeServiceInterface) request -> ServerStatusResponse.newBuilder()
28-
.firstAvailableBlock(
29-
historicalBlockFacility.availableBlocks().min())
30-
.lastAvailableBlock(
31-
historicalBlockFacility.availableBlocks().max())
32-
.build())
31+
final Builder pbjRoutingBuilder = PbjRouting.builder()
32+
.service(new TrivialBlockNodeServerInterface(historicalBlockFacility))
3333
.service((BlockStreamSubscribeServiceInterface) (request, replies) -> {
3434
if (historicalBlockFacility
3535
.availableBlocks()
@@ -51,24 +51,22 @@ public TestBlockNodeServer(int port, HistoricalBlockFacility historicalBlockFaci
5151
}
5252

5353
replies.onNext(SubscribeStreamResponse.newBuilder()
54-
.status(SubscribeStreamResponse.Code.SUCCESS)
54+
.status(Code.SUCCESS)
5555
.build());
5656

5757
replies.onComplete();
5858
}
5959
});
60-
6160
// start the web server with the PBJ configuration and routing
6261
webServer = WebServerConfig.builder()
6362
.port(port)
6463
.addProtocol(pbjConfig)
6564
.addRouting(pbjRoutingBuilder)
6665
.connectionConfig(ConnectionConfig.builder()
67-
.sendBufferSize(32768)
68-
.receiveBufferSize(32768)
66+
.sendBufferSize(524288)
67+
.receiveBufferSize(524288)
6968
.build())
7069
.build();
71-
7270
webServer.start();
7371
}
7472

@@ -80,4 +78,30 @@ public void stop() {
8078
webServer.stop();
8179
}
8280
}
81+
82+
private class TrivialBlockNodeServerInterface implements BlockNodeServiceInterface {
83+
private final HistoricalBlockFacility historicalBlockFacility;
84+
85+
public TrivialBlockNodeServerInterface(final HistoricalBlockFacility historicalFacility) {
86+
historicalBlockFacility = historicalFacility;
87+
}
88+
89+
@Override
90+
@NonNull
91+
public ServerStatusResponse serverStatus(@NonNull final ServerStatusRequest request) {
92+
return ServerStatusResponse.newBuilder()
93+
.firstAvailableBlock(
94+
historicalBlockFacility.availableBlocks().min())
95+
.lastAvailableBlock(
96+
historicalBlockFacility.availableBlocks().max())
97+
.onlyLatestState(false)
98+
.build();
99+
}
100+
101+
@Override
102+
@NonNull
103+
public ServerStatusDetailResponse serverStatusDetail(@NonNull final ServerStatusRequest request) {
104+
return ServerStatusDetailResponse.DEFAULT;
105+
}
106+
}
83107
}

block-node/server-status/src/main/java/org/hiero/block/node/server/status/ServerStatusServicePlugin.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.swirlds.metrics.api.Counter;
88
import edu.umd.cs.findbugs.annotations.NonNull;
99
import org.hiero.block.api.BlockNodeServiceInterface;
10+
import org.hiero.block.api.ServerStatusDetailResponse;
1011
import org.hiero.block.api.ServerStatusRequest;
1112
import org.hiero.block.api.ServerStatusResponse;
1213
import org.hiero.block.node.spi.BlockNodeContext;
@@ -62,6 +63,12 @@ public ServerStatusResponse serverStatus(@NonNull final ServerStatusRequest requ
6263
return response;
6364
}
6465

66+
@Override
67+
@NonNull
68+
public ServerStatusDetailResponse serverStatusDetail(@NonNull final ServerStatusRequest request) {
69+
return ServerStatusDetailResponse.DEFAULT;
70+
}
71+
6572
// ==== BlockNodePlugin Methods ====================================================================================
6673
@Override
6774
public String name() {

block-node/server-status/src/test/java/org/hiero/block/node/server/status/ServerStatusServicePluginTest.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111

1212
import com.hedera.hapi.block.stream.BlockItem;
1313
import com.hedera.pbj.runtime.ParseException;
14-
import com.hedera.pbj.runtime.grpc.ServiceInterface;
15-
import java.util.List;
1614
import java.util.concurrent.LinkedBlockingQueue;
1715
import org.hiero.block.api.ServerStatusRequest;
1816
import org.hiero.block.api.ServerStatusResponse;
@@ -45,20 +43,6 @@ void setup() {
4543
enableDebugLogging();
4644
}
4745

48-
/**
49-
* Verifies that the service interface correctly registers and exposes
50-
* the server status method.
51-
*/
52-
@Test
53-
@DisplayName("Should return correct method for ServerStatusServicePlugin")
54-
void shouldReturnCorrectMethod() {
55-
assertNotNull(serviceInterface);
56-
List<ServiceInterface.Method> methods = serviceInterface.methods();
57-
assertNotNull(methods);
58-
assertEquals(1, methods.size());
59-
assertEquals(plugin.methods().getFirst(), methods.getFirst());
60-
}
61-
6246
/**
6347
* Tests that the server status response is valid when no blocks are available.
6448
* Verifies the first and last available block numbers and other response properties.
@@ -78,9 +62,6 @@ void shouldReturnValidServerStatus() throws ParseException {
7862
assertEquals(UNKNOWN_BLOCK_NUMBER, response.firstAvailableBlock());
7963
assertEquals(UNKNOWN_BLOCK_NUMBER, response.lastAvailableBlock());
8064
assertFalse(response.onlyLatestState());
81-
82-
// TODO(#579) Remove when block node version information is implemented.
83-
assertFalse(response.hasVersionInformation());
8465
}
8566

8667
/**
@@ -104,9 +85,6 @@ void shouldReturnValidServerStatusForNewBlockBatch() throws ParseException {
10485
assertEquals(0, response.firstAvailableBlock());
10586
assertEquals(blocks - 1, response.lastAvailableBlock());
10687
assertFalse(response.onlyLatestState());
107-
108-
// TODO() Remove when block node version information is implemented.
109-
assertFalse(response.hasVersionInformation());
11088
}
11189

11290
/**

block-node/stream-publisher/src/main/java/org/hiero/block/node/stream/publisher/LiveStreamPublisherManager.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,10 @@ public BlockAction getActionForBlock(
181181
return switch (previousAction) {
182182
case null -> getActionForHeader(blockNumber, handlerId);
183183
case ACCEPT -> getActionForCurrentlyStreaming(blockNumber);
184-
case END_ERROR, END_DUPLICATE, END_BEHIND ->
184+
case END_ERROR, END_DUPLICATE ->
185185
// This should not happen because the Handler should have shut down.
186186
BlockAction.END_ERROR;
187-
case SKIP, RESEND ->
187+
case SKIP, RESEND, SEND_BEHIND ->
188188
// This should not happen because the Handler should have reset the previous action.
189189
BlockAction.END_ERROR;
190190
};
@@ -224,7 +224,7 @@ private BlockAction getActionForHeader(final long blockNumber, final long handle
224224
} else if (blockNumber == nextUnstreamedBlockNumber.get()) {
225225
return addHandlerQueueForBlock(blockNumber, handlerId);
226226
} else if (blockNumber > nextUnstreamedBlockNumber.get()) {
227-
return BlockAction.END_BEHIND;
227+
return BlockAction.SEND_BEHIND;
228228
} else {
229229
// This should not be possible, all cases that could reach here are
230230
// already handled above.
@@ -283,7 +283,8 @@ private BlockAction addHandlerQueueForBlock(final long blockNumber, final long h
283283
}
284284
}
285285
// Return the correct action if another handler jumped in front of the caller.
286-
return blockNumber < nextUnstreamedBlockNumber.get() ? BlockAction.SKIP : BlockAction.END_BEHIND;
286+
// Note, neither of these ends the stream; both ask the publisher to send a different block.
287+
return blockNumber < nextUnstreamedBlockNumber.get() ? BlockAction.SKIP : BlockAction.SEND_BEHIND;
287288
}
288289

289290
/*
@@ -362,7 +363,7 @@ private BlockAction getActionForCurrentlyStreaming(final long blockNumber) {
362363
// the block node is behind. The most likely cause here is a block
363364
// that failed to verify, or got stuck and did not finish, and was
364365
// parallel streaming a block earlier than the calling handler.
365-
return BlockAction.END_BEHIND;
366+
return BlockAction.SEND_BEHIND;
366367
} else {
367368
// This should not be possible, all cases that could reach here are
368369
// already handled above.

block-node/stream-publisher/src/main/java/org/hiero/block/node/stream/publisher/PublisherHandler.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.hiero.block.api.BlockEnd;
2929
import org.hiero.block.api.PublishStreamRequest.EndStream;
3030
import org.hiero.block.api.PublishStreamResponse;
31+
import org.hiero.block.api.PublishStreamResponse.BehindPublisher;
3132
import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement;
3233
import org.hiero.block.api.PublishStreamResponse.EndOfStream;
3334
import org.hiero.block.api.PublishStreamResponse.EndOfStream.Code;
@@ -288,7 +289,7 @@ private void handleBlockItemsRequest(
288289
case ACCEPT -> handleAccept(itemSetUnparsed, blockItems);
289290
case SKIP -> handleSkip(blockNumber);
290291
case RESEND -> handleResend();
291-
case END_BEHIND -> handleEndBehind();
292+
case SEND_BEHIND -> handleSendBehind();
292293
case END_DUPLICATE -> handleEndDuplicate();
293294
case END_ERROR -> handleEndError();
294295
};
@@ -610,12 +611,22 @@ private BatchHandleResult handleResend() {
610611
/**
611612
* Handle the END_BEHIND action for a block.
612613
*/
613-
private BatchHandleResult handleEndBehind() {
614-
LOGGER.log(DEBUG, "Handler {0} is sending BEHIND({1}).", handlerId, publisherManager.getLatestBlockNumber());
615-
// If the action is END_BEHIND, we need to send an end of stream
614+
private BatchHandleResult handleSendBehind() {
615+
LOGGER.log(DEBUG, "Handler {0} is sending Behind({1}).", handlerId, publisherManager.getLatestBlockNumber());
616+
// If the action is SEND_BEHIND, we need to send an end of stream
616617
// response to the publisher and not propagate the items.
617-
sendEndOfStream(Code.BEHIND);
618-
return new BatchHandleResult(true, true);
618+
final BehindPublisher behindMessage = BehindPublisher.newBuilder()
619+
.blockNumber(publisherManager.getLatestBlockNumber())
620+
.build();
621+
final PublishStreamResponse response = PublishStreamResponse.newBuilder()
622+
.nodeBehindPublisher(behindMessage)
623+
.build();
624+
if (sendResponse(response)) {
625+
metrics.nodeBehindSent.increment(); // @todo(1415) add label
626+
return new BatchHandleResult(false, true);
627+
} else {
628+
return new BatchHandleResult(true, true);
629+
}
619630
}
620631

621632
/**
@@ -755,6 +766,7 @@ public record MetricsHolder(
755766
Counter blockSkipsSent,
756767
Counter blockResendsSent,
757768
Counter endOfStreamsSent,
769+
Counter nodeBehindSent,
758770
Counter sendResponseFailed,
759771
Counter endStreamsReceived,
760772
Counter receiveBlockTimeLatencyNs) {
@@ -783,6 +795,9 @@ static MetricsHolder createMetrics(@NonNull final Metrics metrics) {
783795
final Counter blockResendsSent =
784796
metrics.getOrCreate(new Counter.Config(METRICS_CATEGORY, "publisher_blocks_resend_sent")
785797
.withDescription("Block Resend messages sent"));
798+
final Counter nodeBehindSent =
799+
metrics.getOrCreate(new Counter.Config(METRICS_CATEGORY, "publisher_block_node_behind_sent")
800+
.withDescription("Node Behind Publisher messages sent"));
786801
final Counter endOfStreamsSent =
787802
metrics.getOrCreate(new Counter.Config(METRICS_CATEGORY, "publisher_block_endofstream_sent")
788803
.withDescription("Block End-of-Stream messages sent"));
@@ -805,6 +820,7 @@ static MetricsHolder createMetrics(@NonNull final Metrics metrics) {
805820
blockSkipsSent,
806821
blockResendsSent,
807822
endOfStreamsSent,
823+
nodeBehindSent,
808824
sendResponseFailed,
809825
endStreamsReceived,
810826
receiveBlockTimeLatencyNs);

block-node/stream-publisher/src/main/java/org/hiero/block/node/stream/publisher/StreamPublisherManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ enum BlockAction {
100100
/**
101101
* todo(1420) add documentation
102102
*/
103-
END_BEHIND,
103+
SEND_BEHIND,
104104
/**
105105
* todo(1420) add documentation
106106
*/

block-node/stream-publisher/src/test/java/org/hiero/block/node/stream/publisher/LiveStreamPublisherManagerTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ void testGetActionNullPreviousActionSKIP() {
268268
/**
269269
* This test aims to assert that the
270270
* {@link LiveStreamPublisherManager#getActionForBlock(long, BlockAction, long)}
271-
* method returns {@link BlockAction#END_BEHIND} when the provided block
271+
* method returns {@link BlockAction#SEND_BEHIND} when the provided block
272272
* number is higher than the next expected one and previous action is {@code null}.
273273
*/
274274
@Test
@@ -279,7 +279,7 @@ void testGetActionNullPreviousActionBEHIND() {
279279
// Call with higher than next expected block number.
280280
final BlockAction actual = toTest.getActionForBlock(1L, null, publisherHandlerId);
281281
// Assert
282-
assertThat(actual).isEqualTo(BlockAction.END_BEHIND);
282+
assertThat(actual).isEqualTo(BlockAction.SEND_BEHIND);
283283
}
284284

285285
/**
@@ -381,7 +381,7 @@ void testGetActionACCEPTPreviousActionERROR() {
381381
/**
382382
* This test aims to assert that the
383383
* {@link LiveStreamPublisherManager#getActionForBlock(long, BlockAction, long)}
384-
* method returns {@link BlockAction#END_BEHIND} when the provided
384+
* method returns {@link BlockAction#SEND_BEHIND} when the provided
385385
* block number higher than the next expected block number and
386386
* previous action is {@link BlockAction#ACCEPT}.
387387
*/
@@ -393,7 +393,7 @@ void testGetActionACCEPTPreviousActionBEHIND() {
393393
// Call with higher than next expected block number and previous action ACCEPT.
394394
final BlockAction actual = toTest.getActionForBlock(1L, BlockAction.ACCEPT, publisherHandlerId);
395395
// Assert
396-
assertThat(actual).isEqualTo(BlockAction.END_BEHIND);
396+
assertThat(actual).isEqualTo(BlockAction.SEND_BEHIND);
397397
}
398398

399399
/**
@@ -620,7 +620,7 @@ void testBatchesIncrementOnlyAfterForwarderCompletes() throws InterruptedExcepti
620620

621621
// After forwarder completion, batches should have increased and facility should contain messages.
622622
assertThat(managerMetrics.blocksClosedComplete().get()).isEqualTo(beforeBatches + 2);
623-
assertThat(managerMetrics.currentPublisherCount().get()).isEqualTo(beforeBatches + 1);
623+
assertThat(managerMetrics.currentPublisherCount().get()).isEqualTo(beforeBatches + 2);
624624
// The in-memory messaging facility should now have reset the block number to -1.
625625
assertThat(toTest.getLatestBlockNumber()).isEqualTo(-1);
626626
}

0 commit comments

Comments
 (0)