Skip to content

Commit 4f5dc49

Browse files
subsciption: Added context cancellation and sync to subscription set
Fixes: #474 Signed-off-by: Volodymyr Pankin <[email protected]>
1 parent 9145741 commit 4f5dc49

File tree

3 files changed

+50
-11
lines changed

3 files changed

+50
-11
lines changed

dbus/set.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,34 +14,49 @@
1414

1515
package dbus
1616

17+
import (
18+
"sync"
19+
)
20+
1721
type set struct {
1822
data map[string]bool
23+
mu sync.Mutex
1924
}
2025

2126
func (s *set) Add(value string) {
27+
s.mu.Lock()
28+
defer s.mu.Unlock()
2229
s.data[value] = true
2330
}
2431

2532
func (s *set) Remove(value string) {
33+
s.mu.Lock()
34+
defer s.mu.Unlock()
2635
delete(s.data, value)
2736
}
2837

2938
func (s *set) Contains(value string) (exists bool) {
39+
s.mu.Lock()
40+
defer s.mu.Unlock()
3041
_, exists = s.data[value]
3142
return
3243
}
3344

3445
func (s *set) Length() int {
46+
s.mu.Lock()
47+
defer s.mu.Unlock()
3548
return len(s.data)
3649
}
3750

3851
func (s *set) Values() (values []string) {
52+
s.mu.Lock()
53+
defer s.mu.Unlock()
3954
for val := range s.data {
4055
values = append(values, val)
4156
}
4257
return
4358
}
4459

4560
func newSet() *set {
46-
return &set{make(map[string]bool)}
61+
return &set{data: make(map[string]bool)}
4762
}

dbus/subscription.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package dbus
1616

1717
import (
18+
"context"
1819
"errors"
1920
"log"
2021
"time"
@@ -94,16 +95,26 @@ func (c *Conn) dispatch() {
9495
}()
9596
}
9697

97-
// SubscribeUnits returns two unbuffered channels which will receive all changed units every
98-
// interval. Deleted units are sent as nil.
98+
// Deprecated: use SubscribeUnitsContext instead.
9999
func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
100-
return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
100+
return c.SubscribeUnitsContext(context.Background(), interval)
101+
}
102+
103+
// SubscribeUnitsContext returns two unbuffered channels which will receive all changed units every
104+
// interval. Deleted units are sent as nil.
105+
func (c *Conn) SubscribeUnitsContext(ctx context.Context, interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
106+
return c.SubscribeUnitsCustomContext(ctx, interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
101107
}
102108

103-
// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
109+
// Deprecated: use SubscribeUnitsCustomContext instead.
110+
func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
111+
return c.SubscribeUnitsCustomContext(context.Background(), interval, buffer, isChanged, filterUnit)
112+
}
113+
114+
// SubscribeUnitsCustomContext is like SubscribeUnits but lets you specify the buffer
104115
// size of the channels, the comparison function for detecting changes and a filter
105116
// function for cutting down on the noise that your channel receives.
106-
func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
117+
func (c *Conn) SubscribeUnitsCustomContext(ctx context.Context, interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
107118
old := make(map[string]*UnitStatus)
108119
statusChan := make(chan map[string]*UnitStatus, buffer)
109120
errChan := make(chan error, buffer)
@@ -112,7 +123,7 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange
112123
for {
113124
timerChan := time.After(interval)
114125

115-
units, err := c.ListUnits()
126+
units, err := c.ListUnitsContext(ctx)
116127
if err == nil {
117128
cur := make(map[string]*UnitStatus)
118129
for i := range units {
@@ -145,7 +156,14 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange
145156
errChan <- err
146157
}
147158

148-
<-timerChan
159+
select {
160+
case <-timerChan:
161+
close(statusChan)
162+
close(errChan)
163+
continue
164+
case <-ctx.Done():
165+
return
166+
}
149167
}
150168
}()
151169

dbus/subscription_set.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package dbus
1616

1717
import (
18+
"context"
1819
"time"
1920
)
2021

@@ -29,16 +30,21 @@ func (s *SubscriptionSet) filter(unit string) bool {
2930
return !s.Contains(unit)
3031
}
3132

32-
// Subscribe starts listening for dbus events for all of the units in the set.
33+
// SubscribeContext starts listening for dbus events for all of the units in the set.
3334
// Returns channels identical to conn.SubscribeUnits.
34-
func (s *SubscriptionSet) Subscribe() (<-chan map[string]*UnitStatus, <-chan error) {
35+
func (s *SubscriptionSet) SubscribeContext(ctx context.Context) (<-chan map[string]*UnitStatus, <-chan error) {
3536
// TODO: Make fully evented by using systemd 209 with properties changed values
36-
return s.conn.SubscribeUnitsCustom(time.Second, 0,
37+
return s.conn.SubscribeUnitsCustomContext(ctx, time.Second, 0,
3738
mismatchUnitStatus,
3839
func(unit string) bool { return s.filter(unit) },
3940
)
4041
}
4142

43+
// Deprecated: use SubscribeContext instead.
44+
func (s *SubscriptionSet) Subscribe() (<-chan map[string]*UnitStatus, <-chan error) {
45+
return s.SubscribeContext(context.Background())
46+
}
47+
4248
// NewSubscriptionSet returns a new subscription set.
4349
func (c *Conn) NewSubscriptionSet() *SubscriptionSet {
4450
return &SubscriptionSet{newSet(), c}

0 commit comments

Comments
 (0)