Correctly sets hash value for headblock iterator (#5423)

pull/5425/head
Cyril Tovena 3 years ago committed by GitHub
parent 137a28d82b
commit 5d8bc6129e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/chunkenc/memchunk.go
  2. 8
      pkg/chunkenc/unordered.go
  3. 32
      pkg/chunkenc/unordered_test.go
  4. 4
      pkg/ingester/recovery_test.go

@ -1015,6 +1015,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
if stream, ok = streams[lhash]; !ok {
stream = &logproto.Stream{
Labels: parsedLbs.String(),
Hash: lhash,
}
streams[lhash] = stream
}
@ -1062,8 +1063,9 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra
lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found {
s = &logproto.Series{
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0],
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0],
StreamHash: lhash,
}
series[lhash] = s
}

@ -215,7 +215,6 @@ func (hb *unorderedHeadBlock) Iterator(
maxt int64,
pipeline log.StreamPipeline,
) iter.EntryIterator {
// We are doing a copy everytime, this is because b.entries could change completely,
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
@ -238,6 +237,7 @@ func (hb *unorderedHeadBlock) Iterator(
if stream, ok = streams[lhash]; !ok {
stream = &logproto.Stream{
Labels: parsedLbs.String(),
Hash: lhash,
}
streams[lhash] = stream
}
@ -267,7 +267,6 @@ func (hb *unorderedHeadBlock) SampleIterator(
maxt int64,
extractor log.StreamSampleExtractor,
) iter.SampleIterator {
series := map[uint64]*logproto.Series{}
_ = hb.forEntries(
@ -285,8 +284,9 @@ func (hb *unorderedHeadBlock) SampleIterator(
lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found {
s = &logproto.Series{
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
Labels: parsedLabels.String(),
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
StreamHash: parsedLabels.Hash(),
}
series[lhash] = s
}

@ -9,6 +9,7 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/iter"
@ -268,7 +269,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
// unordered, unordered
// current default block size of 256kb with 75b avg log lines =~ 5.2k lines/block
var nWrites = (256 << 10) / 50
nWrites := (256 << 10) / 50
headBlockFn := func() func(int64, string) {
hb := &headBlock{}
@ -449,7 +450,6 @@ func BenchmarkUnorderedRead(b *testing.B) {
})
}
})
}
func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
@ -563,7 +563,6 @@ func TestReorder(t *testing.T) {
require.Equal(t, exp, b)
})
}
}
@ -610,3 +609,30 @@ func TestReorderAcrossBlocks(t *testing.T) {
}
iterEq(t, exp, itr)
}
func Test_HeadIteratorHash(t *testing.T) {
lbs := labels.Labels{labels.Label{Name: "foo", Value: "bar"}}
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
if err != nil {
panic(err)
}
for name, b := range map[string]HeadBlock{
"unordered": newUnorderedHeadBlock(),
"ordered": &headBlock{},
} {
t.Run(name, func(t *testing.T) {
require.NoError(t, b.Append(1, "foo"))
eit := b.Iterator(context.Background(), logproto.BACKWARD, 0, 2, log.NewNoopPipeline().ForStream(lbs))
for eit.Next() {
require.Equal(t, lbs.Hash(), eit.StreamHash())
}
sit := b.SampleIterator(context.TODO(), 0, 2, ex.ForStream(lbs))
for sit.Next() {
require.Equal(t, lbs.Hash(), sit.StreamHash())
}
})
}
}

@ -262,15 +262,17 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) {
}, &result)
require.NoError(t, err)
require.Len(t, result.resps, 1)
lbls := labels.Labels{{Name: "bar", Value: "baz1"}, {Name: "foo", Value: "bar"}}
expected := []logproto.Stream{
{
Labels: `{bar="baz1", foo="bar"}`,
Labels: lbls.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "line 1",
},
},
Hash: lbls.Hash(),
},
}
require.Equal(t, expected, result.resps[0].Streams)

Loading…
Cancel
Save