|
29 | 29 | import java.util.Objects; |
30 | 30 | import java.util.Optional; |
31 | 31 | import java.util.concurrent.CountDownLatch; |
| 32 | +import java.util.concurrent.atomic.AtomicBoolean; |
32 | 33 | import java.util.function.Function; |
33 | 34 | import org.hiero.block.api.BlockAccessServiceInterface; |
34 | 35 | import org.hiero.block.api.BlockEnd; |
@@ -306,6 +307,8 @@ void e2eBlockStreamsBaseScenarios() throws InterruptedException { |
306 | 307 | final SubscribeStreamResponse subscribeResponse0 = |
307 | 308 | subscribeResponseObserver.getOnNextCalls().get(0); |
308 | 309 | assertThat(subscribeResponse0.blockItems().blockItems()).hasSize(blockItems.length); |
| 310 | + // check that the next call at element 3 contains expected types |
| 311 | + validateSubsciberObservedExpectedBlockItemTypes(subscribeResponse0); |
309 | 312 |
|
310 | 313 | // ==== Scenario 6: Subscribe to live block stream and confirm receipt of newly published block === |
311 | 314 | final SubscribeStreamRequest subscribeRequest2 = SubscribeStreamRequest.newBuilder() |
@@ -373,13 +376,39 @@ void e2eBlockStreamsBaseScenarios() throws InterruptedException { |
373 | 376 | .isEqualTo(blockNumber1); |
374 | 377 | }); |
375 | 378 |
|
| 379 | + // check that the next call at element 3 contains expected types |
| 380 | + validateSubsciberObservedExpectedBlockItemTypes(subscribeResponseObserver.getOnNextCalls().get(3)); |
| 381 | + |
376 | 382 | // close the client connections |
377 | 383 | blockStreamPublishServiceClient.close(); |
378 | 384 | blockStreamSubscribeServiceClient.close(); |
379 | 385 | blockAccessServiceClient.close(); |
380 | 386 | blockNodeServiceClient.close(); |
381 | 387 | } |
382 | 388 |
|
| 389 | + private void validateSubsciberObservedExpectedBlockItemTypes( |
| 390 | + final SubscribeStreamResponse subscribeResponse) { |
| 391 | + |
| 392 | + assertNotNull(subscribeResponse); |
| 393 | + assertNotNull(subscribeResponse.blockItems()); |
| 394 | + |
| 395 | + AtomicBoolean subscriberObservedHeader = new AtomicBoolean(false); |
| 396 | + AtomicBoolean subscriberObservedFooter = new AtomicBoolean(false); |
| 397 | + AtomicBoolean subscriberObservedOneProof = new AtomicBoolean(false); |
| 398 | + subscribeResponse.blockItems().blockItems().forEach(item -> { |
| 399 | + switch (item.item().kind()) { |
| 400 | + case BLOCK_HEADER -> subscriberObservedHeader.set(true); |
| 401 | + case BLOCK_PROOF -> subscriberObservedOneProof.set(true); |
| 402 | + case BLOCK_FOOTER -> subscriberObservedFooter.set(true); |
| 403 | + default -> { |
| 404 | + } |
| 405 | + } |
| 406 | + }); |
| 407 | + assertTrue(subscriberObservedHeader.get(), "Subscriber did not observe block header"); |
| 408 | + assertTrue(subscriberObservedOneProof.get(), "Subscriber did not observe block proof"); |
| 409 | + assertTrue(subscriberObservedFooter.get(), "Subscriber did not observe block footer"); |
| 410 | + } |
| 411 | + |
383 | 412 | private void endBlock(final long blockNumber, final Pipeline<? super PublishStreamRequest> requestStream) { |
384 | 413 | PublishStreamRequest request = PublishStreamRequest.newBuilder() |
385 | 414 | .endOfBlock(BlockEnd.newBuilder().blockNumber(blockNumber).build()) |
@@ -555,4 +584,189 @@ void e2eSocketClosureTest() throws InterruptedException { |
555 | 584 | assertTrue(ex.getCause() instanceof SocketException); |
556 | 585 | assertTrue(ex.getCause().getMessage().toLowerCase().contains("socket closed")); |
557 | 586 | } |
| 587 | + |
| 588 | + /* Test multiple scenarios in one area without mocks to allow for easy step through when troubleshooting behaviour |
| 589 | + * Scenarios covered: |
| 590 | + * 1. Mimicking CN and publishing a new genesis block to BN and confirming acknowledgement response |
| 591 | + * 2. Publishing a duplicate genesis block and confirming duplicate block response and stream closure |
| 592 | + * 3. Requesting server status to confirm block 0 is reflected in status |
| 593 | + * 4. Requesting genesis block via getBlock to confirm block is stored and retrievable |
| 594 | + * 5. Mimicking MN and subscribing to block stream from block 0 and confirming receipt of block 0 |
| 595 | + * 6. Mimicking MN and subscribing to live block stream and confirming receipt of newly published block |
| 596 | + */ |
| 597 | + @Test |
| 598 | + void e2eBlockStreamsBaseScenariosPre068() throws InterruptedException { |
| 599 | + // ==== Scenario 1: Publish new genesis block and confirm acknowledgement response ==== |
| 600 | + BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient blockStreamPublishServiceClient = |
| 601 | + new BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient( |
| 602 | + publishBlockStreamPbjGrpcClient, OPTIONS); |
| 603 | + |
| 604 | + ResponsePipelineUtils<PublishStreamResponse> responseObserver = new ResponsePipelineUtils<>(); |
| 605 | + final Pipeline<? super PublishStreamRequest> requestStream = |
| 606 | + blockStreamPublishServiceClient.publishBlockStream(responseObserver); |
| 607 | + |
| 608 | + final long blockNumber = 0; |
| 609 | + BlockItem[] blockItems = BlockItemBuilderUtils.createSimpleBlockWithNumber(blockNumber); |
| 610 | + // updated header to a pre 0.68 to confirm backwards compatibility |
| 611 | + blockItems[0] = BlockItemBuilderUtils.sampleBlockHeader(blockNumber, 0, 67); |
| 612 | + |
| 613 | + // change to List to allow multiple items |
| 614 | + PublishStreamRequest request = PublishStreamRequest.newBuilder() |
| 615 | + .blockItems(BlockItemSet.newBuilder().blockItems(blockItems).build()) |
| 616 | + .build(); |
| 617 | + CountDownLatch publishCountDownLatch = responseObserver.setAndGetOnNextLatch(1); |
| 618 | + requestStream.onNext(request); |
| 619 | + endBlock(blockNumber, requestStream); |
| 620 | + |
| 621 | + publishCountDownLatch.await(); // wait for acknowledgement response |
| 622 | + assertThat(responseObserver.getOnNextCalls()) |
| 623 | + .hasSize(1) |
| 624 | + .first() |
| 625 | + .returns(PublishStreamResponse.ResponseOneOfType.ACKNOWLEDGEMENT, responseKindExtractor) |
| 626 | + .returns(0L, acknowledgementBlockNumberExtractor); |
| 627 | + |
| 628 | + // Assert no other responses sent |
| 629 | + assertThat(responseObserver.getOnErrorCalls()).isEmpty(); |
| 630 | + assertThat(responseObserver.getOnSubscriptionCalls()).isEmpty(); |
| 631 | + assertThat(responseObserver.getOnCompleteCalls().get()).isEqualTo(0); |
| 632 | + assertThat(responseObserver.getClientEndStreamCalls().get()).isEqualTo(0); |
| 633 | + |
| 634 | + // ==== Scenario 2: Publish duplicate genesis block and confirm duplicate block response and stream closure === |
| 635 | + CountDownLatch publishCompleteCountDownLatch = responseObserver.setAndGetOnCompleteLatch(1); |
| 636 | + requestStream.onNext(request); |
| 637 | + endBlock(blockNumber, requestStream); |
| 638 | + |
| 639 | + publishCompleteCountDownLatch.await(); // wait for onComplete caused by duplicate response |
| 640 | + |
| 641 | + // Assert that one more response is sent. |
| 642 | + assertThat(responseObserver.getOnNextCalls()) |
| 643 | + .hasSize(2) |
| 644 | + .element(1) |
| 645 | + .returns(PublishStreamResponse.ResponseOneOfType.END_STREAM, responseKindExtractor) |
| 646 | + .returns(PublishStreamResponse.EndOfStream.Code.DUPLICATE_BLOCK, endStreamResponseCodeExtractor) |
| 647 | + .returns(0L, endStreamBlockNumberExtractor); |
| 648 | + |
| 649 | + // Assert no other responses sent |
| 650 | + assertThat(responseObserver.getOnErrorCalls()).isEmpty(); |
| 651 | + assertThat(responseObserver.getOnSubscriptionCalls()).isEmpty(); |
| 652 | + assertThat(responseObserver.getOnCompleteCalls().get()).isEqualTo(1); |
| 653 | + assertThat(responseObserver.getClientEndStreamCalls().get()).isEqualTo(0); |
| 654 | + |
| 655 | + // ==== Scenario 3: Get server status and confirm block 0 is reflected in status ==== |
| 656 | + BlockNodeServiceInterface.BlockNodeServiceClient blockNodeServiceClient = |
| 657 | + new BlockNodeServiceInterface.BlockNodeServiceClient(serverStatusPbjGrpcClient, OPTIONS); |
| 658 | + final ServerStatusResponse nodeStatusPostBlock0 = |
| 659 | + blockNodeServiceClient.serverStatus(SIMPLE_SERVER_STAUS_REQUEST); |
| 660 | + assertNotNull(nodeStatusPostBlock0); |
| 661 | + assertThat(nodeStatusPostBlock0.firstAvailableBlock()).isEqualTo(0); |
| 662 | + assertThat(nodeStatusPostBlock0.lastAvailableBlock()).isEqualTo(0); |
| 663 | + |
| 664 | + // ==== Scenario 4: Get block 0 via getBlock and confirm block items ==== |
| 665 | + BlockAccessServiceInterface.BlockAccessServiceClient blockAccessServiceClient = |
| 666 | + new BlockAccessServiceInterface.BlockAccessServiceClient(getBlockPbjGrpcClient, OPTIONS); |
| 667 | + final BlockResponse block0Response = blockAccessServiceClient.getBlock( |
| 668 | + BlockRequest.newBuilder().blockNumber(blockNumber).build()); |
| 669 | + assertNotNull(block0Response); |
| 670 | + assertTrue(block0Response.hasBlock()); |
| 671 | + assertThat(block0Response.status()).isEqualTo(BlockResponse.Code.SUCCESS); |
| 672 | + assertNotNull(block0Response.block().items()); |
| 673 | + assertThat(block0Response.block().items()).hasSize(blockItems.length); |
| 674 | + |
| 675 | + // ==== Scenario 5: Subscribe to block stream from block 0 and confirm receipt of block 0 === |
| 676 | + BlockStreamSubscribeServiceInterface.BlockStreamSubscribeServiceClient blockStreamSubscribeServiceClient = |
| 677 | + new BlockStreamSubscribeServiceInterface.BlockStreamSubscribeServiceClient( |
| 678 | + subscribeBlockStreamPbjGrpcClient, OPTIONS); |
| 679 | + ResponsePipelineUtils<SubscribeStreamResponse> subscribeResponseObserver = new ResponsePipelineUtils<>(); |
| 680 | + |
| 681 | + final SubscribeStreamRequest subscribeRequest1 = SubscribeStreamRequest.newBuilder() |
| 682 | + .startBlockNumber(blockNumber) |
| 683 | + .build(); |
| 684 | + |
| 685 | + final CountDownLatch blockItemsSubscribe1Latch = subscribeResponseObserver.setAndGetOnNextLatch(2); |
| 686 | + blockStreamSubscribeServiceClient.subscribeBlockStream(subscribeRequest1, subscribeResponseObserver); |
| 687 | + |
| 688 | + blockItemsSubscribe1Latch.await(); |
| 689 | + // block items, end block, and success status |
| 690 | + assertThat(subscribeResponseObserver.getOnNextCalls()).hasSize(3); |
| 691 | + assertThat(subscribeResponseObserver.getOnCompleteCalls().get()).isEqualTo(1); |
| 692 | + |
| 693 | + final SubscribeStreamResponse subscribeResponse0 = |
| 694 | + subscribeResponseObserver.getOnNextCalls().get(0); |
| 695 | + assertThat(subscribeResponse0.blockItems().blockItems()).hasSize(blockItems.length); |
| 696 | + |
| 697 | + // ==== Scenario 6: Subscribe to live block stream and confirm receipt of newly published block === |
| 698 | + final SubscribeStreamRequest subscribeRequest2 = SubscribeStreamRequest.newBuilder() |
| 699 | + .startBlockNumber(1L) |
| 700 | + .endBlockNumber(-1L) // subscribe to all future blocks |
| 701 | + .build(); |
| 702 | + final CountDownLatch blockItemsSubscribe2Latch = subscribeResponseObserver.setAndGetOnNextLatch(1); |
| 703 | + // run blockStreamSubscribeServiceClient.subscribeBlockStrea in its own thread to avoid blocking |
| 704 | + new Thread(() -> { |
| 705 | + try { |
| 706 | + blockStreamSubscribeServiceClient.subscribeBlockStream( |
| 707 | + subscribeRequest2, subscribeResponseObserver); |
| 708 | + } catch (Exception e) { |
| 709 | + fail("Exception in subscribeBlockStream: " + e.getMessage()); |
| 710 | + } |
| 711 | + }) |
| 712 | + .start(); |
| 713 | + |
| 714 | + // publish block 1 and confirm subscriber receives it |
| 715 | + final long blockNumber1 = 1; |
| 716 | + BlockItem[] blockItems1 = BlockItemBuilderUtils.createSimpleBlockWithNumber(blockNumber1); |
| 717 | + // updated header to a pre 0.68 to confirm backwards compatibility |
| 718 | + blockItems1[0] = BlockItemBuilderUtils.sampleBlockHeader(blockNumber1, 0, 67); |
| 719 | + |
| 720 | + PublishStreamRequest request2 = PublishStreamRequest.newBuilder() |
| 721 | + .blockItems(BlockItemSet.newBuilder().blockItems(blockItems1).build()) |
| 722 | + .build(); |
| 723 | + |
| 724 | + // use a new client to publish block 1 as the existing client was closed on duplicate block publish. |
| 725 | + ResponsePipelineUtils<PublishStreamResponse> responseObserver2 = new ResponsePipelineUtils<>(); |
| 726 | + BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient blockStreamPublishServiceClient2 = |
| 727 | + new BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient(createGrpcClient(), OPTIONS); |
| 728 | + final Pipeline<? super PublishStreamRequest> requestStream2 = |
| 729 | + blockStreamPublishServiceClient2.publishBlockStream(responseObserver2); |
| 730 | + final CountDownLatch blockItemsPublish2Latch = responseObserver2.setAndGetOnNextLatch(1); |
| 731 | + requestStream2.onNext(request2); |
| 732 | + endBlock(blockNumber1, requestStream2); |
| 733 | + |
| 734 | + blockItemsSubscribe2Latch.await(); // wait for subscriber to receive unverified block items |
| 735 | + blockItemsPublish2Latch.await(); // wait for publisher to observe block item sets |
| 736 | + |
| 737 | + assertThat(responseObserver2.getOnNextCalls()) |
| 738 | + .hasSize(1) |
| 739 | + .first() |
| 740 | + .returns(PublishStreamResponse.ResponseOneOfType.ACKNOWLEDGEMENT, responseKindExtractor) |
| 741 | + .returns(1L, acknowledgementBlockNumberExtractor); |
| 742 | + |
| 743 | + // Assert no other responses sent |
| 744 | + assertThat(responseObserver2.getOnErrorCalls()).isEmpty(); |
| 745 | + assertThat(responseObserver2.getOnSubscriptionCalls()).isEmpty(); |
| 746 | + assertThat(responseObserver2.getOnCompleteCalls().get()).isEqualTo(0); |
| 747 | + assertThat(responseObserver2.getClientEndStreamCalls().get()).isEqualTo(0); |
| 748 | + |
| 749 | + final SubscribeStreamResponse subscribeResponse1 = |
| 750 | + subscribeResponseObserver.getOnNextCalls().get(3); |
| 751 | + assertThat(subscribeResponse1.blockItems().blockItems()).hasSize(blockItems1.length); |
| 752 | + |
| 753 | + assertThat(subscribeResponseObserver.getOnNextCalls()) |
| 754 | + .hasSize(5) // block 0 items, end block 0, success status, block 1 items, and end block 1 |
| 755 | + .element(3) |
| 756 | + .satisfies(response -> { |
| 757 | + assertThat(response.blockItems().blockItems()).hasSize(blockItems1.length); |
| 758 | + assertThat(response.blockItems() |
| 759 | + .blockItems() |
| 760 | + .getFirst() |
| 761 | + .blockHeader() |
| 762 | + .number()) |
| 763 | + .isEqualTo(blockNumber1); |
| 764 | + }); |
| 765 | + |
| 766 | + // close the client connections |
| 767 | + blockStreamPublishServiceClient.close(); |
| 768 | + blockStreamSubscribeServiceClient.close(); |
| 769 | + blockAccessServiceClient.close(); |
| 770 | + blockNodeServiceClient.close(); |
| 771 | + } |
558 | 772 | } |
0 commit comments