diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6289369909..4659b292c1 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -6,6 +6,7 @@ import ( "math" "math/rand" "net/http" + "sort" "strconv" "strings" "time" @@ -498,6 +499,8 @@ func labelTemplate(lbls string) labels.Labels { copy(streamLabels, baseLbls) streamLabels[len(baseLbls)] = labels.Label{Name: ingester.ShardLbName, Value: ingester.ShardLbPlaceholder} + sort.Sort(labels.Labels(streamLabels)) + return streamLabels } @@ -508,7 +511,13 @@ func (d *Distributor) createShard(streamshardCfg *shardstreams.Config, stream lo } shardLabel := strconv.Itoa(shardNumber) - lbls[len(lbls)-1] = labels.Label{Name: ingester.ShardLbName, Value: shardLabel} + for i := 0; i < len(lbls); i++ { + if lbls[i].Name == ingester.ShardLbName { + lbls[i].Value = shardLabel + break + } + } + return logproto.Stream{ Labels: strings.Replace(streamPattern, ingester.ShardLbPlaceholder, shardLabel, 1), Hash: lbls.Hash(), @@ -631,7 +640,6 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, return "", 0, httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err) } - // ensure labels are correctly sorted. if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil { return "", 0, err } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 962c4fe08c..4079640890 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -470,7 +470,7 @@ func Test_TruncateLogLines(t *testing.T) { func TestStreamShard(t *testing.T) { // setup base stream. baseStream := logproto.Stream{} - baseLabels := "{app='myapp', job='fizzbuzz'}" + baseLabels := "{app='myapp'}" lbs, err := syntax.ParseLabels(baseLabels) require.NoError(t, err) baseStream.Hash = lbs.Hash() @@ -585,6 +585,15 @@ func TestStreamShard(t *testing.T) { _, derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake") require.Len(t, derivedStreams, tc.wantDerivedStreamSize) + + for _, s := range derivedStreams { + // Generate sorted labels + lbls, err := syntax.ParseLabels(s.stream.Labels) + require.NoError(t, err) + + require.Equal(t, lbls.Hash(), s.stream.Hash) + require.Equal(t, lbls.String(), s.stream.Labels) + } }) } }