Skip to content

Commit 88f38ea

Browse files
Ak-Armyxpunch
andauthored
Subscribe error handling (#2785)
* [fix] etcd config source prefix issue (#2389) * http transport data race issue (#2436) * [fix] #2431 http transport data race issue * [feature] Ability to close connection while receiving. Ability to send messages while receiving. Icreased r channel limit to 100 to more fluently communication. Do not dropp sent request if r channel is full. * [feature] always subscribes to all topics and if there is an error in one of them, the unsuccessful topics will be subscribed to again --------- Co-authored-by: Johnson C <[email protected]>
1 parent 0e45edf commit 88f38ea

File tree

2 files changed

+29
-24
lines changed

2 files changed

+29
-24
lines changed

server/rpc_events.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
101101

102102
// subscribeServer will subscribe the server to the topic with its own name.
103103
func (s *rpcServer) subscribeServer(config Options) error {
104-
if s.opts.Router != nil {
104+
if s.opts.Router != nil && s.subscriber == nil {
105105
sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent)
106106
if err != nil {
107107
return err
@@ -115,8 +115,11 @@ func (s *rpcServer) subscribeServer(config Options) error {
115115
}
116116

117117
// reSubscribe itterates over subscribers and re-subscribes then.
118-
func (s *rpcServer) reSubscribe(config Options) error {
118+
func (s *rpcServer) reSubscribe(config Options) {
119119
for sb := range s.subscribers {
120+
if s.subscribers[sb] != nil {
121+
continue
122+
}
120123
var opts []broker.SubscribeOption
121124
if queue := sb.Options().Queue; len(queue) > 0 {
122125
opts = append(opts, broker.Queue(queue))
@@ -133,12 +136,15 @@ func (s *rpcServer) reSubscribe(config Options) error {
133136
config.Logger.Logf(log.InfoLevel, "Subscribing to topic: %s", sb.Topic())
134137
sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...)
135138
if err != nil {
136-
return err
139+
config.Logger.Logf(log.WarnLevel, "Unable to subscribing to topic: %s, error: %s", sb.Topic(), err)
140+
continue
141+
}
142+
err = s.router.Subscribe(sb)
143+
if err != nil {
144+
config.Logger.Logf(log.WarnLevel, "Unable to subscribing to topic: %s, error: %s", sb.Topic(), err)
145+
sub.Unsubscribe()
146+
continue
137147
}
138-
139148
s.subscribers[sb] = []broker.Subscriber{sub}
140-
s.router.Subscribe(sb)
141149
}
142-
143-
return nil
144150
}

server/rpc_server.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -446,17 +446,6 @@ func (s *rpcServer) Register() error {
446446
// Set what we're advertising
447447
s.opts.Advertise = addr
448448

449-
// Router can exchange messages on broker
450-
// Subscribe to the topic with its own name
451-
if err := s.subscribeServer(config); err != nil {
452-
return errors.Wrap(err, "failed to subscribe to service name topic")
453-
}
454-
455-
// Subscribe for all of the subscribers
456-
if err := s.reSubscribe(config); err != nil {
457-
return errors.Wrap(err, "failed to resubscribe")
458-
}
459-
460449
return nil
461450
}
462451

@@ -606,18 +595,28 @@ func (s *rpcServer) newRegFuc(config Options) func(service *registry.Service) er
606595
// Attempt to register. If registration fails, back off and try again.
607596
// TODO: see if we can improve the retry mechanism. Maybe retry lib, maybe config values
608597
for i := 0; i < 3; i++ {
609-
if err := config.Registry.Register(service, rOpts...); err != nil {
610-
regErr = err
611-
598+
if regErr = config.Registry.Register(service, rOpts...); regErr != nil {
612599
time.Sleep(backoff.Do(i + 1))
613-
614600
continue
615601
}
602+
break
603+
}
616604

617-
return nil
605+
if regErr != nil {
606+
return regErr
618607
}
619608

620-
return regErr
609+
s.Lock()
610+
defer s.Unlock()
611+
// Router can exchange messages on broker
612+
// Subscribe to the topic with its own name
613+
if err := s.subscribeServer(config); err != nil {
614+
return errors.Wrap(err, "failed to subscribe to service name topic")
615+
}
616+
// Subscribe for all of the subscribers
617+
s.reSubscribe(config)
618+
619+
return nil
621620
}
622621
}
623622

0 commit comments

Comments
 (0)