diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4382f129fe..3a54be745a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -3,6 +3,7 @@ package distributor import ( "context" "flag" + "fmt" "math" "net/http" "sort" @@ -336,6 +337,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log pushSize := 0 for _, entry := range stream.Entries { if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil { + d.writeFailuresManager.Log(tenantID, err) validationErr = err continue } @@ -373,6 +375,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } }() + if validationErr != nil { + validationErr = httpgrpc.Errorf(http.StatusBadRequest, validationErr.Error()) + } + // Return early if none of the streams contained entries if len(streams) == 0 { return &logproto.PushResponse{}, validationErr @@ -383,7 +389,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // Return a 429 to indicate to the client they are being rate limited validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount)) validation.DiscardedBytes.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineSize)) - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize) + + err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize) + d.writeFailuresManager.Log(tenantID, err) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error()) } const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck @@ -649,7 +658,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, ls, err := syntax.ParseLabels(key) if err != nil { - return "", 0, httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err) + return "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err) } if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil { diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index d4dec900a9..1174109042 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -62,8 +62,6 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { return } - d.writeFailuresManager.Log(tenantID, err) - resp, ok := httpgrpc.HTTPResponseFromError(err) if ok { body := string(resp.Body) diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 50c0584f8d..cc2f13c7f3 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -2,12 +2,11 @@ package distributor import ( "errors" - "net/http" + "fmt" "strings" "time" "github.com/prometheus/prometheus/model/labels" - "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/validation" @@ -71,14 +70,14 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log formatedRejectMaxAgeTime := time.Unix(0, ctx.rejectOldSampleMaxAge).Format(timeFormat) validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line))) - return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime) + return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime) } if ts > ctx.creationGracePeriod { formatedEntryTime := entry.Timestamp.Format(timeFormat) validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line))) - return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, formatedEntryTime) + return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime) } if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize { @@ -88,7 +87,7 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log // for parity. validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line))) - return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line)) + return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line)) } return nil @@ -98,25 +97,25 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error { if len(ls) == 0 { validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc() - return httpgrpc.Errorf(http.StatusBadRequest, validation.MissingLabelsErrorMsg) + return fmt.Errorf(validation.MissingLabelsErrorMsg) } numLabelNames := len(ls) if numLabelNames > ctx.maxLabelNamesPerSeries { updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream) - return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries) + return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries) } lastLabelName := "" for _, l := range ls { if len(l.Name) > ctx.maxLabelNameLength { updateMetrics(validation.LabelNameTooLong, ctx.userID, stream) - return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name) + return fmt.Errorf(validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name) } else if len(l.Value) > ctx.maxLabelValueLength { updateMetrics(validation.LabelValueTooLong, ctx.userID, stream) - return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value) + return fmt.Errorf(validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value) } else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 { updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream) - return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name) + return fmt.Errorf(validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name) } lastLabelName = l.Name } diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go index 64ae423c5e..3cd063aaa9 100644 --- a/pkg/distributor/validator_test.go +++ b/pkg/distributor/validator_test.go @@ -1,7 +1,8 @@ package distributor import ( - "net/http" + "errors" + "fmt" "testing" "time" @@ -9,7 +10,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" - "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" @@ -59,9 +59,7 @@ func TestValidator_ValidateEntry(t *testing.T) { }, }, logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"}, - httpgrpc.Errorf( - http.StatusBadRequest, - validation.GreaterThanMaxSampleAgeErrorMsg, + fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, testStreamLabels, testTime.Add(-time.Hour*5).Format(timeFormat), testTime.Add(-1*time.Hour).Format(timeFormat), // same as RejectOldSamplesMaxAge @@ -72,7 +70,7 @@ func TestValidator_ValidateEntry(t *testing.T) { "test", nil, logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"}, - httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)), + fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)), }, { "line too long", @@ -83,7 +81,7 @@ func TestValidator_ValidateEntry(t *testing.T) { }, }, logproto.Entry{Timestamp: testTime, Line: "12345678901"}, - httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, 10, testStreamLabels, 11), + fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabels, 11), }, } for _, tt := range tests { @@ -121,7 +119,7 @@ func TestValidator_ValidateLabels(t *testing.T) { "test", nil, "{}", - httpgrpc.Errorf(http.StatusBadRequest, validation.MissingLabelsErrorMsg), + fmt.Errorf(validation.MissingLabelsErrorMsg), }, { "test too many labels", @@ -130,7 +128,7 @@ func TestValidator_ValidateLabels(t *testing.T) { &validation.Limits{MaxLabelNamesPerSeries: 2}, }, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", - httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2), + fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2), }, { "label name too long", @@ -142,7 +140,7 @@ func TestValidator_ValidateLabels(t *testing.T) { }, }, "{fooooo=\"bar\"}", - httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, "{fooooo=\"bar\"}", "fooooo"), + fmt.Errorf(validation.LabelNameTooLongErrorMsg, "{fooooo=\"bar\"}", "fooooo"), }, { "label value too long", @@ -155,7 +153,7 @@ func TestValidator_ValidateLabels(t *testing.T) { }, }, "{foo=\"barrrrrr\"}", - httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, "{foo=\"barrrrrr\"}", "barrrrrr"), + fmt.Errorf(validation.LabelValueTooLongErrorMsg, "{foo=\"barrrrrr\"}", "barrrrrr"), }, { "duplicate label", @@ -168,7 +166,7 @@ func TestValidator_ValidateLabels(t *testing.T) { }, }, "{foo=\"bar\", foo=\"barf\"}", - httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, "{foo=\"bar\", foo=\"barf\"}", "foo"), + fmt.Errorf(validation.DuplicateLabelNamesErrorMsg, "{foo=\"bar\", foo=\"barf\"}", "foo"), }, { "label value contains %", @@ -181,10 +179,7 @@ func TestValidator_ValidateLabels(t *testing.T) { }, }, "{foo=\"bar\", foo=\"barf%s\"}", - httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{ - Code: int32(http.StatusBadRequest), - Body: []byte("stream '{foo=\"bar\", foo=\"barf%s\"}' has label value too long: 'barf%s'"), // Intentionally construct the string to make sure %s isn't substituted as (MISSING) - }), + errors.New("stream '{foo=\"bar\", foo=\"barf%s\"}' has label value too long: 'barf%s'"), // Intentionally construct the string to make sure %s isn't substituted as (MISSING) }, } for _, tt := range tests { diff --git a/pkg/distributor/writefailures/manager.go b/pkg/distributor/writefailures/manager.go index 770cc3a63b..cf920b90e3 100644 --- a/pkg/distributor/writefailures/manager.go +++ b/pkg/distributor/writefailures/manager.go @@ -32,6 +32,10 @@ func NewManager(logger log.Logger, cfg Cfg, tenants *runtime.TenantConfigs) *Man } func (m *Manager) Log(tenantID string, err error) { + if m == nil { + return + } + if !m.tenantCfgs.LimitedLogPushErrors(tenantID) { return } diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 27b2fb5ead..8056ad82da 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -15,6 +15,7 @@ import ( "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/distributor/writefailures" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" @@ -67,7 +68,7 @@ func TestIngesterWAL(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -110,7 +111,7 @@ func TestIngesterWAL(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -124,7 +125,7 @@ func TestIngesterWAL(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -147,7 +148,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -193,7 +194,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { require.NoError(t, err) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -250,7 +251,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -271,7 +272,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester, ensuring we replayed from WAL. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -292,7 +293,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -313,7 +314,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester, ensuring we can replay from the checkpoint as well. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -449,7 +450,7 @@ func Test_SeriesIterator(t *testing.T) { limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) for i := 0; i < 3; i++ { - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator()) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator(), nil) require.Nil(t, err) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}})) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}})) @@ -496,7 +497,7 @@ func Benchmark_SeriesIterator(b *testing.B) { limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) for i := range instances { - inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator()) + inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, NewStreamRateCalculator(), nil) require.NoError(b, inst.Push(context.Background(), &logproto.PushRequest{ @@ -588,7 +589,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -660,7 +661,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) { require.NoError(t, err) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index c7b2494640..01ac5dfd49 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/dskit/tenant" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/distributor/writefailures" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/iter" @@ -260,7 +261,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) + ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2419add46f..34cca0189a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -11,8 +11,6 @@ import ( "sync" "time" - "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" - "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/modules" @@ -30,6 +28,7 @@ import ( "github.com/grafana/loki/pkg/analytics" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/distributor/writefailures" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/ingester/index" "github.com/grafana/loki/pkg/iter" @@ -42,6 +41,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" index_stats "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" @@ -242,10 +242,12 @@ type Ingester struct { chunkFilter chunk.RequestChunkFilterer streamRateCalculator *StreamRateCalculator + + writeLogManager *writefailures.Manager } // New makes a new Ingester. -func New(cfg Config, clientConfig client.Config, store ChunkStore, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, store ChunkStore, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg) (*Ingester, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } @@ -271,6 +273,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits Limits flushOnShutdownSwitch: &OnceSwitch{}, terminateOnShutdown: false, streamRateCalculator: NewStreamRateCalculator(), + writeLogManager: writefailures.NewManager(util_log.Logger, writeFailuresCfg, configs), } i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) @@ -803,8 +806,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro if err != nil { return &logproto.PushResponse{}, err } - err = instance.Push(ctx, req) - return &logproto.PushResponse{}, err + return &logproto.PushResponse{}, instance.Push(ctx, req) } // GetStreamRates returns a response containing all streams and their current rate @@ -834,7 +836,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error - inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.streamRateCalculator) + inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.streamRateCalculator, i.writeLogManager) if err != nil { return nil, err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f26a0695c3..e01a8e7add 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/dskit/tenant" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/distributor/writefailures" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/ingester/index" "github.com/grafana/loki/pkg/iter" @@ -50,7 +51,7 @@ func TestPrepareShutdownMarkerPathNotSet(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -73,7 +74,7 @@ func TestPrepareShutdown(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -134,7 +135,7 @@ func TestIngester_GetStreamRates_Correctness(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -166,7 +167,7 @@ func BenchmarkGetStreamRatesAllocs(b *testing.B) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(b, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -190,7 +191,7 @@ func TestIngester(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -372,7 +373,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -719,7 +720,7 @@ func Test_InMemoryLabels(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -1059,7 +1060,7 @@ func TestStats(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) i.instances["test"] = defaultInstance(t) @@ -1086,7 +1087,7 @@ func TestSeriesVolume(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) i.instances["test"] = defaultInstance(t) @@ -1164,7 +1165,7 @@ func createIngesterServer(t *testing.T, ingesterConfig Config) (ingesterClient, limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil) + ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) listener := bufconn.Listen(1024 * 1024) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index b681f224e1..1e7ce267ed 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -8,8 +8,6 @@ import ( "syscall" "time" - "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" - "github.com/go-kit/log/level" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -23,6 +21,7 @@ import ( "go.uber.org/atomic" "github.com/grafana/loki/pkg/analytics" + "github.com/grafana/loki/pkg/distributor/writefailures" "github.com/grafana/loki/pkg/ingester/index" "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/iter" @@ -34,6 +33,7 @@ import ( "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/deletion" util_log "github.com/grafana/loki/pkg/util/log" @@ -106,6 +106,8 @@ type instance struct { chunkFilter chunk.RequestChunkFilterer streamRateCalculator *StreamRateCalculator + + writeFailures *writefailures.Manager } func newInstance( @@ -119,6 +121,7 @@ func newInstance( flushOnShutdownSwitch *OnceSwitch, chunkFilter chunk.RequestChunkFilterer, streamRateCalculator *StreamRateCalculator, + writeFailures *writefailures.Manager, ) (*instance, error) { invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards)) if err != nil { @@ -145,6 +148,8 @@ func newInstance( chunkFilter: chunkFilter, streamRateCalculator: streamRateCalculator, + + writeFailures: writeFailures, } i.mapper = newFPMapper(i.getLabelsFromFingerprint) return i, err @@ -274,7 +279,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) - s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics) + s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures) // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). if record != nil { @@ -306,7 +311,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *stream { sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(ls), fp) - s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics) + s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures) i.streamsCreatedTotal.Inc() memoryStreams.WithLabelValues(i.instanceID).Inc() diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 0a9042502f..73b46de321 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -61,7 +61,7 @@ func TestLabelsCollisions(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator()) + i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil) require.Nil(t, err) // avoid entries from the future. @@ -89,7 +89,7 @@ func TestConcurrentPushes(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator()) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil) require.Nil(t, err) const ( @@ -141,7 +141,7 @@ func TestGetStreamRates(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator()) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil) require.NoError(t, err) const ( @@ -235,7 +235,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator()) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil) require.Nil(t, err) lbls := makeRandomLabels() @@ -280,7 +280,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { cfg.SyncMinUtilization = 0.20 cfg.IndexShards = indexShards - instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator()) + instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil) require.Nil(t, err) currentTime := time.Now() @@ -294,7 +294,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { for _, testStream := range testStreams { stream, err := instance.getOrCreateStream(testStream, recordPool.GetRecord()) require.NoError(t, err) - chunk := newStream(cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics).NewChunk() + chunk := newStream(cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil).NewChunk() for _, entry := range testStream.Entries { err = chunk.Append(&entry) require.NoError(t, err) @@ -487,7 +487,7 @@ func Benchmark_PushInstance(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator()) + i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil) ctx := context.Background() for n := 0; n < b.N; n++ { @@ -531,7 +531,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { ctx := context.Background() - inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator()) + inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil) t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil, 10) require.NoError(b, err) for i := 0; i < 10000; i++ { @@ -547,7 +547,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { lbs := makeRandomLabels() b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics)) + inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil)) } }) } @@ -800,7 +800,7 @@ func TestStreamShardingUsage(t *testing.T) { }) t.Run("invalid push returns error", func(t *testing.T) { - i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator()) + i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant1, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil) ctx := context.Background() err = i.Push(ctx, &logproto.PushRequest{ @@ -819,7 +819,7 @@ func TestStreamShardingUsage(t *testing.T) { }) t.Run("valid push returns no error", func(t *testing.T) { - i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator()) + i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, customTenant2, limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, NewStreamRateCalculator(), nil) ctx := context.Background() err = i.Push(ctx, &logproto.PushRequest{ @@ -935,6 +935,7 @@ func defaultInstance(t *testing.T) *instance { nil, nil, NewStreamRateCalculator(), + nil, ) require.Nil(t, err) insertData(t, instance) diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index 6a663b7f42..6990073fc4 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" + "github.com/grafana/loki/pkg/distributor/writefailures" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/logproto" @@ -206,7 +207,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) mkSample := func(i int) *logproto.PushRequest { @@ -240,7 +241,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { require.Equal(t, false, iter.Next()) // create a new ingester now - i, err = New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil) + i, err = New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}) require.NoError(t, err) // recover the checkpointed series diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 18c8101255..364cc06bb7 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -16,6 +16,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/distributor/writefailures" "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" @@ -71,6 +72,8 @@ type stream struct { unorderedWrites bool streamRateCalculator *StreamRateCalculator + + writeFailures *writefailures.Manager } type chunkDesc struct { @@ -88,7 +91,7 @@ type entryWithError struct { e error } -func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics) *stream { +func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics, writeFailures *writefailures.Manager) *stream { hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName) return &stream{ limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second), @@ -104,6 +107,7 @@ func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model. streamRateCalculator: streamRateCalculator, unorderedWrites: unorderedWrites, + writeFailures: writeFailures, } } @@ -312,6 +316,7 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry) (in chunk.lastUpdated = time.Now() if err := chunk.chunk.Append(&entries[i]); err != nil { + s.writeFailures.Log(s.tenant, err) invalid = append(invalid, entryWithError{&entries[i], err}) if chunkenc.IsOutOfOrderErr(err) { outOfOrderSamples++ @@ -365,6 +370,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh now := time.Now() if !rateLimitWholeStream && !s.limiter.AllowN(now, len(entries[i].Line)) { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(lineBytes)}}) + s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e) rateLimitedSamples++ rateLimitedBytes += lineBytes continue @@ -374,6 +380,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2) if !isReplay && s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(cutoff)}) + s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e) outOfOrderSamples++ outOfOrderBytes += lineBytes continue diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index f6c13689df..40e9977342 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -63,6 +63,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ) _, err := s.Push(context.Background(), []logproto.Entry{ @@ -110,6 +111,7 @@ func TestPushDeduplication(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ) written, err := s.Push(context.Background(), []logproto.Entry{ @@ -140,6 +142,7 @@ func TestPushRejectOldCounter(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ) // counter should be 2 now since the first line will be deduped @@ -239,6 +242,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ) s.highestTs = time.Now() @@ -269,6 +273,7 @@ func TestUnorderedPush(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ) for _, x := range []struct { @@ -366,6 +371,7 @@ func TestPushRateLimit(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ) entries := []logproto.Entry{ @@ -400,6 +406,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ) entries := []logproto.Entry{ @@ -433,6 +440,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ) base := time.Now() @@ -482,7 +490,7 @@ func Benchmark_PushStream(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) - s := newStream(&Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics) + s := newStream(&Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil) t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10) require.NoError(b, err) diff --git a/pkg/ingester/streams_map_test.go b/pkg/ingester/streams_map_test.go index b07d6f6266..2769d9b6b2 100644 --- a/pkg/ingester/streams_map_test.go +++ b/pkg/ingester/streams_map_test.go @@ -27,6 +27,7 @@ func TestStreamsMap(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ), newStream( defaultConfig(), @@ -39,6 +40,7 @@ func TestStreamsMap(t *testing.T) { true, NewStreamRateCalculator(), NilMetrics, + nil, ), } var s *stream diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index de6665f991..889c8d08f6 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -126,7 +126,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete if tenantsRetention != nil { lbs, err := syntax.ParseLabels(s.Labels) if err != nil { - return nil, err + return nil, fmt.Errorf("couldn't parse labels: %w", err) } retentionHours = fmt.Sprintf("%d", int64(math.Floor(tenantsRetention.RetentionPeriodFor(userID, lbs).Hours()))) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 95cee4cddc..fc206dc457 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -468,7 +468,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) { level.Warn(util_log.Logger).Log("msg", "The config setting shutdown marker path is not set. The /ingester/prepare_shutdown endpoint won't work") } - t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer, t.Cfg.Distributor.WriteFailuresLogging) if err != nil { return }