Add LabelFilterer and Store wrapper (#4818)

* Add LabelFilterer and Store wrapper

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Address review comments

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
pull/4842/head
Michel Hollands 4 years ago committed by GitHub
parent c53457feb9
commit e69ceefd84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      pkg/ingester/ingester.go
  2. 130
      pkg/ingester/ingester_test.go
  3. 1
      pkg/querier/querier.go

@ -82,6 +82,7 @@ type Config struct {
WAL WALConfig `yaml:"wal,omitempty"`
ChunkFilterer storage.RequestChunkFilterer `yaml:"-"`
LabelFilterer LabelValueFilterer `yaml:"-"`
IndexShards int `yaml:"index_shards"`
}
@ -125,12 +126,17 @@ func (cfg *Config) Validate() error {
}
if cfg.IndexShards <= 0 {
return fmt.Errorf("Invalid ingester index shard factor: %d", cfg.IndexShards)
return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards)
}
return nil
}
// ChunkFilterer filters chunks based on the metric.
type LabelValueFilterer interface {
Filter(ctx context.Context, labelName string, labelValues []string) ([]string, error)
}
// Ingester builds chunks for incoming log streams.
type Ingester struct {
services.Service
@ -173,6 +179,7 @@ type Ingester struct {
wal WAL
chunkFilter storage.RequestChunkFilterer
labelFilter LabelValueFilterer
}
// ChunkStore is the interface we need to store chunks.
@ -245,6 +252,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
i.SetChunkFilterer(i.cfg.ChunkFilterer)
}
if i.cfg.LabelFilterer != nil {
i.SetLabelFilterer(i.cfg.LabelFilterer)
}
return i, nil
}
@ -252,6 +263,10 @@ func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}
func (i *Ingester) SetLabelFilterer(labelFilter LabelValueFilterer) {
i.labelFilter = labelFilter
}
// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
func (i *Ingester) setupAutoForget() {
@ -717,8 +732,23 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
}
}
allValues := listutil.MergeStringLists(resp.Values, storeValues)
if req.Values && i.labelFilter != nil {
var filteredValues []string
filteredValues, err = i.labelFilter.Filter(ctx, req.Name, allValues)
if err != nil {
return nil, err
}
return &logproto.LabelResponse{
Values: filteredValues,
}, nil
}
return &logproto.LabelResponse{
Values: listutil.MergeStringLists(resp.Values, storeValues),
Values: allValues,
}, nil
}

@ -278,8 +278,8 @@ func (s *mockStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
return nil, nil
}
func (s *mockStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
return nil, nil, nil
func (s *mockStore) GetSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
return nil, nil
}
func (s *mockStore) GetSchemaConfigs() []chunk.PeriodConfig {
@ -289,6 +289,41 @@ func (s *mockStore) GetSchemaConfigs() []chunk.PeriodConfig {
func (s *mockStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {
}
// chunk.Store methods
func (s *mockStore) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error {
return nil
}
func (s *mockStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
return nil, nil
}
func (s *mockStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
return nil, nil, nil
}
func (s *mockStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error) {
return []string{"val1", "val2"}, nil
}
func (s *mockStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
return nil, nil
}
func (s *mockStore) GetChunkFetcher(tm model.Time) *chunk.Fetcher {
return nil
}
func (s *mockStore) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error {
return nil
}
func (s *mockStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error {
return nil
}
func (s *mockStore) Stop() {}
type mockQuerierServer struct {
ctx context.Context
resps []*logproto.QueryResponse
@ -559,3 +594,94 @@ func Test_InMemoryLabels(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{"bar", "foo"}, res.Values)
}
func InMemoryLabels(t *testing.T, labelFilterer LabelValueFilterer, expectedValues []string) {
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.QueryStore = true
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
i.labelFilter = labelFilterer
future := time.Now().Local().Add(time.Minute * 5)
i.periodicConfigs = []chunk.PeriodConfig{
{
From: chunk.DayTime{
Time: model.TimeFromUnix(future.Unix()),
},
},
}
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
},
}
for i := 0; i < 10; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}
ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)
start := time.Now().Add(-5 * time.Minute)
end := time.Now()
res, err := i.Label(ctx, &logproto.LabelRequest{
Name: "bar",
Values: true,
Start: &start,
End: &end,
})
require.NoError(t, err)
require.Equal(t, expectedValues, res.Values)
res, err = i.Label(ctx, &logproto.LabelRequest{})
require.NoError(t, err)
require.Equal(t, []string{"bar", "foo"}, res.Values)
}
func Test_InMemoryLabels_WithoutLabelFilter(t *testing.T) {
expectedValues := []string{"baz1", "baz2", "val1", "val2"}
InMemoryLabels(t, nil, expectedValues)
}
// DummyLabelValuedFilterer adds i to the front of the label value
type DummyLabelValueFilterer struct{}
func (*DummyLabelValueFilterer) Filter(ctx context.Context, labelName string, labelValues []string) ([]string, error) {
var updatedValues []string
for _, v := range labelValues {
updatedValues = append(updatedValues, fmt.Sprintf("i%v", v))
}
return updatedValues, nil
}
func Test_InMemoryLabels_WithLabelFilter(t *testing.T) {
labelFilter := DummyLabelValueFilterer{}
expectedValues := []string{"ibaz1", "ibaz2", "ival1", "ival2"}
InMemoryLabels(t, &labelFilter, expectedValues)
}

@ -290,7 +290,6 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
}
results := append(ingesterValues, storeValues)
return &logproto.LabelResponse{
Values: listutil.MergeStringLists(results...),
}, nil

Loading…
Cancel
Save