@@ -463,7 +463,9 @@ async fn handle_broadcasts(
463463 let mut to_local_broadcast = VecDeque :: new ( ) ;
464464 let mut log_count = 0 ;
465465
466+ let cfg_max_transmissions = agent. config ( ) . perf . max_broadcast_transmissions ;
466467 let mut limited_log_count = 0 ;
468+ let mut prev_set_size = 0 ;
467469
468470 let bytes_per_sec: BroadcastRateLimiter = RateLimiter :: direct ( Quota :: per_second ( unsafe {
469471 NonZeroU32 :: new_unchecked ( 10 * 1024 * 1024 )
@@ -591,25 +593,60 @@ async fn handle_broadcasts(
591593 gauge ! ( "corro.broadcast.buffer.capacity" ) . set ( bcast_buf. capacity ( ) as f64 ) ;
592594 gauge ! ( "corro.broadcast.serialization.buffer.capacity" )
593595 . set ( ser_buf. capacity ( ) as f64 ) ;
596+ gauge ! ( "corro.broadcast.prob_set.size" ) . set ( prev_set_size as f64 ) ;
594597 }
595598 }
596599
597600 let prev_rate_limited = rate_limited;
598601
602+ let ( members_count, ring0_count) = {
603+ let members = agent. members ( ) . read ( ) ;
604+ let members_count = members. states . len ( ) ;
605+ let ring0_count = members. ring0 ( agent. cluster_id ( ) ) . count ( ) ;
606+ ( members_count, ring0_count)
607+ } ;
608+
609+ let ( choose_count, dynamic_count, max_transmissions) = {
610+ let config = config. read ( ) ;
611+ let gossip_max_txns = config. max_transmissions . get ( ) ;
612+ let max_transmissions = cmp:: min (
613+ gossip_max_txns,
614+ cfg_max_transmissions. unwrap_or ( gossip_max_txns) ,
615+ ) ;
616+ let dynamic_count = ( members_count - ring0_count) / ( max_transmissions as usize * 10 ) ;
617+ let count = cmp:: max ( config. num_indirect_probes . get ( ) , dynamic_count) ;
618+
619+ if prev_rate_limited {
620+ // we've been rate limited on the last loop, try sending to less nodes...
621+ (
622+ cmp:: min ( count, dynamic_count / 2 ) ,
623+ dynamic_count / 2 ,
624+ max_transmissions / 2 ,
625+ )
626+ } else {
627+ ( count, dynamic_count, max_transmissions)
628+ }
629+ } ;
630+
599631 // start with local broadcasts, they're higher priority
600632 let mut ring0: HashSet < SocketAddr > = HashSet :: new ( ) ;
601633
602- let members_len = agent. members ( ) . read ( ) . states . len ( ) ;
603- let count = cmp:: max ( members_len / 2 , 2 ) ;
604- let mut set = ProbSet :: new ( count, 4 ) ;
634+ let prob_set = {
635+ // setting the size of the set to the number of times it'll broadcasted + number of members
636+ // it will be broadcasted to with some padding
637+ let dynamic_size = ( dynamic_count + 1 ) * ( max_transmissions + 1 ) as usize ;
638+ // clamp size to 500
639+ let size = cmp:: min ( 500 , dynamic_size) ;
605640
606- {
641+ prev_set_size = size;
642+ let mut set = ProbSet :: new ( size, 4 ) ;
607643 let members = agent. members ( ) . read ( ) ;
608644 let members_ring0 = members. ring0 ( agent. cluster_id ( ) ) ;
609645 for ( actor_id, addr) in members_ring0 {
610646 set. insert ( actor_id. to_u128 ( ) ) ;
611647 ring0. insert ( addr) ;
612648 }
649+ set
613650 } ;
614651
615652 debug ! ( "sending local broadcasts to ring0 nodes: {:?}" , ring0) ;
@@ -619,7 +656,7 @@ async fn handle_broadcasts(
619656
620657 let bcast_change = BroadcastV2 {
621658 change : BroadcastV1 :: Change ( change. clone ( ) ) ,
622- set : set . clone ( ) ,
659+ set : prob_set . clone ( ) ,
623660 num_broadcasts : 1 ,
624661 } ;
625662
@@ -681,32 +718,12 @@ async fn handle_broadcasts(
681718 to_local_broadcast. push_front ( change) ;
682719 break ;
683720 } else {
721+ // TODO: test whether we still want to re-queue
684722 to_broadcast_v2. push_front ( bcast_change) ;
685723 }
686724 counter ! ( "corro.broadcast.spawn" , "type" => "local" ) . increment ( spawn_count) ;
687725 }
688726
689- let ( members_count, ring0_count) = {
690- let members = agent. members ( ) . read ( ) ;
691- let members_count = members. states . len ( ) ;
692- let ring0_count = members. ring0 ( agent. cluster_id ( ) ) . count ( ) ;
693- ( members_count, ring0_count)
694- } ;
695-
696- let ( choose_count, max_transmissions) = {
697- let config = config. read ( ) ;
698- let max_transmissions = config. max_transmissions . get ( ) ;
699- let dynamic_count = ( members_count - ring0_count) / ( max_transmissions as usize * 10 ) ;
700- let count = cmp:: max ( config. num_indirect_probes . get ( ) , dynamic_count) ;
701-
702- if prev_rate_limited {
703- // we've been rate limited on the last loop, try sending to less nodes...
704- ( cmp:: min ( count, dynamic_count / 2 ) , max_transmissions / 2 )
705- } else {
706- ( count, max_transmissions)
707- }
708- } ;
709-
710727 if !rate_limited {
711728 debug ! (
712729 "choosing {} broadcasts, ring0 count: {}, MAX_INFLIGHT_BROADCAST: {}" ,
@@ -847,24 +864,24 @@ async fn handle_broadcasts(
847864 for ( member_id, _) in broadcast_to. clone ( ) {
848865 bcast_v2. set . insert ( member_id. to_u128 ( ) ) ;
849866 }
867+
850868 let uni_payload = UniPayload :: V1 {
851869 data : UniPayloadV1 :: BroadcastV2 ( bcast_v2. clone ( ) ) ,
852870 cluster_id : agent. cluster_id ( ) ,
853871 } ;
854872
855- if let Err ( e) = uni_payload. write_to_stream ( ( & mut ser_buf) . writer ( ) ) {
856- error ! ( "could not encode UniPayload::V1 BroadcastV2: {e}" ) ;
857- ser_buf. clear ( ) ;
858- continue ;
859- }
860-
861- if let Err ( e) = bcast_codec. encode ( ser_buf. split ( ) . freeze ( ) , & mut bcast_buf) {
862- error ! ( "could not encode broadcast: {e}" ) ;
863- bcast_buf. clear ( ) ;
864- continue ;
865- }
866-
867- let payload = bcast_buf. split ( ) . freeze ( ) ;
873+ let payload = match encode_framed (
874+ & mut bcast_codec,
875+ & mut ser_buf,
876+ & mut bcast_buf,
877+ & uni_payload,
878+ ) {
879+ Ok ( payload) => payload,
880+ Err ( e) => {
881+ error ! ( "could not encode UniPayload::V1 BroadcastV2: {e}" ) ;
882+ continue ;
883+ }
884+ } ;
868885
869886 let mut spawn_count = 0 ;
870887 for ( _, addr) in broadcast_to {
0 commit comments