(chore) Cleanup duplicate functions/strucs in bloomgateway package (#11978)

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/11946/head
Christian Haudum 2 years ago committed by GitHub
parent ef40136715
commit fb728a6781
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 48
      pkg/bloomgateway/processor.go
  2. 14
      pkg/bloomgateway/util.go
  3. 6
      pkg/bloomgateway/util_test.go

@ -3,7 +3,6 @@ package bloomgateway
import (
"context"
"math"
"sort"
"time"
"github.com/go-kit/log"
@ -13,11 +12,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
type tasksForBlock struct {
blockRef bloomshipper.BlockRef
tasks []Task
}
func newProcessor(id string, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
return &processor{
id: id,
@ -66,13 +60,13 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
p.metrics.metasFetched.WithLabelValues(p.id).Observe(float64(len(metas)))
blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces)
return p.processBlocks(ctx, partition(tasks, blocksRefs))
return p.processBlocks(ctx, partitionTasks(tasks, blocksRefs))
}
func (p *processor) processBlocks(ctx context.Context, data []tasksForBlock) error {
func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) error {
refs := make([]bloomshipper.BlockRef, len(data))
for _, block := range data {
refs = append(refs, block.blockRef)
refs = append(refs, block.ref)
}
bqs, err := p.store.FetchBlocks(ctx, refs)
@ -87,7 +81,7 @@ outer:
for blockIter.Next() {
bq := blockIter.At()
for i, block := range data {
if block.blockRef.Bounds.Equal(bq.Bounds) {
if block.ref.Bounds.Equal(bq.Bounds) {
err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
bq.Close()
if err != nil {
@ -146,37 +140,3 @@ func group[K comparable, V any, S ~[]V](s S, f func(v V) K) map[K]S {
}
return m
}
func partition(tasks []Task, blocks []bloomshipper.BlockRef) []tasksForBlock {
result := make([]tasksForBlock, 0, len(blocks))
for _, block := range blocks {
bounded := tasksForBlock{
blockRef: block,
}
for _, task := range tasks {
refs := task.series
min := sort.Search(len(refs), func(i int) bool {
return block.Cmp(refs[i].Fingerprint) > v1.Before
})
max := sort.Search(len(refs), func(i int) bool {
return block.Cmp(refs[i].Fingerprint) == v1.After
})
// All fingerprints fall outside of the consumer's range
if min == len(refs) || max == 0 {
continue
}
bounded.tasks = append(bounded.tasks, task.Copy(refs[min:max]))
}
if len(bounded.tasks) > 0 {
result = append(result, bounded)
}
}
return result
}

@ -83,15 +83,17 @@ func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs {
return result
}
type boundedTasks struct {
blockRef bloomshipper.BlockRef
tasks []Task
type blockWithTasks struct {
ref bloomshipper.BlockRef
tasks []Task
}
func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (result []boundedTasks) {
func partitionTasks(tasks []Task, blocks []bloomshipper.BlockRef) []blockWithTasks {
result := make([]blockWithTasks, 0, len(blocks))
for _, block := range blocks {
bounded := boundedTasks{
blockRef: block,
bounded := blockWithTasks{
ref: block,
}
for _, task := range tasks {

@ -73,7 +73,7 @@ func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef {
}
}
func TestPartitionFingerprintRange(t *testing.T) {
func TestPartitionTasks(t *testing.T) {
t.Run("consecutive block ranges", func(t *testing.T) {
bounds := []bloomshipper.BlockRef{
@ -93,7 +93,7 @@ func TestPartitionFingerprintRange(t *testing.T) {
tasks[i%nTasks].series = append(tasks[i%nTasks].series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}
results := partitionFingerprintRange(tasks, bounds)
results := partitionTasks(tasks, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range
actualFingerprints := make([]*logproto.GroupedChunkRefs, 0, nSeries)
@ -128,7 +128,7 @@ func TestPartitionFingerprintRange(t *testing.T) {
task.series = append(task.series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}
results := partitionFingerprintRange([]Task{task}, bounds)
results := partitionTasks([]Task{task}, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range
for _, res := range results {
// ensure we have the right number of tasks per bound

Loading…
Cancel
Save