Add limit to the series volume endpoint (#9833)

To protect index gateways, this PR adds a per-tenant limit to the
`series_volume` endpoint so the accumulation of series can't grow
without bound.
pull/9836/head
Travis Patterson 3 years ago committed by GitHub
parent 72ea9d778c
commit cb2b3ca9ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docs/sources/configure/_index.md
  2. 26
      pkg/storage/stores/index/seriesvolume/volume.go
  3. 43
      pkg/storage/stores/index/seriesvolume/volume_test.go
  4. 1
      pkg/storage/stores/indexshipper/downloads/table_manager.go
  5. 5
      pkg/storage/stores/indexshipper/downloads/table_manager_test.go
  6. 4
      pkg/storage/stores/tsdb/head_manager_test.go
  7. 20
      pkg/storage/stores/tsdb/index_client.go
  8. 24
      pkg/storage/stores/tsdb/index_client_test.go
  9. 12
      pkg/storage/stores/tsdb/single_file_index.go
  10. 16
      pkg/storage/stores/tsdb/single_file_index_test.go
  11. 2
      pkg/storage/stores/tsdb/store.go
  12. 7
      pkg/validation/limits.go
  13. 4
      pkg/validation/limits_test.go

@ -2475,6 +2475,10 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# Enable log-volume endpoints.
[volume_enabled: <boolean>]
# The maximum number of aggregated series in a log-volume response
# CLI flag: -limits.volume-max-series
[volume_max_series: <int> | default = 1000]
# Duration to delay the evaluation of rules to ensure the underlying metrics
# have been pushed to Cortex.
# CLI flag: -ruler.evaluation-delay-duration

@ -1,6 +1,7 @@
package seriesvolume
import (
"fmt"
"sort"
"sync"
@ -10,29 +11,36 @@ import (
const (
MatchAny = "{}"
DefaultLimit = 100
ErrVolumeMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)"
)
// TODO(masslessparticle): Lock striping to reduce contention on this map
type Accumulator struct {
lock sync.RWMutex
volumes map[string]uint64
limit int32
lock sync.RWMutex
volumes map[string]uint64
limit int32
volumeMaxSeries int
}
func NewAccumulator(limit int32) *Accumulator {
func NewAccumulator(limit int32, maxSize int) *Accumulator {
return &Accumulator{
volumes: make(map[string]uint64),
limit: limit,
volumes: make(map[string]uint64),
limit: limit,
volumeMaxSeries: maxSize,
}
}
func (acc *Accumulator) AddVolumes(volumes map[string]uint64) {
func (acc *Accumulator) AddVolume(name string, size uint64) error {
acc.lock.Lock()
defer acc.lock.Unlock()
for name, size := range volumes {
acc.volumes[name] += size
acc.volumes[name] += size
if len(acc.volumes) > acc.volumeMaxSeries {
return fmt.Errorf(ErrVolumeMaxSeriesHit, acc.volumeMaxSeries)
}
return nil
}
func (acc *Accumulator) Volumes() *logproto.VolumeResponse {

@ -1,6 +1,7 @@
package seriesvolume
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
@ -8,7 +9,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
)
func Test_AddVolumes(t *testing.T) {
func Test_AddVolume(t *testing.T) {
volumes := map[string]uint64{
`{job: "loki"}`: 5,
`{job: "prometheus"}`: 10,
@ -17,8 +18,10 @@ func Test_AddVolumes(t *testing.T) {
}
t.Run("accumulates values for the same series", func(t *testing.T) {
acc := NewAccumulator(4)
acc.AddVolumes(volumes)
acc := NewAccumulator(4, 10)
for name, size := range volumes {
_ = acc.AddVolume(name, size)
}
resp := acc.Volumes()
require.Equal(t, &logproto.VolumeResponse{
@ -43,8 +46,9 @@ func Test_AddVolumes(t *testing.T) {
Limit: 4,
}, resp)
acc.AddVolumes(volumes)
for name, size := range volumes {
_ = acc.AddVolume(name, size)
}
resp = acc.Volumes()
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
@ -70,9 +74,10 @@ func Test_AddVolumes(t *testing.T) {
})
t.Run("sorts label value pairs by volume", func(t *testing.T) {
acc := NewAccumulator(5)
acc.AddVolumes(volumes)
acc := NewAccumulator(5, 10)
for name, size := range volumes {
_ = acc.AddVolume(name, size)
}
resp := acc.Volumes()
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
@ -98,31 +103,26 @@ func Test_AddVolumes(t *testing.T) {
})
t.Run("applies limit", func(t *testing.T) {
acc := NewAccumulator(2)
acc := NewAccumulator(2, 10)
volumes := map[string]uint64{
`{job: "loki"}`: 5,
`{job: "prometheus"}`: 10,
`{job: "mimir"}`: 1,
}
acc.AddVolumes(volumes)
volumes = map[string]uint64{
`{job: "loki"}`: 20,
`{job: "prometheus"}`: 30,
`{job: "mimir"}`: 1,
for name, size := range volumes {
_ = acc.AddVolume(name, size)
}
acc.AddVolumes(volumes)
resp := acc.Volumes()
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: "{job: \"prometheus\"}",
Volume: 40,
Volume: 10,
},
{
Name: "{job: \"loki\"}",
Volume: 25,
Volume: 5,
},
},
Limit: 2,
@ -294,4 +294,11 @@ func Test_Merge(t *testing.T) {
Limit: limit,
}, mergedResponse)
})
t.Run("it returns an error when there are more than maxSize volumes", func(t *testing.T) {
acc := NewAccumulator(2, 0)
err := acc.AddVolume(`{job: "loki"}`, 5)
require.EqualError(t, err, fmt.Sprintf(ErrVolumeMaxSeriesHit, 0))
})
}

@ -30,6 +30,7 @@ const (
type Limits interface {
AllByUserID() map[string]*validation.Limits
DefaultLimits() *validation.Limits
VolumeMaxSeries(userID string) int
}
// TenantFilter is invoked by an IndexGateway instance and answers which

@ -405,6 +405,7 @@ func TestTableManager_loadTables(t *testing.T) {
type mockLimits struct {
queryReadyIndexNumDaysDefault int
queryReadyIndexNumDaysByUser map[string]int
volumeMaxSeries int
}
func (m *mockLimits) AllByUserID() map[string]*validation.Limits {
@ -424,6 +425,10 @@ func (m *mockLimits) DefaultLimits() *validation.Limits {
}
}
func (m *mockLimits) VolumeMaxSeries(_ string) int {
return m.volumeMaxSeries
}
type mockTable struct {
tableExpired bool
queryReadinessDoneForUsers []string

@ -57,6 +57,10 @@ func (m *zeroValueLimits) AllByUserID() map[string]*validation.Limits {
return nil
}
func (m *zeroValueLimits) VolumeMaxSeries(_ string) int {
return 0
}
func (m *zeroValueLimits) DefaultLimits() *validation.Limits {
return &validation.Limits{
QueryReadyIndexNumDays: 0,

@ -22,8 +22,9 @@ import (
// implements stores.Index
type IndexClient struct {
idx Index
opts IndexClientOptions
idx Index
opts IndexClientOptions
limits Limits
}
type IndexClientOptions struct {
@ -48,14 +49,19 @@ type IndexStatsAccumulator interface {
}
type SeriesVolumeAccumulator interface {
AddVolumes(map[string]uint64)
AddVolume(string, uint64) error
Volumes() *logproto.VolumeResponse
}
func NewIndexClient(idx Index, opts IndexClientOptions) *IndexClient {
type Limits interface {
VolumeMaxSeries(string) int
}
func NewIndexClient(idx Index, opts IndexClientOptions, l Limits) *IndexClient {
return &IndexClient{
idx: idx,
opts: opts,
idx: idx,
opts: opts,
limits: l,
}
}
@ -258,7 +264,7 @@ func (c *IndexClient) SeriesVolume(ctx context.Context, userID string, from, thr
})
})
acc := seriesvolume.NewAccumulator(limit)
acc := seriesvolume.NewAccumulator(limit, c.limits.VolumeMaxSeries(userID))
for _, interval := range intervals {
if err := c.idx.SeriesVolume(ctx, userID, interval.Start, interval.End, acc, shard, nil, matchers...); err != nil {
return nil, err

@ -2,10 +2,13 @@ package tsdb
import (
"context"
"fmt"
"math"
"testing"
"time"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
@ -76,7 +79,7 @@ func BenchmarkIndexClient_Stats(b *testing.B) {
PeriodConfig: &config.PeriodConfig{},
})
indexClient := NewIndexClient(idx, IndexClientOptions{UseBloomFilters: true})
indexClient := NewIndexClient(idx, IndexClientOptions{UseBloomFilters: true}, &fakeLimits{})
b.ResetTimer()
b.ReportAllocs()
@ -142,7 +145,7 @@ func TestIndexClient_Stats(t *testing.T) {
PeriodConfig: &config.PeriodConfig{},
})
indexClient := NewIndexClient(idx, IndexClientOptions{UseBloomFilters: true})
indexClient := NewIndexClient(idx, IndexClientOptions{UseBloomFilters: true}, &fakeLimits{})
for _, tc := range []struct {
name string
@ -270,7 +273,8 @@ func TestIndexClient_SeriesVolume(t *testing.T) {
PeriodConfig: &config.PeriodConfig{},
})
indexClient := NewIndexClient(idx, IndexClientOptions{UseBloomFilters: true})
limits := &fakeLimits{volumeMaxSeries: 5}
indexClient := NewIndexClient(idx, IndexClientOptions{UseBloomFilters: true}, limits)
from := indexStartYesterday
through := indexStartToday + 1000
@ -300,4 +304,18 @@ func TestIndexClient_SeriesVolume(t *testing.T) {
Limit: 1,
}, vol)
})
t.Run("it returns an error when the number of selected series exceeds the limit", func(t *testing.T) {
limits.volumeMaxSeries = 0
_, err := indexClient.SeriesVolume(context.Background(), "", from, through, 1, nil...)
require.EqualError(t, err, fmt.Sprintf(seriesvolume.ErrVolumeMaxSeriesHit, 0))
})
}
type fakeLimits struct {
volumeMaxSeries int
}
func (f *fakeLimits) VolumeMaxSeries(_ string) int {
return f.volumeMaxSeries
}

@ -346,8 +346,7 @@ func (i *TSDBIndex) SeriesVolume(ctx context.Context, _ string, from, through mo
seriesNames := make(map[uint64]string)
seriesLabels := labels.Labels(make([]labels.Label, 0, len(labelsToMatch)))
volumes := make(map[string]uint64)
err := i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error {
return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error {
var ls labels.Labels
var filterer chunk.Filterer
if i.chunkFilter != nil {
@ -384,14 +383,11 @@ func (i *TSDBIndex) SeriesVolume(ctx context.Context, _ string, from, through mo
seriesNames[hash] = seriesLabels.String()
}
volumes[seriesNames[hash]] += stats.KB << 10 // Return bytes
if err = acc.AddVolume(seriesNames[hash], stats.KB<<10); err != nil {
return err
}
}
}
return p.Err()
})
if err != nil {
return err
}
acc.AddVolumes(volumes)
return nil
}

@ -426,7 +426,7 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
t.Run("it matches all the series when the match all matcher is passed", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "", "")
acc := seriesvolume.NewAccumulator(10)
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matcher)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
@ -443,7 +443,7 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
labels.MustNewMatcher(labels.MatchRegexp, "fizz", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
}
acc := seriesvolume.NewAccumulator(10)
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, withTenantLabelMatcher("fake", matcher)...)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
@ -457,7 +457,7 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
t.Run("it matches none of the series", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "baz")
acc := seriesvolume.NewAccumulator(10)
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matcher)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
@ -468,7 +468,7 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
t.Run("it only returns results for the labels in the matcher", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
acc := seriesvolume.NewAccumulator(10)
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matcher)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
@ -484,7 +484,7 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "fizz", ".+"),
}
acc := seriesvolume.NewAccumulator(10)
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matchers...)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
@ -501,7 +501,7 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "fizz", ".+"),
}
acc := seriesvolume.NewAccumulator(10)
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matchers...)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
@ -518,7 +518,7 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
defer tsdbIndex.SetChunkFilterer(nil)
matcher := labels.MustNewMatcher(labels.MatchEqual, "", "")
acc := seriesvolume.NewAccumulator(10)
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matcher)
require.NoError(t, err)
@ -530,7 +530,7 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
t.Run("only gets factor of stream size within time bounds", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "", "")
acc := seriesvolume.NewAccumulator(10)
acc := seriesvolume.NewAccumulator(10, 10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through.Add(-30*time.Minute), acc, nil, nil, matcher)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{

@ -138,7 +138,7 @@ func (s *store) init(name string, indexShipperCfg indexshipper.Config, schemaCfg
indices = append(indices, newIndexShipperQuerier(s.indexShipper, tableRange))
multiIndex := NewMultiIndex(IndexSlice(indices))
s.Reader = NewIndexClient(multiIndex, opts)
s.Reader = NewIndexClient(multiIndex, opts, limits)
return nil
}

@ -106,6 +106,7 @@ type Limits struct {
MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"`
MaxQuerierBytesRead flagext.ByteSize `yaml:"max_querier_bytes_read" json:"max_querier_bytes_read"`
VolumeEnabled bool `yaml:"volume_enabled" json:"volume_enabled" doc:"description=Enable log-volume endpoints."`
VolumeMaxSeries int `yaml:"volume_max_series" json:"volume_max_series" doc:"description=The maximum number of aggregated series in a log-volume response"`
// Ruler defaults and limits.
@ -283,6 +284,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
f.IntVar(&l.VolumeMaxSeries, "limits.volume-max-series", 1000, "The default number of aggregated series or labels that can be returned from a log-volume endpoint")
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -749,6 +752,10 @@ func (o *Overrides) VolumeEnabled(userID string) bool {
return o.getOverridesForUser(userID).VolumeEnabled
}
func (o *Overrides) VolumeMaxSeries(userID string) int {
return o.getOverridesForUser(userID).VolumeMaxSeries
}
func (o *Overrides) IndexGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).IndexGatewayShardSize
}

@ -80,6 +80,7 @@ blocked_queries:
- pattern: ".*foo.*"
regex: true
volume_enabled: true
volume_max_series: 10001
`
inputJSON := `
{
@ -128,7 +129,8 @@ volume_enabled: true
"regex": true
}
],
"volume_enabled": true
"volume_enabled": true,
"volume_max_series": 10001
}
`

Loading…
Cancel
Save