Skip to content

Commit 99875f9

Browse files
authored
Merge pull request #6934 from onflow/tim/6852-consensus-sealing-engine-componentmanager-refactor
Refactor Consensus Sealing Engine+Core: `engine.Unit` -> `ComponentManager`
2 parents 73ebfbc + daff894 commit 99875f9

File tree

6 files changed

+152
-97
lines changed

6 files changed

+152
-97
lines changed

cmd/consensus/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,9 @@ func main() {
463463
seals,
464464
getSealingConfigs,
465465
)
466+
if err != nil {
467+
return nil, fmt.Errorf("could not initialize sealing engine: %w", err)
468+
}
466469

467470
// subscribe for finalization events from hotstuff
468471
followerDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)

engine/consensus/sealing/core.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/rs/zerolog"
1212
"go.opentelemetry.io/otel/attribute"
1313
otelTrace "go.opentelemetry.io/otel/trace"
14+
"go.uber.org/atomic"
1415

1516
"github.com/onflow/flow-go/engine"
1617
"github.com/onflow/flow-go/engine/consensus"
@@ -34,7 +35,6 @@ import (
3435
// - pre-validating approvals (if they are outdated or non-verifiable)
3536
// - pruning already processed collectorTree
3637
type Core struct {
37-
unit *engine.Unit
3838
workerPool *workerpool.WorkerPool // worker pool used by collectors
3939
log zerolog.Logger // used to log relevant actions with context
4040
collectorTree *approvals.AssignmentCollectorTree // levelled forest for assignment collectors
@@ -50,6 +50,7 @@ type Core struct {
5050
sealingTracker consensus.SealingTracker // logic-aware component for tracking sealing progress.
5151
tracer module.Tracer // used to trace execution
5252
sealingConfigsGetter module.SealingConfigsGetter // used to access configs for sealing conditions
53+
reporter *gatedSealingObservationReporter // used to avoid excess resource usage by sealing observation completions
5354
}
5455

5556
func NewCore(
@@ -58,7 +59,6 @@ func NewCore(
5859
tracer module.Tracer,
5960
conMetrics module.ConsensusMetrics,
6061
sealingTracker consensus.SealingTracker,
61-
unit *engine.Unit,
6262
headers storage.Headers,
6363
state protocol.State,
6464
sealsDB storage.Seals,
@@ -79,7 +79,6 @@ func NewCore(
7979
tracer: tracer,
8080
metrics: conMetrics,
8181
sealingTracker: sealingTracker,
82-
unit: unit,
8382
approvalsCache: approvals.NewApprovalsLRUCache(1000),
8483
counterLastSealedHeight: counters.NewMonotonicCounter(lastSealed.Height),
8584
counterLastFinalizedHeight: counters.NewMonotonicCounter(lastSealed.Height),
@@ -89,6 +88,7 @@ func NewCore(
8988
sealsMempool: sealsMempool,
9089
requestTracker: approvals.NewRequestTracker(headers, 10, 30),
9190
sealingConfigsGetter: sealingConfigsGetter,
91+
reporter: newGatedSealingObservationReporter(),
9292
}
9393

9494
factoryMethod := func(result *flow.ExecutionResult) (approvals.AssignmentCollector, error) {
@@ -561,7 +561,9 @@ func (c *Core) ProcessFinalizedBlock(finalizedBlockID flow.Identifier) error {
561561
// observes the latest state of `sealingObservation`.
562562
// * The `sealingObservation` lives in the scope of this function. Hence, when this goroutine exits
563563
// this function, `sealingObservation` lives solely in the scope of the newly-created goroutine.
564-
c.unit.Launch(sealingObservation.Complete)
564+
// We do this call asynchronously because we are in the hot path, and it is not required to progress,
565+
// and the call may involve database transactions that would unnecessarily delay sealing.
566+
c.reporter.reportAsync(sealingObservation)
565567

566568
return nil
567569
}
@@ -654,7 +656,7 @@ func (c *Core) getOutdatedBlockIDsFromRootSealingSegment(rootHeader *flow.Header
654656
}
655657

656658
knownBlockIDs := make(map[flow.Identifier]struct{}) // track block IDs in the sealing segment
657-
var outdatedBlockIDs flow.IdentifierList
659+
outdatedBlockIDs := make(flow.IdentifierList, 0)
658660
for _, block := range rootSealingSegment.Blocks {
659661
knownBlockIDs[block.ID()] = struct{}{}
660662
for _, result := range block.Payload.Results {
@@ -666,3 +668,25 @@ func (c *Core) getOutdatedBlockIDsFromRootSealingSegment(rootHeader *flow.Header
666668
}
667669
return outdatedBlockIDs.Lookup(), nil
668670
}
671+
672+
// gatedSealingObservationReporter is a utility for gating asynchronous completion of sealing observations.
673+
type gatedSealingObservationReporter struct {
674+
reporting *atomic.Bool // true when a sealing observation is actively being asynchronously completed
675+
}
676+
677+
func newGatedSealingObservationReporter() *gatedSealingObservationReporter {
678+
return &gatedSealingObservationReporter{
679+
reporting: atomic.NewBool(false),
680+
}
681+
}
682+
683+
// reportAsync only allows one in-flight observation completion at a time.
684+
// Any extra observations are dropped.
685+
func (reporter *gatedSealingObservationReporter) reportAsync(observation consensus.SealingObservation) {
686+
if reporter.reporting.CompareAndSwap(false, true) {
687+
go func() {
688+
observation.Complete()
689+
reporter.reporting.Store(false)
690+
}()
691+
}
692+
}

engine/consensus/sealing/core_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (s *ApprovalProcessingCoreTestSuite) SetupTest() {
7070

7171
setter := unittest.NewSealingConfigs(flow.DefaultChunkAssignmentAlpha)
7272
var err error
73-
s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, engine.NewUnit(), s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter)
73+
s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter)
7474
require.NoError(s.T(), err)
7575
s.setter = setter
7676
}
@@ -327,7 +327,7 @@ func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_EmergencySealing(
327327
true, // enable emergency sealing
328328
)
329329
require.NoError(s.T(), err)
330-
s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, engine.NewUnit(), s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter)
330+
s.core, err = NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, s.Headers, s.State, s.sealsDB, s.Assigner, s.SigHasher, s.SealsPL, s.Conduit, setter)
331331
require.NoError(s.T(), err)
332332
s.setter = setter
333333

@@ -748,7 +748,7 @@ func (s *ApprovalProcessingCoreTestSuite) TestRepopulateAssignmentCollectorTree(
748748
finalSnapShot.On("Descendants").Return(blockChildren, nil)
749749
s.State.On("Final").Return(finalSnapShot)
750750

751-
core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, engine.NewUnit(),
751+
core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{},
752752
s.Headers, s.State, s.sealsDB, assigner, s.SigHasher, s.SealsPL, s.Conduit, s.setter)
753753
require.NoError(s.T(), err)
754754

@@ -827,7 +827,7 @@ func (s *ApprovalProcessingCoreTestSuite) TestRepopulateAssignmentCollectorTree_
827827
}, nil)
828828
s.State.On("Final").Return(finalSnapShot)
829829

830-
core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{}, engine.NewUnit(),
830+
core, err := NewCore(unittest.Logger(), s.WorkerPool, tracer, metrics, &tracker.NoopSealingTracker{},
831831
s.Headers, s.State, s.sealsDB, assigner, s.SigHasher, s.SealsPL, s.Conduit, s.setter)
832832
require.NoError(s.T(), err)
833833

0 commit comments

Comments
 (0)