@@ -117,7 +117,10 @@ import (
117117 "github.com/onflow/flow-go/state/protocol/blocktimer"
118118 "github.com/onflow/flow-go/storage"
119119 bstorage "github.com/onflow/flow-go/storage/badger"
120+ "github.com/onflow/flow-go/storage/operation/badgerimpl"
121+ "github.com/onflow/flow-go/storage/operation/pebbleimpl"
120122 pstorage "github.com/onflow/flow-go/storage/pebble"
123+ "github.com/onflow/flow-go/storage/store"
121124 "github.com/onflow/flow-go/utils/grpcutils"
122125)
123126
@@ -552,8 +555,8 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
552555func (builder * FlowAccessNodeBuilder ) BuildExecutionSyncComponents () * FlowAccessNodeBuilder {
553556 var ds datastore.Batching
554557 var bs network.BlobService
555- var processedBlockHeight storage.ConsumerProgress
556- var processedNotifications storage.ConsumerProgress
558+ var processedBlockHeight storage.ConsumerProgressInitializer
559+ var processedNotifications storage.ConsumerProgressInitializer
557560 var bsDependable * module.ProxiedReadyDoneAware
558561 var execDataDistributor * edrequester.ExecutionDataDistributor
559562 var execDataCacheBackend * herocache.BlockExecutionData
@@ -607,21 +610,30 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
607610 Module ("processed block height consumer progress" , func (node * cmd.NodeConfig ) error {
608611 // Note: progress is stored in the datastore's DB since that is where the jobqueue
609612 // writes execution data to.
610- if executionDataDBMode == execution_data .ExecutionDataDBModeBadger {
611- processedBlockHeight = bstorage .NewConsumerProgress (builder .ExecutionDatastoreManager .DB ().(* badger.DB ), module .ConsumeProgressExecutionDataRequesterBlockHeight )
613+ var db storage.DB
614+ edmdb := builder .ExecutionDatastoreManager .DB ()
615+
616+ if bdb , ok := edmdb .(* badger.DB ); ok {
617+ db = badgerimpl .ToDB (bdb )
618+ } else if pdb , ok := edmdb .(* pebble.DB ); ok {
619+ db = pebbleimpl .ToDB (pdb )
612620 } else {
613- processedBlockHeight = pstorage . NewConsumerProgress ( builder . ExecutionDatastoreManager . DB ().( * pebble. DB ), module . ConsumeProgressExecutionDataRequesterBlockHeight )
621+ return fmt . Errorf ( "unsupported execution data DB type: %T" , edmdb )
614622 }
623+
624+ processedBlockHeight = store .NewConsumerProgress (db , module .ConsumeProgressExecutionDataRequesterBlockHeight )
615625 return nil
616626 }).
617627 Module ("processed notifications consumer progress" , func (node * cmd.NodeConfig ) error {
618628 // Note: progress is stored in the datastore's DB since that is where the jobqueue
619629 // writes execution data to.
630+ var db storage.DB
620631 if executionDataDBMode == execution_data .ExecutionDataDBModeBadger {
621- processedNotifications = bstorage . NewConsumerProgress (builder .ExecutionDatastoreManager .DB ().(* badger.DB ), module . ConsumeProgressExecutionDataRequesterNotification )
632+ db = badgerimpl . ToDB (builder .ExecutionDatastoreManager .DB ().(* badger.DB ))
622633 } else {
623- processedNotifications = pstorage . NewConsumerProgress (builder .ExecutionDatastoreManager .DB ().(* pebble.DB ), module . ConsumeProgressExecutionDataRequesterNotification )
634+ db = pebbleimpl . ToDB (builder .ExecutionDatastoreManager .DB ().(* pebble.DB ))
624635 }
636+ processedNotifications = store .NewConsumerProgress (db , module .ConsumeProgressExecutionDataRequesterNotification )
625637 return nil
626638 }).
627639 Module ("blobservice peer manager dependencies" , func (node * cmd.NodeConfig ) error {
@@ -848,15 +860,15 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
848860 }
849861
850862 if builder .executionDataIndexingEnabled {
851- var indexedBlockHeight storage.ConsumerProgress
863+ var indexedBlockHeight storage.ConsumerProgressInitializer
852864
853865 builder .
854866 AdminCommand ("execute-script" , func (config * cmd.NodeConfig ) commands.AdminCommand {
855867 return stateSyncCommands .NewExecuteScriptCommand (builder .ScriptExecutor )
856868 }).
857869 Module ("indexed block height consumer progress" , func (node * cmd.NodeConfig ) error {
858870 // Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
859- indexedBlockHeight = bstorage .NewConsumerProgress (builder .DB , module .ConsumeProgressExecutionDataIndexerBlockHeight )
871+ indexedBlockHeight = store .NewConsumerProgress (badgerimpl . ToDB ( builder .DB ) , module .ConsumeProgressExecutionDataIndexerBlockHeight )
860872 return nil
861873 }).
862874 Module ("transaction results storage" , func (node * cmd.NodeConfig ) error {
@@ -1633,8 +1645,8 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
16331645}
16341646
16351647func (builder * FlowAccessNodeBuilder ) Build () (cmd.Node , error ) {
1636- var processedFinalizedBlockHeight storage.ConsumerProgress
1637- var processedTxErrorMessagesBlockHeight storage.ConsumerProgress
1648+ var processedFinalizedBlockHeight storage.ConsumerProgressInitializer
1649+ var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer
16381650
16391651 if builder .executionDataSyncEnabled {
16401652 builder .BuildExecutionSyncComponents ()
@@ -1838,17 +1850,18 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
18381850 return nil
18391851 }).
18401852 Module ("processed finalized block height consumer progress" , func (node * cmd.NodeConfig ) error {
1841- processedFinalizedBlockHeight = bstorage .NewConsumerProgress (builder .DB , module .ConsumeProgressIngestionEngineBlockHeight )
1853+ processedFinalizedBlockHeight = store .NewConsumerProgress (badgerimpl . ToDB ( builder .DB ) , module .ConsumeProgressIngestionEngineBlockHeight )
18421854 return nil
18431855 }).
18441856 Module ("processed last full block height monotonic consumer progress" , func (node * cmd.NodeConfig ) error {
18451857 rootBlockHeight := node .State .Params ().FinalizedRoot ().Height
18461858
1847- var err error
1848- lastFullBlockHeight , err = counters .NewPersistentStrictMonotonicCounter (
1849- bstorage .NewConsumerProgress (builder .DB , module .ConsumeProgressLastFullBlockHeight ),
1850- rootBlockHeight ,
1851- )
1859+ progress , err := store .NewConsumerProgress (badgerimpl .ToDB (builder .DB ), module .ConsumeProgressLastFullBlockHeight ).Initialize (rootBlockHeight )
1860+ if err != nil {
1861+ return err
1862+ }
1863+
1864+ lastFullBlockHeight , err = counters .NewPersistentStrictMonotonicCounter (progress )
18521865 if err != nil {
18531866 return fmt .Errorf ("failed to initialize monotonic consumer progress: %w" , err )
18541867 }
@@ -2149,8 +2162,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
21492162
21502163 if builder .storeTxResultErrorMessages {
21512164 builder .Module ("processed error messages block height consumer progress" , func (node * cmd.NodeConfig ) error {
2152- processedTxErrorMessagesBlockHeight = bstorage .NewConsumerProgress (
2153- builder .DB ,
2165+ processedTxErrorMessagesBlockHeight = store .NewConsumerProgress (
2166+ badgerimpl . ToDB ( builder .DB ) ,
21542167 module .ConsumeProgressEngineTxErrorMessagesBlockHeight ,
21552168 )
21562169 return nil
0 commit comments