feat: Ignore empty streams in distributor if all entries fail validation (#13674)

pull/13512/head
benclive 10 months ago committed by GitHub
parent 8a3ae223ba
commit 6c4b0622aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      pkg/distributor/distributor.go
  2. 20
      pkg/distributor/distributor_test.go

@ -438,6 +438,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
pushSize += len(entry.Line)
}
stream.Entries = stream.Entries[:n]
if len(stream.Entries) == 0 {
// Empty stream after validating all the entries
continue
}
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
if shardStreamsCfg.Enabled {

@ -589,6 +589,26 @@ func Test_TruncateLogLines(t *testing.T) {
})
}
func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) {
setup := func() (*validation.Limits, *mockIngester) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.MaxLineSize = 5
return limits, &mockIngester{}
}
t.Run("it discards invalid entries and discards resulting empty streams completely", func(t *testing.T) {
limits, ingester := setup()
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\", service_name=\"unknown_service\"}", 10)))
topVal := ingester.Peek()
require.Nil(t, topVal)
})
}
func TestStreamShard(t *testing.T) {
// setup base stream.
baseStream := logproto.Stream{}

Loading…
Cancel
Save