diff --git a/pkg/ingester/index/bitprefix.go b/pkg/ingester/index/bitprefix.go index f8e72a5bee..c65af73467 100644 --- a/pkg/ingester/index/bitprefix.go +++ b/pkg/ingester/index/bitprefix.go @@ -107,10 +107,7 @@ func (ii *BitPrefixInvertedIndex) validateShard(shard *astmapper.ShardAnnotation return nil } - if 1<<(shard.TSDB().RequiredBits()) != shard.Of { - return fmt.Errorf("Shard factor must be a power of two, got %d", shard.Of) - } - return nil + return shard.TSDB().Validate() } // Add a fingerprint under the specified labels. diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index f4dae25735..2f2e7db6af 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -119,7 +119,7 @@ func (r *dynamicShardResolver) Shards(e syntax.Expr) (int, error) { } combined := stats.MergeStats(results...) - factor := guessShardFactor(combined, r.maxParallelism) + factor := guessShardFactor(combined) var bytesPerShard = combined.Bytes if factor > 0 { bytesPerShard = combined.Bytes / uint64(factor) @@ -143,21 +143,18 @@ func (r *dynamicShardResolver) Shards(e syntax.Expr) (int, error) { const ( // Just some observed values to get us started on better query planning. p90BytesPerSecond = 300 << 20 // 300MB/s/core - // At max, schedule a query for 10s of execution before - // splitting it into more requests. This is a lot of guesswork. - maxSeconds = 10 - maxSchedulableBytes = maxSeconds * p90BytesPerSecond ) -func guessShardFactor(stats stats.Stats, maxParallelism int) int { - expectedSeconds := float64(stats.Bytes / p90BytesPerSecond) - if expectedSeconds <= float64(maxParallelism) { - power := math.Ceil(math.Log2(expectedSeconds)) // round up to nearest power of 2 - // Ideally, parallelize down to 1s queries - return int(math.Pow(2, power)) - } +func guessShardFactor(stats stats.Stats) int { + expectedSeconds := float64(stats.Bytes) / float64(p90BytesPerSecond) + power := math.Ceil(math.Log2(expectedSeconds)) // round up to nearest power of 2 - n := stats.Bytes / maxSchedulableBytes - power := math.Ceil(math.Log2(float64(n))) - return int(math.Pow(2, power)) + // Parallelize down to 1s queries + // Since x^0 == 1 and we only support factors of 2 + // reset this edge case manually + factor := int(math.Pow(2, power)) + if factor == 1 { + factor = 0 + } + return factor } diff --git a/pkg/querier/queryrange/shard_resolver_test.go b/pkg/querier/queryrange/shard_resolver_test.go index 7c6166c43a..0fe2c3f661 100644 --- a/pkg/querier/queryrange/shard_resolver_test.go +++ b/pkg/querier/queryrange/shard_resolver_test.go @@ -11,50 +11,41 @@ import ( func TestGuessShardFactor(t *testing.T) { for _, tc := range []struct { - stats stats.Stats - maxParallelism int - exp int + stats stats.Stats + exp int }{ { // no data == no sharding - exp: 0, - maxParallelism: 10, + exp: 0, }, { - exp: 4, - maxParallelism: 10, + exp: 4, stats: stats.Stats{ - Bytes: 1200 << 20, // 1200MB + Bytes: p90BytesPerSecond * 4, }, }, { - exp: 8, - maxParallelism: 10, - // 1500MB moves us to the next - // power of 2 parallelism factor + // round up shard factor + exp: 16, stats: stats.Stats{ - Bytes: 1500 << 20, + Bytes: p90BytesPerSecond * 15, }, }, { - // Two fully packed parallelism cycles - exp: 16, - maxParallelism: 8, + exp: 2, stats: stats.Stats{ - Bytes: maxSchedulableBytes * 16, + Bytes: p90BytesPerSecond + 1, }, }, { - // increase to next factor of two - exp: 32, - maxParallelism: 8, + exp: 0, stats: stats.Stats{ - Bytes: maxSchedulableBytes * 17, + Bytes: p90BytesPerSecond, }, }, } { t.Run(fmt.Sprintf("%+v", tc.stats), func(t *testing.T) { - require.Equal(t, tc.exp, guessShardFactor(tc.stats, tc.maxParallelism)) + require.Equal(t, tc.exp, guessShardFactor(tc.stats)) }) } } diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index f15af478e2..1695952f92 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -151,13 +151,12 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next data.req.LogToSpan(sp) resp, err := next.Do(ctx, data.req) + sp.Finish() select { case <-ctx.Done(): - sp.Finish() return case data.ch <- &packedResp{resp, err}: - sp.Finish() } } } diff --git a/pkg/storage/stores/composite_store_entry.go b/pkg/storage/stores/composite_store_entry.go index 4d0a97fb06..69ca3a4a3f 100644 --- a/pkg/storage/stores/composite_store_entry.go +++ b/pkg/storage/stores/composite_store_entry.go @@ -52,6 +52,12 @@ func (c *storeEntry) GetChunkRefs(ctx context.Context, userID string, from, thro defer log.Span.Finish() shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through) + log.Log( + "shortcut", shortcut, + "from", from.Time(), + "through", through.Time(), + "err", err, + ) if err != nil { return nil, nil, err } else if shortcut { diff --git a/pkg/storage/stores/tsdb/index/fingerprint.go b/pkg/storage/stores/tsdb/index/fingerprint.go index c29f735754..646e587f70 100644 --- a/pkg/storage/stores/tsdb/index/fingerprint.go +++ b/pkg/storage/stores/tsdb/index/fingerprint.go @@ -10,7 +10,6 @@ type FingerprintOffsets [][2]uint64 func (xs FingerprintOffsets) Range(shard ShardAnnotation) (minOffset, maxOffset uint64) { from, through := shard.Bounds() - lower := sort.Search(len(xs), func(i int) bool { return xs[i][1] >= uint64(from) }) diff --git a/pkg/storage/stores/tsdb/index/index.go b/pkg/storage/stores/tsdb/index/index.go index d2d9f7b2f7..5c77aae823 100644 --- a/pkg/storage/stores/tsdb/index/index.go +++ b/pkg/storage/stores/tsdb/index/index.go @@ -535,16 +535,19 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.F w.buf2.PutHash(w.crc32) - if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil { - return errors.Wrap(err, "write series data") - } - w.lastSeries = append(w.lastSeries[:0], lset...) w.lastSeriesHash = labelHash w.lastRef = ref if ref%fingerprintInterval == 0 { - w.fingerprintOffsets = append(w.fingerprintOffsets, [2]uint64{uint64(ref), labelHash}) + // series references are the 16-byte aligned offsets + // Do NOT ask me how long I debugged this particular bit >:O + sRef := w.f.pos / 16 + w.fingerprintOffsets = append(w.fingerprintOffsets, [2]uint64{sRef, labelHash}) + } + + if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil { + return errors.Wrap(err, "write series data") } return nil diff --git a/pkg/storage/stores/tsdb/index/postings.go b/pkg/storage/stores/tsdb/index/postings.go index c144efa55f..028da1bd06 100644 --- a/pkg/storage/stores/tsdb/index/postings.go +++ b/pkg/storage/stores/tsdb/index/postings.go @@ -864,14 +864,14 @@ func (sp *ShardedPostings) Next() bool { if ok := sp.p.Next(); !ok { return false } - return sp.p.Seek(storage.SeriesRef(sp.minOffset)) + return sp.Seek(0) } ok := sp.p.Next() if !ok { return false } - if sp.p.At() > storage.SeriesRef(sp.maxOffset) { + if sp.p.At() >= storage.SeriesRef(sp.maxOffset) { return false } @@ -881,7 +881,7 @@ func (sp *ShardedPostings) Next() bool { // Seek advances the iterator to value v or greater and returns // true if a value was found. func (sp *ShardedPostings) Seek(v storage.SeriesRef) (res bool) { - if v > storage.SeriesRef(sp.maxOffset) { + if v >= storage.SeriesRef(sp.maxOffset) { return false } if v < storage.SeriesRef(sp.minOffset) { diff --git a/pkg/storage/stores/tsdb/index/postings_test.go b/pkg/storage/stores/tsdb/index/postings_test.go index 8cb76b0ad1..3b38e5bf06 100644 --- a/pkg/storage/stores/tsdb/index/postings_test.go +++ b/pkg/storage/stores/tsdb/index/postings_test.go @@ -929,3 +929,25 @@ func TestMemPostings_Delete(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(expanded), "expected empty postings, got %v", expanded) } + +func TestShardedPostings(t *testing.T) { + offsets := FingerprintOffsets{ + {0, 0}, + {5, 0b1 << 62}, + {10, 0b1 << 63}, + {15, 0b11 << 62}, + } + shard := NewShard(0, 2) + var refs []storage.SeriesRef + for i := 0; i < 20; i++ { + refs = append(refs, storage.SeriesRef(i)) + } + ps := newListPostings(refs...) + shardedPostings := NewShardedPostings(ps, shard, offsets) + + for i := 0; i < 10; i++ { + require.Equal(t, true, shardedPostings.Next()) + require.Equal(t, storage.SeriesRef(i), shardedPostings.At()) + } + require.Equal(t, false, shardedPostings.Next()) +} diff --git a/pkg/storage/stores/tsdb/index/shard.go b/pkg/storage/stores/tsdb/index/shard.go index 8f59047b48..8cb7c8201c 100644 --- a/pkg/storage/stores/tsdb/index/shard.go +++ b/pkg/storage/stores/tsdb/index/shard.go @@ -1,6 +1,7 @@ package index import ( + "errors" "fmt" "math" @@ -14,6 +15,8 @@ const ( ShardLabelFmt = "%d_of_%d" ) +var errDisallowedIdentityShard = errors.New("shard with factor of 1 is explicitly disallowed. It's equivalent to no sharding") + // ShardAnnotation is a convenience struct which holds data from a parsed shard label // Of MUST be a power of 2 to ensure sharding logic works correctly. type ShardAnnotation struct { @@ -33,10 +36,14 @@ func NewShard(x, of uint32) ShardAnnotation { // Inclusion in a shard is calculated by determining the arbitrary bit prefix // for a shard, then ensuring the fingerprint has the same prefix func (shard ShardAnnotation) Match(fp model.Fingerprint) bool { + if shard.Of < 2 { + return true + } requiredBits := shard.RequiredBits() // A shard only matches a fingerprint when they both start with the same prefix prefix := uint64(shard.Shard) << (64 - requiredBits) + return prefix^uint64(fp) < 1<<(64-requiredBits) } @@ -51,8 +58,19 @@ func (shard ShardAnnotation) RequiredBits() uint64 { } +func (shard ShardAnnotation) Validate() error { + if shard.Of == 1 { + return errDisallowedIdentityShard + } + + if 1<