Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 2b51297

Browse files
petaraschmahmannStebalien
authored
More stats, knobs and tunings (#514)
* add configurability options for TaskWorkerCount and EngineTaskWorkerCount, * add option for maximum outstanding bytes per peer * add prometheus metrics for how long it takes to send messages, the number of pending and active tasks, and the number of pending and active block tasks * add many of the unexported defaults to a defaults subpackage of the internal package * feat: tighter send timeouts 1. Minimum timeout of 10s. 2. We add 2s due to latencies. 3. Minimum bandwidth of 100kbit/s. 4. Maximum message send time of 2min (way more time than necessary). Co-authored-by: Adin Schmahmann <[email protected]> Co-authored-by: Steven Allen <[email protected]>
1 parent 5c2c537 commit 2b51297

File tree

12 files changed

+351
-86
lines changed

12 files changed

+351
-86
lines changed

bitswap.go

Lines changed: 95 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
deciface "github.com/ipfs/go-bitswap/decision"
1616
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
1717
"github.com/ipfs/go-bitswap/internal/decision"
18+
"github.com/ipfs/go-bitswap/internal/defaults"
1819
bsgetter "github.com/ipfs/go-bitswap/internal/getter"
1920
bsmq "github.com/ipfs/go-bitswap/internal/messagequeue"
2021
"github.com/ipfs/go-bitswap/internal/notifications"
@@ -42,15 +43,6 @@ var sflog = log.Desugar()
4243

4344
var _ exchange.SessionExchange = (*Bitswap)(nil)
4445

45-
const (
46-
// these requests take at _least_ two minutes at the moment.
47-
provideTimeout = time.Minute * 3
48-
defaultProvSearchDelay = time.Second
49-
50-
// Number of concurrent workers in decision engine that process requests to the blockstore
51-
defaulEngineBlockstoreWorkerCount = 128
52-
)
53-
5446
var (
5547
// HasBlockBufferSize is the buffer size of the channel for new blocks
5648
// that need to be provided. They should get pulled over by the
@@ -62,6 +54,8 @@ var (
6254

6355
// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
6456
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
57+
58+
timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600}
6559
)
6660

6761
// Option defines the functional option type that can be used to configure
@@ -100,6 +94,36 @@ func EngineBlockstoreWorkerCount(count int) Option {
10094
}
10195
}
10296

97+
// EngineTaskWorkerCount sets the number of worker threads used inside the engine
98+
func EngineTaskWorkerCount(count int) Option {
99+
if count <= 0 {
100+
panic(fmt.Sprintf("Engine task worker count is %d but must be > 0", count))
101+
}
102+
return func(bs *Bitswap) {
103+
bs.engineTaskWorkerCount = count
104+
}
105+
}
106+
107+
func TaskWorkerCount(count int) Option {
108+
if count <= 0 {
109+
panic(fmt.Sprintf("task worker count is %d but must be > 0", count))
110+
}
111+
return func(bs *Bitswap) {
112+
bs.taskWorkerCount = count
113+
}
114+
}
115+
116+
// MaxOutstandingBytesPerPeer describes approximately how much work we are will to have outstanding to a peer at any
117+
// given time. Setting it to 0 will disable any limiting.
118+
func MaxOutstandingBytesPerPeer(count int) Option {
119+
if count < 0 {
120+
panic(fmt.Sprintf("max outstanding bytes per peer is %d but must be >= 0", count))
121+
}
122+
return func(bs *Bitswap) {
123+
bs.engineMaxOutstandingBytesPerPeer = count
124+
}
125+
}
126+
103127
// SetSendDontHaves indicates what to do when the engine receives a want-block
104128
// for a block that is not in the blockstore. Either
105129
// - Send a DONT_HAVE message
@@ -147,6 +171,17 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
147171
sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
148172
" this bitswap").Histogram(metricsBuckets)
149173

174+
sendTimeHistogram := metrics.NewCtx(ctx, "send_times", "Histogram of how long it takes to send messages"+
175+
" in this bitswap").Histogram(timeMetricsBuckets)
176+
177+
pendingEngineGauge := metrics.NewCtx(ctx, "pending_tasks", "Total number of pending tasks").Gauge()
178+
179+
activeEngineGauge := metrics.NewCtx(ctx, "active_tasks", "Total number of active tasks").Gauge()
180+
181+
pendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
182+
183+
activeBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()
184+
150185
px := process.WithTeardown(func() error {
151186
return nil
152187
})
@@ -192,26 +227,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
192227
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
193228

194229
bs = &Bitswap{
195-
blockstore: bstore,
196-
network: network,
197-
process: px,
198-
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
199-
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
200-
pm: pm,
201-
pqm: pqm,
202-
sm: sm,
203-
sim: sim,
204-
notif: notif,
205-
counters: new(counters),
206-
dupMetric: dupHist,
207-
allMetric: allHist,
208-
sentHistogram: sentHistogram,
209-
provideEnabled: true,
210-
provSearchDelay: defaultProvSearchDelay,
211-
rebroadcastDelay: delay.Fixed(time.Minute),
212-
engineBstoreWorkerCount: defaulEngineBlockstoreWorkerCount,
213-
engineSetSendDontHaves: true,
214-
simulateDontHavesOnTimeout: true,
230+
blockstore: bstore,
231+
network: network,
232+
process: px,
233+
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
234+
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
235+
pm: pm,
236+
pqm: pqm,
237+
sm: sm,
238+
sim: sim,
239+
notif: notif,
240+
counters: new(counters),
241+
dupMetric: dupHist,
242+
allMetric: allHist,
243+
sentHistogram: sentHistogram,
244+
sendTimeHistogram: sendTimeHistogram,
245+
provideEnabled: true,
246+
provSearchDelay: defaults.ProvSearchDelay,
247+
rebroadcastDelay: delay.Fixed(time.Minute),
248+
engineBstoreWorkerCount: defaults.BitswapEngineBlockstoreWorkerCount,
249+
engineTaskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
250+
taskWorkerCount: defaults.BitswapTaskWorkerCount,
251+
engineMaxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer,
252+
engineSetSendDontHaves: true,
253+
simulateDontHavesOnTimeout: true,
215254
}
216255

217256
// apply functional options before starting and running bitswap
@@ -220,7 +259,20 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
220259
}
221260

222261
// Set up decision engine
223-
bs.engine = decision.NewEngine(bstore, bs.engineBstoreWorkerCount, network.ConnectionManager(), network.Self(), bs.engineScoreLedger)
262+
bs.engine = decision.NewEngine(
263+
ctx,
264+
bstore,
265+
bs.engineBstoreWorkerCount,
266+
bs.engineTaskWorkerCount,
267+
bs.engineMaxOutstandingBytesPerPeer,
268+
network.ConnectionManager(),
269+
network.Self(),
270+
bs.engineScoreLedger,
271+
pendingEngineGauge,
272+
activeEngineGauge,
273+
pendingBlocksGauge,
274+
activeBlocksGauge,
275+
)
224276
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)
225277

226278
bs.pqm.Startup()
@@ -277,9 +329,10 @@ type Bitswap struct {
277329
counters *counters
278330

279331
// Metrics interface metrics
280-
dupMetric metrics.Histogram
281-
allMetric metrics.Histogram
282-
sentHistogram metrics.Histogram
332+
dupMetric metrics.Histogram
333+
allMetric metrics.Histogram
334+
sentHistogram metrics.Histogram
335+
sendTimeHistogram metrics.Histogram
283336

284337
// External statistics interface
285338
wiretap WireTap
@@ -303,6 +356,15 @@ type Bitswap struct {
303356
// how many worker threads to start for decision engine blockstore worker
304357
engineBstoreWorkerCount int
305358

359+
// how many worker threads to start for decision engine task worker
360+
engineTaskWorkerCount int
361+
362+
// the total number of simultaneous threads sending outgoing messages
363+
taskWorkerCount int
364+
365+
// the total amount of bytes that a peer should have outstanding, it is utilized by the decision engine
366+
engineMaxOutstandingBytesPerPeer int
367+
306368
// the score ledger used by the decision engine
307369
engineScoreLedger deciface.ScoreLedger
308370

bitswap_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,11 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
285285
t.SkipNow()
286286
}
287287
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
288-
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
288+
ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{
289+
bitswap.TaskWorkerCount(5),
290+
bitswap.EngineTaskWorkerCount(5),
291+
bitswap.MaxOutstandingBytesPerPeer(1 << 20),
292+
})
289293
defer ig.Close()
290294
bg := blocksutil.NewBlockGenerator()
291295

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ require (
1717
github.com/ipfs/go-ipfs-util v0.0.2
1818
github.com/ipfs/go-log v1.0.5
1919
github.com/ipfs/go-metrics-interface v0.0.1
20-
github.com/ipfs/go-peertaskqueue v0.2.0
20+
github.com/ipfs/go-peertaskqueue v0.4.0
2121
github.com/jbenet/goprocess v0.1.4
2222
github.com/libp2p/go-buffer-pool v0.0.2
2323
github.com/libp2p/go-libp2p v0.14.3
@@ -28,6 +28,7 @@ require (
2828
github.com/libp2p/go-msgio v0.0.6
2929
github.com/multiformats/go-multiaddr v0.3.3
3030
github.com/multiformats/go-multistream v0.2.2
31+
github.com/stretchr/testify v1.7.0
3132
go.uber.org/zap v1.16.0
3233
)
3334

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,8 @@ github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
306306
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
307307
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
308308
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
309-
github.com/ipfs/go-peertaskqueue v0.2.0 h1:2cSr7exUGKYyDeUyQ7P/nHPs9P7Ht/B+ROrpN1EJOjc=
310-
github.com/ipfs/go-peertaskqueue v0.2.0/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY=
309+
github.com/ipfs/go-peertaskqueue v0.4.0 h1:x1hFgA4JOUJ3ntPfqLRu6v4k6kKL0p07r3RSg9JNyHI=
310+
github.com/ipfs/go-peertaskqueue v0.4.0/go.mod h1:KL9F49hXJMoXCad8e5anivjN+kWdr+CyGcyh4K6doLc=
311311
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
312312
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
313313
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=

internal/decision/blockstoremanager.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,36 @@ import (
88
blocks "github.com/ipfs/go-block-format"
99
cid "github.com/ipfs/go-cid"
1010
bstore "github.com/ipfs/go-ipfs-blockstore"
11+
"github.com/ipfs/go-metrics-interface"
1112
process "github.com/jbenet/goprocess"
1213
)
1314

1415
// blockstoreManager maintains a pool of workers that make requests to the blockstore.
1516
type blockstoreManager struct {
16-
bs bstore.Blockstore
17-
workerCount int
18-
jobs chan func()
19-
px process.Process
17+
bs bstore.Blockstore
18+
workerCount int
19+
jobs chan func()
20+
px process.Process
21+
pendingGauge metrics.Gauge
22+
activeGauge metrics.Gauge
2023
}
2124

2225
// newBlockstoreManager creates a new blockstoreManager with the given context
2326
// and number of workers
24-
func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager {
27+
func newBlockstoreManager(
28+
ctx context.Context,
29+
bs bstore.Blockstore,
30+
workerCount int,
31+
pendingGauge metrics.Gauge,
32+
activeGauge metrics.Gauge,
33+
) *blockstoreManager {
2534
return &blockstoreManager{
26-
bs: bs,
27-
workerCount: workerCount,
28-
jobs: make(chan func()),
29-
px: process.WithTeardown(func() error { return nil }),
35+
bs: bs,
36+
workerCount: workerCount,
37+
jobs: make(chan func()),
38+
px: process.WithTeardown(func() error { return nil }),
39+
pendingGauge: pendingGauge,
40+
activeGauge: activeGauge,
3041
}
3142
}
3243

@@ -46,7 +57,10 @@ func (bsm *blockstoreManager) worker(px process.Process) {
4657
case <-px.Closing():
4758
return
4859
case job := <-bsm.jobs:
60+
bsm.pendingGauge.Dec()
61+
bsm.activeGauge.Inc()
4962
job()
63+
bsm.activeGauge.Dec()
5064
}
5165
}
5266
}
@@ -58,6 +72,7 @@ func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
5872
case <-bsm.px.Closing():
5973
return fmt.Errorf("shutting down")
6074
case bsm.jobs <- job:
75+
bsm.pendingGauge.Inc()
6176
return nil
6277
}
6378
}

internal/decision/blockstoremanager_test.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ipfs/go-bitswap/internal/testutil"
1111
cid "github.com/ipfs/go-cid"
12+
"github.com/ipfs/go-metrics-interface"
1213

1314
blocks "github.com/ipfs/go-block-format"
1415
ds "github.com/ipfs/go-datastore"
@@ -19,13 +20,23 @@ import (
1920
process "github.com/jbenet/goprocess"
2021
)
2122

23+
func newBlockstoreManagerForTesting(
24+
ctx context.Context,
25+
bs blockstore.Blockstore,
26+
workerCount int,
27+
) *blockstoreManager {
28+
testPendingBlocksGauge := metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
29+
testActiveBlocksGauge := metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()
30+
return newBlockstoreManager(ctx, bs, workerCount, testPendingBlocksGauge, testActiveBlocksGauge)
31+
}
32+
2233
func TestBlockstoreManagerNotFoundKey(t *testing.T) {
2334
ctx := context.Background()
2435
bsdelay := delay.Fixed(3 * time.Millisecond)
2536
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
2637
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
2738

28-
bsm := newBlockstoreManager(bstore, 5)
39+
bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
2940
bsm.start(process.WithTeardown(func() error { return nil }))
3041

3142
cids := testutil.GenerateCids(4)
@@ -64,7 +75,7 @@ func TestBlockstoreManager(t *testing.T) {
6475
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
6576
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
6677

67-
bsm := newBlockstoreManager(bstore, 5)
78+
bsm := newBlockstoreManagerForTesting(ctx, bstore, 5)
6879
bsm.start(process.WithTeardown(func() error { return nil }))
6980

7081
exp := make(map[cid.Cid]blocks.Block)
@@ -148,7 +159,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
148159
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
149160

150161
workerCount := 5
151-
bsm := newBlockstoreManager(bstore, workerCount)
162+
bsm := newBlockstoreManagerForTesting(ctx, bstore, workerCount)
152163
bsm.start(process.WithTeardown(func() error { return nil }))
153164

154165
blkSize := int64(8 * 1024)
@@ -190,7 +201,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
190201
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
191202
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))
192203

193-
bsm := newBlockstoreManager(bstore, 3)
204+
bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
194205
px := process.WithTeardown(func() error { return nil })
195206
bsm.start(px)
196207

@@ -229,7 +240,8 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
229240
underlyingBstore := blockstore.NewBlockstore(underlyingDstore)
230241
bstore := blockstore.NewBlockstore(dstore)
231242

232-
bsm := newBlockstoreManager(bstore, 3)
243+
ctx := context.Background()
244+
bsm := newBlockstoreManagerForTesting(ctx, bstore, 3)
233245
proc := process.WithTeardown(func() error { return nil })
234246
bsm.start(proc)
235247

0 commit comments

Comments
 (0)