Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/consumer/processor_test.go

227 lines
7.7 KiB

package consumer
import (
"context"
"errors"
"testing"
"testing/synctest"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/scratch"
"github.com/grafana/loki/pkg/push"
)
var (
// A builder configuration to be used in tests.
testBuilderCfg = logsobj.BuilderConfig{
BuilderBaseConfig: logsobj.BuilderBaseConfig{
TargetPageSize: 2048,
MaxPageRows: 10,
TargetObjectSize: 1 << 22, // 4 MiB
TargetSectionSize: 1 << 22, // 4 MiB
BufferSize: 2048 * 8,
SectionStripeMergeLimit: 2,
},
}
)
func TestProcessor_BuilderMaxAge(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var (
ctx = t.Context()
reg = prometheus.NewRegistry()
builder = newTestBuilder(t, reg)
flushManager = &mockFlushManager{}
proc = newProcessor(builder, nil, flushManager, 5*time.Minute, 30*time.Minute, log.NewNopLogger(), reg)
)
// Since no records have been pushed, the first append time should be zero,
// and no flush should have occurred.
require.True(t, proc.firstAppend.IsZero())
require.Equal(t, 0, flushManager.flushes)
// Process a record containing some log lines. No flush should occur because
// the builder has not reached the maximum age.
require.NoError(t, proc.processRecord(ctx, newTestRecord(t, "tenant1", time.Now())))
// The first append time should be set to the current time, but no flush
// should have occurred.
require.Equal(t, time.Now(), proc.firstAppend)
require.Equal(t, time.Now(), proc.lastAppend)
require.Equal(t, 0, flushManager.flushes)
// Advance time past the maximum age. A flush should occur, and the
// the log lines from the record should be appended to the next data
// object, not the one that was just flushed.
time.Sleep(31 * time.Minute)
require.NoError(t, proc.processRecord(ctx, newTestRecord(t, "tenant1", time.Now())))
// The last flushed time should be updated to the current time, and so should
// the first append time to reflect the start of the new data object.
require.Equal(t, time.Now(), proc.firstAppend)
require.Equal(t, time.Now(), proc.lastAppend)
require.NotEqual(t, proc.builder.GetEstimatedSize(), 0)
require.Equal(t, 1, flushManager.flushes)
// Advance time one last time and push some more logs. No flush should
// occur because the next builder has not reached the maximum age.
expectedLastFlushed := time.Now()
time.Sleep(time.Minute)
require.NoError(t, proc.processRecord(ctx, newTestRecord(t, "tenant1", time.Now())))
require.Equal(t, expectedLastFlushed, proc.firstAppend)
require.Equal(t, time.Now(), proc.lastAppend)
require.Equal(t, 1, flushManager.flushes)
})
}
func TestPartitionProcessor_IdleFlush(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var (
ctx = t.Context()
reg = prometheus.NewRegistry()
builder = newTestBuilder(t, reg)
flushManager = &mockFlushManager{}
proc = newProcessor(builder, nil, flushManager, 5*time.Minute, 30*time.Minute, log.NewNopLogger(), reg)
)
// The idle flush timeout has not been exceeded, no flush should occur.
flushed, err := proc.idleFlush(ctx)
require.NoError(t, err)
require.False(t, flushed)
require.Equal(t, 0, flushManager.flushes)
// Advance time past the idle flush time. No flush should occur because
// the builder is empty.
time.Sleep(6 * time.Minute)
flushed, err = proc.idleFlush(ctx)
require.NoError(t, err)
require.False(t, flushed)
require.Equal(t, 0, flushManager.flushes)
// Process a record containing some log lines. No flush should occur because
// when log lines are appended to the builder it resets the idle timeout.
require.NoError(t, proc.processRecord(ctx, newTestRecord(t, "tenant1", time.Now())))
require.False(t, proc.lastAppend.IsZero())
flushed, err = proc.idleFlush(ctx)
require.NoError(t, err)
require.False(t, flushed)
require.Equal(t, 0, flushManager.flushes)
// Advance time past the idle timeout. A flush should occur.
time.Sleep(6 * time.Minute)
flushed, err = proc.idleFlush(ctx)
require.NoError(t, err)
require.True(t, flushed)
require.Equal(t, 1, flushManager.flushes)
})
}
// failureFlusher is a special flusher that always fails.
type failureFlusher struct{}
func (f *failureFlusher) Flush(_ context.Context, _ builder, _ string) (string, error) {
return "", errors.New("mock error")
}
type failureFlushManager struct{}
func (m *failureFlushManager) Flush(_ context.Context, _ builder, _ string, _ int64, _ time.Time) error {
return errors.New("mock error")
}
func TestPartitionProcessor_Flush(t *testing.T) {
t.Run("should succeed", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var (
ctx = t.Context()
reg = prometheus.NewRegistry()
builder = newTestBuilder(t, reg)
flushManager = &mockFlushManager{}
proc = newProcessor(builder, nil, flushManager, 5*time.Minute, 30*time.Minute, log.NewNopLogger(), reg)
)
// No flush should have occurred.
require.Equal(t, 0, flushManager.flushes)
// Process a record containing some log lines. No flush should occur.
rec1 := newTestRecord(t, "tenant", time.Now())
require.NoError(t, proc.processRecord(ctx, rec1))
require.Equal(t, time.Now(), proc.firstAppend)
require.Equal(t, time.Now(), proc.lastAppend)
require.Equal(t, rec1.Offset, proc.lastOffset)
// Advance time and force a flush.
time.Sleep(time.Second)
require.NoError(t, proc.flush(ctx, "test"))
require.Equal(t, 1, flushManager.flushes)
// The following fields should be reset at the end of every flush.
require.True(t, proc.firstAppend.IsZero())
require.True(t, proc.earliestRecordTime.IsZero())
require.True(t, proc.lastAppend.IsZero())
})
})
t.Run("should fail", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var (
ctx = t.Context()
reg = prometheus.NewRegistry()
builder = newTestBuilder(t, reg)
flushManager = &failureFlushManager{}
proc = newProcessor(builder, nil, flushManager, 5*time.Minute, 30*time.Minute, log.NewNopLogger(), reg)
)
// Process a record containing some log lines. No flush should occur.
rec := newTestRecord(t, "tenant", time.Now())
require.NoError(t, proc.processRecord(ctx, rec))
require.Equal(t, time.Now(), proc.firstAppend)
require.Equal(t, time.Now(), proc.lastAppend)
require.Equal(t, rec.Offset, proc.lastOffset)
// Advance time and force a flush. This flush should fail.
time.Sleep(time.Second)
require.EqualError(t, proc.flush(ctx, "forced"), "mock error")
// Despite the failure, the following fields should still be reset.
require.True(t, proc.firstAppend.IsZero())
require.True(t, proc.earliestRecordTime.IsZero())
require.True(t, proc.lastAppend.IsZero())
})
})
}
// newTestBuilder returns a new logsobj.Builder with registered metrics.
func newTestBuilder(t *testing.T, reg prometheus.Registerer) *logsobj.Builder {
b, err := logsobj.NewBuilder(testBuilderCfg, scratch.NewMemory())
require.NoError(t, err)
require.NoError(t, b.RegisterMetrics(reg))
return b
}
// newTestRecord returns a new record containing the stream.
func newTestRecord(t *testing.T, tenant string, now time.Time) *kgo.Record {
rec := kgo.Record{
Key: []byte(tenant),
Timestamp: now,
}
stream := logproto.Stream{
Labels: `{foo="bar"}`,
Entries: []push.Entry{{
Timestamp: now,
Line: "baz",
}},
}
var err error
rec.Value, err = stream.Marshal()
require.NoError(t, err)
return &rec
}