fix(blooms): Correctly return unfiltered chunks for series that are not mapped to any block (#12774)

This PR fixes a conceptual mistake in the code of resolving blocks on the index gateways. Currently, a series does not resolve to any block is discarded instead of being kept for the response.
This change adds the chunks of the skipped series to the bloom querier response.


Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/12776/head
Christian Haudum 1 year ago committed by GitHub
parent 282e38548c
commit c36b1142c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      pkg/bloomgateway/querier.go
  2. 4
      pkg/bloomgateway/querier_test.go
  3. 35
      pkg/bloomgateway/resolver.go
  4. 156
      pkg/bloomgateway/resolver_test.go

@ -99,7 +99,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
// only covers a single day, and if not, it's at most two days.
for _, s := range partitionSeriesByDay(from, through, grouped) {
day := bloomshipper.NewInterval(s.day.Time, s.day.Time.Add(Day))
blocks, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series)
blocks, skipped, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series)
if err != nil {
return nil, err
}
@ -121,6 +121,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
return nil, err
}
// add chunk refs from series that were not mapped to any blocks
refs = append(refs, skipped...)
for i := range refs {
seriesSeen[refs[i].Fingerprint] = struct{}{}
for _, ref := range refs[i].Refs {

@ -40,7 +40,7 @@ func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.In
type mockBlockResolver struct{}
// Resolve implements BlockResolver.
func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) {
func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) {
day := truncateDay(interval.Start)
first, last := getFirstLast(series)
block := bloomshipper.BlockRef{
@ -53,7 +53,7 @@ func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval blo
Checksum: 0,
},
}
return []blockWithSeries{{block: block, series: series}}, nil
return []blockWithSeries{{block: block, series: series}}, nil, nil
}
var _ BlockResolver = &mockBlockResolver{}

@ -15,7 +15,7 @@ import (
)
type BlockResolver interface {
Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, error)
Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error)
}
type blockWithSeries struct {
@ -28,7 +28,7 @@ type defaultBlockResolver struct {
logger log.Logger
}
func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) {
func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) {
minFp, maxFp := getFirstLast(series)
metaSearch := bloomshipper.MetaSearchParams{
TenantID: tenant,
@ -52,10 +52,12 @@ func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, inter
)
if err != nil {
return nil, err
return nil, series, err
}
return blocksMatchingSeries(metas, interval, series), nil
mapped := blocksMatchingSeries(metas, interval, series)
skipped := unassignedSeries(mapped, series)
return mapped, skipped, nil
}
func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) []blockWithSeries {
@ -96,6 +98,31 @@ func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Inter
return result
}
func unassignedSeries(mapped []blockWithSeries, series []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
skipped := make([]*logproto.GroupedChunkRefs, len(series))
_ = copy(skipped, series)
for _, block := range mapped {
minFp, maxFp := getFirstLast(block.series)
minIdx := sort.Search(len(skipped), func(i int) bool {
return skipped[i].Fingerprint >= minFp.Fingerprint
})
maxIdx := sort.Search(len(skipped), func(i int) bool {
return skipped[i].Fingerprint >= maxFp.Fingerprint
})
if minIdx == len(skipped) || maxIdx == 0 || minIdx == maxIdx {
continue
}
skipped = append(skipped[0:minIdx], skipped[maxIdx+1:]...)
}
return skipped
}
func NewBlockResolver(store bloomshipper.Store, logger log.Logger) BlockResolver {
return &defaultBlockResolver{
store: store,

@ -11,18 +11,22 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)
func makeBlockRef(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.BlockRef {
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: "tenant",
TableName: "table",
Bounds: v1.NewBounds(minFp, maxFp),
StartTimestamp: from,
EndTimestamp: through,
},
}
}
func makeMeta(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.Meta {
return bloomshipper.Meta{
Blocks: []bloomshipper.BlockRef{
{
Ref: bloomshipper.Ref{
TenantID: "tenant",
TableName: "table",
Bounds: v1.NewBounds(minFp, maxFp),
StartTimestamp: from,
EndTimestamp: through,
},
},
makeBlockRef(minFp, maxFp, from, through),
},
}
}
@ -113,3 +117,137 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) {
require.Equal(t, expected, res)
})
}
func TestBlockResolver_UnassignedSeries(t *testing.T) {
series := []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
{Fingerprint: 0xe0},
}
testCases := []struct {
desc string
mapped []blockWithSeries
expected []*logproto.GroupedChunkRefs
}{
{
desc: "no blocks - all unassigned",
mapped: []blockWithSeries{},
expected: series,
},
{
desc: "block has no overlapping series - all unassigned",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0xf0},
{Fingerprint: 0xff},
},
},
},
expected: series,
},
{
desc: "single block covering all series - no unassigned",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
{Fingerprint: 0xe0},
},
},
},
expected: []*logproto.GroupedChunkRefs{},
},
{
desc: "multiple blocks covering all series - no unassigned",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
},
},
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x40},
{Fingerprint: 0x60},
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
},
},
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
{Fingerprint: 0xe0},
},
},
},
expected: []*logproto.GroupedChunkRefs{},
},
{
desc: "single block overlapping some series",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
},
},
},
expected: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
{Fingerprint: 0xe0},
},
},
{
desc: "multiple blocks overlapping some series",
mapped: []blockWithSeries{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x20},
{Fingerprint: 0x40},
{Fingerprint: 0x60},
},
},
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x80},
{Fingerprint: 0xa0},
{Fingerprint: 0xc0},
},
},
},
expected: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00},
{Fingerprint: 0xe0},
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
result := unassignedSeries(tc.mapped, series)
require.Equal(t, result, tc.expected)
})
}
}

Loading…
Cancel
Save