Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/dataobj/consumer/partition_processor_test.go

323 lines
7.7 KiB

package consumer
import (
"bytes"
"context"
"errors"
"io"
"strings"
"sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kgo"
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
8 months ago
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/pkg/push"
)
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
8 months ago
var testBuilderConfig = logsobj.BuilderConfig{
TargetPageSize: 2048,
TargetObjectSize: 1 << 22, // 4 MiB
TargetSectionSize: 1 << 22, // 4 MiB
BufferSize: 2048 * 8,
SectionStripeMergeLimit: 2,
}
// mockBucket implements objstore.Bucket interface for testing
type mockBucket struct {
uploads map[string][]byte
mu sync.Mutex
}
func newMockBucket() *mockBucket {
return &mockBucket{
uploads: make(map[string][]byte),
}
}
func (m *mockBucket) Close() error { return nil }
func (m *mockBucket) Delete(_ context.Context, _ string) error { return nil }
func (m *mockBucket) Exists(_ context.Context, name string) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
_, exists := m.uploads[name]
return exists, nil
}
func (m *mockBucket) Get(_ context.Context, name string) (io.ReadCloser, error) {
m.mu.Lock()
defer m.mu.Unlock()
data, exists := m.uploads[name]
if !exists {
return nil, errors.New("object not found")
}
return io.NopCloser(bytes.NewReader(data)), nil
}
func (m *mockBucket) GetRange(_ context.Context, _ string, _, _ int64) (io.ReadCloser, error) {
return nil, nil
}
func (m *mockBucket) Upload(_ context.Context, name string, r io.Reader) error {
data, err := io.ReadAll(r)
if err != nil {
return err
}
m.mu.Lock()
defer m.mu.Unlock()
m.uploads[name] = data
return nil
}
func (m *mockBucket) Iter(_ context.Context, _ string, _ func(string) error, _ ...objstore.IterOption) error {
return nil
}
func (m *mockBucket) Name() string { return "mock" }
func (m *mockBucket) Attributes(_ context.Context, _ string) (objstore.ObjectAttributes, error) {
return objstore.ObjectAttributes{}, nil
}
func (m *mockBucket) GetAndReplace(_ context.Context, name string, _ func(io.ReadCloser) (io.ReadCloser, error)) error {
return m.Upload(context.Background(), name, io.NopCloser(bytes.NewReader([]byte{})))
}
func (m *mockBucket) IsAccessDeniedErr(_ error) bool {
return false
}
func (m *mockBucket) IsObjNotFoundErr(err error) bool {
return err != nil && err.Error() == "object not found"
}
func (m *mockBucket) IterWithAttributes(_ context.Context, _ string, _ func(objstore.IterObjectAttributes) error, _ ...objstore.IterOption) error {
return nil
}
func (m *mockBucket) Provider() objstore.ObjProvider {
return objstore.ObjProvider("MOCK")
}
func (m *mockBucket) SupportedIterOptions() []objstore.IterOptionType {
return nil
}
// TestIdleFlush tests the idle flush behavior of the partition processor
// under different timeout and initialization conditions.
func TestIdleFlush(t *testing.T) {
tests := []struct {
name string
idleTimeout time.Duration
sleepDuration time.Duration
expectFlush bool
initBuilder bool
}{
{
name: "should not flush before idle timeout",
idleTimeout: 1 * time.Second,
sleepDuration: 100 * time.Millisecond,
expectFlush: false,
initBuilder: true,
},
{
name: "should flush after idle timeout",
idleTimeout: 100 * time.Millisecond,
sleepDuration: 1 * time.Second,
expectFlush: true,
initBuilder: true,
},
{
name: "should not flush if builder is nil",
idleTimeout: 100 * time.Millisecond,
sleepDuration: 100 * time.Millisecond,
expectFlush: false,
initBuilder: false,
},
{
name: "should not flush if last modified is less than idle timeout",
idleTimeout: 1 * time.Second,
sleepDuration: 100 * time.Millisecond,
expectFlush: false,
initBuilder: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// Setup test dependencies
bucket := newMockBucket()
bufPool := &sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}
// Create processor with test configuration
p := newPartitionProcessor(
context.Background(),
&kgo.Client{},
testBuilderConfig,
uploader.Config{},
metastore.Config{},
bucket,
"test-tenant",
0,
"test-topic",
0,
log.NewNopLogger(),
prometheus.NewRegistry(),
bufPool,
tc.idleTimeout,
nil,
)
if tc.initBuilder {
require.NoError(t, p.initBuilder())
p.start()
defer p.stop()
}
// Record initial flush time
initialFlushTime := p.lastFlush
stream := logproto.Stream{
Labels: `{cluster="test",app="foo"}`,
Entries: []push.Entry{{
Timestamp: time.Now().UTC(),
Line: strings.Repeat("a", 1024),
}},
}
streamBytes, err := stream.Marshal()
require.NoError(t, err)
// Send a record to the processor
p.records <- &kgo.Record{
Value: streamBytes,
Key: []byte("test-tenant"),
}
// Wait for specified duration
time.Sleep(tc.sleepDuration)
// Trigger idle flush check
p.idleFlush()
if tc.expectFlush {
require.True(t, p.lastFlush.After(initialFlushTime), "expected flush to occur")
} else {
require.Equal(t, initialFlushTime, p.lastFlush, "expected no flush to occur")
}
})
}
}
// TestIdleFlushWithActiveProcessing tests the idle flush behavior
// while the processor is actively processing records.
func TestIdleFlushWithActiveProcessing(t *testing.T) {
t.Parallel()
// Setup test dependencies
bucket := newMockBucket()
bufPool := &sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}
p := newPartitionProcessor(
context.Background(),
&kgo.Client{},
testBuilderConfig,
uploader.Config{},
metastore.Config{},
bucket,
"test-tenant",
0,
"test-topic",
0,
log.NewNopLogger(),
prometheus.NewRegistry(),
bufPool,
200*time.Millisecond,
nil,
)
require.NoError(t, p.initBuilder())
// Start the processor
p.start()
defer p.stop()
stream := logproto.Stream{
Labels: `{cluster="test",app="foo"}`,
Entries: []push.Entry{{
Timestamp: time.Now().UTC(),
Line: strings.Repeat("a", 1024),
}},
}
streamBytes, err := stream.Marshal()
require.NoError(t, err)
// Send a record to the processor
p.records <- &kgo.Record{
Value: streamBytes,
Key: []byte("test-tenant"),
}
// Record initial flush time
initialFlushTime := p.lastFlush
// Wait longer than idle timeout
time.Sleep(300 * time.Millisecond)
// Verify that idle flush occurred
require.True(t, p.lastFlush.After(initialFlushTime), "expected idle flush to occur while processor is running")
}
// TestIdleFlushWithEmptyData tests the idle flush behavior
// when no data has been processed.
func TestIdleFlushWithEmptyData(t *testing.T) {
t.Parallel()
// Setup test dependencies
bucket := newMockBucket()
bufPool := &sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}
p := newPartitionProcessor(
context.Background(),
&kgo.Client{},
testBuilderConfig,
uploader.Config{},
metastore.Config{},
bucket,
"test-tenant",
0,
"test-topic",
0,
log.NewNopLogger(),
prometheus.NewRegistry(),
bufPool,
200*time.Millisecond,
nil,
)
require.NoError(t, p.initBuilder())
// Start the processor
p.start()
defer p.stop()
// Record initial flush time
initialFlushTime := p.lastFlush
// Wait longer than idle timeout
time.Sleep(300 * time.Millisecond)
// Verify that idle flush occurred
require.True(t, p.lastFlush.Equal(initialFlushTime), "expected no idle flush with empty data")
}