Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/util/objtest/objtest.go

231 lines
7.0 KiB

// Package objtest provides support for creating a data object storage directory
// for testing purposes.
package objtest
import (
"context"
"errors"
"flag"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/index"
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/logproto"
)
// Tenant is the tenant used for storing logs with a [Builder].
const Tenant = "objtest"
// Builder is a directory holding logs data objects and index data objects.
// Logs can be appended to the builder using [Builder.Append]. After all logs
// have been appended, the builder must be closed using the [Builder.Close].
type Builder struct {
t *testing.T // Test associated with the store
dir string // Actual directory holding data
logger log.Logger
dirty bool // Whether there's any pending data to flush.
uploader *uploader.Uploader
bucket, indexBucket objstore.Bucket
logsBuilder *logsobj.Builder
logsMetastoreToc, indexMetastoreToc *metastore.TableOfContentsWriter
}
// NewBuilder creates a builder that can be used for accumulating logs.
func NewBuilder(t *testing.T) *Builder {
logger := log.NewNopLogger()
dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "tocs"), 0700), "could not create object toc directory")
require.NoError(t, os.MkdirAll(filepath.Join(dir, "index/v0/tocs"), 0700), "could not create index toc directory")
bucket, err := filesystem.NewBucket(dir)
require.NoError(t, err, "expected to be able to create bucket")
var builderConfig logsobj.BuilderConfig
builderConfig.RegisterFlagsWithPrefix("", flag.NewFlagSet("", flag.PanicOnError)) // Acquire defaults
logsBuilder, err := logsobj.NewBuilder(builderConfig, nil)
require.NoError(t, err, "expected to be able to create logs builder")
indexWriterBucket := objstore.NewPrefixedBucket(bucket, "index/v0")
logsMetastoreToc := metastore.NewTableOfContentsWriter(bucket, logger)
indexMetastoreToc := metastore.NewTableOfContentsWriter(indexWriterBucket, logger)
return &Builder{
t: t,
dir: dir,
logger: logger,
uploader: uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, logger),
bucket: bucket,
indexBucket: indexWriterBucket,
logsBuilder: logsBuilder,
logsMetastoreToc: logsMetastoreToc,
indexMetastoreToc: indexMetastoreToc,
}
}
// Append appends the given streams to the builder.
func (b *Builder) Append(ctx context.Context, streams ...logproto.Stream) {
for _, stream := range streams {
if err := b.logsBuilder.Append(Tenant, stream); err != nil && errors.Is(err, logsobj.ErrBuilderFull) {
require.NoError(b.t, b.flush(ctx), "failed to flush logs builder")
// Try appending again after the flush.
require.NoError(b.t, b.logsBuilder.Append(Tenant, stream), "failed to append stream after flush")
} else if err != nil {
require.NoError(b.t, err, "failed to append stream")
}
b.dirty = true
}
}
// flush flushes any pending data in the logs builder and writes a logs
// metastore entry.
func (b *Builder) flush(ctx context.Context) error {
if !b.dirty {
// Nothing to do.
return nil
}
timeRanges := b.logsBuilder.TimeRanges()
obj, closer, err := b.logsBuilder.Flush()
if err != nil {
return fmt.Errorf("flushing builder: %w", err)
}
defer closer.Close()
// Upload the logs object.
path, err := b.uploader.Upload(ctx, obj)
if err != nil {
return fmt.Errorf("uploading logs object: %w", err)
}
if err := b.logsMetastoreToc.WriteEntry(ctx, path, timeRanges); err != nil {
return fmt.Errorf("updating metastore: %w", err)
}
b.logsBuilder.Reset()
b.dirty = false
return nil
}
// Close flushes all remaining data and closes the builder.
func (b *Builder) Close() {
require.NoError(b.t, b.flush(b.t.Context()), "must be able to flush logs builder")
require.NoError(b.t, b.buildIndex(b.t.Context()), "must be able to close logs builder")
}
func (b *Builder) buildIndex(ctx context.Context) error {
var builderConfig logsobj.BuilderConfig
builderConfig.RegisterFlagsWithPrefix("", flag.NewFlagSet("", flag.PanicOnError)) // Acquire defaults
indexBuilder, err := indexobj.NewBuilder(builderConfig.BuilderBaseConfig, nil)
if err != nil {
return fmt.Errorf("creating logs builder: %w", err)
}
calculator := index.NewCalculator(indexBuilder)
var (
count int
objectsPerIndex = 16
)
err = b.bucket.Iter(ctx, "", func(name string) error {
if !strings.Contains(name, "objects") {
return nil
}
reader, err := dataobj.FromBucket(ctx, b.bucket, name)
if err != nil {
return fmt.Errorf("reading object: %w", err)
}
if err := calculator.Calculate(ctx, b.logger, reader, name); err != nil {
return fmt.Errorf("calculating index: %w", err)
}
count++
if count%objectsPerIndex != 0 {
// Stop early if we haven't accumulated enough objects yet.
return nil
}
if err := b.flushAndUpload(ctx, calculator); err != nil {
return fmt.Errorf("flushing and uploading index: %w", err)
}
return nil
}, objstore.WithRecursiveIter())
if err != nil {
return fmt.Errorf("iterating over objects: %w", err)
}
if count%objectsPerIndex != 0 {
if err := b.flushAndUpload(ctx, calculator); err != nil {
return fmt.Errorf("failed to flush and upload index: %w", err)
}
}
return nil
}
func (b *Builder) flushAndUpload(ctx context.Context, calculator *index.Calculator) error {
timeRanges := calculator.TimeRanges()
obj, closer, err := calculator.Flush()
if err != nil {
return fmt.Errorf("failed to flush index: %w", err)
}
defer closer.Close()
key, err := index.ObjectKey(ctx, obj)
if err != nil {
return fmt.Errorf("failed to create object key: %w", err)
}
reader, err := obj.Reader(ctx)
if err != nil {
return fmt.Errorf("failed to create reader for index object: %w", err)
}
defer reader.Close()
if err := b.indexBucket.Upload(ctx, key, reader); err != nil {
return fmt.Errorf("failed to upload index: %w", err)
} else if err := b.indexMetastoreToc.WriteEntry(ctx, key, timeRanges); err != nil {
return fmt.Errorf("failed to update metastore: %w", err)
}
calculator.Reset()
return nil
}
// Location holds information about where objects for a [Builder] are stored.
// Location can be used to read data from a builder.
type Location struct {
Bucket objstore.Bucket // Bucket where all data is stored.
IndexPrefix string // Prefix for index objects in the Bucket.
}
// Location returns the location of index for b so they can be read. Data is not
// guaranteed to exist in the location until calling [Builder.Close].
func (b *Builder) Location() Location {
return Location{
Bucket: b.bucket,
IndexPrefix: "index/v0",
}
}