@@ -75,6 +75,8 @@ type Node struct {
7575 startCtx context.Context
7676
7777 closer io.Closer
78+
79+ roundRepublishCache * lru.Cache
7880}
7981
8082type NewNodeOptions struct {
@@ -116,6 +118,11 @@ func NewNode(ctx context.Context, opts *NewNodeOptions) (*Node, error) {
116118 return nil , fmt .Errorf ("error creating cache: %w" , err )
117119 }
118120
121+ roundRepublishCache , err := lru .New (3 )
122+ if err != nil {
123+ return nil , fmt .Errorf ("error creating cache: %w" , err )
124+ }
125+
119126 nodeName := opts .Name
120127 if nodeName == "" {
121128 nodeName = fmt .Sprintf ("node-%d" , signerIndex )
@@ -128,18 +135,19 @@ func NewNode(ctx context.Context, opts *NewNodeOptions) (*Node, error) {
128135 dagStore := opts .DagStore
129136
130137 n := & Node {
131- name : nodeName ,
132- p2pNode : opts .P2PNode ,
133- signKey : opts .SignKey ,
134- notaryGroup : opts .NotaryGroup ,
135- dagStore : dagStore ,
136- hamtStore : hamtStore ,
137- dataStore : dataStore ,
138- signerIndex : signerIndex ,
139- inflight : cache ,
140- mempool : newMempool (),
141- rootContext : opts .RootActorContext ,
142- logger : logger ,
138+ name : nodeName ,
139+ p2pNode : opts .P2PNode ,
140+ signKey : opts .SignKey ,
141+ notaryGroup : opts .NotaryGroup ,
142+ dagStore : dagStore ,
143+ hamtStore : hamtStore ,
144+ dataStore : dataStore ,
145+ signerIndex : signerIndex ,
146+ inflight : cache ,
147+ mempool : newMempool (),
148+ rootContext : opts .RootActorContext ,
149+ logger : logger ,
150+ roundRepublishCache : roundRepublishCache ,
143151 }
144152
145153 err = n .initRoundHolder ()
@@ -243,7 +251,7 @@ func (n *Node) maybeRepublish(ctx context.Context) {
243251
244252 if found && previousRound != nil && previousRound .published {
245253 n .logger .Debugf ("republishing round: %d" , previousRound .height )
246- n .publishCompletedRound (ctx , previousRound )
254+ n .republishCompletedRound (ctx , previousRound )
247255 }
248256 }
249257}
@@ -451,9 +459,20 @@ func (n *Node) publishCompletedRound(ctx context.Context, round *round) error {
451459
452460 defer func () { round .published = true }()
453461
462+ n .roundRepublishCache .Add (round .height , conf .Data ())
463+
454464 return n .pubsub .Publish (n .notaryGroup .ID , conf .Data ())
455465}
456466
467+ func (n * Node ) republishCompletedRound (ctx context.Context , round * round ) error {
468+ roundConfPayload , ok := n .roundRepublishCache .Peek (round ) // Peek acts as FIFO based on insert above
469+ if ok {
470+ n .logger .Debugf ("republishing round confirmed to: %s" , n .notaryGroup .ID )
471+ return n .pubsub .Publish (n .notaryGroup .ID , roundConfPayload .([]byte ))
472+ }
473+ return n .publishCompletedRound (ctx , round )
474+ }
475+
457476func (n * Node ) storeCompletedRound (round * types.RoundWrapper ) error {
458477 heightBytes := make ([]byte , binary .MaxVarintLen64 )
459478 binary .PutUvarint (heightBytes , round .Height ())
0 commit comments