Bloom Gateway: Partition requests into multiple tasks before enqueuing (#11768)

**What this PR does / why we need it**:

This PR changes at which point a request is split into multiple tasks in
case the chunks boundaries of the request overlap multiple days.

This solves the problem that the request handler needs to know for how
many responses from the block querier needs to wait.

Previously, a single task that spanned across multiple days was executed
multiple times (once for each matching day).

---------

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/11784/head
Christian Haudum 2 years ago committed by GitHub
parent 5c8fd520d3
commit 6871f3c555
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 68
      pkg/bloomgateway/bloomgateway.go
  2. 153
      pkg/bloomgateway/multiplexing.go
  3. 148
      pkg/bloomgateway/multiplexing_test.go
  4. 76
      pkg/bloomgateway/util.go
  5. 189
      pkg/bloomgateway/util_test.go
  6. 34
      pkg/bloomgateway/worker.go

@ -299,6 +299,18 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, err
}
// start time == end time --> empty response
if req.From.Equal(req.Through) {
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
}, nil
}
// start time > end time --> error response
if req.Through.Before(req.From) {
return nil, errors.New("from time must not be after through time")
}
numChunksUnfiltered := len(req.Refs)
// Shortcut if request does not contain filters
@ -315,21 +327,64 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint
})
task, resCh, errCh, err := NewTask(tenantID, req)
var expectedResponses int
seriesWithBloomsPerDay := partitionRequest(req)
// no tasks --> empty response
if len(seriesWithBloomsPerDay) == 0 {
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
}, nil
}
tasks := make([]Task, 0, len(seriesWithBloomsPerDay))
for _, seriesWithBounds := range seriesWithBloomsPerDay {
task, err := NewTask(tenantID, seriesWithBounds, req.Filters)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
expectedResponses += len(seriesWithBounds.series)
}
g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())
level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID)
errCh := make(chan error, 1)
resCh := make(chan v1.Output, 1)
for _, task := range tasks {
level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID, "day", task.day, "series", len(task.series))
g.queue.Enqueue(tenantID, []string{}, task, func() {
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
})
requestCount := len(req.Refs)
responses := responsesPool.Get(requestCount)
// Forward responses or error to the main channels
// TODO(chaudum): Refactor to make tasks cancelable
go func(t Task) {
for {
select {
case <-ctx.Done():
return
case err := <-t.ErrCh:
if ctx.Err() != nil {
level.Warn(g.logger).Log("msg", "received err from channel, but context is already done", "err", ctx.Err())
return
}
errCh <- err
case res := <-t.ResCh:
level.Debug(g.logger).Log("msg", "got partial result", "task", t.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
if ctx.Err() != nil {
level.Warn(g.logger).Log("msg", "received res from channel, but context is already done", "err", ctx.Err())
return
}
resCh <- res
}
}
}(task)
}
responses := responsesPool.Get(expectedResponses)
defer responsesPool.Put(responses)
outer:
@ -342,9 +397,9 @@ outer:
case res := <-resCh:
responses = append(responses, res)
// log line is helpful for debugging tests
level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
level.Debug(g.logger).Log("msg", "got partial result", "progress", fmt.Sprintf("%d/%d", len(responses), expectedResponses))
// wait for all parts of the full response
if len(responses) == requestCount {
if len(responses) == expectedResponses {
break outer
}
}
@ -354,7 +409,6 @@ outer:
if o.Removals.Len() == 0 {
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
removeNotMatchingChunks(req, o, g.logger)
}

@ -1,13 +1,13 @@
package bloomgateway
import (
"sort"
"time"
"github.com/oklog/ulid"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)
@ -25,151 +25,76 @@ type Task struct {
ID ulid.ULID
// Tenant is the tenant ID
Tenant string
// Request is the original request
Request *logproto.FilterChunkRefRequest
// ErrCh is a send-only channel to write an error to
ErrCh chan<- error
ErrCh chan error
// ResCh is a send-only channel to write partial responses to
ResCh chan<- v1.Output
ResCh chan v1.Output
// series of the original request
series []*logproto.GroupedChunkRefs
// filters of the original request
filters []syntax.LineFilter
// from..through date of the task's chunks
bounds model.Interval
// TODO(chaudum): Investigate how to remove that.
day model.Time
}
// NewTask returns a new Task that can be enqueued to the task queue.
// In addition, it returns a result and an error channel, as well
// as an error if the instantiation fails.
func NewTask(tenantID string, req *logproto.FilterChunkRefRequest) (Task, chan v1.Output, chan error, error) {
func NewTask(tenantID string, refs seriesWithBounds, filters []syntax.LineFilter) (Task, error) {
key, err := ulid.New(ulid.Now(), nil)
if err != nil {
return Task{}, nil, nil, err
return Task{}, err
}
errCh := make(chan error, 1)
resCh := make(chan v1.Output, 1)
resCh := make(chan v1.Output, len(refs.series))
task := Task{
ID: key,
Tenant: tenantID,
Request: req,
ErrCh: errCh,
ResCh: resCh,
filters: filters,
series: refs.series,
bounds: refs.bounds,
day: refs.day,
}
return task, nil
}
return task, resCh, errCh, nil
func (t Task) Bounds() (model.Time, model.Time) {
return t.bounds.Start, t.bounds.End
}
// Copy returns a copy of the existing task but with a new slice of chunks
func (t Task) Copy(refs []*logproto.GroupedChunkRefs) Task {
// Copy returns a copy of the existing task but with a new slice of grouped chunk refs
func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
return Task{
ID: t.ID,
ID: ulid.ULID{}, // create emty ID to distinguish it as copied task
Tenant: t.Tenant,
Request: &logproto.FilterChunkRefRequest{
From: t.Request.From,
Through: t.Request.Through,
Filters: t.Request.Filters,
Refs: refs,
},
ErrCh: t.ErrCh,
ResCh: t.ResCh,
filters: t.filters,
series: series,
bounds: t.bounds,
day: t.day,
}
}
// Bounds returns the day boundaries of the task
func (t Task) Bounds() (time.Time, time.Time) {
return getDayTime(t.Request.From), getDayTime(t.Request.Through)
}
func (t Task) ChunkIterForDay(day time.Time) v1.Iterator[*logproto.GroupedChunkRefs] {
cf := filterGroupedChunkRefsByDay{day: day}
return &FilterIter[*logproto.GroupedChunkRefs]{
iter: v1.NewSliceIter(t.Request.Refs),
matches: cf.contains,
transform: cf.filter,
}
}
type filterGroupedChunkRefsByDay struct {
day time.Time
}
func (cf filterGroupedChunkRefsByDay) contains(a *logproto.GroupedChunkRefs) bool {
from, through := getFromThrough(a.Refs)
if from.Time().After(cf.day.Add(Day)) || through.Time().Before(cf.day) {
return false
}
return true
}
func (cf filterGroupedChunkRefsByDay) filter(a *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs {
minTs, maxTs := getFromThrough(a.Refs)
// in most cases, all chunks are within day range
if minTs.Time().Compare(cf.day) >= 0 && maxTs.Time().Before(cf.day.Add(Day)) {
return a
}
// case where certain chunks are outside of day range
// using binary search to get min and max index of chunks that fall into the day range
min := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
end := a.Refs[i].Through.Time()
return start.Compare(cf.day) >= 0 || end.Compare(cf.day) >= 0
})
max := sort.Search(len(a.Refs), func(i int) bool {
start := a.Refs[i].From.Time()
return start.Compare(cf.day.Add(Day)) > 0
})
return &logproto.GroupedChunkRefs{
Tenant: a.Tenant,
Fingerprint: a.Fingerprint,
Refs: a.Refs[min:max],
}
}
type Predicate[T any] func(a T) bool
type Transform[T any] func(a T) T
type FilterIter[T any] struct {
iter v1.Iterator[T]
matches Predicate[T]
transform Transform[T]
cache T
zero T // zero value of the return type of Next()
}
func (it *FilterIter[T]) Next() bool {
next := it.iter.Next()
if !next {
it.cache = it.zero
return false
}
for next && !it.matches(it.iter.At()) {
next = it.iter.Next()
if !next {
it.cache = it.zero
return false
}
}
it.cache = it.transform(it.iter.At())
return true
}
func (it *FilterIter[T]) At() T {
return it.cache
}
func (it *FilterIter[T]) Err() error {
return nil
}
// taskMergeIterator implements v1.Iterator
type taskMergeIterator struct {
curr v1.Request
heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]]
tasks []Task
day time.Time
day model.Time
tokenizer *v1.NGramTokenizer
err error
}
func newTaskMergeIterator(day time.Time, tokenizer *v1.NGramTokenizer, tasks ...Task) v1.PeekingIterator[v1.Request] {
func newTaskMergeIterator(day model.Time, tokenizer *v1.NGramTokenizer, tasks ...Task) v1.PeekingIterator[v1.Request] {
it := &taskMergeIterator{
tasks: tasks,
curr: v1.Request{},
@ -183,8 +108,8 @@ func newTaskMergeIterator(day time.Time, tokenizer *v1.NGramTokenizer, tasks ...
func (it *taskMergeIterator) init() {
sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
for i := range it.tasks {
iter := v1.NewIterWithIndex(it.tasks[i].ChunkIterForDay(it.day), i)
sequences = append(sequences, v1.NewPeekingIter(iter))
iter := v1.NewSliceIterWithIndex(it.tasks[i].series, i)
sequences = append(sequences, iter)
}
it.heap = v1.NewHeapIterator(
func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool {
@ -207,7 +132,7 @@ func (it *taskMergeIterator) Next() bool {
it.curr = v1.Request{
Fp: model.Fingerprint(group.Value().Fingerprint),
Chks: convertToChunkRefs(group.Value().Refs),
Searches: convertToSearches(task.Request.Filters, it.tokenizer),
Searches: convertToSearches(task.filters, it.tokenizer),
Response: task.ResCh,
}
return true

@ -5,62 +5,59 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)
func TestTask(t *testing.T) {
t.Run("bounds returns request boundaries", func(t *testing.T) {
ts := model.Now()
ts := mktime("2024-01-24 12:00")
t.Run("bounds returns boundaries of chunks", func(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
From: ts.Add(-1 * time.Hour),
From: ts.Add(-24 * time.Hour),
Through: ts,
Refs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{
{From: ts.Add(-1 * time.Hour), Through: ts},
},
},
},
}
task, _, _, err := NewTask("tenant", req)
swb := partitionRequest(req)[0]
task, err := NewTask("tenant", swb, nil)
require.NoError(t, err)
from, through := task.Bounds()
require.Equal(t, getDayTime(req.From), from)
require.Equal(t, getDayTime(req.Through), through)
require.Equal(t, ts.Add(-1*time.Hour), from)
require.Equal(t, ts, through)
require.Equal(t, truncateDay(ts), task.day)
})
}
func TestTaskMergeIterator(t *testing.T) {
// Thu Nov 09 2023 10:56:50 UTC
ts := model.TimeFromUnix(1699523810)
day := getDayTime(ts)
tenant := "fake"
tokenizer := v1.NewNGramTokenizer(4, 0)
func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.FilterChunkRefRequest) []Task {
t.Helper()
t.Run("empty requests result in empty iterator", func(t *testing.T) {
r1 := &logproto.FilterChunkRefRequest{
From: ts.Add(-3 * time.Hour),
Through: ts.Add(-2 * time.Hour),
Refs: []*logproto.GroupedChunkRefs{},
}
t1, _, _, err := NewTask(tenant, r1)
tasks := make([]Task, 0, len(requests))
for _, r := range requests {
for _, swb := range partitionRequest(r) {
task, err := NewTask(tenant, swb, nil)
require.NoError(t, err)
r2 := &logproto.FilterChunkRefRequest{
From: ts.Add(-1 * time.Hour),
Through: ts,
Refs: []*logproto.GroupedChunkRefs{},
tasks = append(tasks, task)
}
t2, _, _, err := NewTask(tenant, r2)
require.NoError(t, err)
r3 := &logproto.FilterChunkRefRequest{
From: ts.Add(-1 * time.Hour),
Through: ts,
Refs: []*logproto.GroupedChunkRefs{},
}
t3, _, _, err := NewTask(tenant, r3)
require.NoError(t, err)
return tasks
}
it := newTaskMergeIterator(day, tokenizer, t1, t2, t3)
func TestTaskMergeIterator(t *testing.T) {
ts := mktime("2024-01-24 12:00")
day := truncateDay(ts)
tenant := "fake"
tokenizer := v1.NewNGramTokenizer(4, 0)
t.Run("empty requests result in empty iterator", func(t *testing.T) {
it := newTaskMergeIterator(day, tokenizer)
// nothing to iterate over
require.False(t, it.Next())
})
@ -75,8 +72,6 @@ func TestTaskMergeIterator(t *testing.T) {
}},
},
}
t1, _, _, err := NewTask(tenant, r1)
require.NoError(t, err)
r2 := &logproto.FilterChunkRefRequest{
From: ts.Add(-1 * time.Hour),
@ -90,8 +85,6 @@ func TestTaskMergeIterator(t *testing.T) {
}},
},
}
t2, _, _, err := NewTask(tenant, r2)
require.NoError(t, err)
r3 := &logproto.FilterChunkRefRequest{
From: ts.Add(-1 * time.Hour),
@ -102,10 +95,9 @@ func TestTaskMergeIterator(t *testing.T) {
}},
},
}
t3, _, _, err := NewTask(tenant, r3)
require.NoError(t, err)
it := newTaskMergeIterator(day, tokenizer, t1, t2, t3)
tasks := createTasksForRequests(t, tenant, r1, r2, r3)
it := newTaskMergeIterator(day, tokenizer, tasks...)
// first item
require.True(t, it.Next())
@ -135,73 +127,3 @@ func TestTaskMergeIterator(t *testing.T) {
require.False(t, it.Next())
})
}
func TestChunkIterForDay(t *testing.T) {
tenant := "fake"
// Thu Nov 09 2023 10:56:50 UTC
ts := model.TimeFromUnix(1699523810)
t.Run("filter chunk refs that fall into the day range", func(t *testing.T) {
input := &logproto.FilterChunkRefRequest{
From: ts.Add(-168 * time.Hour), // 1w ago
Through: ts,
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 100, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-168 * time.Hour), Through: ts.Add(-167 * time.Hour), Checksum: 100},
{From: ts.Add(-143 * time.Hour), Through: ts.Add(-142 * time.Hour), Checksum: 101},
}},
{Fingerprint: 200, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-144 * time.Hour), Through: ts.Add(-143 * time.Hour), Checksum: 200},
{From: ts.Add(-119 * time.Hour), Through: ts.Add(-118 * time.Hour), Checksum: 201},
}},
{Fingerprint: 300, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-120 * time.Hour), Through: ts.Add(-119 * time.Hour), Checksum: 300},
{From: ts.Add(-95 * time.Hour), Through: ts.Add(-94 * time.Hour), Checksum: 301},
}},
{Fingerprint: 400, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-96 * time.Hour), Through: ts.Add(-95 * time.Hour), Checksum: 400},
{From: ts.Add(-71 * time.Hour), Through: ts.Add(-70 * time.Hour), Checksum: 401},
}},
{Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-72 * time.Hour), Through: ts.Add(-71 * time.Hour), Checksum: 500},
{From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501},
}},
{Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600},
{From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour), Checksum: 601},
}},
{Fingerprint: 700, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-24 * time.Hour), Through: ts.Add(-23 * time.Hour), Checksum: 700},
{From: ts.Add(-1 * time.Hour), Through: ts, Checksum: 701},
}},
},
Filters: []syntax.LineFilter{
{Ty: labels.MatchEqual, Match: "foo"},
{Ty: labels.MatchEqual, Match: "bar"},
},
}
// day ranges from ts-48h to ts-24h
day := getDayTime(ts.Add(-36 * time.Hour))
expected := []*logproto.GroupedChunkRefs{
{Fingerprint: 500, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-47 * time.Hour), Through: ts.Add(-46 * time.Hour), Checksum: 501},
}},
{Fingerprint: 600, Tenant: tenant, Refs: []*logproto.ShortRef{
{From: ts.Add(-48 * time.Hour), Through: ts.Add(-47 * time.Hour), Checksum: 600},
}},
}
task, _, _, _ := NewTask(tenant, input)
it := task.ChunkIterForDay(day)
output := make([]*logproto.GroupedChunkRefs, 0, len(input.Refs))
for it.Next() {
output = append(output, it.At())
}
require.Equal(t, expected, output)
})
}

@ -15,7 +15,12 @@ import (
)
func getDayTime(ts model.Time) time.Time {
return time.Date(ts.Time().Year(), ts.Time().Month(), ts.Time().Day(), 0, 0, 0, 0, time.UTC)
return ts.Time().UTC().Truncate(Day)
}
func truncateDay(ts model.Time) model.Time {
// model.minimumTick is time.Millisecond
return ts - (ts % model.Time(24*time.Hour/time.Millisecond))
}
// getFromThrough assumes a list of ShortRefs sorted by From time
@ -24,6 +29,10 @@ func getFromThrough(refs []*logproto.ShortRef) (model.Time, model.Time) {
return model.Earliest, model.Latest
}
if len(refs) == 1 {
return refs[0].From, refs[0].Through
}
maxItem := slices.MaxFunc(refs, func(a, b *logproto.ShortRef) int {
if a.Through > b.Through {
return 1
@ -85,7 +94,7 @@ func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (re
}
for _, task := range tasks {
refs := task.Request.Refs
refs := task.series
min := sort.Search(len(refs), func(i int) bool {
return block.Cmp(refs[i].Fingerprint) > v1.Before
})
@ -109,3 +118,66 @@ func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (re
}
return result
}
type seriesWithBounds struct {
bounds model.Interval
day model.Time
series []*logproto.GroupedChunkRefs
}
func partitionRequest(req *logproto.FilterChunkRefRequest) []seriesWithBounds {
result := make([]seriesWithBounds, 0)
fromDay, throughDay := truncateDay(req.From), truncateDay(req.Through)
for day := fromDay; day.Equal(throughDay) || day.Before(throughDay); day = day.Add(Day) {
minTs, maxTs := model.Latest, model.Earliest
nextDay := day.Add(Day)
res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))
for _, series := range req.Refs {
chunks := series.Refs
min := sort.Search(len(chunks), func(i int) bool {
return chunks[i].Through >= day
})
max := sort.Search(len(chunks), func(i int) bool {
return chunks[i].From >= nextDay
})
// All chunks fall outside of the range
if min == len(chunks) || max == 0 {
continue
}
if chunks[min].From < minTs {
minTs = chunks[min].From
}
if chunks[max-1].Through > maxTs {
maxTs = chunks[max-1].Through
}
// fmt.Println("day", day, "series", series.Fingerprint, "minTs", minTs, "maxTs", maxTs)
res = append(res, &logproto.GroupedChunkRefs{
Fingerprint: series.Fingerprint,
Tenant: series.Tenant,
Refs: chunks[min:max],
})
}
if len(res) > 0 {
result = append(result, seriesWithBounds{
bounds: model.Interval{
Start: minTs,
End: maxTs,
},
day: day,
series: res,
})
}
}
return result
}

@ -2,6 +2,7 @@ package bloomgateway
import (
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
@ -27,6 +28,23 @@ func TestGetFromThrough(t *testing.T) {
require.Equal(t, model.Time(4), chunks[len(chunks)-1].From)
}
func TestTruncateDay(t *testing.T) {
expected := mktime("2024-01-24 00:00")
for _, inp := range []string{
"2024-01-24 00:00",
"2024-01-24 08:00",
"2024-01-24 16:00",
"2024-01-24 23:59",
} {
t.Run(inp, func(t *testing.T) {
ts := mktime(inp)
result := truncateDay(ts)
require.Equal(t, expected, result)
})
}
}
func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef {
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
@ -37,6 +55,7 @@ func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef {
}
func TestPartitionFingerprintRange(t *testing.T) {
t.Run("consecutive block ranges", func(t *testing.T) {
bounds := []bloomshipper.BlockRef{
mkBlockRef(0, 99), // out of bounds block
@ -52,10 +71,7 @@ func TestPartitionFingerprintRange(t *testing.T) {
tasks := make([]Task, nTasks)
for i := startFp; i < startFp+nSeries; i++ {
if tasks[i%nTasks].Request == nil {
tasks[i%nTasks].Request = &logproto.FilterChunkRefRequest{}
}
tasks[i%nTasks].Request.Refs = append(tasks[i%nTasks].Request.Refs, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
tasks[i%nTasks].series = append(tasks[i%nTasks].series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}
results := partitionFingerprintRange(tasks, bounds)
@ -67,8 +83,8 @@ func TestPartitionFingerprintRange(t *testing.T) {
// ensure we have the right number of tasks per bound
require.Len(t, res.tasks, 5)
for _, task := range res.tasks {
require.Equal(t, expectedTaskRefs[i], len(task.Request.Refs))
actualFingerprints = append(actualFingerprints, task.Request.Refs...)
require.Equal(t, expectedTaskRefs[i], len(task.series))
actualFingerprints = append(actualFingerprints, task.series...)
}
}
@ -88,9 +104,9 @@ func TestPartitionFingerprintRange(t *testing.T) {
mkBlockRef(200, 289),
}
task := Task{Request: &logproto.FilterChunkRefRequest{}}
task := Task{}
for i := 0; i < 300; i++ {
task.Request.Refs = append(task.Request.Refs, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
task.series = append(task.series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}
results := partitionFingerprintRange([]Task{task}, bounds)
@ -98,7 +114,162 @@ func TestPartitionFingerprintRange(t *testing.T) {
for _, res := range results {
// ensure we have the right number of tasks per bound
require.Len(t, res.tasks, 1)
require.Len(t, res.tasks[0].Request.Refs, 90)
require.Len(t, res.tasks[0].series, 90)
}
})
}
func TestPartitionRequest(t *testing.T) {
ts := mktime("2024-01-24 12:00")
testCases := map[string]struct {
inp *logproto.FilterChunkRefRequest
exp []seriesWithBounds
}{
"empty": {
inp: &logproto.FilterChunkRefRequest{
From: ts.Add(-24 * time.Hour),
Through: ts,
},
exp: []seriesWithBounds{},
},
"all chunks within single day": {
inp: &logproto.FilterChunkRefRequest{
From: ts.Add(-1 * time.Hour),
Through: ts,
Refs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{
{From: ts.Add(-60 * time.Minute), Through: ts.Add(-50 * time.Minute)},
},
},
{
Fingerprint: 0x01,
Refs: []*logproto.ShortRef{
{From: ts.Add(-55 * time.Minute), Through: ts.Add(-45 * time.Minute)},
},
},
},
},
exp: []seriesWithBounds{
{
bounds: model.Interval{Start: ts.Add(-60 * time.Minute), End: ts.Add(-45 * time.Minute)},
day: mktime("2024-01-24 00:00"),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{
{From: ts.Add(-60 * time.Minute), Through: ts.Add(-50 * time.Minute)},
},
},
{
Fingerprint: 0x01,
Refs: []*logproto.ShortRef{
{From: ts.Add(-55 * time.Minute), Through: ts.Add(-45 * time.Minute)},
},
},
},
},
},
},
"chunks across multiple days - no overlap": {
inp: &logproto.FilterChunkRefRequest{
From: ts.Add(-24 * time.Hour),
Through: ts,
Refs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{
{From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour)},
},
},
{
Fingerprint: 0x01,
Refs: []*logproto.ShortRef{
{From: ts.Add(-2 * time.Hour), Through: ts.Add(-1 * time.Hour)},
},
},
},
},
exp: []seriesWithBounds{
{
bounds: model.Interval{Start: ts.Add(-23 * time.Hour), End: ts.Add(-22 * time.Hour)},
day: mktime("2024-01-23 00:00"),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{
{From: ts.Add(-23 * time.Hour), Through: ts.Add(-22 * time.Hour)},
},
},
},
},
{
bounds: model.Interval{Start: ts.Add(-2 * time.Hour), End: ts.Add(-1 * time.Hour)},
day: mktime("2024-01-24 00:00"),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x01,
Refs: []*logproto.ShortRef{
{From: ts.Add(-2 * time.Hour), Through: ts.Add(-1 * time.Hour)},
},
},
},
},
},
},
"chunks across multiple days - overlap": {
inp: &logproto.FilterChunkRefRequest{
From: ts.Add(-24 * time.Hour),
Through: ts,
Refs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{
{From: ts.Add(-13 * time.Hour), Through: ts.Add(-11 * time.Hour)},
},
},
},
},
exp: []seriesWithBounds{
{
bounds: model.Interval{Start: ts.Add(-13 * time.Hour), End: ts.Add(-11 * time.Hour)},
day: mktime("2024-01-23 00:00"),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{
{From: ts.Add(-13 * time.Hour), Through: ts.Add(-11 * time.Hour)},
},
},
},
},
{
bounds: model.Interval{Start: ts.Add(-13 * time.Hour), End: ts.Add(-11 * time.Hour)},
day: mktime("2024-01-24 00:00"),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Refs: []*logproto.ShortRef{
{From: ts.Add(-13 * time.Hour), Through: ts.Add(-11 * time.Hour)},
},
},
},
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
result := partitionRequest(tc.inp)
require.Equal(t, tc.exp, result)
})
}
}

@ -79,18 +79,18 @@ type worker struct {
cfg workerConfig
queue *queue.RequestQueue
shipper bloomshipper.Interface
tasks *pendingTasks
pending *pendingTasks
logger log.Logger
metrics *workerMetrics
}
func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, shipper bloomshipper.Interface, tasks *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker {
func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, shipper bloomshipper.Interface, pending *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker {
w := &worker{
id: id,
cfg: cfg,
queue: queue,
shipper: shipper,
tasks: tasks,
pending: pending,
logger: log.With(logger, "worker", id),
metrics: metrics,
}
@ -134,7 +134,7 @@ func (w *worker) running(ctx context.Context) error {
}
w.metrics.dequeuedTasks.WithLabelValues(w.id).Add(float64(len(items)))
tasksPerDay := make(map[time.Time][]Task)
tasksPerDay := make(map[model.Time][]Task)
for _, item := range items {
task, ok := item.(Task)
@ -144,17 +144,9 @@ func (w *worker) running(ctx context.Context) error {
return errors.Errorf("failed to cast dequeued item to Task: %v", item)
}
level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID)
w.tasks.Delete(task.ID)
w.pending.Delete(task.ID)
fromDay, throughDay := task.Bounds()
if fromDay.Equal(throughDay) {
tasksPerDay[fromDay] = append(tasksPerDay[fromDay], task)
} else {
for i := fromDay; i.Before(throughDay); i = i.Add(24 * time.Hour) {
tasksPerDay[i] = append(tasksPerDay[i], task)
}
}
tasksPerDay[task.day] = append(tasksPerDay[task.day], task)
}
for day, tasks := range tasksPerDay {
@ -162,7 +154,7 @@ func (w *worker) running(ctx context.Context) error {
level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks))
storeFetchStart := time.Now()
blockRefs, err := w.shipper.GetBlockRefs(taskCtx, tasks[0].Tenant, toModelTime(day), toModelTime(day.Add(Day).Add(-1*time.Nanosecond)))
blockRefs, err := w.shipper.GetBlockRefs(taskCtx, tasks[0].Tenant, day, day.Add(Day).Add(-1*time.Nanosecond))
w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockRefs").Observe(time.Since(storeFetchStart).Seconds())
if err != nil {
for _, t := range tasks {
@ -177,7 +169,7 @@ func (w *worker) running(ctx context.Context) error {
if len(blockRefs) == 0 {
level.Warn(logger).Log("msg", "no blocks found")
for _, t := range tasks {
for _, ref := range t.Request.Refs {
for _, ref := range t.series {
t.ResCh <- v1.Output{
Fp: model.Fingerprint(ref.Fingerprint),
Removals: nil,
@ -188,13 +180,13 @@ func (w *worker) running(ctx context.Context) error {
continue
}
boundedRefs := partitionFingerprintRange(tasks, blockRefs)
tasksForBlocks := partitionFingerprintRange(tasks, blockRefs)
blockRefs = blockRefs[:0]
for _, b := range boundedRefs {
for _, b := range tasksForBlocks {
blockRefs = append(blockRefs, b.blockRef)
}
err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, boundedRefs)
err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, tasksForBlocks)
if err != nil {
for _, t := range tasks {
t.ErrCh <- err
@ -217,7 +209,7 @@ func (w *worker) stopping(err error) error {
return nil
}
func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day time.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day model.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
@ -228,7 +220,7 @@ func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant strin
})
}
func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) error {
func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day model.Time, tasks []Task) error {
schema, err := blockQuerier.Schema()
if err != nil {
return err

Loading…
Cancel
Save