log: Use dskit/log.BufferedLogger instead of LineBufferedLogger (#10005)

**What this PR does / why we need it**:
`pkg/util/log.LineBufferedLogger` was ported to
`dskit/log.BufferedLogger` in https://github.com/grafana/dskit/pull/338,
so this utility can be shared among projects. This PR modifies Loki to
use the dskit implementation.
pull/10110/head
Arve Knudsen 3 years ago committed by GitHub
parent aa1f5dc9cd
commit 7014e5ce7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 179
      pkg/util/log/line_buffer_test.go
  2. 11
      pkg/util/log/log.go
  3. 62
      vendor/github.com/grafana/dskit/log/buffered.go
  4. 1
      vendor/modules.txt

@ -1,179 +0,0 @@
package log
import (
"bytes"
"fmt"
"io"
"math"
"os"
"strings"
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
)
const (
flushPeriod = 10 * time.Millisecond
bufferSize = 10e6
)
// BenchmarkLineBuffered creates line-buffered loggers of various capacities to see which perform best.
func BenchmarkLineBuffered(b *testing.B) {
for i := 1; i <= 2048; i *= 2 {
f := outFile(b)
defer os.RemoveAll(f.Name())
bufLog := NewLineBufferedLogger(f, uint32(i),
WithFlushPeriod(flushPeriod),
WithPrellocatedBuffer(bufferSize),
)
l := log.NewLogfmtLogger(bufLog)
b.Run(fmt.Sprintf("capacity:%d", i), func(b *testing.B) {
b.ReportAllocs()
b.StartTimer()
require.NoError(b, f.Truncate(0))
logger := log.With(l, "common_key", "common_value")
for j := 0; j < b.N; j++ {
logger.Log("foo_key", "foo_value")
}
// force a final flush for outstanding lines in buffer
bufLog.Flush()
b.StopTimer()
contents, err := os.ReadFile(f.Name())
require.NoErrorf(b, err, "could not read test file: %s", f.Name())
lines := strings.Split(string(contents), "\n")
require.Equal(b, b.N, len(lines)-1)
})
}
}
// BenchmarkLineUnbuffered should perform roughly equivalently to a line-buffered logger with a capacity of 1.
func BenchmarkLineUnbuffered(b *testing.B) {
b.ReportAllocs()
f := outFile(b)
defer os.RemoveAll(f.Name())
l := log.NewLogfmtLogger(f)
benchmarkRunner(b, l, baseMessage)
b.StopTimer()
contents, err := os.ReadFile(f.Name())
require.NoErrorf(b, err, "could not read test file: %s", f.Name())
lines := strings.Split(string(contents), "\n")
require.Equal(b, b.N, len(lines)-1)
}
func BenchmarkLineDiscard(b *testing.B) {
b.ReportAllocs()
l := log.NewLogfmtLogger(io.Discard)
benchmarkRunner(b, l, baseMessage)
}
func TestLineBufferedConcurrency(t *testing.T) {
t.Parallel()
bufLog := NewLineBufferedLogger(io.Discard, 32,
WithFlushPeriod(flushPeriod),
WithPrellocatedBuffer(bufferSize),
)
testConcurrency(t, log.NewLogfmtLogger(bufLog), 10000)
}
func TestOnFlushCallback(t *testing.T) {
var (
flushCount uint32
flushedEntries int
buf bytes.Buffer
)
callback := func(entries uint32) {
flushCount++
flushedEntries += int(entries)
}
bufLog := NewLineBufferedLogger(&buf, 2,
WithFlushPeriod(flushPeriod),
WithPrellocatedBuffer(bufferSize),
WithFlushCallback(callback),
)
l := log.NewLogfmtLogger(bufLog)
require.NoError(t, l.Log("line"))
require.NoError(t, l.Log("line"))
// first flush
require.NoError(t, l.Log("line"))
// force a second
require.NoError(t, bufLog.Flush())
require.Equal(t, uint32(2), flushCount)
require.Equal(t, len(strings.Split(buf.String(), "\n"))-1, flushedEntries)
}
// outFile creates a real OS file for testing.
// We cannot use stdout/stderr since we need to read the contents afterwards to validate, and we have to write to a file
// to benchmark the impact of write() syscalls.
func outFile(b *testing.B) *os.File {
f, err := os.CreateTemp(b.TempDir(), "linebuffer*")
require.NoErrorf(b, err, "cannot create test file")
return f
}
// Copied from go-kit/log
// These test are designed to be run with the race detector.
func testConcurrency(t *testing.T, logger log.Logger, total int) {
n := int(math.Sqrt(float64(total)))
share := total / n
errC := make(chan error, n)
for i := 0; i < n; i++ {
go func() {
errC <- spam(logger, share)
}()
}
for i := 0; i < n; i++ {
err := <-errC
if err != nil {
t.Fatalf("concurrent logging error: %v", err)
}
}
}
func spam(logger log.Logger, count int) error {
for i := 0; i < count; i++ {
err := logger.Log("key", i)
if err != nil {
return err
}
}
return nil
}
func benchmarkRunner(b *testing.B, logger log.Logger, f func(log.Logger)) {
lc := log.With(logger, "common_key", "common_value")
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
f(lc)
}
}
var (
baseMessage = func(logger log.Logger) { logger.Log("foo_key", "foo_value") }
)

@ -11,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
dslog "github.com/grafana/dskit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/logging"
@ -23,7 +24,7 @@ var (
// Prefer accepting a non-global logger as an argument.
Logger = log.NewNopLogger()
bufferedLogger *LineBufferedLogger
bufferedLogger *dslog.BufferedLogger
plogger *prometheusLogger
)
@ -147,10 +148,10 @@ func newPrometheusLogger(l logging.Level, format logging.Format, reg prometheus.
if buffered {
// retain a reference to this logger because it doesn't conform to the standard Logger interface,
// and we can't unwrap it to get the underlying logger when we flush on shutdown
bufferedLogger = NewLineBufferedLogger(os.Stderr, logEntries,
WithFlushPeriod(flushTimeout),
WithPrellocatedBuffer(logBufferSize),
WithFlushCallback(func(entries uint32) {
bufferedLogger = dslog.NewBufferedLogger(os.Stderr, logEntries,
dslog.WithFlushPeriod(flushTimeout),
dslog.WithPrellocatedBuffer(logBufferSize),
dslog.WithFlushCallback(func(entries uint32) {
logFlushes.Observe(float64(entries))
}),
)

@ -1,3 +1,6 @@
// Provenance-includes-location: https://github.com/grafana/loki/blob/7c78d7ea44afb420847255f9f5a4f677ad0f47bf/pkg/util/log/line_buffer.go
// Provenance-includes-location: https://github.com/grafana/mimir/blob/c8b24a462f7e224950409e7e0a4e0a58f3a79599/pkg/util/log/line_buffer.go
// Provenance-includes-copyright: Grafana Labs
package log
import (
@ -9,9 +12,9 @@ import (
"go.uber.org/atomic"
)
// LineBufferedLogger buffers log lines to be flushed periodically. Without a line buffer, Log() will call the write
// BufferedLogger buffers log lines to be flushed periodically. Without a line buffer, Log() will call the write
// syscall for every log line which is expensive if logging thousands of lines per second.
type LineBufferedLogger struct {
type BufferedLogger struct {
buf *threadsafeBuffer
entries atomic.Uint32
cap uint32
@ -21,13 +24,13 @@ type LineBufferedLogger struct {
}
// Size returns the number of entries in the buffer.
func (l *LineBufferedLogger) Size() uint32 {
func (l *BufferedLogger) Size() uint32 {
return l.entries.Load()
}
// Write writes the given bytes to the line buffer, and increments the entries counter.
// If the buffer is full (entries == cap), it will be flushed, and the entries counter reset.
func (l *LineBufferedLogger) Write(p []byte) (n int, err error) {
func (l *BufferedLogger) Write(p []byte) (n int, err error) {
// when we've filled the buffer, flush it
if l.Size() >= l.cap {
// Flush resets the size to 0
@ -43,15 +46,13 @@ func (l *LineBufferedLogger) Write(p []byte) (n int, err error) {
}
// Flush forces the buffer to be written to the underlying writer.
func (l *LineBufferedLogger) Flush() error {
sz := l.Size()
func (l *BufferedLogger) Flush() error {
// reset the counter
sz := l.entries.Swap(0)
if sz <= 0 {
return nil
}
// reset the counter
l.entries.Store(0)
// WriteTo() calls Reset() on the underlying buffer, so it's not needed here
_, err := l.buf.WriteTo(l.w)
@ -63,11 +64,11 @@ func (l *LineBufferedLogger) Flush() error {
return err
}
type LineBufferedLoggerOption func(*LineBufferedLogger)
type BufferedLoggerOption func(*BufferedLogger)
// WithFlushPeriod creates a new LineBufferedLoggerOption that sets the flush period for the LineBufferedLogger.
func WithFlushPeriod(d time.Duration) LineBufferedLoggerOption {
return func(l *LineBufferedLogger) {
// WithFlushPeriod creates a new BufferedLoggerOption that sets the flush period for the BufferedLogger.
func WithFlushPeriod(d time.Duration) BufferedLoggerOption {
return func(l *BufferedLogger) {
go func() {
tick := time.NewTicker(d)
defer tick.Stop()
@ -81,23 +82,23 @@ func WithFlushPeriod(d time.Duration) LineBufferedLoggerOption {
// WithFlushCallback allows for a callback function to be executed when Flush() is called.
// The length of the buffer at the time of the Flush() will be passed to the function.
func WithFlushCallback(fn func(entries uint32)) LineBufferedLoggerOption {
return func(l *LineBufferedLogger) {
func WithFlushCallback(fn func(entries uint32)) BufferedLoggerOption {
return func(l *BufferedLogger) {
l.onFlush = fn
}
}
// WithPrellocatedBuffer preallocates a buffer to reduce GC cycles and slice resizing.
func WithPrellocatedBuffer(size uint32) LineBufferedLoggerOption {
return func(l *LineBufferedLogger) {
func WithPrellocatedBuffer(size uint32) BufferedLoggerOption {
return func(l *BufferedLogger) {
l.buf = newThreadsafeBuffer(bytes.NewBuffer(make([]byte, 0, size)))
}
}
// NewLineBufferedLogger creates a new LineBufferedLogger with a configured capacity.
// NewBufferedLogger creates a new BufferedLogger with a configured capacity.
// Lines are flushed when the context is done, the buffer is full, or the flush period is reached.
func NewLineBufferedLogger(w io.Writer, cap uint32, opts ...LineBufferedLoggerOption) *LineBufferedLogger {
l := &LineBufferedLogger{
func NewBufferedLogger(w io.Writer, cap uint32, opts ...BufferedLoggerOption) *BufferedLogger {
l := &BufferedLogger{
w: w,
buf: newThreadsafeBuffer(bytes.NewBuffer([]byte{})),
cap: cap,
@ -112,31 +113,30 @@ func NewLineBufferedLogger(w io.Writer, cap uint32, opts ...LineBufferedLoggerOp
// threadsafeBuffer wraps the non-threadsafe bytes.Buffer.
type threadsafeBuffer struct {
sync.RWMutex
mx sync.Mutex
buf *bytes.Buffer
}
// Read returns the contents of the buffer.
// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.
func (t *threadsafeBuffer) Read(p []byte) (n int, err error) {
t.RLock()
defer t.RUnlock()
t.mx.Lock()
defer t.mx.Unlock()
return t.buf.Read(p)
}
// Write writes the given bytes to the underlying writer.
func (t *threadsafeBuffer) Write(p []byte) (n int, err error) {
t.Lock()
defer t.Unlock()
t.mx.Lock()
defer t.mx.Unlock()
return t.buf.Write(p)
}
// WriteTo writes the buffered lines to the given writer.
func (t *threadsafeBuffer) WriteTo(w io.Writer) (n int64, err error) {
t.Lock()
defer t.Unlock()
t.mx.Lock()
defer t.mx.Unlock()
return t.buf.WriteTo(w)
}
@ -145,8 +145,8 @@ func (t *threadsafeBuffer) WriteTo(w io.Writer) (n int64, err error) {
// but it retains the underlying storage for use by future writes.
// Reset is the same as Truncate(0).
func (t *threadsafeBuffer) Reset() {
t.Lock()
defer t.Unlock()
t.mx.Lock()
defer t.mx.Unlock()
t.buf.Reset()
}

@ -846,6 +846,7 @@ github.com/grafana/dskit/kv/consul
github.com/grafana/dskit/kv/etcd
github.com/grafana/dskit/kv/memberlist
github.com/grafana/dskit/limiter
github.com/grafana/dskit/log
github.com/grafana/dskit/loser
github.com/grafana/dskit/modules
github.com/grafana/dskit/multierror

Loading…
Cancel
Save