Bloom Compactor: Optimize check for fingerprint ownership (#11389)

Calling `c.sharding.OwnsFingerprint(tenant, uint64(fingerprint))` for
each Series of a TSDB index is very expensive, because it not only
creates the tenant's sub-ring but also needs to check the fingerprint
against it.

Instead, we can pre-calculate the current instance's token ranges and
check if the (uint32 converted) fingerprint is contained within these
ranges.
 

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/11448/head
Christian Haudum 2 years ago committed by GitHub
parent c573defcbb
commit c4f5a57bc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 58
      pkg/bloomcompactor/bloomcompactor.go
  2. 92
      pkg/bloomgateway/client.go
  3. 92
      pkg/bloomgateway/client_test.go
  4. 16
      pkg/bloomgateway/multiplexing.go
  5. 61
      pkg/bloomgateway/util.go
  6. 28
      pkg/bloomgateway/util_test.go
  7. 37
      pkg/bloomutils/iter.go
  8. 146
      pkg/bloomutils/ring.go
  9. 112
      pkg/bloomutils/ring_test.go
  10. 70
      pkg/storage/bloom/v1/iter.go
  11. 35
      pkg/storage/bloom/v1/iter_test.go

@ -42,6 +42,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
chunk_client "github.com/grafana/loki/pkg/storage/chunk/client"
@ -343,26 +344,37 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
bt, _ := v1.NewBloomTokenizer(c.reg, NGramLength, NGramSkip)
errs := multierror.New()
if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex { // TODO: handle multitenant tables
return fmt.Errorf("unexpected multi-tenant")
rs, err := c.sharding.GetTenantSubRing(tenant).GetAllHealthy(RingOp)
if err != nil {
return err
}
tokenRanges := bloomutils.GetInstanceWithTokenRange(c.cfg.Ring.InstanceID, rs.Instances)
_ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex {
// Skip multi-tenant indexes
return nil
}
tsdbFile, ok := idx.(*tsdb.TSDBFile)
if !ok {
errs.Add(fmt.Errorf("failed to cast to TSDBFile"))
return nil
}
tsdbIndex, ok := tsdbFile.Index.(*tsdb.TSDBIndex)
if !ok {
errs.Add(fmt.Errorf("failed to cast to TSDBIndex"))
return nil
}
var seriesMetas []seriesMeta
// TODO: Make these casts safely
if err := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries(
err := tsdbIndex.ForSeries(
ctx, nil,
0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod
func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) {
// TODO: Inefficient as is, calls the ring per fingerprint. Refactor to make the call once per compaction fingerprint bounds.
ownsFingerprint, err := c.sharding.OwnsFingerprint(tenant, uint64(fingerprint))
if err != nil {
level.Error(logger).Log("msg", "failed to check if compactor owns fp", "err", err)
errs.Add(err)
return
}
if !ownsFingerprint {
if !tokenRanges.Contains(uint32(fingerprint)) {
return
}
@ -371,26 +383,26 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
//All seriesMetas given a table within fp of this compactor shard
seriesMetas = append(seriesMetas, seriesMeta{seriesFP: fingerprint, seriesLbs: labels, chunkRefs: temp})
},
); err != nil {
)
if err != nil {
errs.Add(err)
return nil
}
job := NewJob(tenant, tableName, idx.Path(), seriesMetas)
jobLogger := log.With(logger, "job", job.String())
c.metrics.compactionRunJobStarted.Inc()
if err := c.runCompact(ctx, jobLogger, job, bt, sc); err != nil {
err = c.runCompact(ctx, jobLogger, job, bt, sc)
if err != nil {
c.metrics.compactionRunJobFailed.Inc()
errs.Add(errors.Wrap(err, "runBloomCompact failed"))
return errs.Err()
} else {
c.metrics.compactionRunJobSuceeded.Inc()
}
c.metrics.compactionRunJobSuceeded.Inc()
return nil
}); err != nil {
errs.Add(err)
}
})
return errs.Err()
}

@ -23,6 +23,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
@ -283,39 +284,44 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
}
func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, subRing ring.ReadRing, instances []ring.InstanceDesc) ([]instanceWithFingerprints, error) {
servers, err := serverAddressesWithTokenRanges(subRing, instances)
if err != nil {
return nil, err
}
boundedFingerprints := partitionFingerprintsByAddresses(groups, servers)
return groupByInstance(boundedFingerprints), nil
}
func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithTokenRange, error) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
servers := make([]addrsWithTokenRange, 0, len(instances))
prev := -1
it := newInstanceSortMergeIterator(instances)
it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().token, BlocksRead, bufDescs, bufHosts, bufZones)
rs, err := subRing.Get(it.At().MaxToken, BlocksRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev + 1),
maxToken: it.At().token,
id: it.At().instance.Id,
id: it.At().Instance.Id,
addrs: rs.GetAddresses(),
minToken: it.At().MinToken,
maxToken: it.At().MaxToken,
})
prev = int(it.At().token)
}
if len(servers) > 0 {
if len(servers) > 0 && servers[len(servers)-1].maxToken < math.MaxUint32 {
// append the instance for the token range between the greates token and MaxUint32
servers = append(servers, addrsWithTokenRange{
minToken: uint32(prev),
maxToken: math.MaxUint32,
addrs: servers[0].addrs,
id: servers[0].id,
addrs: servers[0].addrs,
minToken: servers[len(servers)-1].maxToken + 1,
maxToken: math.MaxUint32,
})
}
boundedFingerprints := partitionFingerprintsByAddresses(groups, servers)
return groupByInstance(boundedFingerprints), nil
return servers, nil
}
type instanceWithToken struct {
@ -401,61 +407,3 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW
return result
}
// newInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
// where the token of the elements are sorted in ascending order.
func newInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[instanceWithToken] {
it := &sortMergeIterator[ring.InstanceDesc, uint32, instanceWithToken]{
items: instances,
transform: func(item ring.InstanceDesc, val uint32) instanceWithToken {
return instanceWithToken{instance: item, token: val}
},
}
sequences := make([]v1.PeekingIterator[IndexedValue[uint32]], 0, len(instances))
for i := range instances {
sort.Slice(instances[i].Tokens, func(a, b int) bool {
return instances[i].Tokens[a] < instances[i].Tokens[b]
})
iter := NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i)
sequences = append(sequences, v1.NewPeekingIter[IndexedValue[uint32]](iter))
}
it.heap = v1.NewHeapIterator(
func(i, j IndexedValue[uint32]) bool {
return i.val < j.val
},
sequences...,
)
it.err = nil
return it
}
// sortMergeIterator implements v1.Iterator
type sortMergeIterator[T any, C comparable, R any] struct {
curr R
heap *v1.HeapIterator[IndexedValue[C]]
items []T
transform func(T, C) R
err error
}
func (it *sortMergeIterator[T, C, R]) Next() bool {
ok := it.heap.Next()
if !ok {
it.err = io.EOF
return false
}
group := it.heap.At()
it.curr = it.transform(it.items[group.idx], group.val)
return true
}
func (it *sortMergeIterator[T, C, R]) At() R {
return it.curr
}
func (it *sortMergeIterator[T, C, R]) Err() error {
return it.err
}

@ -1,6 +1,7 @@
package bloomgateway
import (
"math"
"sort"
"testing"
"time"
@ -11,12 +12,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/bloomutils"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/validation"
)
func TestBloomGatewayClient(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
@ -32,33 +33,6 @@ func TestBloomGatewayClient(t *testing.T) {
})
}
func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) {
input := []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{6, 5, 2, 9}},
{Id: "2", Tokens: []uint32{3, 4, 7}},
{Id: "3", Tokens: []uint32{1, 8, 0}},
}
expected := []instanceWithToken{
{instance: input[2], token: 0},
{instance: input[2], token: 1},
{instance: input[0], token: 2},
{instance: input[1], token: 3},
{instance: input[1], token: 4},
{instance: input[0], token: 5},
{instance: input[0], token: 6},
{instance: input[1], token: 7},
{instance: input[2], token: 8},
{instance: input[0], token: 9},
}
var i int
it := newInstanceSortMergeIterator(input)
for it.Next() {
require.Equal(t, expected[i], it.At())
i++
}
}
func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
// instance token ranges do not overlap
t.Run("non-overlapping", func(t *testing.T) {
@ -183,6 +157,50 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) {
})
}
func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
testCases := map[string]struct {
instances []ring.InstanceDesc
expected []addrsWithTokenRange
}{
"one token per instance": {
instances: []ring.InstanceDesc{
{Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{math.MaxUint32 / 6 * 1}},
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 6 * 3}},
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: math.MaxUint32 / 6 * 1},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/6*1 + 1, maxToken: math.MaxUint32 / 6 * 3},
{id: "instance-3", addrs: []string{"10.0.0.3"}, minToken: math.MaxUint32/6*3 + 1, maxToken: math.MaxUint32 / 6 * 5},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/6*5 + 1, maxToken: math.MaxUint32},
},
},
"MinUint32 and MaxUint32 are tokens in the ring": {
instances: []ring.InstanceDesc{
{Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0, math.MaxUint32 / 3 * 2}},
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}},
},
expected: []addrsWithTokenRange{
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: 0, maxToken: 0},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: 1, maxToken: math.MaxUint32 / 3},
{id: "instance-1", addrs: []string{"10.0.0.1"}, minToken: math.MaxUint32/3*1 + 1, maxToken: math.MaxUint32 / 3 * 2},
{id: "instance-2", addrs: []string{"10.0.0.2"}, minToken: math.MaxUint32/3*2 + 1, maxToken: math.MaxUint32},
},
},
}
for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
subRing := newMockRing(tc.instances)
res, err := serverAddressesWithTokenRanges(subRing, tc.instances)
require.NoError(t, err)
require.Equal(t, tc.expected, res)
})
}
}
func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
logger := log.NewNopLogger()
@ -203,9 +221,9 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
{Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{2014002871, 315617625, 1036168527}},
}
it := newInstanceSortMergeIterator(instances)
it := bloomutils.NewInstanceSortMergeIterator(instances)
for it.Next() {
t.Log(it.At().token, it.At().instance.Addr)
t.Log(it.At().MaxToken, it.At().Instance.Addr)
}
testCases := []struct {
@ -257,7 +275,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
},
},
{
name: "fingerprints with token ranges of a multiple instance are grouped",
name: "fingerprints with token ranges of multiple instances are grouped",
chunks: []*logproto.GroupedChunkRefs{
// instance 1
{Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}},
@ -327,8 +345,8 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
var _ ring.ReadRing = &mockRing{}
func newMockRing(instances []ring.InstanceDesc) *mockRing {
it := newInstanceSortMergeIterator(instances)
ranges := make([]instanceWithToken, 0)
it := bloomutils.NewInstanceSortMergeIterator(instances)
ranges := make([]bloomutils.InstanceWithTokenRange, 0)
for it.Next() {
ranges = append(ranges, it.At())
}
@ -340,21 +358,21 @@ func newMockRing(instances []ring.InstanceDesc) *mockRing {
type mockRing struct {
instances []ring.InstanceDesc
ranges []instanceWithToken
ranges []bloomutils.InstanceWithTokenRange
}
// Get implements ring.ReadRing.
func (r *mockRing) Get(key uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) {
idx, _ := sort.Find(len(r.ranges), func(i int) int {
if r.ranges[i].token < key {
if r.ranges[i].MaxToken < key {
return 1
}
if r.ranges[i].token > key {
if r.ranges[i].MaxToken > key {
return -1
}
return 0
})
return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].instance}}, nil
return ring.ReplicationSet{Instances: []ring.InstanceDesc{r.ranges[idx].Instance}}, nil
}
// GetAllHealthy implements ring.ReadRing.

@ -164,7 +164,7 @@ type FilterRequest struct {
// taskMergeIterator implements v1.Iterator
type taskMergeIterator struct {
curr FilterRequest
heap *v1.HeapIterator[IndexedValue[*logproto.GroupedChunkRefs]]
heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]]
tasks []Task
day time.Time
err error
@ -181,14 +181,14 @@ func newTaskMergeIterator(day time.Time, tasks ...Task) v1.PeekingIterator[v1.Re
}
func (it *taskMergeIterator) init() {
sequences := make([]v1.PeekingIterator[IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
for i := range it.tasks {
iter := NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i)
iter := v1.NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i)
sequences = append(sequences, v1.NewPeekingIter(iter))
}
it.heap = v1.NewHeapIterator(
func(i, j IndexedValue[*logproto.GroupedChunkRefs]) bool {
return i.val.Fingerprint < j.val.Fingerprint
func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool {
return i.Value().Fingerprint < j.Value().Fingerprint
},
sequences...,
)
@ -202,10 +202,10 @@ func (it *taskMergeIterator) Next() bool {
}
group := it.heap.At()
task := it.tasks[group.idx]
task := it.tasks[group.Index()]
it.curr.Fp = model.Fingerprint(group.val.Fingerprint)
it.curr.Chks = convertToChunkRefs(group.val.Refs)
it.curr.Fp = model.Fingerprint(group.Value().Fingerprint)
it.curr.Chks = convertToChunkRefs(group.Value().Refs)
it.curr.Searches = convertToSearches(task.Request.Filters)
it.curr.Response = task.ResCh
it.curr.Error = task.ErrCh

@ -12,67 +12,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
type IndexedValue[T any] struct {
idx int
val T
}
type IterWithIndex[T any] struct {
v1.Iterator[T]
zero T // zero value of T
cache IndexedValue[T]
}
func (it *IterWithIndex[T]) At() IndexedValue[T] {
it.cache.val = it.Iterator.At()
return it.cache
}
func NewIterWithIndex[T any](iter v1.Iterator[T], idx int) v1.Iterator[IndexedValue[T]] {
return &IterWithIndex[T]{
Iterator: iter,
cache: IndexedValue[T]{idx: idx},
}
}
type SliceIterWithIndex[T any] struct {
xs []T // source slice
pos int // position within the slice
zero T // zero value of T
cache IndexedValue[T]
}
func (it *SliceIterWithIndex[T]) Next() bool {
it.pos++
return it.pos < len(it.xs)
}
func (it *SliceIterWithIndex[T]) Err() error {
return nil
}
func (it *SliceIterWithIndex[T]) At() IndexedValue[T] {
it.cache.val = it.xs[it.pos]
return it.cache
}
func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) {
if it.pos+1 >= len(it.xs) {
it.cache.val = it.zero
return it.cache, false
}
it.cache.val = it.xs[it.pos+1]
return it.cache, true
}
func NewSliceIterWithIndex[T any](xs []T, idx int) v1.PeekingIterator[IndexedValue[T]] {
return &SliceIterWithIndex[T]{
xs: xs,
pos: -1,
cache: IndexedValue[T]{idx: idx},
}
}
func getDayTime(ts model.Time) time.Time {
return time.Date(ts.Time().Year(), ts.Time().Month(), ts.Time().Day(), 0, 0, 0, 0, time.UTC)
}

@ -10,34 +10,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
func TestSliceIterWithIndex(t *testing.T) {
t.Run("SliceIterWithIndex implements v1.PeekingIterator interface", func(t *testing.T) {
xs := []string{"a", "b", "c"}
it := NewSliceIterWithIndex(xs, 123)
// peek at first item
p, ok := it.Peek()
require.True(t, ok)
require.Equal(t, "a", p.val)
require.Equal(t, 123, p.idx)
// proceed to first item
require.True(t, it.Next())
require.Equal(t, "a", it.At().val)
require.Equal(t, 123, it.At().idx)
// proceed to second and third item
require.True(t, it.Next())
require.True(t, it.Next())
// peek at non-existing fourth item
p, ok = it.Peek()
require.False(t, ok)
require.Equal(t, "", p.val) // "" is zero value for type string
require.Equal(t, 123, p.idx)
})
}
func TestGetFromThrough(t *testing.T) {
chunks := []*logproto.ShortRef{
{From: 0, Through: 6},

@ -0,0 +1,37 @@
package bloomutils
import (
"io"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)
// sortMergeIterator implements v1.Iterator
type sortMergeIterator[T any, C comparable, R any] struct {
curr *R
heap *v1.HeapIterator[v1.IndexedValue[C]]
items []T
transform func(T, C, *R) *R
err error
}
func (it *sortMergeIterator[T, C, R]) Next() bool {
ok := it.heap.Next()
if !ok {
it.err = io.EOF
return false
}
group := it.heap.At()
it.curr = it.transform(it.items[group.Index()], group.Value(), it.curr)
return true
}
func (it *sortMergeIterator[T, C, R]) At() R {
return *it.curr
}
func (it *sortMergeIterator[T, C, R]) Err() error {
return it.err
}

@ -0,0 +1,146 @@
// This file contains a bunch of utility functions for bloom components.
// TODO: Find a better location for this package
package bloomutils
import (
"math"
"sort"
"github.com/grafana/dskit/ring"
"golang.org/x/exp/slices"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)
type InstanceWithTokenRange struct {
Instance ring.InstanceDesc
MinToken, MaxToken uint32
}
func (i InstanceWithTokenRange) Cmp(token uint32) v1.BoundsCheck {
if token < i.MinToken {
return v1.Before
} else if token > i.MaxToken {
return v1.After
}
return v1.Overlap
}
type InstancesWithTokenRange []InstanceWithTokenRange
func (i InstancesWithTokenRange) Contains(token uint32) bool {
for _, instance := range i {
if instance.Cmp(token) == v1.Overlap {
return true
}
}
return false
}
// GetInstanceTokenRange calculates the token range for a specific instance
// with given id based on the first token in the ring.
// This assumes that each instance in the ring is configured with only a single
// token.
func GetInstanceWithTokenRange(id string, instances []ring.InstanceDesc) InstancesWithTokenRange {
// Sorting the tokens of the instances would not be necessary if there is
// only a single token per instances, however, since we only assume one
// token, but don't enforce one token, we keep the sorting.
for _, inst := range instances {
sort.Slice(inst.Tokens, func(i, j int) bool {
return inst.Tokens[i] < inst.Tokens[j]
})
}
// Sort instances
sort.Slice(instances, func(i, j int) bool {
return instances[i].Tokens[0] < instances[j].Tokens[0]
})
idx := slices.IndexFunc(instances, func(inst ring.InstanceDesc) bool {
return inst.Id == id
})
// instance with Id == id not found
if idx == -1 {
return InstancesWithTokenRange{}
}
i := uint32(idx)
n := uint32(len(instances))
step := math.MaxUint32 / n
minToken := step * i
maxToken := step*i + step - 1
if i == n-1 {
// extend the last token tange to MaxUint32
maxToken = math.MaxUint32
}
return InstancesWithTokenRange{
{MinToken: minToken, MaxToken: maxToken, Instance: instances[i]},
}
}
// GetInstancesWithTokenRanges calculates the token ranges for a specific
// instance with given id based on all tokens in the ring.
// If the instances in the ring are configured with a single token, such as the
// bloom compactor, use GetInstanceWithTokenRange() instead.
func GetInstancesWithTokenRanges(id string, instances []ring.InstanceDesc) InstancesWithTokenRange {
servers := make([]InstanceWithTokenRange, 0, len(instances))
it := NewInstanceSortMergeIterator(instances)
var firstInst ring.InstanceDesc
var lastToken uint32
for it.Next() {
if firstInst.Id == "" {
firstInst = it.At().Instance
}
if it.At().Instance.Id == id {
servers = append(servers, it.At())
}
lastToken = it.At().MaxToken
}
// append token range from lastToken+1 to MaxUint32
// only if the instance with the first token is the current one
if len(servers) > 0 && firstInst.Id == id {
servers = append(servers, InstanceWithTokenRange{
MinToken: lastToken + 1,
MaxToken: math.MaxUint32,
Instance: servers[0].Instance,
})
}
return servers
}
// NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
// where the token of the elements are sorted in ascending order.
func NewInstanceSortMergeIterator(instances []ring.InstanceDesc) v1.Iterator[InstanceWithTokenRange] {
it := &sortMergeIterator[ring.InstanceDesc, uint32, InstanceWithTokenRange]{
items: instances,
transform: func(item ring.InstanceDesc, val uint32, prev *InstanceWithTokenRange) *InstanceWithTokenRange {
var prevToken uint32
if prev != nil {
prevToken = prev.MaxToken + 1
}
return &InstanceWithTokenRange{Instance: item, MinToken: prevToken, MaxToken: val}
},
}
sequences := make([]v1.PeekingIterator[v1.IndexedValue[uint32]], 0, len(instances))
for i := range instances {
sort.Slice(instances[i].Tokens, func(a, b int) bool {
return instances[i].Tokens[a] < instances[i].Tokens[b]
})
iter := v1.NewIterWithIndex[uint32](v1.NewSliceIter(instances[i].Tokens), i)
sequences = append(sequences, v1.NewPeekingIter[v1.IndexedValue[uint32]](iter))
}
it.heap = v1.NewHeapIterator(
func(i, j v1.IndexedValue[uint32]) bool {
return i.Value() < j.Value()
},
sequences...,
)
it.err = nil
return it
}

@ -0,0 +1,112 @@
package bloomutils
import (
"math"
"testing"
"github.com/grafana/dskit/ring"
"github.com/stretchr/testify/require"
)
func TestBloomGatewayClient_SortInstancesByToken(t *testing.T) {
input := []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{5, 9}},
{Id: "2", Tokens: []uint32{3, 7}},
{Id: "3", Tokens: []uint32{1}},
}
expected := []InstanceWithTokenRange{
{Instance: input[2], MinToken: 0, MaxToken: 1},
{Instance: input[1], MinToken: 2, MaxToken: 3},
{Instance: input[0], MinToken: 4, MaxToken: 5},
{Instance: input[1], MinToken: 6, MaxToken: 7},
{Instance: input[0], MinToken: 8, MaxToken: 9},
}
var i int
it := NewInstanceSortMergeIterator(input)
for it.Next() {
t.Log(expected[i], it.At())
require.Equal(t, expected[i], it.At())
i++
}
}
func TestBloomGatewayClient_GetInstancesWithTokenRanges(t *testing.T) {
t.Run("instance does not own first token in the ring", func(t *testing.T) {
input := []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{5, 9}},
{Id: "2", Tokens: []uint32{3, 7}},
{Id: "3", Tokens: []uint32{1}},
}
expected := InstancesWithTokenRange{
{Instance: input[1], MinToken: 2, MaxToken: 3},
{Instance: input[1], MinToken: 6, MaxToken: 7},
}
result := GetInstancesWithTokenRanges("2", input)
require.Equal(t, expected, result)
})
t.Run("instance owns first token in the ring", func(t *testing.T) {
input := []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{5, 9}},
{Id: "2", Tokens: []uint32{3, 7}},
{Id: "3", Tokens: []uint32{1}},
}
expected := InstancesWithTokenRange{
{Instance: input[2], MinToken: 0, MaxToken: 1},
{Instance: input[2], MinToken: 10, MaxToken: math.MaxUint32},
}
result := GetInstancesWithTokenRanges("3", input)
require.Equal(t, expected, result)
})
}
func TestBloomGatewayClient_GetInstanceWithTokenRange(t *testing.T) {
for name, tc := range map[string]struct {
id string
input []ring.InstanceDesc
expected InstancesWithTokenRange
}{
"first instance includes 0 token": {
id: "3",
input: []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{3}},
{Id: "2", Tokens: []uint32{5}},
{Id: "3", Tokens: []uint32{1}},
},
expected: InstancesWithTokenRange{
{Instance: ring.InstanceDesc{Id: "3", Tokens: []uint32{1}}, MinToken: 0, MaxToken: math.MaxUint32/3 - 1},
},
},
"middle instance": {
id: "1",
input: []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{3}},
{Id: "2", Tokens: []uint32{5}},
{Id: "3", Tokens: []uint32{1}},
},
expected: InstancesWithTokenRange{
{Instance: ring.InstanceDesc{Id: "1", Tokens: []uint32{3}}, MinToken: math.MaxUint32 / 3, MaxToken: math.MaxUint32/3*2 - 1},
},
},
"last instance includes MaxUint32 token": {
id: "2",
input: []ring.InstanceDesc{
{Id: "1", Tokens: []uint32{3}},
{Id: "2", Tokens: []uint32{5}},
{Id: "3", Tokens: []uint32{1}},
},
expected: InstancesWithTokenRange{
{Instance: ring.InstanceDesc{Id: "2", Tokens: []uint32{5}}, MinToken: math.MaxUint32 / 3 * 2, MaxToken: math.MaxUint32},
},
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
result := GetInstanceWithTokenRange(tc.id, tc.input)
require.Equal(t, tc.expected, result)
})
}
}

@ -0,0 +1,70 @@
package v1
type IndexedValue[T any] struct {
idx int
val T
}
func (iv IndexedValue[T]) Value() T {
return iv.val
}
func (iv IndexedValue[T]) Index() int {
return iv.idx
}
type IterWithIndex[T any] struct {
Iterator[T]
zero T // zero value of T
cache IndexedValue[T]
}
func (it *IterWithIndex[T]) At() IndexedValue[T] {
it.cache.val = it.Iterator.At()
return it.cache
}
func NewIterWithIndex[T any](iter Iterator[T], idx int) Iterator[IndexedValue[T]] {
return &IterWithIndex[T]{
Iterator: iter,
cache: IndexedValue[T]{idx: idx},
}
}
type SliceIterWithIndex[T any] struct {
xs []T // source slice
pos int // position within the slice
zero T // zero value of T
cache IndexedValue[T]
}
func (it *SliceIterWithIndex[T]) Next() bool {
it.pos++
return it.pos < len(it.xs)
}
func (it *SliceIterWithIndex[T]) Err() error {
return nil
}
func (it *SliceIterWithIndex[T]) At() IndexedValue[T] {
it.cache.val = it.xs[it.pos]
return it.cache
}
func (it *SliceIterWithIndex[T]) Peek() (IndexedValue[T], bool) {
if it.pos+1 >= len(it.xs) {
it.cache.val = it.zero
return it.cache, false
}
it.cache.val = it.xs[it.pos+1]
return it.cache, true
}
func NewSliceIterWithIndex[T any](xs []T, idx int) PeekingIterator[IndexedValue[T]] {
return &SliceIterWithIndex[T]{
xs: xs,
pos: -1,
cache: IndexedValue[T]{idx: idx},
}
}

@ -0,0 +1,35 @@
package v1
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestSliceIterWithIndex(t *testing.T) {
t.Run("SliceIterWithIndex implements PeekingIterator interface", func(t *testing.T) {
xs := []string{"a", "b", "c"}
it := NewSliceIterWithIndex(xs, 123)
// peek at first item
p, ok := it.Peek()
require.True(t, ok)
require.Equal(t, "a", p.val)
require.Equal(t, 123, p.idx)
// proceed to first item
require.True(t, it.Next())
require.Equal(t, "a", it.At().val)
require.Equal(t, 123, it.At().idx)
// proceed to second and third item
require.True(t, it.Next())
require.True(t, it.Next())
// peek at non-existing fourth item
p, ok = it.Peek()
require.False(t, ok)
require.Equal(t, "", p.val) // "" is zero value for type string
require.Equal(t, 123, p.idx)
})
}
Loading…
Cancel
Save