Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion dbus/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,49 @@

package dbus

import (
"sync"
)

type set struct {
data map[string]bool
mu sync.Mutex
}

func (s *set) Add(value string) {
s.mu.Lock()
defer s.mu.Unlock()
s.data[value] = true
}

func (s *set) Remove(value string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.data, value)
}

func (s *set) Contains(value string) (exists bool) {
s.mu.Lock()
defer s.mu.Unlock()
_, exists = s.data[value]
return
}

func (s *set) Length() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.data)
}

func (s *set) Values() (values []string) {
s.mu.Lock()
defer s.mu.Unlock()
for val := range s.data {
values = append(values, val)
}
return
}

func newSet() *set {
return &set{make(map[string]bool)}
return &set{data: make(map[string]bool)}
}
32 changes: 25 additions & 7 deletions dbus/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dbus

import (
"context"
"errors"
"log"
"time"
Expand Down Expand Up @@ -94,16 +95,26 @@ func (c *Conn) dispatch() {
}()
}

// SubscribeUnits returns two unbuffered channels which will receive all changed units every
// interval. Deleted units are sent as nil.
// Deprecated: use SubscribeUnitsContext instead.
func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
return c.SubscribeUnitsContext(context.Background(), interval)
}

// SubscribeUnitsContext returns two unbuffered channels which will receive all changed units every
// interval. Deleted units are sent as nil.
func (c *Conn) SubscribeUnitsContext(ctx context.Context, interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
return c.SubscribeUnitsCustomContext(ctx, interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
}

// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
// Deprecated: use SubscribeUnitsCustomContext instead.
func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
return c.SubscribeUnitsCustomContext(context.Background(), interval, buffer, isChanged, filterUnit)
}

// SubscribeUnitsCustomContext is like SubscribeUnits but lets you specify the buffer
// size of the channels, the comparison function for detecting changes and a filter
// function for cutting down on the noise that your channel receives.
func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
func (c *Conn) SubscribeUnitsCustomContext(ctx context.Context, interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
old := make(map[string]*UnitStatus)
statusChan := make(chan map[string]*UnitStatus, buffer)
errChan := make(chan error, buffer)
Expand All @@ -112,7 +123,7 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange
for {
timerChan := time.After(interval)

units, err := c.ListUnits()
units, err := c.ListUnitsContext(ctx)
if err == nil {
cur := make(map[string]*UnitStatus)
for i := range units {
Expand Down Expand Up @@ -145,7 +156,14 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange
errChan <- err
}

<-timerChan
select {
case <-timerChan:
continue
case <-ctx.Done():
close(statusChan)
close(errChan)
return
}
}
}()

Expand Down
12 changes: 9 additions & 3 deletions dbus/subscription_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dbus

import (
"context"
"time"
)

Expand All @@ -29,16 +30,21 @@ func (s *SubscriptionSet) filter(unit string) bool {
return !s.Contains(unit)
}

// Subscribe starts listening for dbus events for all of the units in the set.
// SubscribeContext starts listening for dbus events for all of the units in the set.
// Returns channels identical to conn.SubscribeUnits.
func (s *SubscriptionSet) Subscribe() (<-chan map[string]*UnitStatus, <-chan error) {
func (s *SubscriptionSet) SubscribeContext(ctx context.Context) (<-chan map[string]*UnitStatus, <-chan error) {
// TODO: Make fully evented by using systemd 209 with properties changed values
return s.conn.SubscribeUnitsCustom(time.Second, 0,
return s.conn.SubscribeUnitsCustomContext(ctx, time.Second, 0,
mismatchUnitStatus,
func(unit string) bool { return s.filter(unit) },
)
}

// Deprecated: use SubscribeContext instead.
func (s *SubscriptionSet) Subscribe() (<-chan map[string]*UnitStatus, <-chan error) {
return s.SubscribeContext(context.Background())
}

// NewSubscriptionSet returns a new subscription set.
func (c *Conn) NewSubscriptionSet() *SubscriptionSet {
return &SubscriptionSet{newSet(), c}
Expand Down