From 71fa802c86a39701f2cc171caa1392fcb9309eba Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 20 Feb 2024 18:06:29 +0100 Subject: [PATCH] Map ring token keyspace (uint32) into fingerprint keyspace (uint64) (#11975) In order to compare the keyspace from the bloom gateway's ring with the fingerprints, they need to use the same keyspace. The ring, however, uses tokens within a `uint32` keyspace, whereas the fingerprints use a `uint64` keyspace. Therefore the ranges from the ring tokens need to be mapped into the `uint64` keyspace. This is done bit-shifting the Min value 32 bits to the left and the Max value 32 bits to the left plus adding `(1<<32)-1` to "fill" the remaining values up to the next Min value. The structs used to combined ring instance information and token/fingerprint ranges are yet to be simplified. However, this is not goal of this PR. Signed-off-by: Christian Haudum Co-authored-by: Owen Diehl --- pkg/bloomgateway/client.go | 72 ++++---- pkg/bloomgateway/client_test.go | 302 ++++++++++++++++++-------------- pkg/bloomutils/ring.go | 6 +- 3 files changed, 220 insertions(+), 160 deletions(-) diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index e5fd35d884..721e0c35ca 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -291,16 +291,26 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway return err } -func groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, servers []addrsWithTokenRange) []instanceWithFingerprints { +func groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, servers []addrsWithBounds) []instanceWithFingerprints { boundedFingerprints := partitionFingerprintsByAddresses(groups, servers) return groupByInstance(boundedFingerprints) } -func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithTokenRange, error) { +func mapTokenRangeToFingerprintRange(r bloomutils.Range[uint32]) v1.FingerprintBounds { + minFp := uint64(r.Min) << 32 + maxFp := uint64(r.Max) << 32 + return v1.NewBounds( + model.Fingerprint(minFp), + model.Fingerprint(maxFp|math.MaxUint32), + ) +} + +func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithBounds, error) { bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() - servers := make([]addrsWithTokenRange, 0, len(instances)) + servers := make([]addrsWithBounds, 0, len(instances)) it := bloomutils.NewInstanceSortMergeIterator(instances) + for it.Next() { // We can use on of the tokens from the token range // to obtain all addresses for that token. @@ -308,54 +318,56 @@ func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.Inst if err != nil { return nil, errors.Wrap(err, "bloom gateway get ring") } - servers = append(servers, addrsWithTokenRange{ - id: it.At().Instance.Id, - addrs: rs.GetAddresses(), - tokenRange: it.At().TokenRange, + + bounds := mapTokenRangeToFingerprintRange(it.At().TokenRange) + servers = append(servers, addrsWithBounds{ + id: it.At().Instance.Id, + addrs: rs.GetAddresses(), + FingerprintBounds: bounds, }) } - if len(servers) > 0 && servers[len(servers)-1].tokenRange.Max < math.MaxUint32 { - // append the instance for the token range between the greates token and MaxUint32 - servers = append(servers, addrsWithTokenRange{ - id: servers[0].id, - addrs: servers[0].addrs, - tokenRange: bloomutils.NewTokenRange(servers[len(servers)-1].tokenRange.Max+1, math.MaxUint32), + if len(servers) > 0 && servers[len(servers)-1].Max < math.MaxUint64 { + // append the instance for the range between the maxFp and MaxUint64 + // TODO(owen-d): support wrapping around keyspace for token ranges + servers = append(servers, addrsWithBounds{ + id: servers[0].id, + addrs: servers[0].addrs, + FingerprintBounds: v1.NewBounds( + servers[len(servers)-1].Max+1, + model.Fingerprint(math.MaxUint64), + ), }) } return servers, nil } -type addrsWithTokenRange struct { - id string - addrs []string - tokenRange bloomutils.Range[uint32] -} - -func (s addrsWithTokenRange) cmp(token uint32) v1.BoundsCheck { - return s.tokenRange.Cmp(token) +type addrsWithBounds struct { + v1.FingerprintBounds + id string + addrs []string } type instanceWithFingerprints struct { - instance addrsWithTokenRange + instance addrsWithBounds fingerprints []*logproto.GroupedChunkRefs } -func partitionFingerprintsByAddresses(fingerprints []*logproto.GroupedChunkRefs, addresses []addrsWithTokenRange) (result []instanceWithFingerprints) { +func partitionFingerprintsByAddresses(fingerprints []*logproto.GroupedChunkRefs, addresses []addrsWithBounds) (result []instanceWithFingerprints) { for _, instance := range addresses { - min, _ := slices.BinarySearchFunc(fingerprints, instance.tokenRange, func(g *logproto.GroupedChunkRefs, r bloomutils.Range[uint32]) int { - if uint32(g.Fingerprint) < r.Min { + min, _ := slices.BinarySearchFunc(fingerprints, instance.FingerprintBounds, func(g *logproto.GroupedChunkRefs, b v1.FingerprintBounds) int { + if g.Fingerprint < uint64(b.Min) { return -1 - } else if uint32(g.Fingerprint) > r.Min { + } else if g.Fingerprint > uint64(b.Min) { return 1 } return 0 }) - max, _ := slices.BinarySearchFunc(fingerprints, instance.tokenRange, func(g *logproto.GroupedChunkRefs, r bloomutils.Range[uint32]) int { - if uint32(g.Fingerprint) <= r.Max { + max, _ := slices.BinarySearchFunc(fingerprints, instance.FingerprintBounds, func(g *logproto.GroupedChunkRefs, b v1.FingerprintBounds) int { + if g.Fingerprint <= uint64(b.Max) { return -1 - } else if uint32(g.Fingerprint) > r.Max { + } else if g.Fingerprint > uint64(b.Max) { return 1 } return 0 @@ -398,7 +410,7 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW pos[cur.instance.id] = len(result) result = append(result, instanceWithFingerprints{ - instance: addrsWithTokenRange{ + instance: addrsWithBounds{ id: cur.instance.id, addrs: cur.instance.addrs, }, diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index 8a9a3d3564..71ac0ec063 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -17,12 +17,10 @@ import ( "github.com/grafana/loki/pkg/bloomutils" "github.com/grafana/loki/pkg/logproto" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/validation" ) -// short constructor -var newTr = bloomutils.NewTokenRange - func TestBloomGatewayClient(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewRegistry() @@ -43,24 +41,20 @@ func TestBloomGatewayClient(t *testing.T) { } func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { + // Create 10 fingerprints [0, 2, 4, ... 18] + groups := make([]*logproto.GroupedChunkRefs, 0, 10) + for i := 0; i < 20; i += 2 { + groups = append(groups, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)}) + } + // instance token ranges do not overlap t.Run("non-overlapping", func(t *testing.T) { - groups := []*logproto.GroupedChunkRefs{ - {Fingerprint: 0}, - {Fingerprint: 100}, - {Fingerprint: 101}, - {Fingerprint: 200}, - {Fingerprint: 201}, - {Fingerprint: 300}, - {Fingerprint: 301}, - {Fingerprint: 400}, - {Fingerprint: 401}, // out of bounds, will be dismissed - } - servers := []addrsWithTokenRange{ - {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 100)}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(101, 200)}, - {id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(201, 300)}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(301, 400)}, + + servers := []addrsWithBounds{ + {id: "instance-1", addrs: []string{"10.0.0.1"}, FingerprintBounds: v1.NewBounds(0, 4)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, FingerprintBounds: v1.NewBounds(5, 9)}, + {id: "instance-3", addrs: []string{"10.0.0.3"}, FingerprintBounds: v1.NewBounds(10, 14)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, FingerprintBounds: v1.NewBounds(15, 19)}, } // partition fingerprints @@ -70,28 +64,30 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { instance: servers[0], fingerprints: []*logproto.GroupedChunkRefs{ {Fingerprint: 0}, - {Fingerprint: 100}, + {Fingerprint: 2}, + {Fingerprint: 4}, }, }, { instance: servers[1], fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 101}, - {Fingerprint: 200}, + {Fingerprint: 6}, + {Fingerprint: 8}, }, }, { instance: servers[2], fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 201}, - {Fingerprint: 300}, + {Fingerprint: 10}, + {Fingerprint: 12}, + {Fingerprint: 14}, }, }, { instance: servers[3], fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 301}, - {Fingerprint: 400}, + {Fingerprint: 16}, + {Fingerprint: 18}, }, }, } @@ -103,26 +99,28 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { expected = []instanceWithFingerprints{ { - instance: addrsWithTokenRange{id: "instance-1", addrs: []string{"10.0.0.1"}}, + instance: addrsWithBounds{id: "instance-1", addrs: []string{"10.0.0.1"}}, fingerprints: []*logproto.GroupedChunkRefs{ {Fingerprint: 0}, - {Fingerprint: 100}, + {Fingerprint: 2}, + {Fingerprint: 4}, }, }, { - instance: addrsWithTokenRange{id: "instance-2", addrs: []string{"10.0.0.2"}}, + instance: addrsWithBounds{id: "instance-2", addrs: []string{"10.0.0.2"}}, fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 101}, - {Fingerprint: 200}, - {Fingerprint: 301}, - {Fingerprint: 400}, + {Fingerprint: 6}, + {Fingerprint: 8}, + {Fingerprint: 16}, + {Fingerprint: 18}, }, }, { - instance: addrsWithTokenRange{id: "instance-3", addrs: []string{"10.0.0.3"}}, + instance: addrsWithBounds{id: "instance-3", addrs: []string{"10.0.0.3"}}, fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 201}, - {Fingerprint: 300}, + {Fingerprint: 10}, + {Fingerprint: 12}, + {Fingerprint: 14}, }, }, } @@ -132,33 +130,45 @@ func TestBloomGatewayClient_PartitionFingerprintsByAddresses(t *testing.T) { // instance token ranges overlap t.Run("overlapping", func(t *testing.T) { - groups := []*logproto.GroupedChunkRefs{ - {Fingerprint: 50}, - {Fingerprint: 150}, - {Fingerprint: 250}, - {Fingerprint: 350}, - } - servers := []addrsWithTokenRange{ - {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 200)}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(100, 300)}, - {id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(200, 400)}, + servers := []addrsWithBounds{ + {id: "instance-1", addrs: []string{"10.0.0.1"}, FingerprintBounds: v1.NewBounds(0, 9)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, FingerprintBounds: v1.NewBounds(5, 14)}, + {id: "instance-3", addrs: []string{"10.0.0.3"}, FingerprintBounds: v1.NewBounds(10, 19)}, } // partition fingerprints expected := []instanceWithFingerprints{ - {instance: servers[0], fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 50}, - {Fingerprint: 150}, - }}, - {instance: servers[1], fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 150}, - {Fingerprint: 250}, - }}, - {instance: servers[2], fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 250}, - {Fingerprint: 350}, - }}, + { + instance: servers[0], + fingerprints: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0}, + {Fingerprint: 2}, + {Fingerprint: 4}, + {Fingerprint: 6}, + {Fingerprint: 8}, + }, + }, + { + instance: servers[1], + fingerprints: []*logproto.GroupedChunkRefs{ + {Fingerprint: 6}, + {Fingerprint: 8}, + {Fingerprint: 10}, + {Fingerprint: 12}, + {Fingerprint: 14}, + }, + }, + { + instance: servers[2], + fingerprints: []*logproto.GroupedChunkRefs{ + {Fingerprint: 10}, + {Fingerprint: 12}, + {Fingerprint: 14}, + {Fingerprint: 16}, + {Fingerprint: 18}, + }, + }, } bounded := partitionFingerprintsByAddresses(groups, servers) @@ -177,12 +187,15 @@ func BenchmarkPartitionFingerprintsByAddresses(b *testing.B) { numServers := 100 tokenStep := math.MaxUint32 / uint32(numServers) - servers := make([]addrsWithTokenRange, 0, numServers) + servers := make([]addrsWithBounds, 0, numServers) for i := uint32(0); i < math.MaxUint32-tokenStep; i += tokenStep { - servers = append(servers, addrsWithTokenRange{ - id: fmt.Sprintf("instance-%x", i), - addrs: []string{fmt.Sprintf("%d", i)}, - tokenRange: newTr(i, i+tokenStep), + servers = append(servers, addrsWithBounds{ + id: fmt.Sprintf("instance-%x", i), + addrs: []string{fmt.Sprintf("%d", i)}, + FingerprintBounds: v1.NewBounds( + model.Fingerprint(i)<<32, + model.Fingerprint(i+tokenStep)<<32, + ), }) } @@ -193,34 +206,55 @@ func BenchmarkPartitionFingerprintsByAddresses(b *testing.B) { } } +func TestBloomGatewayClient_MapTokenRangeToFingerprintRange(t *testing.T) { + testCases := map[string]struct { + lshift int + inp bloomutils.Range[uint32] + exp v1.FingerprintBounds + }{ + "single token expands to multiple fingerprints": { + inp: bloomutils.NewTokenRange(0, 0), + exp: v1.NewBounds(0, 0xffffffff), + }, + "max value expands to max value of new range": { + inp: bloomutils.NewTokenRange((1 << 31), math.MaxUint32), + exp: v1.NewBounds((1 << 63), 0xffffffffffffffff), + }, + } + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + actual := mapTokenRangeToFingerprintRange(tc.inp) + require.Equal(t, tc.exp, actual) + }) + } +} + func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) { testCases := map[string]struct { instances []ring.InstanceDesc - expected []addrsWithTokenRange + expected []addrsWithBounds }{ - "one token per instance": { + "one token per instance, no gaps between fingerprint ranges": { instances: []ring.InstanceDesc{ - {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{math.MaxUint32 / 6 * 1}}, - {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 6 * 3}}, - {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{math.MaxUint32 / 6 * 5}}, + {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{(1 << 30) * 1}}, // 0x40000000 + {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{(1 << 30) * 2}}, // 0x80000000 + {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{(1 << 30) * 3}}, // 0xc0000000 }, - expected: []addrsWithTokenRange{ - {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, math.MaxUint32/6*1)}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/6*1+1, math.MaxUint32/6*3)}, - {id: "instance-3", addrs: []string{"10.0.0.3"}, tokenRange: newTr(math.MaxUint32/6*3+1, math.MaxUint32/6*5)}, - {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/6*5+1, math.MaxUint32)}, + expected: []addrsWithBounds{ + {id: "instance-1", addrs: []string{"10.0.0.1"}, FingerprintBounds: v1.NewBounds(0, 4611686022722355199)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, FingerprintBounds: v1.NewBounds(4611686022722355200, 9223372041149743103)}, + {id: "instance-3", addrs: []string{"10.0.0.3"}, FingerprintBounds: v1.NewBounds(9223372041149743104, 13835058059577131007)}, + {id: "instance-1", addrs: []string{"10.0.0.1"}, FingerprintBounds: v1.NewBounds(13835058059577131008, 18446744073709551615)}, }, }, - "MinUint32 and MaxUint32 are tokens in the ring": { + "MinUint32 and MaxUint32 are actual tokens in the ring": { instances: []ring.InstanceDesc{ - {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0, math.MaxUint32 / 3 * 2}}, - {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32 / 3 * 1, math.MaxUint32}}, + {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0}}, + {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{math.MaxUint32}}, }, - expected: []addrsWithTokenRange{ - {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(0, 0)}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(1, math.MaxUint32/3)}, - {id: "instance-1", addrs: []string{"10.0.0.1"}, tokenRange: newTr(math.MaxUint32/3*1+1, math.MaxUint32/3*2)}, - {id: "instance-2", addrs: []string{"10.0.0.2"}, tokenRange: newTr(math.MaxUint32/3*2+1, math.MaxUint32)}, + expected: []addrsWithBounds{ + {id: "instance-1", addrs: []string{"10.0.0.1"}, FingerprintBounds: v1.NewBounds(0, (1<<32)-1)}, + {id: "instance-2", addrs: []string{"10.0.0.2"}, FingerprintBounds: v1.NewBounds((1 << 32), math.MaxUint64)}, }, }, } @@ -239,15 +273,27 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) { func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { instances := []ring.InstanceDesc{ - {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{2146405214, 1029997044, 678878693}}, - {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{296463531, 1697323986, 800258284}}, - {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{2014002871, 315617625, 1036168527}}, + {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{0x1fffffff, 0x7fffffff}}, + {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{0x3fffffff, 0x9fffffff}}, + {Id: "instance-3", Addr: "10.0.0.3", Tokens: []uint32{0x5fffffff, 0xbfffffff}}, } - it := bloomutils.NewInstanceSortMergeIterator(instances) - for it.Next() { - t.Log(it.At().TokenRange.Max, it.At().Instance.Addr) - } + subRing := newMockRing(instances) + servers, err := serverAddressesWithTokenRanges(subRing, instances) + require.NoError(t, err) + + // for _, s := range servers { + // t.Log(s, v1.NewBounds(model.Fingerprint(s.fpRange.Min), model.Fingerprint(s.fpRange.Max))) + // } + /** + {instance-1 [10.0.0.1] { 0 536870911} { 0 2305843004918726656}} 0000000000000000-1fffffff00000000 + {instance-2 [10.0.0.2] { 536870912 1073741823} { 2305843009213693952 4611686014132420608}} 2000000000000000-3fffffff00000000 + {instance-3 [10.0.0.3] {1073741824 1610612735} { 4611686018427387904 6917529023346114560}} 4000000000000000-5fffffff00000000 + {instance-1 [10.0.0.1] {1610612736 2147483647} { 6917529027641081856 9223372032559808512}} 6000000000000000-7fffffff00000000 + {instance-2 [10.0.0.2] {2147483648 2684354559} { 9223372036854775808 11529215041773502464}} 8000000000000000-9fffffff00000000 + {instance-3 [10.0.0.3] {2684354560 3221225471} {11529215046068469760 13835058050987196416}} a000000000000000-bfffffff00000000 + {instance-1 [10.0.0.1] {3221225472 4294967295} {13835058055282163712 18446744073709551615}} c000000000000000-ffffffffffffffff + **/ testCases := []struct { name string @@ -262,18 +308,20 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { { name: "fingerprints within a single token range are grouped", chunks: []*logproto.GroupedChunkRefs{ - {Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}}, - {Fingerprint: 1000000001, Refs: []*logproto.ShortRef{{Checksum: 2}}}, + {Fingerprint: 0x5000000000000001}, + {Fingerprint: 0x5000000000000010}, + {Fingerprint: 0x5000000000000100}, }, expected: []instanceWithFingerprints{ { - instance: addrsWithTokenRange{ - id: "instance-1", - addrs: []string{"10.0.0.1"}, + instance: addrsWithBounds{ + id: "instance-3", + addrs: []string{"10.0.0.3"}, }, fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}}, - {Fingerprint: 1000000001, Refs: []*logproto.ShortRef{{Checksum: 2}}}, + {Fingerprint: 0x5000000000000001}, + {Fingerprint: 0x5000000000000010}, + {Fingerprint: 0x5000000000000100}, }, }, }, @@ -281,18 +329,20 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { { name: "fingerprints within multiple token ranges of a single instance are grouped", chunks: []*logproto.GroupedChunkRefs{ - {Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}}, - {Fingerprint: 2100000000, Refs: []*logproto.ShortRef{{Checksum: 2}}}, + {Fingerprint: 0x1000000000000000}, + {Fingerprint: 0x7000000000000000}, + {Fingerprint: 0xd000000000000000}, }, expected: []instanceWithFingerprints{ { - instance: addrsWithTokenRange{ + instance: addrsWithBounds{ id: "instance-1", addrs: []string{"10.0.0.1"}, }, fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}}, - {Fingerprint: 2100000000, Refs: []*logproto.ShortRef{{Checksum: 2}}}, + {Fingerprint: 0x1000000000000000}, + {Fingerprint: 0x7000000000000000}, + {Fingerprint: 0xd000000000000000}, }, }, }, @@ -300,55 +350,52 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { { name: "fingerprints with token ranges of multiple instances are grouped", chunks: []*logproto.GroupedChunkRefs{ - // instance 1 - {Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}}, - // instance 1 - {Fingerprint: 2100000000, Refs: []*logproto.ShortRef{{Checksum: 2}}}, - // instance 2 - {Fingerprint: 290000000, Refs: []*logproto.ShortRef{{Checksum: 3}}}, - // instance 2 (fingerprint equals instance token) - {Fingerprint: 800258284, Refs: []*logproto.ShortRef{{Checksum: 4}}}, - // instance 2 (fingerprint greater than greatest token) - {Fingerprint: 2147483648, Refs: []*logproto.ShortRef{{Checksum: 5}}}, - // instance 3 - {Fingerprint: 1029997045, Refs: []*logproto.ShortRef{{Checksum: 6}}}, + {Fingerprint: 0x1000000000000000}, + {Fingerprint: 0x3000000000000000}, + {Fingerprint: 0x5000000000000000}, + {Fingerprint: 0x7000000000000000}, + {Fingerprint: 0x9000000000000000}, + {Fingerprint: 0xb000000000000000}, + {Fingerprint: 0xd000000000000000}, + {Fingerprint: 0xf000000000000000}, }, expected: []instanceWithFingerprints{ { - instance: addrsWithTokenRange{ - id: "instance-2", - addrs: []string{"10.0.0.2"}, + instance: addrsWithBounds{ + id: "instance-1", + addrs: []string{"10.0.0.1"}, }, fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 290000000, Refs: []*logproto.ShortRef{{Checksum: 3}}}, - {Fingerprint: 800258284, Refs: []*logproto.ShortRef{{Checksum: 4}}}, - {Fingerprint: 2147483648, Refs: []*logproto.ShortRef{{Checksum: 5}}}, + {Fingerprint: 0x1000000000000000}, + {Fingerprint: 0x7000000000000000}, + {Fingerprint: 0xd000000000000000}, + {Fingerprint: 0xf000000000000000}, }, }, { - instance: addrsWithTokenRange{ - id: "instance-1", - addrs: []string{"10.0.0.1"}, + instance: addrsWithBounds{ + id: "instance-2", + addrs: []string{"10.0.0.2"}, }, fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 1000000000, Refs: []*logproto.ShortRef{{Checksum: 1}}}, - {Fingerprint: 2100000000, Refs: []*logproto.ShortRef{{Checksum: 2}}}, + {Fingerprint: 0x3000000000000000}, + {Fingerprint: 0x9000000000000000}, }, }, { - instance: addrsWithTokenRange{ + instance: addrsWithBounds{ id: "instance-3", addrs: []string{"10.0.0.3"}, }, fingerprints: []*logproto.GroupedChunkRefs{ - {Fingerprint: 1029997045, Refs: []*logproto.ShortRef{{Checksum: 6}}}, + {Fingerprint: 0x5000000000000000}, + {Fingerprint: 0xb000000000000000}, }, }, }, }, } - subRing := newMockRing(instances) for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { @@ -356,9 +403,6 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { sort.Slice(tc.chunks, func(i, j int) bool { return tc.chunks[i].Fingerprint < tc.chunks[j].Fingerprint }) - - servers, err := serverAddressesWithTokenRanges(subRing, instances) - require.NoError(t, err) res := groupFingerprintsByServer(tc.chunks, servers) require.Equal(t, tc.expected, res) }) diff --git a/pkg/bloomutils/ring.go b/pkg/bloomutils/ring.go index d2aebe5b88..b3246fd587 100644 --- a/pkg/bloomutils/ring.go +++ b/pkg/bloomutils/ring.go @@ -44,8 +44,12 @@ func (r Range[T]) Cmp(t T) v1.BoundsCheck { return v1.Overlap } +func NewRange[T constraints.Unsigned](min, max T) Range[T] { + return Range[T]{Min: min, Max: max} +} + func NewTokenRange(min, max uint32) Range[uint32] { - return Range[uint32]{min, max} + return Range[uint32]{Min: min, Max: max} } type InstanceWithTokenRange struct {