Skip to content

Commit c415859

Browse files
committed
added overlapped ring buffer impl, see the #5
An overlapped ring buffer will overwrite the existing head element in putting new elem but the ring buffer is full. The original ring buffer of ourselves is to return a wrong state and stop putting action, see the `ErrQueueFull`. The new overlapped ring buffer will be created by calling `NewOverlappedRingBuffer()`. Any issues are welcome.
1 parent f5661e7 commit c415859

File tree

5 files changed

+286
-46
lines changed

5 files changed

+286
-46
lines changed

mpmc/rb.go

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,68 @@ import (
99
"github.com/hedzr/go-ringbuf/v2/mpmc/state"
1010
)
1111

12-
// New returns the RingBuffer object
12+
// New returns the RingBuffer object.
13+
//
14+
// It returns ErrQueueFull when the queue is full and
15+
// putting a new element.
1316
func New[T any](capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
14-
if isInitialized() {
17+
return newRingBuffer(func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
1518
size := roundUpToPower2(capacity)
16-
1719
rb := &ringBuf[T]{
1820
data: make([]rbItem[T], size),
1921
cap: size,
2022
capModMask: size - 1, // = 2^n - 1
2123
}
22-
24+
for _, opt := range opts {
25+
opt(rb)
26+
}
2327
ringBuffer = rb
28+
return
29+
}, capacity, opts...)
30+
}
2431

32+
// NewOverlappedRingBuffer initials a ring buffer, which overwrites
33+
// its head element if it's full.
34+
func NewOverlappedRingBuffer[T any](capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
35+
return newRingBuffer(func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
36+
size := roundUpToPower2(capacity)
37+
rb := &orbuf[T]{
38+
ringBuf[T]{
39+
data: make([]rbItem[T], size),
40+
cap: size,
41+
capModMask: size - 1, // = 2^n - 1
42+
},
43+
}
2544
for _, opt := range opts {
26-
opt(rb)
45+
opt(&rb.ringBuf)
2746
}
47+
ringBuffer = rb
48+
return
49+
}, capacity, opts...)
50+
}
51+
52+
type Creator[T any] func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T])
53+
54+
func newRingBuffer[T any](creator Creator[T], capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
55+
if isInitialized() {
56+
ringBuffer = creator(capacity, opts...)
57+
58+
// ringBuffer = rb
2859

29-
// if rb.debugMode && rb.logger != nil {
30-
// // rb.logger.Debug("[ringbuf][INI] ", zap.Uint32("cap", rb.cap), zap.Uint32("capModMask", rb.capModMask))
60+
// for _, opt := range opts {
61+
// opt(rb)
3162
// }
3263

33-
for i := 0; i < int(size); i++ {
34-
rb.data[i].readWrite &= 0 // bit 0: readable, bit 1: writable
35-
if rb.initializer != nil {
36-
rb.data[i].value = rb.initializer.PreAlloc(i)
37-
}
38-
}
64+
// // if rb.debugMode && rb.logger != nil {
65+
// // // rb.logger.Debug("[ringbuf][INI] ", zap.Uint32("cap", rb.cap), zap.Uint32("capModMask", rb.capModMask))
66+
// // }
67+
68+
// for i := 0; i < int(size); i++ {
69+
// rb.data[i].readWrite &= 0 // bit 0: readable, bit 1: writable
70+
// if rb.initializer != nil {
71+
// rb.data[i].value = rb.initializer.PreAlloc(i)
72+
// }
73+
// }
3974
}
4075
return
4176
}
@@ -52,7 +87,7 @@ func WithItemInitializer[T any](initializeable Initializeable[T]) Opt[T] {
5287

5388
// WithDebugMode enables the internal debug mode for more logging output, and collect the metrics for debugging
5489
func WithDebugMode[T any](_ bool) Opt[T] {
55-
return func(buf *ringBuf[T]) {
90+
return func(_ *ringBuf[T]) {
5691
// buf.debugMode = debug
5792
}
5893
}
@@ -71,13 +106,13 @@ type ringBuf[T any] struct {
71106
capModMask uint32
72107
_ [CacheLinePadSize - 8]byte
73108
head uint32
74-
_ [CacheLinePadSize - 4]byte
109+
_ [CacheLinePadSize - 4]byte //nolint:revive
75110
tail uint32
76-
_ [CacheLinePadSize - 4]byte
111+
_ [CacheLinePadSize - 4]byte //nolint:revive
77112
putWaits uint64
78-
_ [CacheLinePadSize - 8]byte
113+
_ [CacheLinePadSize - 8]byte //nolint:revive
79114
getWaits uint64
80-
_ [CacheLinePadSize - 8]byte
115+
_ [CacheLinePadSize - 8]byte //nolint:revive
81116
data []rbItem[T]
82117
// debugMode bool
83118
// logger log.Logger
@@ -93,9 +128,9 @@ type rbItem[T any] struct {
93128
// _ cpu.CacheLinePad
94129
}
95130

96-
func (rb *ringBuf[T]) Put(item T) (err error) { return rb.Enqueue(item) }
131+
func (rb *ringBuf[T]) Put(item T) (err error) { return rb.Enqueue(item) } //nolint:revive
97132

98-
func (rb *ringBuf[T]) Enqueue(item T) (err error) {
133+
func (rb *ringBuf[T]) Enqueue(item T) (err error) { //nolint:revive
99134
var tail, head, nt uint32
100135
var holder *rbItem[T]
101136
for {
@@ -145,9 +180,9 @@ func (rb *ringBuf[T]) Enqueue(item T) (err error) {
145180
}
146181
}
147182

148-
func (rb *ringBuf[T]) Get() (item T, err error) { return rb.Dequeue() }
183+
func (rb *ringBuf[T]) Get() (item T, err error) { return rb.Dequeue() } //nolint:revive
149184

150-
func (rb *ringBuf[T]) Dequeue() (item T, err error) {
185+
func (rb *ringBuf[T]) Dequeue() (item T, err error) { //nolint:revive
151186
var tail, head, nh uint32
152187
var holder *rbItem[T]
153188
for {

mpmc/rb2.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package mpmc
2+
3+
import (
4+
"runtime"
5+
"sync/atomic"
6+
7+
"github.com/hedzr/go-ringbuf/v2/mpmc/state"
8+
)
9+
10+
type orbuf[T any] struct {
11+
ringBuf[T]
12+
}
13+
14+
func (rb *orbuf[T]) Put(item T) (err error) { return rb.Enqueue(item) } //nolint:revive
15+
16+
func (rb *orbuf[T]) Enqueue(item T) (err error) { //nolint:revive
17+
var tail, head, nt, nh uint32
18+
var holder *rbItem[T]
19+
for {
20+
head = atomic.LoadUint32(&rb.head)
21+
tail = atomic.LoadUint32(&rb.tail)
22+
nt = (tail + 1) & rb.capModMask
23+
24+
isEmpty := head == tail
25+
if isEmpty && head == MaxUint32 {
26+
err = ErrQueueNotReady
27+
return
28+
}
29+
30+
isFull := nt == head
31+
if isFull {
32+
nh = (head + 1) & rb.capModMask
33+
atomic.CompareAndSwapUint32(&rb.head, head, nh)
34+
}
35+
36+
holder = &rb.data[tail]
37+
atomic.CompareAndSwapUint32(&rb.tail, tail, nt)
38+
retry:
39+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 0, 2) { //nolint:gomnd
40+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 2) { //nolint:gomnd
41+
if atomic.LoadUint64(&holder.readWrite) == 0 {
42+
goto retry // sometimes, short circuit
43+
}
44+
runtime.Gosched() // time to time
45+
continue
46+
}
47+
}
48+
49+
if rb.initializer != nil {
50+
rb.initializer.CloneIn(item, &holder.value)
51+
} else {
52+
holder.value = item
53+
}
54+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 2, 1) { //nolint:gomnd
55+
err = ErrRaced // runtime.Gosched() // never happens
56+
}
57+
58+
if state.VerboseEnabled {
59+
state.Verbose("[W] enqueued",
60+
"tail", tail, "new-tail", nt, "head", head, "value", toString(holder.value),
61+
"value(rb.data[0])", toString(rb.data[0].value),
62+
"value(rb.data[1])", toString(rb.data[1].value))
63+
}
64+
return
65+
}
66+
}
67+
68+
func (rb *orbuf[T]) Get() (item T, err error) { return rb.Dequeue() } //nolint:revive
69+
70+
func (rb *orbuf[T]) Dequeue() (item T, err error) { //nolint:revive
71+
var tail, head, nh uint32
72+
var holder *rbItem[T]
73+
for {
74+
// var quad uint64
75+
// quad = atomic.LoadUint64((*uint64)(unsafe.Pointer(&rb.head)))
76+
// head = (uint32)(quad & MaxUint32_64)
77+
// tail = (uint32)(quad >> 32)
78+
head = atomic.LoadUint32(&rb.head)
79+
tail = atomic.LoadUint32(&rb.tail)
80+
81+
isEmpty := head == tail
82+
if isEmpty {
83+
if head == MaxUint32 {
84+
err = ErrQueueNotReady
85+
return
86+
}
87+
err = ErrQueueEmpty
88+
return
89+
}
90+
91+
holder = &rb.data[head]
92+
93+
nh = (head + 1) & rb.capModMask
94+
atomic.CompareAndSwapUint32(&rb.head, head, nh)
95+
retry:
96+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 3) { //nolint:gomnd
97+
if atomic.LoadUint64(&holder.readWrite) == 1 {
98+
goto retry // sometimes, short circuit
99+
}
100+
runtime.Gosched() // time to time
101+
continue
102+
}
103+
104+
if rb.initializer != nil {
105+
item = rb.initializer.CloneOut(&holder.value)
106+
} else {
107+
item = holder.value
108+
// holder.value = zero
109+
}
110+
if !atomic.CompareAndSwapUint64(&holder.readWrite, 3, 0) { //nolint:gomnd
111+
err = ErrRaced // runtime.Gosched() // never happens
112+
}
113+
114+
if state.VerboseEnabled {
115+
state.Verbose("[ringbuf][GET] states are:",
116+
"cap", rb.Cap(), "qty", rb.qty(head, tail), "tail", tail, "head", head, "new-head", nh, "item", toString(item))
117+
}
118+
119+
return
120+
}
121+
}

mpmc/rb2_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package mpmc
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"testing"
7+
)
8+
9+
func TestOverlappedRingBuf_Put_OneByOne(t *testing.T) { //nolint:revive
10+
var err error
11+
rb := NewOverlappedRingBuffer(NLtd, WithDebugMode[uint32](true))
12+
size := rb.Cap() - 1
13+
// t.Logf("Ring Buffer created, capacity = %v, real size: %v", size+1, size)
14+
defer rb.Close()
15+
16+
for i := uint32(0); i < size; i++ {
17+
err = rb.Enqueue(i)
18+
if err != nil {
19+
t.Fatalf("faild on i=%v. err: %+v", i, err)
20+
// } else {
21+
// t.Logf("%5d. '%v' put, quantity = %v.", i, i, fast.Quantity())
22+
}
23+
t.Logf(" %3d. ringbuf elements -> %v", i, rb)
24+
}
25+
26+
for i := size; i < size+size; i++ {
27+
err = rb.Put(i)
28+
if errors.Is(err, ErrQueueFull) {
29+
t.Fatalf("> %3d. expect ErrQueueFull but no error raised. err: %+v", i, err)
30+
}
31+
t.Logf(" %3d. ringbuf elements -> %v", i, rb)
32+
}
33+
34+
var it any
35+
it, err = rb.Dequeue()
36+
_, _ = it, err
37+
t.Logf(" %3d. ringbuf elements -> %v", -1, rb)
38+
it, err = rb.Dequeue()
39+
_, _ = it, err
40+
t.Logf(" %3d. ringbuf elements -> %v", -1, rb)
41+
if fmt.Sprintf("%v", rb) != "[17,18,19,20,21,22,23,24,25,26,27,28,29,]/13" {
42+
t.Fatalf("faild: expecting elements are: [17,18,19,20,21,22,23,24,25,26,27,28,29,]/13")
43+
}
44+
45+
for i := size * 2; i < size*3+2; i++ {
46+
err = rb.Put(i)
47+
if errors.Is(err, ErrQueueFull) {
48+
t.Fatalf("> %3d. expect ErrQueueFull but no error raised. err: %+v", i, err)
49+
}
50+
t.Logf(" %3d. ringbuf elements -> %v", i, rb)
51+
}
52+
if fmt.Sprintf("%v", rb) != "[32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,]/15" {
53+
t.Fatalf("faild: expecting elements are: [32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,]/15")
54+
}
55+
56+
for i := 0; ; i++ {
57+
it, err = rb.Dequeue()
58+
if err != nil {
59+
if errors.Is(err, ErrQueueEmpty) {
60+
break
61+
}
62+
t.Fatalf("faild on i=%v. err: %+v. item: %v", i, err, it)
63+
// } else {
64+
// t.Logf("< %3d. '%v' GOT, quantity = %v.", i, it, fast.Quantity())
65+
}
66+
if rb.Size() == 0 {
67+
// t.Log("empty ring buffer elements")
68+
if fmt.Sprintf("%v", rb) != "[]/0" {
69+
t.Fatalf("faild: expecting elements are: []/0")
70+
}
71+
}
72+
t.Logf(" %3d. ringbuf elements -> %v", i, rb)
73+
}
74+
75+
it, err = rb.Dequeue()
76+
if err == nil {
77+
t.Fatalf("faild: Dequeue on an empty ringbuf should return an ErrQueueEmpty object.")
78+
}
79+
}

0 commit comments

Comments
 (0)