Ensure labels on sharded streams are sorted before they're sent anywhere (#7556)

Pushing streams with unsorted labels to ingesters results in a bug where
ingesters duplicate chunks for streams. This causes a memory leak and
eventual OOM crash when ingesters recover from the WAL because one of
the duplicate streams is orphaned.

This PR ensures that sharded streams have sorted labels to avoid this
bug.
pull/7557/head
Travis Patterson 3 years ago committed by GitHub
parent b523ff336e
commit 0a654e2357
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      pkg/distributor/distributor.go
  2. 11
      pkg/distributor/distributor_test.go

@ -6,6 +6,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -498,6 +499,8 @@ func labelTemplate(lbls string) labels.Labels {
copy(streamLabels, baseLbls) copy(streamLabels, baseLbls)
streamLabels[len(baseLbls)] = labels.Label{Name: ingester.ShardLbName, Value: ingester.ShardLbPlaceholder} streamLabels[len(baseLbls)] = labels.Label{Name: ingester.ShardLbName, Value: ingester.ShardLbPlaceholder}
sort.Sort(labels.Labels(streamLabels))
return streamLabels return streamLabels
} }
@ -508,7 +511,13 @@ func (d *Distributor) createShard(streamshardCfg *shardstreams.Config, stream lo
} }
shardLabel := strconv.Itoa(shardNumber) shardLabel := strconv.Itoa(shardNumber)
lbls[len(lbls)-1] = labels.Label{Name: ingester.ShardLbName, Value: shardLabel} for i := 0; i < len(lbls); i++ {
if lbls[i].Name == ingester.ShardLbName {
lbls[i].Value = shardLabel
break
}
}
return logproto.Stream{ return logproto.Stream{
Labels: strings.Replace(streamPattern, ingester.ShardLbPlaceholder, shardLabel, 1), Labels: strings.Replace(streamPattern, ingester.ShardLbPlaceholder, shardLabel, 1),
Hash: lbls.Hash(), Hash: lbls.Hash(),
@ -631,7 +640,6 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
return "", 0, httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err) return "", 0, httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err)
} }
// ensure labels are correctly sorted.
if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil { if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
return "", 0, err return "", 0, err
} }

@ -470,7 +470,7 @@ func Test_TruncateLogLines(t *testing.T) {
func TestStreamShard(t *testing.T) { func TestStreamShard(t *testing.T) {
// setup base stream. // setup base stream.
baseStream := logproto.Stream{} baseStream := logproto.Stream{}
baseLabels := "{app='myapp', job='fizzbuzz'}" baseLabels := "{app='myapp'}"
lbs, err := syntax.ParseLabels(baseLabels) lbs, err := syntax.ParseLabels(baseLabels)
require.NoError(t, err) require.NoError(t, err)
baseStream.Hash = lbs.Hash() baseStream.Hash = lbs.Hash()
@ -585,6 +585,15 @@ func TestStreamShard(t *testing.T) {
_, derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake") _, derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake")
require.Len(t, derivedStreams, tc.wantDerivedStreamSize) require.Len(t, derivedStreams, tc.wantDerivedStreamSize)
for _, s := range derivedStreams {
// Generate sorted labels
lbls, err := syntax.ParseLabels(s.stream.Labels)
require.NoError(t, err)
require.Equal(t, lbls.Hash(), s.stream.Hash)
require.Equal(t, lbls.String(), s.stream.Labels)
}
}) })
} }
} }

Loading…
Cancel
Save