Skip to content

Commit c523380

Browse files
starnopfuweid
authored andcommitted
feature: use ringBuffer to make log not blocking and configurable
Signed-off-by: Starnop <[email protected]>
1 parent c279a17 commit c523380

File tree

8 files changed

+227
-79
lines changed

8 files changed

+227
-79
lines changed

daemon/containerio/io.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/alibaba/pouch/daemon/logger"
88
"github.com/alibaba/pouch/daemon/logger/crilog"
9+
"github.com/alibaba/pouch/daemon/logger/logbuffer"
910
"github.com/alibaba/pouch/pkg/multierror"
1011
"github.com/alibaba/pouch/pkg/streams"
1112

@@ -47,6 +48,9 @@ type IO struct {
4748
logdriver logger.LogDriver
4849
logcopier *logger.LogCopier
4950
criLog *crilog.Log
51+
52+
nonBlock bool
53+
maxBufferSize int64
5054
}
5155

5256
// NewIO return IO instance.
@@ -87,6 +91,16 @@ func (ctrio *IO) SetLogDriver(logdriver logger.LogDriver) {
8791
ctrio.logdriver = logdriver
8892
}
8993

94+
// SetMaxBufferSize set the max size of buffer.
95+
func (ctrio *IO) SetMaxBufferSize(maxBufferSize int64) {
96+
ctrio.maxBufferSize = maxBufferSize
97+
}
98+
99+
// SetNonBlock whether to cache the container's logs with buffer.
100+
func (ctrio *IO) SetNonBlock(nonBlock bool) {
101+
ctrio.nonBlock = nonBlock
102+
}
103+
90104
// Stream is used to export the stream field.
91105
func (ctrio *IO) Stream() *streams.Stream {
92106
return ctrio.stream
@@ -188,6 +202,14 @@ func (ctrio *IO) startLogging() error {
188202
return nil
189203
}
190204

205+
if ctrio.nonBlock {
206+
logDriver, err := logbuffer.NewLogBuffer(ctrio.logdriver, ctrio.maxBufferSize)
207+
if err != nil {
208+
return err
209+
}
210+
ctrio.logdriver = logDriver
211+
}
212+
191213
ctrio.logcopier = logger.NewLogCopier(ctrio.logdriver, map[string]io.Reader{
192214
"stdout": ctrio.stream.NewStdoutPipe(),
193215
"stderr": ctrio.stream.NewStderrPipe(),

pkg/ringbuffer/list.go renamed to daemon/logger/logbuffer/list.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
package ringbuffer
1+
package logbuffer
22

33
import (
44
"sync"
5+
6+
"github.com/alibaba/pouch/daemon/logger"
57
)
68

79
var elemPool = &sync.Pool{New: func() interface{} { return new(element) }}
810

911
type element struct {
1012
next, prev *element
11-
val interface{}
13+
val *logger.LogMessage
1214
}
1315

1416
func (e *element) reset() {
@@ -34,7 +36,7 @@ func (q *queue) size() int {
3436
return q.count
3537
}
3638

37-
func (q *queue) enqueue(val interface{}) {
39+
func (q *queue) enqueue(val *logger.LogMessage) {
3840
elem := elemPool.Get().(*element)
3941
elem.val = val
4042

@@ -47,7 +49,7 @@ func (q *queue) enqueue(val interface{}) {
4749
q.count++
4850
}
4951

50-
func (q *queue) dequeue() interface{} {
52+
func (q *queue) dequeue() *logger.LogMessage {
5153
if q.size() == 0 {
5254
return nil
5355
}

daemon/logger/logbuffer/logbuff.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package logbuffer
2+
3+
import (
4+
"github.com/alibaba/pouch/daemon/logger"
5+
6+
"github.com/sirupsen/logrus"
7+
)
8+
9+
// LogBuffer is uses to cache the container's logs with ringBuffer.
10+
type LogBuffer struct {
11+
ringBuffer *RingBuffer
12+
logger logger.LogDriver
13+
}
14+
15+
// NewLogBuffer return a new BufferLog.
16+
func NewLogBuffer(logDriver logger.LogDriver, maxBytes int64) (logger.LogDriver, error) {
17+
bl := &LogBuffer{
18+
logger: logDriver,
19+
ringBuffer: NewRingBuffer(maxBytes),
20+
}
21+
22+
// use a goroutine to write logs continuously with specified log driver
23+
go bl.run()
24+
return bl, nil
25+
}
26+
27+
// Name return the log driver's name.
28+
func (bl *LogBuffer) Name() string {
29+
return bl.logger.Name()
30+
}
31+
32+
// WriteLogMessage will write the LogMessage to the ringBuffer.
33+
func (bl *LogBuffer) WriteLogMessage(msg *logger.LogMessage) error {
34+
return bl.ringBuffer.Push(msg)
35+
}
36+
37+
// Close close the ringBuffer and drain the messages.
38+
func (bl *LogBuffer) Close() error {
39+
bl.ringBuffer.Close()
40+
for _, msg := range bl.ringBuffer.Drain() {
41+
if err := bl.logger.WriteLogMessage(msg); err != nil {
42+
logrus.Debugf("failed to write log %v when closing with log driver %s", msg, bl.logger.Name())
43+
}
44+
}
45+
46+
return bl.logger.Close()
47+
}
48+
49+
// write logs continuously with specified log driver from ringBuffer.
50+
func (bl *LogBuffer) run() {
51+
for {
52+
msg, err := bl.ringBuffer.Pop()
53+
if err != nil {
54+
return
55+
}
56+
57+
if err := bl.logger.WriteLogMessage(msg); err != nil {
58+
logrus.Debugf("failed to write log %v with log driver %s", msg, bl.logger.Name())
59+
}
60+
}
61+
}

pkg/ringbuffer/ringbuff.go renamed to daemon/logger/logbuffer/ringbuff.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,75 @@
1-
package ringbuffer
1+
package logbuffer
22

33
import (
44
"fmt"
55
"sync"
6+
7+
"github.com/alibaba/pouch/daemon/logger"
68
)
79

810
// ErrClosed is used to indicate the ringbuffer has been closed.
911
var ErrClosed = fmt.Errorf("closed")
1012

11-
const defaultSize = 1024
13+
const (
14+
defaultMaxBytes = 1e6 //1MB
15+
)
1216

1317
// RingBuffer implements a fixed-size buffer which will drop oldest data if full.
1418
type RingBuffer struct {
1519
mu sync.Mutex
1620
wait *sync.Cond
1721

18-
cap int
1922
closed bool
2023
q *queue
24+
25+
maxBytes int64
26+
currentBytes int64
2127
}
2228

23-
// New creates new RingBuffer.
24-
func New(cap int) *RingBuffer {
25-
if cap <= 0 {
26-
cap = defaultSize
29+
// NewRingBuffer creates new RingBuffer.
30+
func NewRingBuffer(maxBytes int64) *RingBuffer {
31+
if maxBytes < 0 {
32+
maxBytes = defaultMaxBytes
2733
}
2834

2935
rb := &RingBuffer{
30-
cap: cap,
31-
closed: false,
32-
q: newQueue(),
36+
closed: false,
37+
q: newQueue(),
38+
maxBytes: maxBytes,
3339
}
3440
rb.wait = sync.NewCond(&rb.mu)
3541
return rb
3642
}
3743

3844
// Push pushes value into buffer and return whether it covers the oldest data
3945
// or not.
40-
func (rb *RingBuffer) Push(val interface{}) (bool, error) {
46+
func (rb *RingBuffer) Push(val *logger.LogMessage) error {
4147
rb.mu.Lock()
4248
defer rb.mu.Unlock()
4349

4450
if rb.closed {
45-
return false, ErrClosed
51+
return ErrClosed
4652
}
4753

4854
if val == nil {
49-
return false, nil
55+
return nil
5056
}
5157

52-
// drop the oldest element if covered
53-
covered := (rb.q.size() == rb.cap)
54-
if covered {
55-
rb.q.dequeue()
58+
msgLength := int64(len(val.Line))
59+
if (rb.currentBytes + msgLength) > rb.maxBytes {
60+
rb.wait.Broadcast()
61+
return nil
5662
}
5763

5864
rb.q.enqueue(val)
5965
rb.wait.Broadcast()
60-
return covered, nil
66+
return nil
6167
}
6268

6369
// Pop pops the value in the buffer.
6470
//
6571
// NOTE: it returns ErrClosed if the buffer has been closed.
66-
func (rb *RingBuffer) Pop() (interface{}, error) {
72+
func (rb *RingBuffer) Pop() (*logger.LogMessage, error) {
6773
rb.mu.Lock()
6874
for rb.q.size() == 0 && !rb.closed {
6975
rb.wait.Wait()
@@ -75,23 +81,25 @@ func (rb *RingBuffer) Pop() (interface{}, error) {
7581
}
7682

7783
val := rb.q.dequeue()
84+
rb.currentBytes -= int64(len(val.Line))
7885
rb.mu.Unlock()
7986
return val, nil
8087
}
8188

8289
// Drain returns all the data in the buffer.
8390
//
8491
// NOTE: it can be used after closed to make sure the data have been consumed.
85-
func (rb *RingBuffer) Drain() []interface{} {
92+
func (rb *RingBuffer) Drain() []*logger.LogMessage {
8693
rb.mu.Lock()
8794
defer rb.mu.Unlock()
8895

8996
size := rb.q.size()
90-
vals := make([]interface{}, 0, size)
97+
vals := make([]*logger.LogMessage, 0, size)
9198

9299
for i := 0; i < size; i++ {
93100
vals = append(vals, rb.q.dequeue())
94101
}
102+
rb.currentBytes = 0
95103
return vals
96104
}
97105

0 commit comments

Comments
 (0)