Log push failures per-entry instead of in batches (#9720)

**What this PR does / why we need it**:
Modify how we log push errors to do it per-entry instead of accumulating
all errors and then logging them all in a single pass.

**Which issue(s) this PR fixes**:
N/A
pull/9741/head
Dylan Guedes 3 years ago committed by GitHub
parent 62bb5b27ae
commit c0d9a2b4ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      pkg/distributor/distributor.go
  2. 2
      pkg/distributor/http.go
  3. 19
      pkg/distributor/validator.go
  4. 27
      pkg/distributor/validator_test.go
  5. 4
      pkg/distributor/writefailures/manager.go
  6. 27
      pkg/ingester/checkpoint_test.go
  7. 3
      pkg/ingester/flush_test.go
  8. 14
      pkg/ingester/ingester.go
  9. 21
      pkg/ingester/ingester_test.go
  10. 13
      pkg/ingester/instance.go
  11. 23
      pkg/ingester/instance_test.go
  12. 5
      pkg/ingester/recovery_test.go
  13. 9
      pkg/ingester/stream.go
  14. 10
      pkg/ingester/stream_test.go
  15. 2
      pkg/ingester/streams_map_test.go
  16. 2
      pkg/loghttp/push/push.go
  17. 2
      pkg/loki/modules.go

@ -3,6 +3,7 @@ package distributor
import (
"context"
"flag"
"fmt"
"math"
"net/http"
"sort"
@ -336,6 +337,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
pushSize := 0
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErr = err
continue
}
@ -373,6 +375,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}()
if validationErr != nil {
validationErr = httpgrpc.Errorf(http.StatusBadRequest, validationErr.Error())
}
// Return early if none of the streams contained entries
if len(streams) == 0 {
return &logproto.PushResponse{}, validationErr
@ -383,7 +389,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Return a 429 to indicate to the client they are being rate limited
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
d.writeFailuresManager.Log(tenantID, err)
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
}
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
@ -649,7 +658,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
ls, err := syntax.ParseLabels(key)
if err != nil {
return "", 0, httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err)
return "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
}
if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {

@ -62,8 +62,6 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
return
}
d.writeFailuresManager.Log(tenantID, err)
resp, ok := httpgrpc.HTTPResponseFromError(err)
if ok {
body := string(resp.Body)

@ -2,12 +2,11 @@ package distributor
import (
"errors"
"net/http"
"fmt"
"strings"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/validation"
@ -71,14 +70,14 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
formatedRejectMaxAgeTime := time.Unix(0, ctx.rejectOldSampleMaxAge).Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
}
if ts > ctx.creationGracePeriod {
formatedEntryTime := entry.Timestamp.Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
}
if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize {
@ -88,7 +87,7 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}
return nil
@ -98,25 +97,25 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error {
if len(ls) == 0 {
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc()
return httpgrpc.Errorf(http.StatusBadRequest, validation.MissingLabelsErrorMsg)
return fmt.Errorf(validation.MissingLabelsErrorMsg)
}
numLabelNames := len(ls)
if numLabelNames > ctx.maxLabelNamesPerSeries {
updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
}
lastLabelName := ""
for _, l := range ls {
if len(l.Name) > ctx.maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name)
return fmt.Errorf(validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name)
} else if len(l.Value) > ctx.maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value)
return fmt.Errorf(validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value)
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name)
return fmt.Errorf(validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name)
}
lastLabelName = l.Name
}

@ -1,7 +1,8 @@
package distributor
import (
"net/http"
"errors"
"fmt"
"testing"
"time"
@ -9,7 +10,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
@ -59,9 +59,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"},
httpgrpc.Errorf(
http.StatusBadRequest,
validation.GreaterThanMaxSampleAgeErrorMsg,
fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg,
testStreamLabels,
testTime.Add(-time.Hour*5).Format(timeFormat),
testTime.Add(-1*time.Hour).Format(timeFormat), // same as RejectOldSamplesMaxAge
@ -72,7 +70,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
"test",
nil,
logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)),
fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)),
},
{
"line too long",
@ -83,7 +81,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901"},
httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, 10, testStreamLabels, 11),
fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabels, 11),
},
}
for _, tt := range tests {
@ -121,7 +119,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
"test",
nil,
"{}",
httpgrpc.Errorf(http.StatusBadRequest, validation.MissingLabelsErrorMsg),
fmt.Errorf(validation.MissingLabelsErrorMsg),
},
{
"test too many labels",
@ -130,7 +128,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
&validation.Limits{MaxLabelNamesPerSeries: 2},
},
"{foo=\"bar\",food=\"bars\",fed=\"bears\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2),
fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2),
},
{
"label name too long",
@ -142,7 +140,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
},
},
"{fooooo=\"bar\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, "{fooooo=\"bar\"}", "fooooo"),
fmt.Errorf(validation.LabelNameTooLongErrorMsg, "{fooooo=\"bar\"}", "fooooo"),
},
{
"label value too long",
@ -155,7 +153,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
},
},
"{foo=\"barrrrrr\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, "{foo=\"barrrrrr\"}", "barrrrrr"),
fmt.Errorf(validation.LabelValueTooLongErrorMsg, "{foo=\"barrrrrr\"}", "barrrrrr"),
},
{
"duplicate label",
@ -168,7 +166,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
},
},
"{foo=\"bar\", foo=\"barf\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, "{foo=\"bar\", foo=\"barf\"}", "foo"),
fmt.Errorf(validation.DuplicateLabelNamesErrorMsg, "{foo=\"bar\", foo=\"barf\"}", "foo"),
},
{
"label value contains %",
@ -181,10 +179,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
},
},
"{foo=\"bar\", foo=\"barf%s\"}",
httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{
Code: int32(http.StatusBadRequest),
Body: []byte("stream '{foo=\"bar\", foo=\"barf%s\"}' has label value too long: 'barf%s'"), // Intentionally construct the string to make sure %s isn't substituted as (MISSING)
}),
errors.New("stream '{foo=\"bar\", foo=\"barf%s\"}' has label value too long: 'barf%s'"), // Intentionally construct the string to make sure %s isn't substituted as (MISSING)
},
}
for _, tt := range tests {

@ -32,6 +32,10 @@ func NewManager(logger log.Logger, cfg Cfg, tenants *runtime.TenantConfigs) *Man
}
func (m *Manager) Log(tenantID string, err error) {
if m == nil {
return
}
if !m.tenantCfgs.LimitedLogPushErrors(tenantID) {
return
}

@ -15,6 +15,7 @@ import (
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
@ -67,7 +68,7 @@ func TestIngesterWAL(t *testing.T) {
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -110,7 +111,7 @@ func TestIngesterWAL(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)
// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -124,7 +125,7 @@ func TestIngesterWAL(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -147,7 +148,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -193,7 +194,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.NoError(t, err)
// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -250,7 +251,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -271,7 +272,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)
// restart the ingester, ensuring we replayed from WAL.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -292,7 +293,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -313,7 +314,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// restart the ingester, ensuring we can replay from the checkpoint as well.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -449,7 +450,7 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := 0; i < 3; i++ {
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator())
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
@ -496,7 +497,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := range instances {
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator())
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator(), nil)
require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
@ -588,7 +589,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -660,7 +661,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
require.NoError(t, err)
// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))

@ -23,6 +23,7 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/iter"
@ -260,7 +261,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

@ -11,8 +11,6 @@ import (
"sync"
"time"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/modules"
@ -30,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/analytics"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/ingester/index"
"github.com/grafana/loki/pkg/iter"
@ -42,6 +41,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
index_stats "github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
@ -242,10 +242,12 @@ type Ingester struct {
chunkFilter chunk.RequestChunkFilterer
streamRateCalculator *StreamRateCalculator
writeLogManager *writefailures.Manager
}
// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
@ -271,6 +273,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits Limits
flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
streamRateCalculator: NewStreamRateCalculator(),
writeLogManager: writefailures.NewManager(util_log.Logger, writeFailuresCfg, configs),
}
i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})
@ -803,8 +806,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
if err != nil {
return &logproto.PushResponse{}, err
}
err = instance.Push(ctx, req)
return &logproto.PushResponse{}, err
return &logproto.PushResponse{}, instance.Push(ctx, req)
}
// GetStreamRates returns a response containing all streams and their current rate
@ -834,7 +836,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.streamRateCalculator)
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.streamRateCalculator, i.writeLogManager)
if err != nil {
return nil, err
}

@ -28,6 +28,7 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/ingester/index"
"github.com/grafana/loki/pkg/iter"
@ -50,7 +51,7 @@ func TestPrepareShutdownMarkerPathNotSet(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -73,7 +74,7 @@ func TestPrepareShutdown(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -134,7 +135,7 @@ func TestIngester_GetStreamRates_Correctness(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -166,7 +167,7 @@ func BenchmarkGetStreamRatesAllocs(b *testing.B) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(b, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -190,7 +191,7 @@ func TestIngester(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -372,7 +373,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -719,7 +720,7 @@ func Test_InMemoryLabels(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -1059,7 +1060,7 @@ func TestStats(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
i.instances["test"] = defaultInstance(t)
@ -1086,7 +1087,7 @@ func TestSeriesVolume(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
i.instances["test"] = defaultInstance(t)
@ -1164,7 +1165,7 @@ func createIngesterServer(t *testing.T, ingesterConfig Config) (ingesterClient,
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil)
ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
listener := bufconn.Listen(1024 * 1024)

@ -8,8 +8,6 @@ import (
"syscall"
"time"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
@ -23,6 +21,7 @@ import (
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/analytics"
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester/index"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/iter"
@ -34,6 +33,7 @@ import (
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/deletion"
util_log "github.com/grafana/loki/pkg/util/log"
@ -106,6 +106,8 @@ type instance struct {
chunkFilter chunk.RequestChunkFilterer
streamRateCalculator *StreamRateCalculator
writeFailures *writefailures.Manager
}
func newInstance(
@ -119,6 +121,7 @@ func newInstance(
flushOnShutdownSwitch *OnceSwitch,
chunkFilter chunk.RequestChunkFilterer,
streamRateCalculator *StreamRateCalculator,
writeFailures *writefailures.Manager,
) (*instance, error) {
invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards))
if err != nil {
@ -145,6 +148,8 @@ func newInstance(
chunkFilter: chunkFilter,
streamRateCalculator: streamRateCalculator,
writeFailures: writeFailures,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i, err
@ -274,7 +279,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)
// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
if record != nil {
@ -306,7 +311,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor
func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *stream {
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(ls), fp)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)
i.streamsCreatedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Inc()

@ -61,7 +61,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
// avoid entries from the future.
@ -89,7 +89,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
const (
@ -141,7 +141,7 @@ func TestGetStreamRates(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
require.NoError(t, err)
const (
@ -235,7 +235,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
lbls := makeRandomLabels()
@ -280,7 +280,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
cfg.SyncMinUtilization = 0.20
cfg.IndexShards = indexShards
instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
require.Nil(t, err)
currentTime := time.Now()
@ -294,7 +294,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
for _, testStream := range testStreams {
stream, err := instance.getOrCreateStream(testStream, recordPool.GetRecord())
require.NoError(t, err)
chunk := newStream(cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics).NewChunk()
chunk := newStream(cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
@ -487,7 +487,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
ctx := context.Background()
for n := 0; n < b.N; n++ {
@ -531,7 +531,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()
inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil, 10)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
@ -547,7 +547,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
lbs := makeRandomLabels()
b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics))
inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil))
}
})
}
@ -800,7 +800,7 @@ func TestStreamShardingUsage(t *testing.T) {
})
t.Run("invalid push returns error", func(t *testing.T) {
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
@ -819,7 +819,7 @@ func TestStreamShardingUsage(t *testing.T) {
})
t.Run("valid push returns no error", func(t *testing.T) {
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator())
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil)
ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
@ -935,6 +935,7 @@ func defaultInstance(t *testing.T) *instance {
nil,
nil,
NewStreamRateCalculator(),
nil,
)
require.Nil(t, err)
insertData(t, instance)

@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/logproto"
@ -206,7 +207,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
mkSample := func(i int) *logproto.PushRequest {
@ -240,7 +241,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) {
require.Equal(t, false, iter.Next())
// create a new ingester now
i, err = New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil)
i, err = New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
require.NoError(t, err)
// recover the checkpointed series

@ -16,6 +16,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
@ -71,6 +72,8 @@ type stream struct {
unorderedWrites bool
streamRateCalculator *StreamRateCalculator
writeFailures *writefailures.Manager
}
type chunkDesc struct {
@ -88,7 +91,7 @@ type entryWithError struct {
e error
}
func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics) *stream {
func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics, writeFailures *writefailures.Manager) *stream {
hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName)
return &stream{
limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second),
@ -104,6 +107,7 @@ func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.
streamRateCalculator: streamRateCalculator,
unorderedWrites: unorderedWrites,
writeFailures: writeFailures,
}
}
@ -312,6 +316,7 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry) (in
chunk.lastUpdated = time.Now()
if err := chunk.chunk.Append(&entries[i]); err != nil {
s.writeFailures.Log(s.tenant, err)
invalid = append(invalid, entryWithError{&entries[i], err})
if chunkenc.IsOutOfOrderErr(err) {
outOfOrderSamples++
@ -365,6 +370,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh
now := time.Now()
if !rateLimitWholeStream && !s.limiter.AllowN(now, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(lineBytes)}})
s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e)
rateLimitedSamples++
rateLimitedBytes += lineBytes
continue
@ -374,6 +380,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh
cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2)
if !isReplay && s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(cutoff)})
s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e)
outOfOrderSamples++
outOfOrderBytes += lineBytes
continue

@ -63,6 +63,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
)
_, err := s.Push(context.Background(), []logproto.Entry{
@ -110,6 +111,7 @@ func TestPushDeduplication(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
)
written, err := s.Push(context.Background(), []logproto.Entry{
@ -140,6 +142,7 @@ func TestPushRejectOldCounter(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
)
// counter should be 2 now since the first line will be deduped
@ -239,6 +242,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
)
s.highestTs = time.Now()
@ -269,6 +273,7 @@ func TestUnorderedPush(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
)
for _, x := range []struct {
@ -366,6 +371,7 @@ func TestPushRateLimit(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
)
entries := []logproto.Entry{
@ -400,6 +406,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
)
entries := []logproto.Entry{
@ -433,6 +440,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
)
base := time.Now()
@ -482,7 +490,7 @@ func Benchmark_PushStream(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
s := newStream(&Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics)
s := newStream(&Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10)
require.NoError(b, err)

@ -27,6 +27,7 @@ func TestStreamsMap(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
),
newStream(
defaultConfig(),
@ -39,6 +40,7 @@ func TestStreamsMap(t *testing.T) {
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
),
}
var s *stream

@ -126,7 +126,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
if tenantsRetention != nil {
lbs, err := syntax.ParseLabels(s.Labels)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't parse labels: %w", err)
}
retentionHours = fmt.Sprintf("%d", int64(math.Floor(tenantsRetention.RetentionPeriodFor(userID, lbs).Hours())))
}

@ -468,7 +468,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
level.Warn(util_log.Logger).Log("msg", "The config setting shutdown marker path is not set. The /ingester/prepare_shutdown endpoint won't work")
}
t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer)
t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer, t.Cfg.Distributor.WriteFailuresLogging)
if err != nil {
return
}

Loading…
Cancel
Save