Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
package org.hiero.block.node.app.fixtures.server;

import com.hedera.pbj.grpc.helidon.PbjRouting;
import com.hedera.pbj.grpc.helidon.PbjRouting.Builder;
import com.hedera.pbj.grpc.helidon.config.PbjConfig;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.webserver.ConnectionConfig;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.WebServerConfig;
import org.hiero.block.api.BlockEnd;
import org.hiero.block.api.BlockItemSet;
import org.hiero.block.api.BlockNodeServiceInterface;
import org.hiero.block.api.BlockStreamSubscribeServiceInterface;
import org.hiero.block.api.ServerStatusDetailResponse;
import org.hiero.block.api.ServerStatusRequest;
import org.hiero.block.api.ServerStatusResponse;
import org.hiero.block.api.SubscribeStreamResponse;
import org.hiero.block.api.SubscribeStreamResponse.Code;
import org.hiero.block.node.spi.historicalblocks.HistoricalBlockFacility;

public class TestBlockNodeServer {
Expand All @@ -23,13 +28,8 @@ public TestBlockNodeServer(int port, HistoricalBlockFacility historicalBlockFaci
PbjConfig.builder().name("pbj").maxMessageSizeBytes(4_194_304).build();

// Create the service builder
final PbjRouting.Builder pbjRoutingBuilder = PbjRouting.builder()
.service((BlockNodeServiceInterface) request -> ServerStatusResponse.newBuilder()
.firstAvailableBlock(
historicalBlockFacility.availableBlocks().min())
.lastAvailableBlock(
historicalBlockFacility.availableBlocks().max())
.build())
final Builder pbjRoutingBuilder = PbjRouting.builder()
.service(new TrivialBlockNodeServerInterface(historicalBlockFacility))
.service((BlockStreamSubscribeServiceInterface) (request, replies) -> {
if (historicalBlockFacility
.availableBlocks()
Expand All @@ -51,24 +51,22 @@ public TestBlockNodeServer(int port, HistoricalBlockFacility historicalBlockFaci
}

replies.onNext(SubscribeStreamResponse.newBuilder()
.status(SubscribeStreamResponse.Code.SUCCESS)
.status(Code.SUCCESS)
.build());

replies.onComplete();
}
});

// start the web server with the PBJ configuration and routing
webServer = WebServerConfig.builder()
.port(port)
.addProtocol(pbjConfig)
.addRouting(pbjRoutingBuilder)
.connectionConfig(ConnectionConfig.builder()
.sendBufferSize(32768)
.receiveBufferSize(32768)
.sendBufferSize(524288)
.receiveBufferSize(524288)
.build())
.build();

webServer.start();
}

Expand All @@ -80,4 +78,30 @@ public void stop() {
webServer.stop();
}
}

private class TrivialBlockNodeServerInterface implements BlockNodeServiceInterface {
private final HistoricalBlockFacility historicalBlockFacility;

public TrivialBlockNodeServerInterface(final HistoricalBlockFacility historicalFacility) {
historicalBlockFacility = historicalFacility;
}

@Override
@NonNull
public ServerStatusResponse serverStatus(@NonNull final ServerStatusRequest request) {
return ServerStatusResponse.newBuilder()
.firstAvailableBlock(
historicalBlockFacility.availableBlocks().min())
.lastAvailableBlock(
historicalBlockFacility.availableBlocks().max())
.onlyLatestState(false)
.build();
}

@Override
@NonNull
public ServerStatusDetailResponse serverStatusDetail(@NonNull final ServerStatusRequest request) {
return ServerStatusDetailResponse.DEFAULT;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.swirlds.metrics.api.Counter;
import edu.umd.cs.findbugs.annotations.NonNull;
import org.hiero.block.api.BlockNodeServiceInterface;
import org.hiero.block.api.ServerStatusDetailResponse;
import org.hiero.block.api.ServerStatusRequest;
import org.hiero.block.api.ServerStatusResponse;
import org.hiero.block.node.spi.BlockNodeContext;
Expand Down Expand Up @@ -62,6 +63,12 @@ public ServerStatusResponse serverStatus(@NonNull final ServerStatusRequest requ
return response;
}

@Override
@NonNull
public ServerStatusDetailResponse serverStatusDetail(@NonNull final ServerStatusRequest request) {
return ServerStatusDetailResponse.DEFAULT;
}

// ==== BlockNodePlugin Methods ====================================================================================
@Override
public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

import com.hedera.hapi.block.stream.BlockItem;
import com.hedera.pbj.runtime.ParseException;
import com.hedera.pbj.runtime.grpc.ServiceInterface;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.hiero.block.api.ServerStatusRequest;
import org.hiero.block.api.ServerStatusResponse;
Expand Down Expand Up @@ -45,20 +43,6 @@ void setup() {
enableDebugLogging();
}

/**
* Verifies that the service interface correctly registers and exposes
* the server status method.
*/
@Test
@DisplayName("Should return correct method for ServerStatusServicePlugin")
void shouldReturnCorrectMethod() {
assertNotNull(serviceInterface);
List<ServiceInterface.Method> methods = serviceInterface.methods();
assertNotNull(methods);
assertEquals(1, methods.size());
assertEquals(plugin.methods().getFirst(), methods.getFirst());
}

/**
* Tests that the server status response is valid when no blocks are available.
* Verifies the first and last available block numbers and other response properties.
Expand All @@ -78,9 +62,6 @@ void shouldReturnValidServerStatus() throws ParseException {
assertEquals(UNKNOWN_BLOCK_NUMBER, response.firstAvailableBlock());
assertEquals(UNKNOWN_BLOCK_NUMBER, response.lastAvailableBlock());
assertFalse(response.onlyLatestState());

// TODO(#579) Remove when block node version information is implemented.
assertFalse(response.hasVersionInformation());
}

/**
Expand All @@ -104,9 +85,6 @@ void shouldReturnValidServerStatusForNewBlockBatch() throws ParseException {
assertEquals(0, response.firstAvailableBlock());
assertEquals(blocks - 1, response.lastAvailableBlock());
assertFalse(response.onlyLatestState());

// TODO() Remove when block node version information is implemented.
assertFalse(response.hasVersionInformation());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ public BlockAction getActionForBlock(
return switch (previousAction) {
case null -> getActionForHeader(blockNumber, handlerId);
case ACCEPT -> getActionForCurrentlyStreaming(blockNumber);
case END_ERROR, END_DUPLICATE, END_BEHIND ->
case END_ERROR, END_DUPLICATE ->
// This should not happen because the Handler should have shut down.
BlockAction.END_ERROR;
case SKIP, RESEND ->
case SKIP, RESEND, SEND_BEHIND ->
// This should not happen because the Handler should have reset the previous action.
BlockAction.END_ERROR;
};
Expand Down Expand Up @@ -224,7 +224,7 @@ private BlockAction getActionForHeader(final long blockNumber, final long handle
} else if (blockNumber == nextUnstreamedBlockNumber.get()) {
return addHandlerQueueForBlock(blockNumber, handlerId);
} else if (blockNumber > nextUnstreamedBlockNumber.get()) {
return BlockAction.END_BEHIND;
return BlockAction.SEND_BEHIND;
} else {
// This should not be possible, all cases that could reach here are
// already handled above.
Expand Down Expand Up @@ -283,7 +283,8 @@ private BlockAction addHandlerQueueForBlock(final long blockNumber, final long h
}
}
// Return the correct action if another handler jumped in front of the caller.
return blockNumber < nextUnstreamedBlockNumber.get() ? BlockAction.SKIP : BlockAction.END_BEHIND;
// Note, neither of these ends the stream; both ask the publisher to send a different block.
return blockNumber < nextUnstreamedBlockNumber.get() ? BlockAction.SKIP : BlockAction.SEND_BEHIND;
}

/*
Expand Down Expand Up @@ -362,7 +363,7 @@ private BlockAction getActionForCurrentlyStreaming(final long blockNumber) {
// the block node is behind. The most likely cause here is a block
// that failed to verify, or got stuck and did not finish, and was
// parallel streaming a block earlier than the calling handler.
return BlockAction.END_BEHIND;
return BlockAction.SEND_BEHIND;
} else {
// This should not be possible, all cases that could reach here are
// already handled above.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.hiero.block.api.BlockEnd;
import org.hiero.block.api.PublishStreamRequest.EndStream;
import org.hiero.block.api.PublishStreamResponse;
import org.hiero.block.api.PublishStreamResponse.BehindPublisher;
import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement;
import org.hiero.block.api.PublishStreamResponse.EndOfStream;
import org.hiero.block.api.PublishStreamResponse.EndOfStream.Code;
Expand Down Expand Up @@ -288,7 +289,7 @@ private void handleBlockItemsRequest(
case ACCEPT -> handleAccept(itemSetUnparsed, blockItems);
case SKIP -> handleSkip(blockNumber);
case RESEND -> handleResend();
case END_BEHIND -> handleEndBehind();
case SEND_BEHIND -> handleSendBehind();
case END_DUPLICATE -> handleEndDuplicate();
case END_ERROR -> handleEndError();
};
Expand Down Expand Up @@ -610,12 +611,22 @@ private BatchHandleResult handleResend() {
/**
* Handle the END_BEHIND action for a block.
*/
private BatchHandleResult handleEndBehind() {
LOGGER.log(DEBUG, "Handler {0} is sending BEHIND({1}).", handlerId, publisherManager.getLatestBlockNumber());
// If the action is END_BEHIND, we need to send an end of stream
private BatchHandleResult handleSendBehind() {
LOGGER.log(DEBUG, "Handler {0} is sending Behind({1}).", handlerId, publisherManager.getLatestBlockNumber());
// If the action is SEND_BEHIND, we need to send an end of stream
// response to the publisher and not propagate the items.
sendEndOfStream(Code.BEHIND);
return new BatchHandleResult(true, true);
final BehindPublisher behindMessage = BehindPublisher.newBuilder()
.blockNumber(publisherManager.getLatestBlockNumber())
.build();
final PublishStreamResponse response = PublishStreamResponse.newBuilder()
.nodeBehindPublisher(behindMessage)
.build();
if (sendResponse(response)) {
metrics.nodeBehindSent.increment(); // @todo(1415) add label
return new BatchHandleResult(false, true);
} else {
return new BatchHandleResult(true, true);
}
}

/**
Expand Down Expand Up @@ -755,6 +766,7 @@ public record MetricsHolder(
Counter blockSkipsSent,
Counter blockResendsSent,
Counter endOfStreamsSent,
Counter nodeBehindSent,
Counter sendResponseFailed,
Counter endStreamsReceived,
Counter receiveBlockTimeLatencyNs) {
Expand Down Expand Up @@ -783,6 +795,9 @@ static MetricsHolder createMetrics(@NonNull final Metrics metrics) {
final Counter blockResendsSent =
metrics.getOrCreate(new Counter.Config(METRICS_CATEGORY, "publisher_blocks_resend_sent")
.withDescription("Block Resend messages sent"));
final Counter nodeBehindSent =
metrics.getOrCreate(new Counter.Config(METRICS_CATEGORY, "publisher_block_node_behind_sent")
.withDescription("Node Behind Publisher messages sent"));
final Counter endOfStreamsSent =
metrics.getOrCreate(new Counter.Config(METRICS_CATEGORY, "publisher_block_endofstream_sent")
.withDescription("Block End-of-Stream messages sent"));
Expand All @@ -805,6 +820,7 @@ static MetricsHolder createMetrics(@NonNull final Metrics metrics) {
blockSkipsSent,
blockResendsSent,
endOfStreamsSent,
nodeBehindSent,
sendResponseFailed,
endStreamsReceived,
receiveBlockTimeLatencyNs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ enum BlockAction {
/**
* todo(1420) add documentation
*/
END_BEHIND,
SEND_BEHIND,
/**
* todo(1420) add documentation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ void testGetActionNullPreviousActionSKIP() {
/**
* This test aims to assert that the
* {@link LiveStreamPublisherManager#getActionForBlock(long, BlockAction, long)}
* method returns {@link BlockAction#END_BEHIND} when the provided block
* method returns {@link BlockAction#SEND_BEHIND} when the provided block
* number is higher than the next expected one and previous action is {@code null}.
*/
@Test
Expand All @@ -279,7 +279,7 @@ void testGetActionNullPreviousActionBEHIND() {
// Call with higher than next expected block number.
final BlockAction actual = toTest.getActionForBlock(1L, null, publisherHandlerId);
// Assert
assertThat(actual).isEqualTo(BlockAction.END_BEHIND);
assertThat(actual).isEqualTo(BlockAction.SEND_BEHIND);
}

/**
Expand Down Expand Up @@ -381,7 +381,7 @@ void testGetActionACCEPTPreviousActionERROR() {
/**
* This test aims to assert that the
* {@link LiveStreamPublisherManager#getActionForBlock(long, BlockAction, long)}
* method returns {@link BlockAction#END_BEHIND} when the provided
* method returns {@link BlockAction#SEND_BEHIND} when the provided
* block number higher than the next expected block number and
* previous action is {@link BlockAction#ACCEPT}.
*/
Expand All @@ -393,7 +393,7 @@ void testGetActionACCEPTPreviousActionBEHIND() {
// Call with higher than next expected block number and previous action ACCEPT.
final BlockAction actual = toTest.getActionForBlock(1L, BlockAction.ACCEPT, publisherHandlerId);
// Assert
assertThat(actual).isEqualTo(BlockAction.END_BEHIND);
assertThat(actual).isEqualTo(BlockAction.SEND_BEHIND);
}

/**
Expand Down Expand Up @@ -620,7 +620,7 @@ void testBatchesIncrementOnlyAfterForwarderCompletes() throws InterruptedExcepti

// After forwarder completion, batches should have increased and facility should contain messages.
assertThat(managerMetrics.blocksClosedComplete().get()).isEqualTo(beforeBatches + 2);
assertThat(managerMetrics.currentPublisherCount().get()).isEqualTo(beforeBatches + 1);
assertThat(managerMetrics.currentPublisherCount().get()).isEqualTo(beforeBatches + 2);
// The in-memory messaging facility should now have reset the block number to -1.
assertThat(toTest.getLatestBlockNumber()).isEqualTo(-1);
}
Expand Down
Loading
Loading