Skip to content

perf: add async log writer #3953

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions pkg/phlare/phlare.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,39 +727,30 @@ func (f *Phlare) stopped() {
level.Error(f.logger).Log("msg", "error closing tracing", "err", err)
}
}
if err := f.logger.buf.Flush(); err != nil {
fmt.Println("error flushing logs:", err)
if err := f.logger.w.Close(); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error closing log writer: %v\n", err)
}
}

func initLogger(logFormat string, logLevel dslog.Level) *logger {
buf := dslog.NewBufferedLogger(
// There's no particular reason to explicitly synchronise stdout/err writes:
// writes less than 4K (pipe buffer size) are already synchronous, and large
// outputs such as goroutine dump are not directed to the writer. However,
// since we're buffering writes, it's required.
log.NewSyncWriter(os.Stderr),
256, // Max number of entries.
dslog.WithFlushPeriod(100*time.Millisecond),
dslog.WithPrellocatedBuffer(64<<10),
w := util.NewAsyncWriter(os.Stderr, // Flush after:
256<<10, 20, // 256KiB buffer is full (keep 20 buffers).
1<<10, // 1K writes or 100ms.
100*time.Millisecond,
)

l := dslog.NewGoKitWithWriter(logFormat, buf)

// use UTC timestamps and skip 5 stack frames.
// Use UTC timestamps and skip 5 stack frames.
l := dslog.NewGoKitWithWriter(logFormat, w)
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.Caller(5))

// Must put the level filter last for efficiency.
l = level.NewFilter(l, logLevel.Option)

return &logger{
buf: buf,
Logger: l,
}
return &logger{w: w, Logger: l}
}

type logger struct {
buf *dslog.BufferedLogger
w io.WriteCloser
log.Logger
}

Expand Down
126 changes: 126 additions & 0 deletions pkg/util/logger.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package util

import (
"bytes"
"context"
"io"
"os"
"sync"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/tenant"
Expand Down Expand Up @@ -60,3 +65,124 @@ func LoggerWithContext(ctx context.Context, l log.Logger) log.Logger {
func WithSourceIPs(sourceIPs string, l log.Logger) log.Logger {
return log.With(l, "sourceIPs", sourceIPs)
}

// AsyncWriter is a writer that buffers writes and flushes them asynchronously
// in the order they were written. It is safe for concurrent use.
//
// If the internal queue is full, writes will block until there is space.
// Errors are ignored: it's caller responsibility to handle errors from the
// underlying writer.
type AsyncWriter struct {
mu sync.Mutex
w io.Writer
pool sync.Pool
buffer *bytes.Buffer
flushQueue chan *bytes.Buffer
maxSize int
maxCount int
flushInterval time.Duration
writes int
closeOnce sync.Once
close chan struct{}
done chan error
closed bool
}

func NewAsyncWriter(w io.Writer, bufSize, maxBuffers, maxWrites int, flushInterval time.Duration) *AsyncWriter {
bw := &AsyncWriter{
w: w,
flushQueue: make(chan *bytes.Buffer, maxBuffers),
maxSize: bufSize,
maxCount: maxWrites,
flushInterval: flushInterval,
close: make(chan struct{}),
done: make(chan error),
pool: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, bufSize))
},
},
}
go bw.loop()
return bw
}

func (aw *AsyncWriter) Write(p []byte) (int, error) {
aw.mu.Lock()
defer aw.mu.Unlock()
if aw.closed {
return 0, os.ErrClosed
}
if aw.overflows(len(p)) {
aw.enqueueFlush()
}
if aw.buffer == nil {
aw.buffer = aw.pool.Get().(*bytes.Buffer)
aw.buffer.Reset()
}
aw.writes++
return aw.buffer.Write(p)
}

func (aw *AsyncWriter) overflows(n int) bool {
return aw.buffer != nil && (aw.buffer.Len()+n >= aw.maxSize || aw.writes >= aw.maxCount)
}

func (aw *AsyncWriter) Close() error {
aw.closeOnce.Do(func() {
// Break the loop.
close(aw.close)
<-aw.done
// Empty the queue.
aw.mu.Lock()
defer aw.mu.Unlock()
aw.enqueueFlush()
close(aw.flushQueue)
for buf := range aw.flushQueue {
aw.flushSync(buf)
}
aw.closed = true
})
return nil
}

func (aw *AsyncWriter) enqueueFlush() {
buf := aw.buffer
if buf == nil || buf.Len() == 0 {
return
}
aw.buffer = nil
aw.writes = 0
select {
case aw.flushQueue <- buf:
default:
}
}

func (aw *AsyncWriter) loop() {
ticker := time.NewTicker(aw.flushInterval)
defer func() {
ticker.Stop()
close(aw.done)
}()

for {
select {
case buf := <-aw.flushQueue:
aw.flushSync(buf)

case <-ticker.C:
aw.mu.Lock()
aw.enqueueFlush()
aw.mu.Unlock()

case <-aw.close:
return
}
}
}

func (aw *AsyncWriter) flushSync(b *bytes.Buffer) {
_, _ = aw.w.Write(b.Bytes())
aw.pool.Put(b)
}
64 changes: 64 additions & 0 deletions pkg/util/logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package util

import (
"bytes"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAsyncWriter_Write(t *testing.T) {
var buf bytes.Buffer
w := NewAsyncWriter(&buf, 10, 2, 2, 100*time.Millisecond)
n, err := w.Write([]byte("hello"))
require.NoError(t, err)
assert.Equal(t, 5, n)
assert.NoError(t, w.Close())
assert.Equal(t, "hello", buf.String())
}

func TestAsyncWriter_Empty(t *testing.T) {
var buf bytes.Buffer
w := NewAsyncWriter(&buf, 10, 2, 2, 100*time.Millisecond)
assert.NoError(t, w.Close())
assert.EqualValues(t, 0, buf.Len())
}

func TestAsyncWriter_Overflow(t *testing.T) {
var buf bytes.Buffer
w := NewAsyncWriter(&buf, 10, 2, 2, 100*time.Millisecond)
_, _ = w.Write([]byte("hello"))
_, _ = w.Write([]byte("world"))
assert.NoError(t, w.Close())
assert.Equal(t, "helloworld", buf.String())
}

func TestAsyncWriter_Close(t *testing.T) {
var buf bytes.Buffer
w := NewAsyncWriter(&buf, 10, 2, 2, 100*time.Millisecond)
_, _ = w.Write([]byte("hello"))
assert.NoError(t, w.Close())
assert.Equal(t, "hello", buf.String())
assert.NoError(t, w.Close())
}

func TestAsyncWriter_ConcurrentWrites(t *testing.T) {
var buf bytes.Buffer
w := NewAsyncWriter(&buf, 50, 2, 10, 100*time.Millisecond)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
_, _ = fmt.Fprintf(w, "hello %d\n", i)
}(i)
}
wg.Wait()

assert.NoError(t, w.Close())
assert.Equal(t, 80, buf.Len())
}
Loading