Implement hooks to instrument query pipelines (#11493)

This PR implements interfaces to inject new pipelines/extractors into
the query path of queriers and ingesters.
pull/11525/head
Travis Patterson 1 year ago committed by GitHub
parent b51b7d7b55
commit be71a80b15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      pkg/ingester/checkpoint_test.go
  2. 29
      pkg/ingester/ingester.go
  3. 18
      pkg/ingester/instance.go
  4. 188
      pkg/ingester/instance_test.go
  5. 6
      pkg/logql/log/metrics_extraction.go
  6. 6
      pkg/logql/log/pipeline.go
  7. 7
      pkg/querier/querier_mock_test.go
  8. 27
      pkg/storage/store.go
  9. 158
      pkg/storage/store_test.go

@ -452,7 +452,7 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := 0; i < 3; i++ {
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator(), nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
@ -499,7 +499,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := range instances {
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator(), nil)
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil)
require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{

@ -12,6 +12,8 @@ import (
"sync"
"time"
lokilog "github.com/grafana/loki/pkg/logql/log"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
@ -99,7 +101,10 @@ type Config struct {
WAL WALConfig `yaml:"wal,omitempty" doc:"description=The ingester WAL (Write Ahead Log) records incoming logs and stores them on the local file systems in order to guarantee persistence of acknowledged data in the event of a process crash."`
ChunkFilterer chunk.RequestChunkFilterer `yaml:"-"`
ChunkFilterer chunk.RequestChunkFilterer `yaml:"-"`
PipelineWrapper lokilog.PipelineWrapper `yaml:"-"`
SampleExtractorWrapper lokilog.SampleExtractorWrapper `yaml:"-"`
// Optional wrapper that can be used to modify the behaviour of the ingester
Wrapper Wrapper `yaml:"-"`
@ -227,7 +232,9 @@ type Ingester struct {
wal WAL
chunkFilter chunk.RequestChunkFilterer
chunkFilter chunk.RequestChunkFilterer
extractorWrapper lokilog.SampleExtractorWrapper
pipelineWrapper lokilog.PipelineWrapper
streamRateCalculator *StreamRateCalculator
@ -304,6 +311,14 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
i.SetChunkFilterer(i.cfg.ChunkFilterer)
}
if i.cfg.PipelineWrapper != nil {
i.SetPipelineWrapper(i.cfg.PipelineWrapper)
}
if i.cfg.SampleExtractorWrapper != nil {
i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper)
}
return i, nil
}
@ -311,6 +326,14 @@ func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}
func (i *Ingester) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper) {
i.extractorWrapper = wrapper
}
func (i *Ingester) SetPipelineWrapper(wrapper lokilog.PipelineWrapper) {
i.pipelineWrapper = wrapper
}
// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
func (i *Ingester) setupAutoForget() {
@ -837,7 +860,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.streamRateCalculator, i.writeLogManager)
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager)
if err != nil {
return nil, err
}

@ -10,6 +10,8 @@ import (
"syscall"
"time"
"github.com/grafana/loki/pkg/logql/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/opentracing/opentracing-go"
@ -109,6 +111,8 @@ type instance struct {
metrics *ingesterMetrics
chunkFilter chunk.RequestChunkFilterer
pipelineWrapper log.PipelineWrapper
extractorWrapper log.SampleExtractorWrapper
streamRateCalculator *StreamRateCalculator
writeFailures *writefailures.Manager
@ -126,6 +130,8 @@ func newInstance(
metrics *ingesterMetrics,
flushOnShutdownSwitch *OnceSwitch,
chunkFilter chunk.RequestChunkFilterer,
pipelineWrapper log.PipelineWrapper,
extractorWrapper log.SampleExtractorWrapper,
streamRateCalculator *StreamRateCalculator,
writeFailures *writefailures.Manager,
) (*instance, error) {
@ -153,7 +159,9 @@ func newInstance(
metrics: metrics,
flushOnShutdownSwitch: flushOnShutdownSwitch,
chunkFilter: chunkFilter,
chunkFilter: chunkFilter,
pipelineWrapper: pipelineWrapper,
extractorWrapper: extractorWrapper,
streamRateCalculator: streamRateCalculator,
@ -419,6 +427,10 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
return nil, err
}
if i.pipelineWrapper != nil {
pipeline = i.pipelineWrapper.Wrap(pipeline, expr.String())
}
stats := stats.FromContext(ctx)
var iters []iter.EntryIterator
@ -464,6 +476,10 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
return nil, err
}
if i.extractorWrapper != nil {
extractor = i.extractorWrapper.Wrap(extractor, expr.String())
}
stats := stats.FromContext(ctx)
var iters []iter.SampleIterator

@ -10,6 +10,8 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
@ -65,7 +67,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
// avoid entries from the future.
@ -93,7 +95,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
const (
@ -145,7 +147,7 @@ func TestGetStreamRates(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.NoError(t, err)
const (
@ -239,7 +241,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
lbls := makeRandomLabels()
@ -284,7 +286,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
cfg.SyncMinUtilization = 0.20
cfg.IndexShards = indexShards
instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
currentTime := time.Now()
@ -493,7 +495,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
ctx := context.Background()
for n := 0; n < b.N; n++ {
@ -537,7 +539,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()
inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
expr, err := syntax.ParseLogSelector(`{namespace="foo",pod="bar",instance=~"10.*"}`, true)
require.NoError(b, err)
t, err := newTailer("foo", expr, nil, 10)
@ -671,6 +673,172 @@ func Test_ChunkFilter(t *testing.T) {
}
}
func Test_PipelineWrapper(t *testing.T) {
instance := defaultInstance(t)
wrapper := &testPipelineWrapper{
pipeline: newMockPipeline(),
}
instance.pipelineWrapper = wrapper
it, err := instance.Query(context.TODO(),
logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: `{job="3"}`,
Limit: uint32(2),
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: logproto.BACKWARD,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{job="3"}`),
},
},
},
)
require.NoError(t, err)
defer it.Close()
for it.Next() {
// Consume the iterator
require.NoError(t, it.Error())
}
require.Equal(t, `{job="3"}`, wrapper.query)
require.Equal(t, 10, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
}
type testPipelineWrapper struct {
query string
pipeline *mockPipeline
}
func (t *testPipelineWrapper) Wrap(pipeline log.Pipeline, query string) log.Pipeline {
t.query = query
t.pipeline.wrappedExtractor = pipeline
return t.pipeline
}
func newMockPipeline() *mockPipeline {
return &mockPipeline{
sp: &mockStreamPipeline{},
}
}
type mockPipeline struct {
wrappedExtractor log.Pipeline
sp *mockStreamPipeline
}
func (p *mockPipeline) ForStream(l labels.Labels) log.StreamPipeline {
sp := p.wrappedExtractor.ForStream(l)
p.sp.wrappedSP = sp
return p.sp
}
func (p *mockPipeline) Reset() {}
// A stub always returns the same data
type mockStreamPipeline struct {
wrappedSP log.StreamPipeline
called int
}
func (p *mockStreamPipeline) BaseLabels() log.LabelsResult {
return p.wrappedSP.BaseLabels()
}
func (p *mockStreamPipeline) Process(ts int64, line []byte, lbs ...labels.Label) ([]byte, log.LabelsResult, bool) {
p.called++
return p.wrappedSP.Process(ts, line, lbs...)
}
func (p *mockStreamPipeline) ProcessString(ts int64, line string, lbs ...labels.Label) (string, log.LabelsResult, bool) {
p.called++
return p.wrappedSP.ProcessString(ts, line, lbs...)
}
func Test_ExtractorWrapper(t *testing.T) {
instance := defaultInstance(t)
wrapper := &testExtractorWrapper{
extractor: newMockExtractor(),
}
instance.extractorWrapper = wrapper
it, err := instance.QuerySample(context.TODO(),
logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Selector: `sum(count_over_time({job="3"}[1m]))`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(count_over_time({job="3"}[1m]))`),
},
},
},
)
require.NoError(t, err)
defer it.Close()
for it.Next() {
// Consume the iterator
require.NoError(t, it.Error())
}
require.Equal(t, `sum(count_over_time({job="3"}[1m]))`, wrapper.query)
require.Equal(t, 10, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
}
type testExtractorWrapper struct {
query string
extractor *mockExtractor
}
func (t *testExtractorWrapper) Wrap(extractor log.SampleExtractor, query string) log.SampleExtractor {
t.query = query
t.extractor.wrappedExtractor = extractor
return t.extractor
}
func newMockExtractor() *mockExtractor {
return &mockExtractor{
sp: &mockStreamExtractor{},
}
}
type mockExtractor struct {
wrappedExtractor log.SampleExtractor
sp *mockStreamExtractor
}
func (p *mockExtractor) ForStream(l labels.Labels) log.StreamSampleExtractor {
sp := p.wrappedExtractor.ForStream(l)
p.sp.wrappedSP = sp
return p.sp
}
func (p *mockExtractor) Reset() {}
// A stub always returns the same data
type mockStreamExtractor struct {
wrappedSP log.StreamSampleExtractor
called int
}
func (p *mockStreamExtractor) BaseLabels() log.LabelsResult {
return p.wrappedSP.BaseLabels()
}
func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs ...labels.Label) (float64, log.LabelsResult, bool) {
p.called++
return p.wrappedSP.Process(ts, line, lbs...)
}
func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs ...labels.Label) (float64, log.LabelsResult, bool) {
p.called++
return p.wrappedSP.ProcessString(ts, line, lbs...)
}
func Test_QueryWithDelete(t *testing.T) {
instance := defaultInstance(t)
@ -824,7 +992,7 @@ func TestStreamShardingUsage(t *testing.T) {
})
t.Run("invalid push returns error", func(t *testing.T) {
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
@ -843,7 +1011,7 @@ func TestStreamShardingUsage(t *testing.T) {
})
t.Run("valid push returns no error", func(t *testing.T) {
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil)
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
@ -1174,6 +1342,8 @@ func defaultInstance(t *testing.T) *instance {
NilMetrics,
nil,
nil,
nil,
nil,
NewStreamRateCalculator(),
nil,
)

@ -38,6 +38,12 @@ type StreamSampleExtractor interface {
ProcessString(ts int64, line string, structuredMetadata ...labels.Label) (float64, LabelsResult, bool)
}
// SampleExtractorWrapper takes an extractor, wraps it is some desired functionality
// and returns a new pipeline
type SampleExtractorWrapper interface {
Wrap(extractor SampleExtractor, query string) SampleExtractor
}
type lineSampleExtractor struct {
Stage
LineExtractor

@ -35,6 +35,12 @@ type Stage interface {
RequiredLabelNames() []string
}
// PipelineWrapper takes a pipeline, wraps it is some desired functionality and
// returns a new pipeline
type PipelineWrapper interface {
Wrap(pipeline Pipeline, query string) Pipeline
}
// NewNoopPipeline creates a pipelines that does not process anything and returns log streams as is.
func NewNoopPipeline() Pipeline {
return &noopPipeline{

@ -6,6 +6,8 @@ import (
"fmt"
"time"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/dskit/grpcclient"
@ -298,8 +300,9 @@ type storeMock struct {
func newStoreMock() *storeMock {
return &storeMock{}
}
func (s *storeMock) SetChunkFilterer(chunk.RequestChunkFilterer) {}
func (s *storeMock) SetChunkFilterer(chunk.RequestChunkFilterer) {}
func (s *storeMock) SetExtractorWrapper(log.SampleExtractorWrapper) {}
func (s *storeMock) SetPipelineWrapper(log.PipelineWrapper) {}
func (s *storeMock) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {
args := s.Called(ctx, req)

@ -6,6 +6,8 @@ import (
"math"
"time"
lokilog "github.com/grafana/loki/pkg/logql/log"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
@ -57,10 +59,17 @@ type SchemaConfigProvider interface {
GetSchemaConfigs() []config.PeriodConfig
}
type Instrumentable interface {
SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper)
SetPipelineWrapper(wrapper lokilog.PipelineWrapper)
}
type Store interface {
stores.Store
SelectStore
SchemaConfigProvider
Instrumentable
}
type LokiStore struct {
@ -84,6 +93,8 @@ type LokiStore struct {
logger log.Logger
chunkFilterer chunk.RequestChunkFilterer
extractorWrapper lokilog.SampleExtractorWrapper
pipelineWrapper lokilog.PipelineWrapper
congestionControllerFactory func(cfg congestion.Config, logger log.Logger, metrics *congestion.Metrics) congestion.Controller
metricsNamespace string
@ -381,6 +392,14 @@ func (s *LokiStore) SetChunkFilterer(chunkFilterer chunk.RequestChunkFilterer) {
s.Store.SetChunkFilterer(chunkFilterer)
}
func (s *LokiStore) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper) {
s.extractorWrapper = wrapper
}
func (s *LokiStore) SetPipelineWrapper(wrapper lokilog.PipelineWrapper) {
s.pipelineWrapper = wrapper
}
// lazyChunks is an internal function used to resolve a set of lazy chunks from the store without actually loading them. It's used internally by `LazyQuery` and `GetSeries`
func (s *LokiStore) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*LazyChunk, error) {
userID, err := tenant.TenantID(ctx)
@ -493,6 +512,10 @@ func (s *LokiStore) SelectLogs(ctx context.Context, req logql.SelectLogParams) (
return nil, err
}
if s.pipelineWrapper != nil {
pipeline = s.pipelineWrapper.Wrap(pipeline, expr.String())
}
var chunkFilterer chunk.Filterer
if s.chunkFilterer != nil {
chunkFilterer = s.chunkFilterer.ForRequest(ctx)
@ -531,6 +554,10 @@ func (s *LokiStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
return nil, err
}
if s.extractorWrapper != nil {
extractor = s.extractorWrapper.Wrap(extractor, expr.String())
}
var chunkFilterer chunk.Filterer
if s.chunkFilterer != nil {
chunkFilterer = s.chunkFilterer.ForRequest(ctx)

@ -14,6 +14,8 @@ import (
"testing"
"time"
lokilog "github.com/grafana/loki/pkg/logql/log"
"github.com/cespare/xxhash/v2"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
@ -894,6 +896,162 @@ func Test_ChunkFilterer(t *testing.T) {
}
}
func Test_PipelineWrapper(t *testing.T) {
s := &LokiStore{
Store: storeFixture,
cfg: Config{
MaxChunkBatchSize: 10,
},
chunkMetrics: NilMetrics,
}
wrapper := &testPipelineWrapper{
pipeline: newMockPipeline(),
}
s.SetPipelineWrapper(wrapper)
ctx = user.InjectOrgID(context.Background(), "test-user")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil, nil)})
if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
}
defer logit.Close()
for logit.Next() {
require.NoError(t, logit.Error()) // consume the iterator
}
require.Equal(t, "{foo=~\"ba.*\"}", wrapper.query)
require.Equal(t, 28, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
}
type testPipelineWrapper struct {
query string
pipeline *mockPipeline
}
func (t *testPipelineWrapper) Wrap(pipeline lokilog.Pipeline, query string) lokilog.Pipeline {
t.query = query
t.pipeline.wrappedExtractor = pipeline
return t.pipeline
}
func newMockPipeline() *mockPipeline {
return &mockPipeline{
sp: &mockStreamPipeline{},
}
}
type mockPipeline struct {
wrappedExtractor lokilog.Pipeline
sp *mockStreamPipeline
}
func (p *mockPipeline) ForStream(l labels.Labels) lokilog.StreamPipeline {
sp := p.wrappedExtractor.ForStream(l)
p.sp.wrappedSP = sp
return p.sp
}
func (p *mockPipeline) Reset() {}
// A stub always returns the same data
type mockStreamPipeline struct {
wrappedSP lokilog.StreamPipeline
called int
}
func (p *mockStreamPipeline) BaseLabels() lokilog.LabelsResult {
return p.wrappedSP.BaseLabels()
}
func (p *mockStreamPipeline) Process(ts int64, line []byte, lbs ...labels.Label) ([]byte, lokilog.LabelsResult, bool) {
p.called++
return p.wrappedSP.Process(ts, line, lbs...)
}
func (p *mockStreamPipeline) ProcessString(ts int64, line string, lbs ...labels.Label) (string, lokilog.LabelsResult, bool) {
p.called++
return p.wrappedSP.ProcessString(ts, line, lbs...)
}
func Test_SampleWrapper(t *testing.T) {
s := &LokiStore{
Store: storeFixture,
cfg: Config{
MaxChunkBatchSize: 10,
},
chunkMetrics: NilMetrics,
}
wrapper := &testExtractorWrapper{
extractor: newMockExtractor(),
}
s.SetExtractorWrapper(wrapper)
ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), nil)})
if err != nil {
t.Errorf("store.SelectSamples() error = %v", err)
return
}
defer it.Close()
for it.Next() {
require.NoError(t, it.Error()) // consume the iterator
}
require.Equal(t, "count_over_time({foo=~\"ba.*\"}[1s])", wrapper.query)
require.Equal(t, 28, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
}
type testExtractorWrapper struct {
query string
extractor *mockExtractor
}
func (t *testExtractorWrapper) Wrap(extractor lokilog.SampleExtractor, query string) lokilog.SampleExtractor {
t.query = query
t.extractor.wrappedExtractor = extractor
return t.extractor
}
func newMockExtractor() *mockExtractor {
return &mockExtractor{
sp: &mockStreamExtractor{},
}
}
type mockExtractor struct {
wrappedExtractor lokilog.SampleExtractor
sp *mockStreamExtractor
}
func (p *mockExtractor) ForStream(l labels.Labels) lokilog.StreamSampleExtractor {
sp := p.wrappedExtractor.ForStream(l)
p.sp.wrappedSP = sp
return p.sp
}
func (p *mockExtractor) Reset() {}
// A stub always returns the same data
type mockStreamExtractor struct {
wrappedSP lokilog.StreamSampleExtractor
called int
}
func (p *mockStreamExtractor) BaseLabels() lokilog.LabelsResult {
return p.wrappedSP.BaseLabels()
}
func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs ...labels.Label) (float64, lokilog.LabelsResult, bool) {
p.called++
return p.wrappedSP.Process(ts, line, lbs...)
}
func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs ...labels.Label) (float64, lokilog.LabelsResult, bool) {
p.called++
return p.wrappedSP.ProcessString(ts, line, lbs...)
}
func Test_store_GetSeries(t *testing.T) {
periodConfig := config.PeriodConfig{
From: config.DayTime{Time: 0},

Loading…
Cancel
Save