Measure Per-Stream rates (#7370)

This PR implements the ability for ingesters to measure per-stream rates
at 1/s resolution.

It also fixes bugs in the kind of Limits used in the RateStore and the
Ingester API
pull/7391/head
Travis Patterson 3 years ago committed by GitHub
parent 4e61c60296
commit 3e7adb2c34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      pkg/distributor/limits.go
  2. 21
      pkg/distributor/ratestore.go
  3. 9
      pkg/distributor/ratestore_test.go
  4. 4
      pkg/ingester/checkpoint_test.go
  5. 59
      pkg/ingester/ingester.go
  6. 33
      pkg/ingester/instance.go
  7. 108
      pkg/ingester/instance_test.go
  8. 24
      pkg/ingester/stream.go
  9. 89
      pkg/ingester/stream_rate_calculator.go
  10. 25
      pkg/ingester/stream_rate_calculator_test.go
  11. 9
      pkg/ingester/stream_test.go
  12. 2
      pkg/ingester/streams_map_test.go

@ -3,6 +3,8 @@ package distributor
import (
"time"
"github.com/grafana/loki/pkg/validation"
"github.com/grafana/loki/pkg/distributor/shardstreams"
)
@ -22,4 +24,5 @@ type Limits interface {
IncrementDuplicateTimestamps(userID string) bool
ShardStreams(userID string) *shardstreams.Config
AllByUserID() map[string]*validation.Limits
}

@ -6,8 +6,6 @@ import (
"sync"
"time"
"github.com/grafana/loki/pkg/validation"
"github.com/weaveworks/common/instrument"
"github.com/grafana/dskit/services"
@ -46,10 +44,6 @@ type ingesterClient struct {
client logproto.StreamDataClient
}
type overrides interface {
AllByUserID() map[string]*validation.Limits
}
type rateStore struct {
services.Service
@ -62,17 +56,17 @@ type rateStore struct {
maxParallelism int
rateRefreshFailures *prometheus.CounterVec
refreshDuration *instrument.HistogramCollector
overrides overrides
limits Limits
}
func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, o overrides, registerer prometheus.Registerer) *rateStore { //nolint
func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, registerer prometheus.Registerer) *rateStore { //nolint
s := &rateStore{
ring: r,
clientPool: cf,
rateCollectionInterval: cfg.StreamRateUpdateInterval,
maxParallelism: cfg.MaxParallelism,
ingesterTimeout: cfg.IngesterReqTimeout,
overrides: o,
limits: l,
rateRefreshFailures: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "rate_store_refresh_failures_total",
@ -124,13 +118,14 @@ func (s *rateStore) updateAllRates(ctx context.Context) error {
}
func (s *rateStore) anyShardingEnabled() bool {
limits := s.overrides.AllByUserID()
limits := s.limits.AllByUserID()
if limits == nil {
return false
// There aren't any tenant limits, check the default
return s.limits.ShardStreams("fake").Enabled
}
for _, l := range limits {
if l.ShardStreams.Enabled {
for user := range limits {
if s.limits.ShardStreams(user).Enabled {
return true
}
}

@ -177,6 +177,7 @@ func (c *fakeStreamDataClient) GetStreamRates(ctx context.Context, in *logproto.
}
type fakeOverrides struct {
Limits
enabled bool
}
@ -190,6 +191,12 @@ func (c *fakeOverrides) AllByUserID() map[string]*validation.Limits {
}
}
func (c *fakeOverrides) ShardStreams(_ string) *shardstreams.Config {
return &shardstreams.Config{
Enabled: c.enabled,
}
}
type testContext struct {
ring *fakeRing
clientPool *fakeClientPool
@ -204,6 +211,6 @@ func setup(enabled bool) *testContext {
return &testContext{
ring: ring,
clientPool: cp,
rateStore: NewRateStore(cfg, ring, cp, &fakeOverrides{enabled}, nil),
rateStore: NewRateStore(cfg, ring, cp, &fakeOverrides{enabled: enabled}, nil),
}
}

@ -449,7 +449,7 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := 0; i < 3; i++ {
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator())
require.Nil(t, err)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
@ -496,7 +496,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := range instances {
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator())
require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{

@ -47,8 +47,7 @@ import (
const (
// RingKey is the key under which we store the ingesters ring in the KVStore.
RingKey = "ring"
internalInstanceID = "internal"
RingKey = "ring"
)
// ErrReadOnly is returned when the ingester is shutting down and a push was
@ -193,11 +192,10 @@ type Ingester struct {
clientConfig client.Config
tenantConfigs *runtime.TenantConfigs
shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown
instancesMtx sync.RWMutex
instances map[string]*instance
internalInstance *instance // used for non-user communication from the distributors
readonly bool
shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown
instancesMtx sync.RWMutex
instances map[string]*instance
readonly bool
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher
@ -232,6 +230,8 @@ type Ingester struct {
wal WAL
chunkFilter chunk.RequestChunkFilterer
streamRateCalculator *StreamRateCalculator
}
// New makes a new Ingester.
@ -260,6 +260,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
metrics: metrics,
flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
streamRateCalculator: NewStreamRateCalculator(),
}
i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})
@ -521,6 +522,8 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()
i.streamRateCalculator.Stop()
// In case the flag to terminate on shutdown is set we need to mark the
// ingester service as "failed", so Loki will shut down entirely.
// The module manager logs the failure `modules.ErrStopProcess` in a special way.
@ -625,12 +628,14 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
// GetStreamRates returns a response containing all streams and their current rate
// TODO: It might be nice for this to be human readable, eventually: Sort output and return labels, too?
func (i *Ingester) GetStreamRates(ctx context.Context, req *logproto.StreamRatesRequest) (*logproto.StreamRatesResponse, error) {
instance, err := i.getOrCreateInternalInstance()
if err != nil {
return &logproto.StreamRatesResponse{}, err
instances := i.getInstances()
var rates []*logproto.StreamRate
for _, inst := range instances {
rates = append(rates, inst.GetStreamRates(ctx, req)...)
}
return instance.GetStreamRates(ctx, req)
return &logproto.StreamRatesResponse{StreamRates: rates}, nil
}
func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { //nolint:revive
@ -644,7 +649,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter)
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.streamRateCalculator)
if err != nil {
return nil, err
}
@ -654,25 +659,6 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
return inst, nil
}
func (i *Ingester) getOrCreateInternalInstance() (*instance, error) { //nolint:revive
if inst, ok := i.getInternalInstance(); ok {
return inst, nil
}
i.instancesMtx.Lock()
defer i.instancesMtx.Unlock()
if i.internalInstance == nil {
inst, err := newInstance(&i.cfg, i.periodicConfigs, internalInstanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter)
if err != nil {
return nil, err
}
i.internalInstance = inst
}
return i.internalInstance, nil
}
// Query the ingests for log streams matching a set of matchers.
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
// initialize stats collection for ingester queries.
@ -969,17 +955,6 @@ func (i *Ingester) getInstanceByID(id string) (*instance, bool) {
return inst, ok
}
func (i *Ingester) getInternalInstance() (*instance, bool) {
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()
if i.internalInstance != nil {
return i.internalInstance, true
}
return nil, false
}
func (i *Ingester) getInstances() []*instance {
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()

@ -95,7 +95,8 @@ type instance struct {
metrics *ingesterMetrics
chunkFilter chunk.RequestChunkFilterer
chunkFilter chunk.RequestChunkFilterer
streamRateCalculator *StreamRateCalculator
}
func newInstance(
@ -108,6 +109,7 @@ func newInstance(
metrics *ingesterMetrics,
flushOnShutdownSwitch *OnceSwitch,
chunkFilter chunk.RequestChunkFilterer,
streamRateCalculator *StreamRateCalculator,
) (*instance, error) {
invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards))
if err != nil {
@ -132,6 +134,8 @@ func newInstance(
flushOnShutdownSwitch: flushOnShutdownSwitch,
chunkFilter: chunkFilter,
streamRateCalculator: streamRateCalculator,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i, err
@ -261,7 +265,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics)
// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
if record != nil {
@ -292,7 +296,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord
func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *stream {
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(ls), fp)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics)
i.streamsCreatedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Inc()
@ -337,12 +341,25 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels
return s.labels
}
func (i *instance) GetStreamRates(_ context.Context, _ *logproto.StreamRatesRequest) (*logproto.StreamRatesResponse, error) {
resp := &logproto.StreamRatesResponse{
StreamRates: make([]*logproto.StreamRate, 0, i.streams.Len()),
}
func (i *instance) GetStreamRates(_ context.Context, _ *logproto.StreamRatesRequest) []*logproto.StreamRate {
rates := make([]*logproto.StreamRate, 0, i.streams.Len())
buf := make([]byte, 256)
_ = i.streams.ForEach(func(s *stream) (bool, error) {
var streamHashNoShard uint64
streamHashNoShard, buf = s.labels.HashWithoutLabels(buf, ShardLbName)
streamHash := s.labels.Hash()
rates = append(rates, &logproto.StreamRate{
StreamHash: streamHash,
StreamHashNoShard: streamHashNoShard,
Rate: i.streamRateCalculator.RateFor(streamHash),
})
return true, nil
})
return resp, nil
return rates
}
func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) {

@ -61,7 +61,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
require.Nil(t, err)
// avoid entries from the future.
@ -89,7 +89,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
require.Nil(t, err)
const (
@ -136,6 +136,93 @@ func TestConcurrentPushes(t *testing.T) {
// test passes if no goroutine reports error
}
func TestGetStreamRates(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
require.NoError(t, err)
const (
concurrent = 10
iterations = 100
entriesPerIteration = 100
)
uniqueLabels := map[string]bool{}
startChannel := make(chan struct{})
labelsByHash := map[uint64]labels.Labels{}
wg := sync.WaitGroup{}
for i := 0; i < concurrent; i++ {
l := makeRandomLabels()
for uniqueLabels[l.String()] {
l = makeRandomLabels()
}
uniqueLabels[l.String()] = true
labelsByHash[l.Hash()] = l
wg.Add(1)
go func(labels string) {
defer wg.Done()
<-startChannel
tt := time.Now().Add(-5 * time.Minute)
for i := 0; i < iterations; i++ {
// each iteration generated the entries [hello 0, hello 100) for a total of 790 bytes per push
_ = inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: labels, Entries: entries(entriesPerIteration, tt)},
}})
tt = tt.Add(entriesPerIteration * time.Nanosecond)
}
}(l.String())
}
close(startChannel)
wg.Wait()
var rates []*logproto.StreamRate
require.Eventually(t, func() bool {
rates = inst.GetStreamRates(context.Background(), &logproto.StreamRatesRequest{})
if len(rates) != concurrent {
return false
}
valid := true
for i := 0; i < len(rates); i++ {
streamRates := rates[i]
origLabels, ok := labelsByHash[streamRates.StreamHash]
valid = valid && ok &&
streamRates.Rate == 79000 && // Each stream gets 100 pushes of 790 bytes
labelHashNoShard(origLabels) == streamRates.StreamHashNoShard
}
return valid
}, 3*time.Second, 100*time.Millisecond)
// Decay back to 0
require.Eventually(t, func() bool {
rates = inst.GetStreamRates(context.Background(), &logproto.StreamRatesRequest{})
for _, r := range rates {
if r.Rate != 0 {
return false
}
}
return true
}, 3*time.Second, 100*time.Millisecond)
}
func labelHashNoShard(l labels.Labels) uint64 {
buf := make([]byte, 256)
hash, _ := l.HashWithoutLabels(buf, ShardLbName)
return hash
}
func TestSyncPeriod(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
@ -148,7 +235,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
require.Nil(t, err)
lbls := makeRandomLabels()
@ -193,7 +280,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
cfg.SyncMinUtilization = 0.20
cfg.IndexShards = indexShards
instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
require.Nil(t, err)
currentTime := time.Now()
@ -206,7 +293,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
for _, testStream := range testStreams {
stream, err := instance.getOrCreateStream(testStream, recordPool.GetRecord())
require.NoError(t, err)
chunk := newStream(cfg, limiter, "fake", 0, nil, true, NilMetrics).NewChunk()
chunk := newStream(cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
@ -399,7 +486,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
ctx := context.Background()
for n := 0; n < b.N; n++ {
@ -443,7 +530,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()
inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil, 10)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
@ -459,7 +546,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
lbs := makeRandomLabels()
b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NilMetrics))
inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics))
}
})
}
@ -712,7 +799,7 @@ func TestStreamShardingUsage(t *testing.T) {
})
t.Run("invalid push returns error", func(t *testing.T) {
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
@ -731,7 +818,7 @@ func TestStreamShardingUsage(t *testing.T) {
})
t.Run("valid push returns no error", func(t *testing.T) {
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
@ -765,6 +852,7 @@ func defaultInstance(t *testing.T) *instance {
NilMetrics,
nil,
nil,
NewStreamRateCalculator(),
)
require.Nil(t, err)
insertData(t, instance)

@ -67,7 +67,8 @@ type stream struct {
// introduced to facilitate removing the ordering constraint.
entryCt int64
unorderedWrites bool
unorderedWrites bool
streamRateCalculator *StreamRateCalculator
}
type chunkDesc struct {
@ -85,16 +86,18 @@ type entryWithError struct {
e error
}
func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics) *stream {
return &stream{
limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second),
cfg: cfg,
fp: fp,
labels: labels,
labelsString: labels.String(),
tailers: map[uint32]*tailer{},
metrics: metrics,
tenant: tenant,
limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second),
cfg: cfg,
fp: fp,
labels: labels,
labelsString: labels.String(),
tailers: map[uint32]*tailer{},
metrics: metrics,
tenant: tenant,
streamRateCalculator: streamRateCalculator,
unorderedWrites: unorderedWrites,
}
}
@ -392,6 +395,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh
}
}
s.streamRateCalculator.Record(s.labels.Hash(), int64(totalBytes))
s.reportMetrics(outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes)
return toStore, failedEntriesWithError
}

@ -0,0 +1,89 @@
package ingester
import (
"sync"
"time"
)
const (
// defaultStripeSize is the default number of entries to allocate in the
// stripeSeries list.
defaultStripeSize = 1 << 15
// The intent is for a per-second rate so this is hard coded
updateInterval = time.Second
)
// stripeLock is taken from ruler/storage/wal/series.go
type stripeLock struct {
sync.RWMutex
// Padding to avoid multiple locks being on the same cache line.
_ [40]byte
}
type StreamRateCalculator struct {
size int
samples []int64
rates []int64
locks []stripeLock
stopchan chan struct{}
}
func NewStreamRateCalculator() *StreamRateCalculator {
calc := &StreamRateCalculator{
size: defaultStripeSize,
samples: make([]int64, defaultStripeSize),
rates: make([]int64, defaultStripeSize),
locks: make([]stripeLock, defaultStripeSize),
stopchan: make(chan struct{}),
}
go calc.updateLoop()
return calc
}
func (c *StreamRateCalculator) updateLoop() {
t := time.NewTicker(updateInterval)
defer t.Stop()
for {
select {
case <-t.C:
c.updateRates()
case <-c.stopchan:
return
}
}
}
func (c *StreamRateCalculator) updateRates() {
for i := 0; i < c.size; i++ {
c.locks[i].Lock()
c.rates[i] = c.samples[i]
c.samples[i] = 0
c.locks[i].Unlock()
}
}
func (c *StreamRateCalculator) RateFor(streamHash uint64) int64 {
i := streamHash & uint64(c.size-1)
c.locks[i].RLock()
defer c.locks[i].RUnlock()
return c.rates[i]
}
func (c *StreamRateCalculator) Record(streamHash uint64, bytes int64) {
i := streamHash & uint64(c.size-1)
c.locks[i].Lock()
defer c.locks[i].Unlock()
c.samples[i] += bytes
}
func (c *StreamRateCalculator) Stop() {
close(c.stopchan)
}

@ -0,0 +1,25 @@
package ingester
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestStreamRateCalculator(t *testing.T) {
calc := NewStreamRateCalculator()
defer calc.Stop()
for i := 0; i < 100; i++ {
calc.Record(0, 100)
}
require.Eventually(t, func() bool {
return calc.RateFor(0) == 10000
}, 2*time.Second, 250*time.Millisecond)
require.Eventually(t, func() bool {
return calc.RateFor(0) == 0
}, 2*time.Second, 250*time.Millisecond)
}

@ -61,6 +61,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
)
@ -107,6 +108,7 @@ func TestPushDeduplication(t *testing.T) {
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
)
@ -136,6 +138,7 @@ func TestPushRejectOldCounter(t *testing.T) {
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
)
@ -230,6 +233,7 @@ func TestUnorderedPush(t *testing.T) {
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
)
@ -326,6 +330,7 @@ func TestPushRateLimit(t *testing.T) {
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
)
@ -359,6 +364,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) {
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
)
@ -391,6 +397,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) {
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
)
@ -441,7 +448,7 @@ func Benchmark_PushStream(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
s := newStream(&Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NilMetrics)
s := newStream(&Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10)
require.NoError(b, err)

@ -25,6 +25,7 @@ func TestStreamsMap(t *testing.T) {
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
),
newStream(
@ -36,6 +37,7 @@ func TestStreamsMap(t *testing.T) {
{Name: "bar", Value: "foo"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
),
}

Loading…
Cancel
Save