[TSDB] fingerprint offsets is now correctly 16 byte aligned (#6630)

* simpler shard factor calculations, disallows identity factor of 1
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* log interval span as soon as it finishes
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* another shard bounds test
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* indexclient span logging
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* more index span logging
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* correctly match single shard
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* shardedPostings test
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* shardedPostings maxOffset is properly exclusive
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* minor posting idiomatic-ness
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* store 16 byte aligned offsets for fingerprint table
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
pull/6645/head
Owen Diehl 3 years ago committed by GitHub
parent 1592bf5cc9
commit 094f8f3b20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/ingester/index/bitprefix.go
  2. 25
      pkg/querier/queryrange/shard_resolver.go
  3. 27
      pkg/querier/queryrange/shard_resolver_test.go
  4. 3
      pkg/querier/queryrange/split_by_interval.go
  5. 6
      pkg/storage/stores/composite_store_entry.go
  6. 1
      pkg/storage/stores/tsdb/index/fingerprint.go
  7. 13
      pkg/storage/stores/tsdb/index/index.go
  8. 6
      pkg/storage/stores/tsdb/index/postings.go
  9. 22
      pkg/storage/stores/tsdb/index/postings_test.go
  10. 20
      pkg/storage/stores/tsdb/index/shard.go
  11. 45
      pkg/storage/stores/tsdb/index/shard_test.go
  12. 25
      pkg/storage/stores/tsdb/index_client.go

@ -107,10 +107,7 @@ func (ii *BitPrefixInvertedIndex) validateShard(shard *astmapper.ShardAnnotation
return nil
}
if 1<<(shard.TSDB().RequiredBits()) != shard.Of {
return fmt.Errorf("Shard factor must be a power of two, got %d", shard.Of)
}
return nil
return shard.TSDB().Validate()
}
// Add a fingerprint under the specified labels.

@ -119,7 +119,7 @@ func (r *dynamicShardResolver) Shards(e syntax.Expr) (int, error) {
}
combined := stats.MergeStats(results...)
factor := guessShardFactor(combined, r.maxParallelism)
factor := guessShardFactor(combined)
var bytesPerShard = combined.Bytes
if factor > 0 {
bytesPerShard = combined.Bytes / uint64(factor)
@ -143,21 +143,18 @@ func (r *dynamicShardResolver) Shards(e syntax.Expr) (int, error) {
const (
// Just some observed values to get us started on better query planning.
p90BytesPerSecond = 300 << 20 // 300MB/s/core
// At max, schedule a query for 10s of execution before
// splitting it into more requests. This is a lot of guesswork.
maxSeconds = 10
maxSchedulableBytes = maxSeconds * p90BytesPerSecond
)
func guessShardFactor(stats stats.Stats, maxParallelism int) int {
expectedSeconds := float64(stats.Bytes / p90BytesPerSecond)
if expectedSeconds <= float64(maxParallelism) {
func guessShardFactor(stats stats.Stats) int {
expectedSeconds := float64(stats.Bytes) / float64(p90BytesPerSecond)
power := math.Ceil(math.Log2(expectedSeconds)) // round up to nearest power of 2
// Ideally, parallelize down to 1s queries
return int(math.Pow(2, power))
}
n := stats.Bytes / maxSchedulableBytes
power := math.Ceil(math.Log2(float64(n)))
return int(math.Pow(2, power))
// Parallelize down to 1s queries
// Since x^0 == 1 and we only support factors of 2
// reset this edge case manually
factor := int(math.Pow(2, power))
if factor == 1 {
factor = 0
}
return factor
}

@ -12,49 +12,40 @@ import (
func TestGuessShardFactor(t *testing.T) {
for _, tc := range []struct {
stats stats.Stats
maxParallelism int
exp int
}{
{
// no data == no sharding
exp: 0,
maxParallelism: 10,
},
{
exp: 4,
maxParallelism: 10,
stats: stats.Stats{
Bytes: 1200 << 20, // 1200MB
Bytes: p90BytesPerSecond * 4,
},
},
{
exp: 8,
maxParallelism: 10,
// 1500MB moves us to the next
// power of 2 parallelism factor
// round up shard factor
exp: 16,
stats: stats.Stats{
Bytes: 1500 << 20,
Bytes: p90BytesPerSecond * 15,
},
},
{
// Two fully packed parallelism cycles
exp: 16,
maxParallelism: 8,
exp: 2,
stats: stats.Stats{
Bytes: maxSchedulableBytes * 16,
Bytes: p90BytesPerSecond + 1,
},
},
{
// increase to next factor of two
exp: 32,
maxParallelism: 8,
exp: 0,
stats: stats.Stats{
Bytes: maxSchedulableBytes * 17,
Bytes: p90BytesPerSecond,
},
},
} {
t.Run(fmt.Sprintf("%+v", tc.stats), func(t *testing.T) {
require.Equal(t, tc.exp, guessShardFactor(tc.stats, tc.maxParallelism))
require.Equal(t, tc.exp, guessShardFactor(tc.stats))
})
}
}

@ -151,13 +151,12 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next
data.req.LogToSpan(sp)
resp, err := next.Do(ctx, data.req)
sp.Finish()
select {
case <-ctx.Done():
sp.Finish()
return
case data.ch <- &packedResp{resp, err}:
sp.Finish()
}
}
}

@ -52,6 +52,12 @@ func (c *storeEntry) GetChunkRefs(ctx context.Context, userID string, from, thro
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
log.Log(
"shortcut", shortcut,
"from", from.Time(),
"through", through.Time(),
"err", err,
)
if err != nil {
return nil, nil, err
} else if shortcut {

@ -10,7 +10,6 @@ type FingerprintOffsets [][2]uint64
func (xs FingerprintOffsets) Range(shard ShardAnnotation) (minOffset, maxOffset uint64) {
from, through := shard.Bounds()
lower := sort.Search(len(xs), func(i int) bool {
return xs[i][1] >= uint64(from)
})

@ -535,16 +535,19 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.F
w.buf2.PutHash(w.crc32)
if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil {
return errors.Wrap(err, "write series data")
}
w.lastSeries = append(w.lastSeries[:0], lset...)
w.lastSeriesHash = labelHash
w.lastRef = ref
if ref%fingerprintInterval == 0 {
w.fingerprintOffsets = append(w.fingerprintOffsets, [2]uint64{uint64(ref), labelHash})
// series references are the 16-byte aligned offsets
// Do NOT ask me how long I debugged this particular bit >:O
sRef := w.f.pos / 16
w.fingerprintOffsets = append(w.fingerprintOffsets, [2]uint64{sRef, labelHash})
}
if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil {
return errors.Wrap(err, "write series data")
}
return nil

@ -864,14 +864,14 @@ func (sp *ShardedPostings) Next() bool {
if ok := sp.p.Next(); !ok {
return false
}
return sp.p.Seek(storage.SeriesRef(sp.minOffset))
return sp.Seek(0)
}
ok := sp.p.Next()
if !ok {
return false
}
if sp.p.At() > storage.SeriesRef(sp.maxOffset) {
if sp.p.At() >= storage.SeriesRef(sp.maxOffset) {
return false
}
@ -881,7 +881,7 @@ func (sp *ShardedPostings) Next() bool {
// Seek advances the iterator to value v or greater and returns
// true if a value was found.
func (sp *ShardedPostings) Seek(v storage.SeriesRef) (res bool) {
if v > storage.SeriesRef(sp.maxOffset) {
if v >= storage.SeriesRef(sp.maxOffset) {
return false
}
if v < storage.SeriesRef(sp.minOffset) {

@ -929,3 +929,25 @@ func TestMemPostings_Delete(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(expanded), "expected empty postings, got %v", expanded)
}
func TestShardedPostings(t *testing.T) {
offsets := FingerprintOffsets{
{0, 0},
{5, 0b1 << 62},
{10, 0b1 << 63},
{15, 0b11 << 62},
}
shard := NewShard(0, 2)
var refs []storage.SeriesRef
for i := 0; i < 20; i++ {
refs = append(refs, storage.SeriesRef(i))
}
ps := newListPostings(refs...)
shardedPostings := NewShardedPostings(ps, shard, offsets)
for i := 0; i < 10; i++ {
require.Equal(t, true, shardedPostings.Next())
require.Equal(t, storage.SeriesRef(i), shardedPostings.At())
}
require.Equal(t, false, shardedPostings.Next())
}

@ -1,6 +1,7 @@
package index
import (
"errors"
"fmt"
"math"
@ -14,6 +15,8 @@ const (
ShardLabelFmt = "%d_of_%d"
)
var errDisallowedIdentityShard = errors.New("shard with factor of 1 is explicitly disallowed. It's equivalent to no sharding")
// ShardAnnotation is a convenience struct which holds data from a parsed shard label
// Of MUST be a power of 2 to ensure sharding logic works correctly.
type ShardAnnotation struct {
@ -33,10 +36,14 @@ func NewShard(x, of uint32) ShardAnnotation {
// Inclusion in a shard is calculated by determining the arbitrary bit prefix
// for a shard, then ensuring the fingerprint has the same prefix
func (shard ShardAnnotation) Match(fp model.Fingerprint) bool {
if shard.Of < 2 {
return true
}
requiredBits := shard.RequiredBits()
// A shard only matches a fingerprint when they both start with the same prefix
prefix := uint64(shard.Shard) << (64 - requiredBits)
return prefix^uint64(fp) < 1<<(64-requiredBits)
}
@ -51,8 +58,19 @@ func (shard ShardAnnotation) RequiredBits() uint64 {
}
func (shard ShardAnnotation) Validate() error {
if shard.Of == 1 {
return errDisallowedIdentityShard
}
if 1<<shard.RequiredBits() != shard.Of {
return fmt.Errorf("Shard factor must be a power of two, got %d", shard.Of)
}
return nil
}
// Bounds shows the [minimum, maximum) fingerprints. If there is no maximum
// fingerprint (for example )
// fingerprint (for example the last shard), math.MaxUint64 is used as the maximum.
func (shard ShardAnnotation) Bounds() (model.Fingerprint, model.Fingerprint) {
requiredBits := model.Fingerprint(shard.RequiredBits())
from := model.Fingerprint(shard.Shard) << (64 - requiredBits)

@ -20,6 +20,11 @@ func TestShardMatch(t *testing.T) {
fp: 0,
exp: true,
},
{
shard: NewShard(0, 2),
fp: 5287603155525329,
exp: true,
},
{
shard: NewShard(0, 2),
fp: 1 << 63,
@ -50,6 +55,11 @@ func TestShardMatch(t *testing.T) {
fp: 3 << 62,
exp: false,
},
{
shard: NewShard(0, 1),
fp: 5287603155525329,
exp: true,
},
} {
t.Run(fmt.Sprint(tc.shard, tc.fp), func(t *testing.T) {
require.Equal(t, tc.exp, tc.shard.Match(model.Fingerprint(tc.fp)))
@ -62,6 +72,11 @@ func TestShardBounds(t *testing.T) {
shard ShardAnnotation
from, through uint64
}{
{
shard: NewShard(0, 1),
from: 0,
through: math.MaxUint64,
},
{
shard: NewShard(0, 2),
from: 0,
@ -95,3 +110,33 @@ func TestShardBounds(t *testing.T) {
})
}
}
func TestShardValidate(t *testing.T) {
for _, tc := range []struct {
desc string
factor uint32
err bool
}{
{
factor: 0,
err: false,
},
{
factor: 1,
err: true,
},
{
factor: 2,
err: false,
},
} {
t.Run(fmt.Sprint(tc.factor), func(t *testing.T) {
err := NewShard(0, tc.factor).Validate()
if tc.err {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
})
}
}

@ -7,10 +7,12 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
"github.com/grafana/loki/pkg/util/spanlogger"
)
// implements stores.Index
@ -45,6 +47,10 @@ func cleanMatchers(matchers ...*labels.Matcher) ([]*labels.Matcher, *index.Shard
Shard: uint32(s.Shard),
Of: uint32(s.Of),
}
if err := shard.Validate(); err != nil {
return nil, nil, err
}
}
if len(matchers) == 0 {
@ -60,13 +66,32 @@ func cleanMatchers(matchers ...*labels.Matcher) ([]*labels.Matcher, *index.Shard
// They share almost the same fields, so we can add the missing `KB` field to the proto and then
// use that within the tsdb package.
func (c *IndexClient) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
log, ctx := spanlogger.New(ctx, "IndexClient.GetChunkRefs")
defer log.Span.Finish()
var kvps []interface{}
defer func() {
log.Log(kvps...)
}()
matchers, shard, err := cleanMatchers(matchers...)
kvps = append(kvps,
"from", from.Time(),
"through", through.Time(),
"matchers", syntax.MatchersString(matchers),
"shard", shard,
"cleanMatcherErr", err,
)
if err != nil {
return nil, err
}
// TODO(owen-d): use a pool to reduce allocs here
chks, err := c.idx.GetChunkRefs(ctx, userID, from, through, nil, shard, matchers...)
kvps = append(kvps,
"chunks", len(chks),
"indexErr", err,
)
if err != nil {
return nil, err
}

Loading…
Cancel
Save