Loki: Per-tenant stream sharding (#7311)

**What this PR does / why we need it**:
- Move stream sharding configuration to its own package to avoid cyclic
imports
- Change stream sharding to be a per-tenant configuration
- Change ingesters to reject whole streams due to rate-limit based on
per-tenant stream sharding
- Change stream sharding flags prefix from `distributor.shard-stream` to
`shard-stream`
pull/7355/head
Dylan Guedes 4 years ago committed by GitHub
parent f5501469ab
commit b1d4efab12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      docs/sources/configuration/_index.md
  2. 56
      pkg/distributor/distributor.go
  3. 26
      pkg/distributor/distributor_test.go
  4. 8
      pkg/distributor/limits.go
  5. 23
      pkg/distributor/shardstreams/config.go
  6. 3
      pkg/ingester/ingester.go
  7. 3
      pkg/ingester/instance.go
  8. 114
      pkg/ingester/instance_test.go
  9. 2
      pkg/ingester/recovery.go
  10. 12
      pkg/ingester/stream.go
  11. 29
      pkg/ingester/stream_test.go
  12. 1
      pkg/loki/modules.go
  13. 15
      pkg/validation/limits.go
  14. 11
      pkg/validation/limits_test.go

@ -318,28 +318,6 @@ ring:
# reading and writing.
# CLI flag: -distributor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# Configures the distributor to shard streams that are too big
shard_streams:
# Whether to enable stream sharding
#
# CLI flag: -distributor.shard-streams.enabled
[enabled: <boolean> | default = false]
# Enable logging when sharding streams because logging on the read path may
# impact performance. When disabled, stream sharding will emit no logs
# regardless of log level
#
# CLI flag: -distributor.shard-streams.logging-enabled
[logging_enabled: <boolean> | default = false]
# Threshold that determines how much the stream should be sharded.
# The formula used is n = ceil(stream size + ingested rate / desired rate), where n is the number of shards.
# For instance, if a stream ingestion is at 10MB, desired rate is 3MB (default), and a stream of size 1MB is
# received, the given stream will be split into n = ceil((1 + 10)/3) = 4 shards.
#
# CLI flag: -distributor.shard-streams.desired-rate
[desired_rate: <string> | default = 3MB]
```
## querier
@ -2364,6 +2342,28 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -ingester.per-stream-rate-limit-burst
[per_stream_rate_limit_burst: <string|int> | default = "15MB"]
# Configures the distributor to shard streams that are too big
shard_streams:
# Whether to enable stream sharding
#
# CLI flag: -shard-streams.enabled
[enabled: <boolean> | default = false]
# Enable logging when sharding streams because logging on the read path may
# impact performance. When disabled, stream sharding will emit no logs
# regardless of log level
#
# CLI flag: -shard-streams.logging-enabled
[logging_enabled: <boolean> | default = false]
# Threshold that determines how much the stream should be sharded.
# The formula used is n = ceil(stream size + ingested rate / desired rate), where n is the number of shards.
# For instance, if a stream ingestion is at 10MB, desired rate is 3MB (default), and a stream of size 1MB is
# received, the given stream will be split into n = ceil((1 + 10)/3) = 4 shards.
#
# CLI flag: -shard-streams.desired-rate
[desired_rate: <string> | default = 3MB]
# Limit how far back in time series data and metadata can be queried,
# up until lookback duration ago.
# This limit is enforced in the query frontend, the querier and the ruler.

@ -30,6 +30,7 @@ import (
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/distributor/shardstreams"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
@ -37,7 +38,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/flagext"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
@ -51,21 +51,6 @@ var (
rfStats = usagestats.NewInt("distributor_replication_factor")
)
type ShardStreamsConfig struct {
Enabled bool `yaml:"enabled"`
LoggingEnabled bool `yaml:"logging_enabled"`
// DesiredRate is the threshold used to shard the stream into smaller pieces.
// Expected to be in bytes.
DesiredRate flagext.ByteSize `yaml:"desired_rate"`
}
func (cfg *ShardStreamsConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
fs.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Automatically shard streams to keep them under the per-stream rate limit")
fs.BoolVar(&cfg.LoggingEnabled, prefix+".logging-enabled", false, "Enable logging when sharding streams")
fs.Var(&cfg.DesiredRate, prefix+".desired-rate", "threshold used to cut a new shard. Default (3MB) means if a rate is above 3MB, it will be sharded.")
}
// Config for a Distributor.
type Config struct {
// Distributors ring
@ -73,15 +58,11 @@ type Config struct {
// For testing.
factory ring_client.PoolFactory `yaml:"-"`
// ShardStreams configures wether big streams should be sharded or not.
ShardStreams ShardStreamsConfig `yaml:"shard_streams"`
}
// RegisterFlags registers distributor-related flags.
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.ShardStreams.RegisterFlagsWithPrefix("distributor.shard-streams", fs)
}
// RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
@ -329,7 +310,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
stream.Entries = stream.Entries[:n]
if d.cfg.ShardStreams.Enabled {
shardStreamsCfg := d.validator.Limits.ShardStreams(userID)
if shardStreamsCfg.Enabled {
derivedKeys, derivedStreams := d.shardStream(stream, streamSize, userID)
keys = append(keys, derivedKeys...)
streams = append(streams, derivedStreams...)
@ -409,15 +391,15 @@ func min(x1, x2 int) int {
// N is the sharding size for the given stream. shardSteam returns the smaller
// streams and their associated keys for hashing to ingesters.
func (d *Distributor) shardStream(stream logproto.Stream, streamSize int, userID string) ([]uint32, []streamTracker) {
shardStreamsCfg := d.validator.Limits.ShardStreams(userID)
logger := log.With(util_log.WithUserID(userID, util_log.Logger), "stream", stream.Labels)
shardCount := d.shardCountFor(logger, &stream, streamSize, d.cfg.ShardStreams.DesiredRate.Val(), d.rateStore)
shardCount := d.shardCountFor(logger, &stream, streamSize, d.rateStore, shardStreamsCfg)
if shardCount <= 1 {
return []uint32{util.TokenFor(userID, stream.Labels)}, []streamTracker{{stream: stream}}
}
if d.cfg.ShardStreams.LoggingEnabled {
if shardStreamsCfg.LoggingEnabled {
level.Info(logger).Log("msg", "sharding request", "shard_count", shardCount)
}
@ -427,7 +409,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, streamSize int, userID
derivedKeys := make([]uint32, 0, shardCount)
derivedStreams := make([]streamTracker, 0, shardCount)
for i := 0; i < shardCount; i++ {
shard, ok := d.createShard(stream, streamLabels, streamPattern, shardCount, i)
shard, ok := d.createShard(shardStreamsCfg, stream, streamLabels, streamPattern, shardCount, i)
if !ok {
level.Error(logger).Log("msg", "couldn't create shard", "idx", i)
continue
@ -436,7 +418,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, streamSize int, userID
derivedKeys = append(derivedKeys, util.TokenFor(userID, shard.Labels))
derivedStreams = append(derivedStreams, streamTracker{stream: shard})
if d.cfg.ShardStreams.LoggingEnabled {
if shardStreamsCfg.LoggingEnabled {
level.Info(util_log.Logger).Log("msg", "stream derived from sharding", "src-stream", stream.Labels, "derived-stream", shard.Labels)
}
}
@ -460,8 +442,8 @@ func labelTemplate(lbls string) labels.Labels {
return streamLabels
}
func (d *Distributor) createShard(stream logproto.Stream, lbls labels.Labels, streamPattern string, totalShards, shardNumber int) (logproto.Stream, bool) {
lowerBound, upperBound, ok := d.boundsFor(stream, totalShards, shardNumber)
func (d *Distributor) createShard(shardStreamsCfg *shardstreams.Config, stream logproto.Stream, lbls labels.Labels, streamPattern string, totalShards, shardNumber int) (logproto.Stream, bool) {
lowerBound, upperBound, ok := d.boundsFor(stream, totalShards, shardNumber, shardStreamsCfg.LoggingEnabled)
if !ok {
return logproto.Stream{}, false
}
@ -475,7 +457,7 @@ func (d *Distributor) createShard(stream logproto.Stream, lbls labels.Labels, st
}, true
}
func (d *Distributor) boundsFor(stream logproto.Stream, totalShards, shardNumber int) (int, int, bool) {
func (d *Distributor) boundsFor(stream logproto.Stream, totalShards, shardNumber int, loggingEnabled bool) (int, int, bool) {
entriesPerWindow := float64(len(stream.Entries)) / float64(totalShards)
fIdx := float64(shardNumber)
@ -483,7 +465,7 @@ func (d *Distributor) boundsFor(stream logproto.Stream, totalShards, shardNumber
upperBound := min(int(entriesPerWindow*(1+fIdx)), len(stream.Entries))
if lowerBound > upperBound {
if d.cfg.ShardStreams.LoggingEnabled {
if loggingEnabled {
level.Warn(util_log.Logger).Log("msg", "sharding with lowerbound > upperbound", "lowerbound", lowerBound, "upperbound", upperBound, "shards", totalShards, "labels", stream.Labels)
}
return 0, 0, false
@ -598,10 +580,10 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
// based on the rate stored in the rate store and will store the new evaluated number of shards.
//
// desiredRate is expected to be given in bytes.
func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, streamSize, desiredRate int, rateStore RateStore) int {
if desiredRate <= 0 {
if d.cfg.ShardStreams.LoggingEnabled {
level.Error(logger).Log("msg", "invalid desired rate", "desired_rate", desiredRate)
func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, streamSize int, rateStore RateStore, streamShardcfg *shardstreams.Config) int {
if streamShardcfg.DesiredRate.Val() <= 0 {
if streamShardcfg.LoggingEnabled {
level.Error(logger).Log("msg", "invalid desired rate", "desired_rate", streamShardcfg.DesiredRate.String())
}
return 1
}
@ -609,16 +591,16 @@ func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream,
rate, err := rateStore.RateFor(stream)
if err != nil {
d.streamShardingFailures.WithLabelValues("rate_not_found").Inc()
if d.cfg.ShardStreams.LoggingEnabled {
if streamShardcfg.LoggingEnabled {
level.Error(logger).Log("msg", "couldn't shard stream because rate store returned error", "err", err)
}
return 1
}
shards := calculateShards(rate, streamSize, desiredRate)
shards := calculateShards(rate, streamSize, streamShardcfg.DesiredRate.Val())
if shards > len(stream.Entries) {
d.streamShardingFailures.WithLabelValues("too_many_shards").Inc()
if d.cfg.ShardStreams.LoggingEnabled {
if streamShardcfg.LoggingEnabled {
level.Error(logger).Log("msg", "number of shards bigger than number of entries", "shards", shards, "entries", len(stream.Entries))
}
return len(stream.Entries)

@ -672,11 +672,21 @@ func TestStreamShard(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
baseStream.Entries = tc.entries
distributorLimits := &validation.Limits{}
flagext.DefaultValues(distributorLimits)
distributorLimits.ShardStreams.DesiredRate = desiredRate
overrides, err := validation.NewOverrides(*distributorLimits, nil)
require.NoError(t, err)
validator, err := NewValidator(overrides)
require.NoError(t, err)
d := Distributor{
rateStore: &noopRateStore{},
streamShardingFailures: shardingFailureMetric,
validator: validator,
}
d.cfg.ShardStreams.DesiredRate = desiredRate
_, derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake")
require.Equal(t, tc.wantDerivedStream, derivedStreams)
@ -865,21 +875,12 @@ func TestShardCountFor(t *testing.T) {
name string
stream *logproto.Stream
rate int
desiredRate int
desiredRate loki_flagext.ByteSize
wantStreamSize int // used for sanity check.
wantShards int
wantErr bool
}{
{
name: "2 entries with zero rate and desired rate < 0, return 1 shard",
stream: &logproto.Stream{Hash: 1},
rate: 0,
desiredRate: -5, // in bytes
wantStreamSize: 2, // in bytes
wantShards: 1,
wantErr: false,
},
{
name: "2 entries with zero rate and desired rate == 0, return 1 shard",
stream: &logproto.Stream{Hash: 1},
@ -953,11 +954,12 @@ func TestShardCountFor(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.EnforceMetricName = false
limits.ShardStreams.DesiredRate = tc.desiredRate
d := &Distributor{
streamShardingFailures: shardingFailureMetric,
}
got := d.shardCountFor(util_log.Logger, tc.stream, tc.wantStreamSize, tc.desiredRate, &noopRateStore{tc.rate})
got := d.shardCountFor(util_log.Logger, tc.stream, tc.wantStreamSize, &noopRateStore{tc.rate}, limits.ShardStreams)
require.Equal(t, tc.wantShards, got)
})
}

@ -1,6 +1,10 @@
package distributor
import "time"
import (
"time"
"github.com/grafana/loki/pkg/distributor/shardstreams"
)
// Limits is an interface for distributor limits/related configs
type Limits interface {
@ -16,4 +20,6 @@ type Limits interface {
RejectOldSamplesMaxAge(userID string) time.Duration
IncrementDuplicateTimestamps(userID string) bool
ShardStreams(userID string) *shardstreams.Config
}

@ -0,0 +1,23 @@
package shardstreams
import (
"flag"
"github.com/grafana/loki/pkg/util/flagext"
)
type Config struct {
Enabled bool `yaml:"enabled" json:"enabled"`
LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled"`
// DesiredRate is the threshold used to shard the stream into smaller pieces.
// Expected to be in bytes.
DesiredRate flagext.ByteSize `yaml:"desired_rate" json:"desired_rate"`
}
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
fs.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Automatically shard streams to keep them under the per-stream rate limit")
fs.BoolVar(&cfg.LoggingEnabled, prefix+".logging-enabled", false, "Enable logging when sharding streams")
cfg.DesiredRate.Set("3mb") //nolint:errcheck
fs.Var(&cfg.DesiredRate, prefix+".desired-rate", "threshold used to cut a new shard. Default (3MB) means if a rate is above 3MB, it will be sharded.")
}

@ -105,9 +105,6 @@ type Config struct {
IndexShards int `yaml:"index_shards"`
MaxDroppedStreams int `yaml:"max_dropped_streams"`
// Whether nor not to ingest all at once or not. Comes from distributor StreamShards Enabled
RateLimitWholeStream bool `yaml:"-"`
}
// RegisterFlags registers the flags.

@ -172,6 +172,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
record := recordPool.GetRecord()
record.UserID = i.instanceID
defer recordPool.PutRecord(record)
rateLimitWholeStream := i.limiter.limits.ShardStreams(i.instanceID).Enabled
var appendErr error
for _, reqStream := range req.Streams {
@ -195,7 +196,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
continue
}
_, appendErr = s.Push(ctx, reqStream.Entries, record, 0, false)
_, appendErr = s.Push(ctx, reqStream.Entries, record, 0, false, rateLimitWholeStream)
s.chunkMtx.Unlock()
}

@ -10,19 +10,20 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/distributor/shardstreams"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
loki_runtime "github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/validation"
)
@ -646,6 +647,109 @@ func Test_QuerySampleWithDelete(t *testing.T) {
require.Equal(t, samples, []float64{1.})
}
type fakeLimits struct {
limits map[string]*validation.Limits
}
func (f fakeLimits) TenantLimits(userID string) *validation.Limits {
limits, ok := f.limits[userID]
if !ok {
return nil
}
return limits
}
func (f fakeLimits) AllByUserID() map[string]*validation.Limits {
return f.limits
}
func TestStreamShardingUsage(t *testing.T) {
setupCustomTenantLimit := func(perStreamLimit string) *validation.Limits {
shardStreamsCfg := &shardstreams.Config{Enabled: true, LoggingEnabled: true}
shardStreamsCfg.DesiredRate.Set("6MB") //nolint:errcheck
customTenantLimits := &validation.Limits{}
flagext.DefaultValues(customTenantLimits)
customTenantLimits.PerStreamRateLimit.Set(perStreamLimit) //nolint:errcheck
customTenantLimits.PerStreamRateLimitBurst.Set(perStreamLimit) //nolint:errcheck
customTenantLimits.ShardStreams = shardStreamsCfg
return customTenantLimits
}
customTenant1 := "my-org1"
customTenant2 := "my-org2"
limitsDefinition := &fakeLimits{
limits: make(map[string]*validation.Limits),
}
// testing with 1 because although 1 is enough to accept at least the
// first line entry, because per-stream sharding is enabled,
// all entries are rejected if one of them isn't to be accepted.
limitsDefinition.limits[customTenant1] = setupCustomTenantLimit("1")
limitsDefinition.limits[customTenant2] = setupCustomTenantLimit("4")
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), limitsDefinition)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
defaultShardStreamsCfg := limiter.limits.ShardStreams("fake")
tenantShardStreamsCfg := limiter.limits.ShardStreams(customTenant1)
t.Run("test default configuration", func(t *testing.T) {
require.Equal(t, false, defaultShardStreamsCfg.Enabled)
require.Equal(t, "3MB", defaultShardStreamsCfg.DesiredRate.String())
require.Equal(t, false, defaultShardStreamsCfg.LoggingEnabled)
})
t.Run("test configuration being applied", func(t *testing.T) {
require.Equal(t, true, tenantShardStreamsCfg.Enabled)
require.Equal(t, "6MB", tenantShardStreamsCfg.DesiredRate.String())
require.Equal(t, true, tenantShardStreamsCfg.LoggingEnabled)
})
t.Run("invalid push returns error", func(t *testing.T) {
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`,
Entries: []logproto.Entry{
{Timestamp: time.Now(), Line: "1"},
{Timestamp: time.Now(), Line: "2"},
{Timestamp: time.Now(), Line: "3"},
},
},
},
})
require.Error(t, err)
})
t.Run("valid push returns no error", func(t *testing.T) {
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{myotherlabel="myothervalue"}`,
Entries: []logproto.Entry{
{Timestamp: time.Now(), Line: "1"},
{Timestamp: time.Now(), Line: "2"},
{Timestamp: time.Now(), Line: "3"},
},
},
},
})
require.NoError(t, err)
})
}
func defaultInstance(t *testing.T) *instance {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()

@ -165,7 +165,7 @@ func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error {
}
// ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments)
bytesAdded, err := s.(*stream).Push(context.Background(), entries.Entries, nil, entries.Counter, true)
bytesAdded, err := s.(*stream).Push(context.Background(), entries.Entries, nil, entries.Counter, true, false)
r.ing.replayController.Add(int64(bytesAdded))
if err != nil && err == ErrEntriesExist {
r.ing.metrics.duplicateEntriesTotal.Add(float64(len(entries.Entries)))

@ -150,6 +150,8 @@ func (s *stream) Push(
// Lock chunkMtx while pushing.
// If this is false, chunkMtx must be held outside Push.
lockChunk bool,
// Whether nor not to ingest all at once or not. It is a per-tenant configuration.
rateLimitWholeStream bool,
) (int, error) {
if lockChunk {
s.chunkMtx.Lock()
@ -168,8 +170,8 @@ func (s *stream) Push(
return 0, ErrEntriesExist
}
toStore, invalid := s.validateEntries(entries, isReplay)
if s.cfg.RateLimitWholeStream && hasRateLimitErr(invalid) {
toStore, invalid := s.validateEntries(entries, isReplay, rateLimitWholeStream)
if rateLimitWholeStream && hasRateLimitErr(invalid) {
return 0, errorForFailedEntries(s, invalid, len(entries))
}
@ -320,7 +322,7 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry) (in
return bytesAdded, storedEntries, invalid
}
func (s *stream) validateEntries(entries []logproto.Entry, isReplay bool) ([]logproto.Entry, []entryWithError) {
func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWholeStream bool) ([]logproto.Entry, []entryWithError) {
var (
outOfOrderSamples, outOfOrderBytes int
rateLimitedSamples, rateLimitedBytes int
@ -349,7 +351,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay bool) ([]log
totalBytes += lineBytes
now := time.Now()
if !s.cfg.RateLimitWholeStream && !s.limiter.AllowN(now, lineBytes) {
if !rateLimitWholeStream && !s.limiter.AllowN(now, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(lineBytes)}})
rateLimitedSamples++
rateLimitedBytes += lineBytes
@ -380,7 +382,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay bool) ([]log
// ingestion, the limiter should only be advanced when the whole stream can be
// sent
now := time.Now()
if s.cfg.RateLimitWholeStream && !s.limiter.AllowN(now, validBytes) {
if rateLimitWholeStream && !s.limiter.AllowN(now, totalBytes) {
// Report that the whole stream was rate limited
rateLimitedSamples = len(entries)
failedEntriesWithError = make([]entryWithError, 0, len(entries))

@ -66,7 +66,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
_, err := s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(int64(numLogs), 0), Line: "log"},
}, recordPool.GetRecord(), 0, true)
}, recordPool.GetRecord(), 0, true, false)
require.NoError(t, err)
newLines := make([]logproto.Entry, numLogs)
@ -86,7 +86,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
fmt.Fprintf(&expected, "total ignored: %d out of %d", numLogs, numLogs)
expectErr := httpgrpc.Errorf(http.StatusBadRequest, expected.String())
_, err = s.Push(context.Background(), newLines, recordPool.GetRecord(), 0, true)
_, err = s.Push(context.Background(), newLines, recordPool.GetRecord(), 0, true, false)
require.Error(t, err)
require.Equal(t, expectErr.Error(), err.Error())
})
@ -114,7 +114,7 @@ func TestPushDeduplication(t *testing.T) {
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "newer, better test"},
}, recordPool.GetRecord(), 0, true)
}, recordPool.GetRecord(), 0, true, false)
require.NoError(t, err)
require.Len(t, s.chunks, 1)
require.Equal(t, s.chunks[0].chunk.Size(), 2,
@ -144,7 +144,7 @@ func TestPushRejectOldCounter(t *testing.T) {
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "newer, better test"},
}, recordPool.GetRecord(), 0, true)
}, recordPool.GetRecord(), 0, true, false)
require.NoError(t, err)
require.Len(t, s.chunks, 1)
require.Equal(t, s.chunks[0].chunk.Size(), 2,
@ -153,13 +153,13 @@ func TestPushRejectOldCounter(t *testing.T) {
// fail to push with a counter <= the streams internal counter
_, err = s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "test"},
}, recordPool.GetRecord(), 2, true)
}, recordPool.GetRecord(), 2, true, false)
require.Equal(t, ErrEntriesExist, err)
// succeed with a greater counter
_, err = s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "test"},
}, recordPool.GetRecord(), 3, true)
}, recordPool.GetRecord(), 3, true, false)
require.Nil(t, err)
}
@ -273,7 +273,7 @@ func TestUnorderedPush(t *testing.T) {
if x.cutBefore {
_ = s.cutChunk(context.Background())
}
written, err := s.Push(context.Background(), x.entries, recordPool.GetRecord(), 0, true)
written, err := s.Push(context.Background(), x.entries, recordPool.GetRecord(), 0, true, false)
if x.err {
require.NotNil(t, err)
} else {
@ -334,7 +334,8 @@ func TestPushRateLimit(t *testing.T) {
{Timestamp: time.Unix(1, 0), Line: "aaaaaaaaab"},
}
// Counter should be 2 now since the first line will be deduped.
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true)
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true, true)
require.Error(t, err)
require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[1].Line))}).Error())
}
@ -348,7 +349,6 @@ func TestPushRateLimitAllOrNothing(t *testing.T) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
cfg := defaultConfig()
cfg.RateLimitWholeStream = true
s := newStream(
cfg,
@ -368,7 +368,8 @@ func TestPushRateLimitAllOrNothing(t *testing.T) {
}
// Both entries have errors because rate limiting is done all at once
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true)
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true, true)
require.Error(t, err)
require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[0].Line))}).Error())
require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[1].Line))}).Error())
}
@ -400,7 +401,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) {
}
// Push a first entry (it doesn't matter if we look like we're replaying or not)
_, err = s.Push(context.Background(), entries, nil, 1, true)
_, err = s.Push(context.Background(), entries, nil, 1, true, false)
require.Nil(t, err)
// Create a sample outside the validity window
@ -409,11 +410,11 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) {
}
// Pretend it's not a replay, ensure we error
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true)
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0, true, false)
require.NotNil(t, err)
// Now pretend it's a replay. The same write should succeed.
_, err = s.Push(context.Background(), entries, nil, 2, true)
_, err = s.Push(context.Background(), entries, nil, 2, true, false)
require.Nil(t, err)
}
@ -455,7 +456,7 @@ func Benchmark_PushStream(b *testing.B) {
for n := 0; n < b.N; n++ {
rec := recordPool.GetRecord()
_, err := s.Push(ctx, e, rec, 0, true)
_, err := s.Push(ctx, e, rec, 0, true, false)
require.NoError(b, err)
recordPool.PutRecord(rec)
}

@ -438,7 +438,6 @@ func (t *Loki) initQuerier() (services.Service, error) {
func (t *Loki) initIngester() (_ services.Service, err error) {
t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ingester.RateLimitWholeStream = t.Cfg.Distributor.ShardStreams.Enabled
t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer)
if err != nil {

@ -10,9 +10,6 @@ import (
"github.com/go-kit/log/level"
dskit_flagext "github.com/grafana/dskit/flagext"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/common/sigv4"
@ -21,9 +18,12 @@ import (
"golang.org/x/time/rate"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/distributor/shardstreams"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode"
"github.com/grafana/loki/pkg/util/flagext"
util_log "github.com/grafana/loki/pkg/util/log"
)
const (
@ -148,6 +148,8 @@ type Limits struct {
// Deprecated
CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes"`
ShardStreams *shardstreams.Config `yaml:"shard_streams" json:"shard_streams"`
}
type StreamRetention struct {
@ -230,6 +232,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
// Deprecated
dskit_flagext.DeprecatedFlag(f, "compactor.allow-deletes", "Deprecated. Instead, see compactor.deletion-mode which is another per tenant configuration", util_log.Logger)
l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -608,6 +613,10 @@ func (o *Overrides) DeletionMode(userID string) string {
return o.getOverridesForUser(userID).DeletionMode
}
func (o *Overrides) ShardStreams(userID string) *shardstreams.Config {
return o.getOverridesForUser(userID).ShardStreams
}
func (o *Overrides) DefaultLimits() *Limits {
return o.defaultLimits
}

@ -72,6 +72,10 @@ ruler_remote_write_sigv4_config:
per_tenant_override_config: ""
per_tenant_override_period: 230s
query_timeout: 5m
shard_streams:
enabled: true
desired_rate: 4mb
logging_enabled: true
`
inputJSON := `
{
@ -108,7 +112,12 @@ query_timeout: 5m
},
"per_tenant_override_config": "",
"per_tenant_override_period": "230s",
"query_timeout": "5m"
"query_timeout": "5m",
"shard_streams": {
"desired_rate": "4mb",
"enabled": true,
"logging_enabled": true
}
}
`

Loading…
Cancel
Save