feat: downsample aggregated metrics (#13449)

pull/13500/head
Trevor Whitney 2 years ago committed by GitHub
parent 583f7f3388
commit 2c053ee00c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      docs/sources/shared/configuration.md
  2. 18
      pkg/pattern/flush.go
  3. 33
      pkg/pattern/ingester.go
  4. 129
      pkg/pattern/ingester_test.go
  5. 12
      pkg/pattern/instance_test.go
  6. 89
      pkg/pattern/metric/chunk.go
  7. 28
      pkg/pattern/metric/chunk_test.go
  8. 30
      pkg/pattern/metric/config.go
  9. 11
      pkg/pattern/stream.go
  10. 4
      pkg/pattern/stream_test.go

@ -606,6 +606,10 @@ pattern_ingester:
# CLI flag: -pattern-ingester.metric-aggregation.log-push-observations
[log_push_observations: <boolean> | default = false]
# How often to downsample metrics from raw push observations.
# CLI flag: -pattern-ingester.downsample-period
[downsample_period: <duration> | default = 10s]
# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.

@ -27,6 +27,7 @@ func (i *Ingester) Flush() {
func (i *Ingester) flush(mayRemoveStreams bool) {
i.sweepUsers(true, mayRemoveStreams)
i.downsampleMetrics(model.Now())
// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
@ -73,3 +74,20 @@ func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) {
return true, nil
})
}
func (i *Ingester) downsampleMetrics(ts model.Time) {
instances := i.getInstances()
for _, instance := range instances {
i.downsampleInstance(instance, ts)
}
}
func (i *Ingester) downsampleInstance(instance *instance, ts model.Time) {
_ = instance.streams.ForEach(func(s *stream) (bool, error) {
instance.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}

@ -16,6 +16,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"google.golang.org/grpc/health/grpc_health_v1"
ring_client "github.com/grafana/dskit/ring/client"
@ -206,13 +207,33 @@ func (i *Ingester) loop() {
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
if i.cfg.MetricAggregation.Enabled {
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()
case <-i.loopQuit:
return
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)
case <-i.loopQuit:
return
}
}
} else {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case <-i.loopQuit:
return
}
}
}
}

@ -9,6 +9,7 @@ import (
"time"
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
@ -39,6 +40,16 @@ func setup(t *testing.T) *instance {
return inst
}
func downsampleInstance(inst *instance, tts int64) {
ts := model.TimeFromUnixNano(time.Unix(tts, 0).UnixNano())
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}
func TestInstancePushQuery(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
@ -55,6 +66,7 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 20)
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
@ -70,6 +82,7 @@ func TestInstancePushQuery(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, 30)
for i := 0; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
@ -87,6 +100,7 @@ func TestInstancePushQuery(t *testing.T) {
})
require.NoError(t, err)
}
downsampleInstance(inst, 30)
it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{
Query: "{test=\"test\"}",
@ -115,6 +129,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)
for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
@ -130,8 +147,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)
expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)
@ -149,10 +166,11 @@ func TestInstancePushQuerySamples(t *testing.T) {
require.Equal(t, lbls.String(), res.Series[0].GetLabels())
// end - start / step -- (start is 0, step is 10s)
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints := ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(1), res.Series[0].Samples[expectedDataPoints-1].Value)
expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)
@ -170,7 +188,7 @@ func TestInstancePushQuerySamples(t *testing.T) {
require.Equal(t, lbls.String(), res.Series[0].GetLabels())
// end - start / step -- (start is 0, step is 10s)
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints = ((20 * 30) / 10)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
@ -187,6 +205,101 @@ func TestInstancePushQuerySamples(t *testing.T) {
require.Equal(t, float64(4), res.Series[0].Samples[expectedDataPoints-1].Value)
})
t.Run("test count_over_time samples with downsampling", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(0, 0),
Line: "ts=1 msg=hello",
},
},
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)
for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbls.String(),
Entries: []push.Entry{
{
Timestamp: time.Unix(int64(10*i), 0),
Line: "foo bar foo bar",
},
},
},
},
})
require.NoError(t, err)
// downsample every 20s
if i%2 == 0 {
downsampleInstance(inst, int64(10*i))
}
}
expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`)
require.NoError(t, err)
it, err := inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err := iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))
require.Equal(t, lbls.String(), res.Series[0].GetLabels())
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints := ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
// after the first push there's 2 pushes per sample due to downsampling
require.Equal(t, float64(2), res.Series[0].Samples[expectedDataPoints-1].Value)
expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`)
require.NoError(t, err)
it, err = inst.QuerySample(context.Background(), expr, &logproto.QuerySamplesRequest{
Query: expr.String(),
Start: time.Unix(0, 0),
End: time.Unix(int64(10*30), 0),
Step: 20000,
})
require.NoError(t, err)
res, err = iter.ReadAllSamples(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))
require.Equal(t, lbls.String(), res.Series[0].GetLabels())
// end - start / step -- (start is 0, step is 10s, downsampling at 20s intervals)
expectedDataPoints = ((10 * 30) / 20)
require.Equal(t, expectedDataPoints, len(res.Series[0].Samples))
// with a larger selection range of 80s, we expect to eventually get up to 8 per datapoint
// our pushes are spaced 10s apart, downsampled every 20s, and there's 10s step,
// so we expect to see the value increase by 2 every samples, maxing out and staying at 8 after 5 samples
require.Equal(t, float64(1), res.Series[0].Samples[0].Value)
require.Equal(t, float64(3), res.Series[0].Samples[1].Value)
require.Equal(t, float64(5), res.Series[0].Samples[2].Value)
require.Equal(t, float64(7), res.Series[0].Samples[3].Value)
require.Equal(t, float64(8), res.Series[0].Samples[4].Value)
require.Equal(t, float64(8), res.Series[0].Samples[expectedDataPoints-1].Value)
})
t.Run("test bytes_over_time samples", func(t *testing.T) {
inst := setup(t)
err := inst.Push(context.Background(), &push.PushRequest{
@ -202,6 +315,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)
for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
@ -217,8 +333,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)
expr, err := syntax.ParseSampleExpr(`bytes_over_time({test="test"}[20s])`)
require.NoError(t, err)
@ -343,6 +459,9 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
},
})
require.NoError(t, err)
downsampleInstance(inst, 0)
for i := 1; i <= 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
@ -397,8 +516,8 @@ func TestInstancePushQuerySamples(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(inst, int64(20*i))
}
require.NoError(t, err)
for _, tt := range []struct {
name string

@ -36,6 +36,15 @@ func TestInstance_QuerySample(t *testing.T) {
return instance
}
downsampleInstance := func(inst *instance, ts model.Time) {
_ = inst.streams.ForEach(func(s *stream) (bool, error) {
inst.streams.WithLock(func() {
s.Downsample(ts)
})
return true, nil
})
}
ctx := context.Background()
thirtySeconds := int64(30000)
@ -85,6 +94,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(lastTsMilli))
// 5 min query range
// 1 min step
@ -203,6 +213,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+thirtySeconds))
err = instance.Push(ctx, &logproto.PushRequest{
Streams: []push.Stream{
@ -245,6 +256,7 @@ func TestInstance_QuerySample(t *testing.T) {
},
})
require.NoError(t, err)
downsampleInstance(instance, model.Time(then+oneMin+oneMin+oneMin+thirtySeconds))
// steps
start := then

@ -34,12 +34,13 @@ type metrics struct {
}
type Chunks struct {
chunks []*Chunk
labels labels.Labels
service string
metrics metrics
logger log.Logger
lock sync.RWMutex
chunks []*Chunk
labels labels.Labels
lock sync.RWMutex
logger log.Logger
metrics metrics
rawSamples SamplesWithoutTS
service string
}
func NewChunks(labels labels.Labels, chunkMetrics *ChunkMetrics, logger log.Logger) *Chunks {
@ -55,37 +56,25 @@ func NewChunks(labels labels.Labels, chunkMetrics *ChunkMetrics, logger log.Logg
)
return &Chunks{
chunks: []*Chunk{},
labels: labels,
service: service,
chunks: []*Chunk{},
labels: labels,
logger: logger,
rawSamples: SamplesWithoutTS{},
service: service,
metrics: metrics{
chunks: chunkMetrics.chunks.WithLabelValues(service),
samples: chunkMetrics.samples.WithLabelValues(service),
},
logger: logger,
}
}
func (c *Chunks) Observe(bytes, count float64, ts model.Time) {
func (c *Chunks) Observe(bytes, count float64) {
c.lock.Lock()
defer c.lock.Unlock()
c.rawSamples = append(c.rawSamples, newSampleWithoutTS(bytes, count))
c.metrics.samples.Inc()
if len(c.chunks) == 0 {
c.chunks = append(c.chunks, newChunk(bytes, count, ts))
c.metrics.chunks.Set(float64(len(c.chunks)))
return
}
last := c.chunks[len(c.chunks)-1]
if !last.spaceFor(ts) {
c.chunks = append(c.chunks, newChunk(bytes, count, ts))
c.metrics.chunks.Set(float64(len(c.chunks)))
return
}
last.AddSample(newSample(bytes, count, ts))
}
func (c *Chunks) Prune(olderThan time.Duration) bool {
@ -204,6 +193,11 @@ type Sample struct {
Count float64
}
type SampleWithoutTS struct {
Bytes float64
Count float64
}
func newSample(bytes, count float64, ts model.Time) Sample {
return Sample{
Timestamp: ts,
@ -212,7 +206,17 @@ func newSample(bytes, count float64, ts model.Time) Sample {
}
}
type Samples []Sample
func newSampleWithoutTS(bytes, count float64) SampleWithoutTS {
return SampleWithoutTS{
Bytes: bytes,
Count: count,
}
}
type (
Samples []Sample
SamplesWithoutTS []SampleWithoutTS
)
type Chunk struct {
Samples Samples
@ -291,3 +295,34 @@ func (c *Chunk) ForTypeAndRange(
return aggregatedSamples, nil
}
func (c *Chunks) Downsample(now model.Time) {
c.lock.Lock()
defer func() {
c.lock.Unlock()
c.rawSamples = c.rawSamples[:0]
}()
var totalBytes, totalCount float64
for _, sample := range c.rawSamples {
totalBytes += sample.Bytes
totalCount += sample.Count
}
c.metrics.samples.Inc()
if len(c.chunks) == 0 {
c.chunks = append(c.chunks, newChunk(totalBytes, totalCount, now))
c.metrics.chunks.Set(float64(len(c.chunks)))
return
}
last := c.chunks[len(c.chunks)-1]
if !last.spaceFor(now) {
c.chunks = append(c.chunks, newChunk(totalBytes, totalCount, now))
c.metrics.chunks.Set(float64(len(c.chunks)))
return
}
last.AddSample(newSample(totalBytes, totalCount, now))
}

@ -447,3 +447,31 @@ func Test_Chunks_Iterator(t *testing.T) {
require.Equal(t, 4, cap(res.Series[0].Samples))
})
}
func TestDownsample(t *testing.T) {
// Create a Chunks object with two rawChunks, each containing two Samples
c := NewChunks(labels.Labels{
labels.Label{Name: "foo", Value: "bar"},
}, NewChunkMetrics(nil, "test"), log.NewNopLogger())
c.Observe(2.0, 1.0)
c.Observe(2.0, 1.0)
c.Observe(2.0, 1.0)
now := model.Time(5000)
// Call the Downsample function
c.Downsample(now)
chunks := c.chunks
require.Len(t, chunks, 1)
// Check that the result is a Chunk with the correct summed values
result := chunks[0]
require.Len(t, result.Samples, 1)
require.Equal(t, 6.0, result.Samples[0].Bytes)
require.Equal(t, 3.0, result.Samples[0].Count)
require.Equal(t, model.Time(5000), result.Samples[0].Timestamp)
require.Len(t, c.rawSamples, 0)
}

@ -1,10 +1,14 @@
package metric
import "flag"
import (
"flag"
"time"
)
type AggregationConfig struct {
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."`
LogPushObservations bool `yaml:"log_push_observations,omitempty" doc:"description=Whether to log push observations."`
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."`
LogPushObservations bool `yaml:"log_push_observations,omitempty" doc:"description=Whether to log push observations."`
DownsamplePeriod time.Duration `yaml:"downsample_period"`
}
// RegisterFlags registers pattern ingester related flags.
@ -13,6 +17,22 @@ func (cfg *AggregationConfig) RegisterFlags(fs *flag.FlagSet) {
}
func (cfg *AggregationConfig) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) {
fs.BoolVar(&cfg.Enabled, prefix+"metric-aggregation.enabled", false, "Flag to enable or disable metric aggregation.")
fs.BoolVar(&cfg.LogPushObservations, prefix+"metric-aggregation.log-push-observations", false, "Flag to enable or disable logging of push observations.")
fs.BoolVar(
&cfg.Enabled,
prefix+"metric-aggregation.enabled",
false,
"Flag to enable or disable metric aggregation.",
)
fs.BoolVar(
&cfg.LogPushObservations,
prefix+"metric-aggregation.log-push-observations",
false,
"Flag to enable or disable logging of push observations.",
)
fs.DurationVar(
&cfg.DownsamplePeriod,
"pattern-ingester.downsample-period",
10*time.Second,
"How often to downsample metrics from raw push observations.",
)
}

@ -109,7 +109,7 @@ func (s *stream) Push(
"sample_ts_ns", s.lastTs,
)
}
s.chunks.Observe(bytes, count, model.TimeFromUnixNano(s.lastTs))
s.chunks.Observe(bytes, count)
}
return nil
}
@ -291,3 +291,12 @@ func (s *stream) prune(olderThan time.Duration) bool {
return len(s.patterns.Clusters()) == 0 && chunksPruned
}
func (s *stream) Downsample(ts model.Time) {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.chunks != nil {
s.chunks.Downsample(ts)
}
}

@ -136,6 +136,7 @@ func TestSampleIterator(t *testing.T) {
Line: "ts=2 msg=hello",
},
})
stream.Downsample(model.TimeFromUnix(20))
require.NoError(t, err)
@ -185,6 +186,7 @@ func TestSampleIterator(t *testing.T) {
},
})
require.NoError(t, err)
stream.Downsample(model.TimeFromUnix(20))
err = stream.Push(context.Background(), []push.Entry{
{
@ -197,6 +199,7 @@ func TestSampleIterator(t *testing.T) {
},
})
require.NoError(t, err)
stream.Downsample(model.TimeFromUnix(40))
t.Run("non-overlapping timestamps", func(t *testing.T) {
expr, err := syntax.ParseSampleExpr("count_over_time({foo=\"bar\"}[5s])")
@ -273,6 +276,7 @@ func TestSampleIterator(t *testing.T) {
Line: "ts=2 msg=hello",
},
})
stream.Downsample(model.TimeFromUnixNano(time.Unix(26, 999).UnixNano()))
require.NoError(t, err)

Loading…
Cancel
Save