Skip to content

Commit e833530

Browse files
authored
Merge pull request #6994 from onflow/petera/fix-flakiness-execsync-integration
[Access] Fix flakiness in ExecutionStateSync integration test
2 parents c7514cb + cd9dbf4 commit e833530

File tree

8 files changed

+97
-105
lines changed

8 files changed

+97
-105
lines changed

engine/access/state_stream/backend/backend_executiondata.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (b *ExecutionDataBackend) GetExecutionDataByBlockID(ctx context.Context, bl
4343

4444
if err != nil {
4545
// need custom not found handler due to blob not found error
46-
if errors.Is(err, storage.ErrNotFound) || execution_data.IsBlobNotFoundError(err) {
46+
if errors.Is(err, storage.ErrNotFound) || execution_data.IsBlobNotFoundError(err) || errors.Is(err, subscription.ErrBlockNotReady) {
4747
return nil, status.Errorf(codes.NotFound, "could not find execution data: %v", err)
4848
}
4949

integration/localnet/builder/bootstrap.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ func prepareAccessService(container testnet.ContainerConfig, i int, n int) Servi
421421
fmt.Sprintf("--secure-rpc-addr=%s:%s", container.ContainerName, testnet.GRPCSecurePort),
422422
fmt.Sprintf("--http-addr=%s:%s", container.ContainerName, testnet.GRPCWebPort),
423423
fmt.Sprintf("--rest-addr=%s:%s", container.ContainerName, testnet.RESTPort),
424-
fmt.Sprintf("--state-stream-addr=%s:%s", container.ContainerName, testnet.ExecutionStatePort),
424+
fmt.Sprintf("--state-stream-addr=%s:%s", container.ContainerName, testnet.GRPCPort),
425425
fmt.Sprintf("--collection-ingress-port=%s", testnet.GRPCPort),
426426
"--supports-observer=true",
427427
fmt.Sprintf("--public-network-address=%s:%s", container.ContainerName, testnet.PublicNetworkPort),
@@ -443,7 +443,6 @@ func prepareAccessService(container testnet.ContainerConfig, i int, n int) Servi
443443
testnet.GRPCSecurePort,
444444
testnet.GRPCWebPort,
445445
testnet.RESTPort,
446-
testnet.ExecutionStatePort,
447446
testnet.PublicNetworkPort,
448447
)
449448

@@ -466,7 +465,7 @@ func prepareObserverService(i int, observerName string, agPublicKey string) Serv
466465
fmt.Sprintf("--secure-rpc-addr=%s:%s", observerName, testnet.GRPCSecurePort),
467466
fmt.Sprintf("--http-addr=%s:%s", observerName, testnet.GRPCWebPort),
468467
fmt.Sprintf("--rest-addr=%s:%s", observerName, testnet.RESTPort),
469-
fmt.Sprintf("--state-stream-addr=%s:%s", observerName, testnet.ExecutionStatePort),
468+
fmt.Sprintf("--state-stream-addr=%s:%s", observerName, testnet.GRPCPort),
470469
"--execution-data-dir=/data/execution-data",
471470
"--execution-data-sync-enabled=true",
472471
"--execution-data-indexing-enabled=true",
@@ -479,7 +478,6 @@ func prepareObserverService(i int, observerName string, agPublicKey string) Serv
479478
testnet.GRPCSecurePort,
480479
testnet.GRPCWebPort,
481480
testnet.RESTPort,
482-
testnet.ExecutionStatePort,
483481
)
484482

485483
// observer services rely on the access gateway

integration/localnet/builder/ports.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ func (a *PortAllocator) Print() {
144144
testnet.GRPCSecurePort,
145145
testnet.GRPCWebPort,
146146
testnet.RESTPort,
147-
testnet.ExecutionStatePort,
148147
testnet.PublicNetworkPort,
149148
} {
150149
if hostPort, ok := a.exposedPorts[node][containerPort]; ok {
@@ -165,8 +164,6 @@ func portName(containerPort string) string {
165164
return "GRPC-Web"
166165
case testnet.RESTPort:
167166
return "REST"
168-
case testnet.ExecutionStatePort:
169-
return "Execution Data"
170167
case testnet.AdminPort:
171168
return "Admin"
172169
case testnet.PublicNetworkPort:

integration/testnet/network.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ const (
8989
DefaultProfilerDir = "/data/profiler"
9090

9191
// GRPCPort is the GRPC API port.
92+
// Use this same port for the ExecutionDataAPI
9293
GRPCPort = "9000"
9394
// GRPCSecurePort is the secure GRPC API port.
9495
GRPCSecurePort = "9001"
@@ -100,8 +101,6 @@ const (
100101
MetricsPort = "8080"
101102
// AdminPort is the admin server port
102103
AdminPort = "9002"
103-
// ExecutionStatePort is the execution state server port
104-
ExecutionStatePort = "9003"
105104
// PublicNetworkPort is the access node network port accessible from outside any docker container
106105
PublicNetworkPort = "9876"
107106
// DebuggerPort is the go debugger port
@@ -797,6 +796,7 @@ func (net *FlowNetwork) AddObserver(t *testing.T, conf ObserverConfig) *Containe
797796

798797
nodeContainer.exposePort(GRPCPort, testingdock.RandomPort(t))
799798
nodeContainer.AddFlag("rpc-addr", nodeContainer.ContainerAddr(GRPCPort))
799+
nodeContainer.AddFlag("state-stream-addr", nodeContainer.ContainerAddr(GRPCPort))
800800

801801
nodeContainer.exposePort(GRPCSecurePort, testingdock.RandomPort(t))
802802
nodeContainer.AddFlag("secure-rpc-addr", nodeContainer.ContainerAddr(GRPCSecurePort))
@@ -810,9 +810,6 @@ func (net *FlowNetwork) AddObserver(t *testing.T, conf ObserverConfig) *Containe
810810
nodeContainer.exposePort(RESTPort, testingdock.RandomPort(t))
811811
nodeContainer.AddFlag("rest-addr", nodeContainer.ContainerAddr(RESTPort))
812812

813-
nodeContainer.exposePort(ExecutionStatePort, testingdock.RandomPort(t))
814-
nodeContainer.AddFlag("state-stream-addr", nodeContainer.ContainerAddr(ExecutionStatePort))
815-
816813
nodeContainer.opts.HealthCheck = testingdock.HealthCheckCustom(nodeContainer.HealthcheckCallback())
817814

818815
suiteContainer := net.suite.Container(containerOpts)
@@ -910,6 +907,7 @@ func (net *FlowNetwork) AddNode(t *testing.T, bootstrapDir string, nodeConf Cont
910907
case flow.RoleAccess:
911908
nodeContainer.exposePort(GRPCPort, testingdock.RandomPort(t))
912909
nodeContainer.AddFlag("rpc-addr", nodeContainer.ContainerAddr(GRPCPort))
910+
nodeContainer.AddFlag("state-stream-addr", nodeContainer.ContainerAddr(GRPCPort))
913911

914912
nodeContainer.exposePort(GRPCSecurePort, testingdock.RandomPort(t))
915913
nodeContainer.AddFlag("secure-rpc-addr", nodeContainer.ContainerAddr(GRPCSecurePort))
@@ -920,9 +918,6 @@ func (net *FlowNetwork) AddNode(t *testing.T, bootstrapDir string, nodeConf Cont
920918
nodeContainer.exposePort(RESTPort, testingdock.RandomPort(t))
921919
nodeContainer.AddFlag("rest-addr", nodeContainer.ContainerAddr(RESTPort))
922920

923-
nodeContainer.exposePort(ExecutionStatePort, testingdock.RandomPort(t))
924-
nodeContainer.AddFlag("state-stream-addr", nodeContainer.ContainerAddr(ExecutionStatePort))
925-
926921
// uncomment line below to point the access node exclusively to a single collection node
927922
// nodeContainer.AddFlag("static-collection-ingress-addr", "collection_1:9000")
928923
nodeContainer.AddFlag("collection-ingress-port", GRPCPort)

integration/tests/access/cohort3/execution_state_sync_test.go

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,29 @@ import (
55
"fmt"
66
"path/filepath"
77
"testing"
8+
"time"
89

910
"github.com/ipfs/go-datastore"
1011
badgerds "github.com/ipfs/go-ds-badger2"
1112
pebbleds "github.com/ipfs/go-ds-pebble"
13+
sdk "github.com/onflow/flow-go-sdk"
14+
sdkclient "github.com/onflow/flow-go-sdk/access/grpc"
15+
"github.com/onflow/flow/protobuf/go/flow/entities"
16+
"github.com/onflow/flow/protobuf/go/flow/executiondata"
1217
"github.com/rs/zerolog"
1318
"github.com/stretchr/testify/assert"
1419
"github.com/stretchr/testify/require"
1520
"github.com/stretchr/testify/suite"
21+
"google.golang.org/grpc/codes"
22+
"google.golang.org/grpc/status"
1623

24+
"github.com/onflow/flow-go/engine/common/rpc/convert"
1725
"github.com/onflow/flow-go/engine/ghost/client"
1826
"github.com/onflow/flow-go/integration/testnet"
1927
"github.com/onflow/flow-go/integration/tests/lib"
2028
"github.com/onflow/flow-go/model/flow"
2129
"github.com/onflow/flow-go/module/blobs"
2230
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
23-
"github.com/onflow/flow-go/module/metrics"
24-
storage "github.com/onflow/flow-go/storage/badger"
2531
"github.com/onflow/flow-go/utils/unittest"
2632
)
2733

@@ -158,62 +164,75 @@ func (s *ExecutionStateSyncSuite) executionStateSyncTest() {
158164
blockA := s.BlockState.WaitForHighestFinalizedProgress(s.T(), currentFinalized)
159165
s.T().Logf("got block height %v ID %v", blockA.Header.Height, blockA.Header.ID())
160166

161-
// wait for the requested number of sealed blocks, then pause the network so we can inspect the dbs
162-
s.BlockState.WaitForSealedHeight(s.T(), blockA.Header.Height+runBlocks)
163-
s.net.StopContainers()
164-
165-
metrics := metrics.NewNoopCollector()
166-
167-
// start an execution data service using the Access Node's execution data db
168-
an := s.net.ContainerByID(s.bridgeID)
169-
anEds := s.nodeExecutionDataStore(an)
170-
171-
// setup storage objects needed to get the execution data id
172-
anDB, err := an.DB()
173-
require.NoError(s.T(), err, "could not open db")
174-
175-
anHeaders := storage.NewHeaders(metrics, anDB)
176-
anResults := storage.NewExecutionResults(metrics, anDB)
167+
// Loop through checkBlocks and verify the execution data was downloaded correctly
168+
an := s.net.ContainerByName(testnet.PrimaryAN)
169+
anClient, err := an.SDKClient()
170+
require.NoError(s.T(), err, "could not get access node testnet client")
177171

178-
// start an execution data service using the Observer Node's execution data db
179172
on := s.net.ContainerByName(s.observerName)
180-
onEds := s.nodeExecutionDataStore(on)
181-
182-
// setup storage objects needed to get the execution data id
183-
onDB, err := on.DB()
184-
require.NoError(s.T(), err, "could not open db")
173+
onClient, err := on.SDKClient()
174+
require.NoError(s.T(), err, "could not get observer testnet client")
185175

186-
onHeaders := storage.NewHeaders(metrics, onDB)
187-
onResults := storage.NewExecutionResults(metrics, onDB)
176+
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Minute)
177+
defer cancel()
188178

189-
// Loop through checkBlocks and verify the execution data was downloaded correctly
190179
for i := blockA.Header.Height; i <= blockA.Header.Height+checkBlocks; i++ {
191-
// access node
192-
header, err := anHeaders.ByHeight(i)
193-
require.NoError(s.T(), err, "%s: could not get header", testnet.PrimaryAN)
180+
anBED, err := s.executionDataForHeight(ctx, anClient, i)
181+
require.NoError(s.T(), err, "could not get execution data from AN for height %v", i)
182+
183+
onBED, err := s.executionDataForHeight(ctx, onClient, i)
184+
require.NoError(s.T(), err, "could not get execution data from ON for height %v", i)
194185

195-
result, err := anResults.ByBlockID(header.ID())
196-
require.NoError(s.T(), err, "%s: could not get sealed result", testnet.PrimaryAN)
186+
assert.Equal(s.T(), anBED.BlockID, onBED.BlockID)
187+
}
188+
}
197189

198-
ed, err := anEds.Get(s.ctx, result.ExecutionDataID)
199-
if assert.NoError(s.T(), err, "%s: could not get execution data for height %v", testnet.PrimaryAN, i) {
200-
s.T().Logf("%s: got execution data for height %d", testnet.PrimaryAN, i)
201-
assert.Equal(s.T(), header.ID(), ed.BlockID)
190+
// executionDataForHeight returns the execution data for the given height from the given node
191+
// It retries the request until the data is available or the context is canceled
192+
func (s *ExecutionStateSyncSuite) executionDataForHeight(ctx context.Context, nodeClient *sdkclient.Client, height uint64) (*execution_data.BlockExecutionData, error) {
193+
execDataClient := nodeClient.ExecutionDataRPCClient()
194+
195+
var header *sdk.BlockHeader
196+
s.Require().NoError(retryNotFound(ctx, 200*time.Millisecond, func() error {
197+
var err error
198+
header, err = nodeClient.GetBlockHeaderByHeight(s.ctx, height)
199+
return err
200+
}), "could not get block header for block %d", height)
201+
202+
var blockED *execution_data.BlockExecutionData
203+
s.Require().NoError(retryNotFound(ctx, 200*time.Millisecond, func() error {
204+
ed, err := execDataClient.GetExecutionDataByBlockID(s.ctx, &executiondata.GetExecutionDataByBlockIDRequest{
205+
BlockId: header.ID[:],
206+
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
207+
})
208+
if err != nil {
209+
return err
202210
}
203211

204-
// observer node
205-
header, err = onHeaders.ByHeight(i)
206-
require.NoError(s.T(), err, "%s: could not get header", testnet.PrimaryON)
212+
blockED, err = convert.MessageToBlockExecutionData(ed.GetBlockExecutionData(), flow.Localnet.Chain())
213+
s.Require().NoError(err, "could not convert execution data")
214+
215+
return err
216+
}), "could not get execution data for block %d", height)
207217

208-
result, err = onResults.ByID(result.ID())
209-
require.NoError(s.T(), err, "%s: could not get sealed result from ON`s storage", testnet.PrimaryON)
218+
return blockED, nil
219+
}
210220

211-
ed, err = onEds.Get(s.ctx, result.ExecutionDataID)
212-
if assert.NoError(s.T(), err, "%s: could not get execution data for height %v", testnet.PrimaryON, i) {
213-
s.T().Logf("%s: got execution data for height %d", testnet.PrimaryON, i)
214-
assert.Equal(s.T(), header.ID(), ed.BlockID)
221+
// retryNotFound retries the given function until it returns an error that is not NotFound or the context is canceled
222+
func retryNotFound(ctx context.Context, delay time.Duration, f func() error) error {
223+
for ctx.Err() == nil {
224+
err := f()
225+
if status.Code(err) == codes.NotFound {
226+
select {
227+
case <-ctx.Done():
228+
return ctx.Err()
229+
case <-time.After(delay):
230+
}
231+
continue
215232
}
233+
return err
216234
}
235+
return ctx.Err()
217236
}
218237

219238
func (s *ExecutionStateSyncSuite) nodeExecutionDataStore(node *testnet.Container) execution_data.ExecutionDataStore {

integration/tests/access/cohort4/execution_data_pruning_test.go

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,10 @@ import (
88
"time"
99

1010
badgerds "github.com/ipfs/go-ds-badger2"
11+
sdk "github.com/onflow/flow-go-sdk"
1112
"github.com/rs/zerolog"
1213
"github.com/stretchr/testify/require"
1314
"github.com/stretchr/testify/suite"
14-
"google.golang.org/grpc"
15-
"google.golang.org/grpc/credentials/insecure"
16-
17-
sdk "github.com/onflow/flow-go-sdk"
1815

1916
"github.com/onflow/flow-go/integration/testnet"
2017
"github.com/onflow/flow-go/model/flow"
@@ -137,17 +134,6 @@ func (s *ExecutionDataPruningSuite) SetupTest() {
137134
s.net.Start(s.ctx)
138135
}
139136

140-
// getGRPCClient is the helper func to create an access api client
141-
func (s *ExecutionDataPruningSuite) getGRPCClient(address string) (accessproto.AccessAPIClient, error) {
142-
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
143-
if err != nil {
144-
return nil, err
145-
}
146-
147-
client := accessproto.NewAccessAPIClient(conn)
148-
return client, nil
149-
}
150-
151137
// TestHappyPath tests the execution data pruning process in a happy path scenario.
152138
// The test follows these steps:
153139
//
@@ -197,17 +183,17 @@ func (s *ExecutionDataPruningSuite) TestHappyPath() {
197183
func (s *ExecutionDataPruningSuite) waitUntilExecutionDataForBlockIndexed(waitingBlockHeight uint64) {
198184
observerNode := s.net.ContainerByName(s.observerNodeName)
199185

200-
grpcClient, err := s.getGRPCClient(observerNode.Addr(testnet.GRPCPort))
186+
sdkClient, err := observerNode.SDKClient()
201187
s.Require().NoError(err)
202188

203189
// creating execution data api client
204-
client, err := getClient(fmt.Sprintf("localhost:%s", observerNode.Port(testnet.ExecutionStatePort)))
205-
s.Require().NoError(err)
190+
accessClient := sdkClient.RPCClient()
191+
execClient := sdkClient.ExecutionDataRPCClient()
206192

207193
// pause until the observer node start indexing blocks,
208194
// getting events from 1-nd block to make sure that 1-st block already indexed, and we can start subscribing
209195
s.Require().Eventually(func() bool {
210-
_, err := grpcClient.GetEventsForHeightRange(s.ctx, &accessproto.GetEventsForHeightRangeRequest{
196+
_, err := accessClient.GetEventsForHeightRange(s.ctx, &accessproto.GetEventsForHeightRangeRequest{
211197
Type: sdk.EventAccountCreated,
212198
StartHeight: 1,
213199
EndHeight: 1,
@@ -220,7 +206,7 @@ func (s *ExecutionDataPruningSuite) waitUntilExecutionDataForBlockIndexed(waitin
220206
// subscribe on events till waitingBlockHeight to make sure that execution data for block indexed till waitingBlockHeight and pruner
221207
// pruned execution data at least once
222208
// SubscribeEventsFromStartHeight used as subscription here because we need to make sure that execution data are already indexed
223-
stream, err := client.SubscribeEventsFromStartHeight(s.ctx, &executiondata.SubscribeEventsFromStartHeightRequest{
209+
stream, err := execClient.SubscribeEventsFromStartHeight(s.ctx, &executiondata.SubscribeEventsFromStartHeightRequest{
224210
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
225211
Filter: &executiondata.EventFilter{},
226212
HeartbeatInterval: 1,

integration/tests/access/cohort4/grpc_state_stream_test.go

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,11 @@ import (
99
"sync"
1010
"testing"
1111

12+
jsoncdc "github.com/onflow/cadence/encoding/json"
1213
"github.com/rs/zerolog"
1314
"github.com/stretchr/testify/assert"
1415
"github.com/stretchr/testify/require"
1516
"github.com/stretchr/testify/suite"
16-
"google.golang.org/grpc"
17-
"google.golang.org/grpc/credentials/insecure"
18-
19-
jsoncdc "github.com/onflow/cadence/encoding/json"
2017

2118
"github.com/onflow/flow-go-sdk/test"
2219

@@ -168,17 +165,14 @@ func (s *GrpcStateStreamSuite) Ghost() *client.GhostClient {
168165
// TestRestEventStreaming tests gRPC event streaming
169166
func (s *GrpcStateStreamSuite) TestHappyPath() {
170167
unittest.SkipUnless(s.T(), unittest.TEST_FLAKY, "flaky tests: https://github.com/onflow/flow-go/issues/5825")
171-
testANURL := fmt.Sprintf("localhost:%s", s.net.ContainerByName(testnet.PrimaryAN).Port(testnet.ExecutionStatePort))
172-
sdkClientTestAN, err := getClient(testANURL)
173-
s.Require().NoError(err)
168+
testAN := s.net.ContainerByName(testnet.PrimaryAN)
169+
sdkClientTestAN := getClient(s.T(), testAN)
174170

175-
controlANURL := fmt.Sprintf("localhost:%s", s.net.ContainerByName("access_2").Port(testnet.ExecutionStatePort))
176-
sdkClientControlAN, err := getClient(controlANURL)
177-
s.Require().NoError(err)
171+
controlAN := s.net.ContainerByName("access_2")
172+
sdkClientControlAN := getClient(s.T(), controlAN)
178173

179-
testONURL := fmt.Sprintf("localhost:%s", s.net.ContainerByName(testnet.PrimaryON).Port(testnet.ExecutionStatePort))
180-
sdkClientTestON, err := getClient(testONURL)
181-
s.Require().NoError(err)
174+
testON := s.net.ContainerByName(testnet.PrimaryON)
175+
sdkClientTestON := getClient(s.T(), testON)
182176

183177
// get the first block height
184178
currentFinalized := s.BlockState.HighestFinalizedHeight()
@@ -471,15 +465,10 @@ func compareEvents(t *testing.T, controlData, testData *SubscribeEventsResponse)
471465
}
472466
}
473467

474-
// TODO: switch to SDK versions once crypto library is fixed to support the latest SDK version
475-
476-
func getClient(address string) (executiondata.ExecutionDataAPIClient, error) {
477-
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
478-
if err != nil {
479-
return nil, err
480-
}
481-
482-
return executiondata.NewExecutionDataAPIClient(conn), nil
468+
func getClient(t *testing.T, node *testnet.Container) executiondata.ExecutionDataAPIClient {
469+
accessClient, err := node.SDKClient()
470+
require.NoError(t, err, "could not get access client")
471+
return accessClient.ExecutionDataRPCClient()
483472
}
484473

485474
func SubscribeHandler[T any, V any](

0 commit comments

Comments
 (0)