|
|
|
|
@ -3,7 +3,6 @@ package bloomgateway |
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"flag" |
|
|
|
|
"fmt" |
|
|
|
|
"io" |
|
|
|
|
"math" |
|
|
|
|
"sort" |
|
|
|
|
@ -15,6 +14,7 @@ import ( |
|
|
|
|
ringclient "github.com/grafana/dskit/ring/client" |
|
|
|
|
"github.com/pkg/errors" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
"golang.org/x/exp/slices" |
|
|
|
|
"google.golang.org/grpc" |
|
|
|
|
"google.golang.org/grpc/health/grpc_health_v1" |
|
|
|
|
|
|
|
|
|
@ -22,6 +22,7 @@ import ( |
|
|
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel/stats" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/querier/plan" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/queue" |
|
|
|
|
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/storage/chunk/cache" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" |
|
|
|
|
@ -258,17 +259,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva |
|
|
|
|
return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
level.Info(c.logger).Log( |
|
|
|
|
"msg", "do FilterChunkRefs for addresses", |
|
|
|
|
"part", fmt.Sprintf("%d/%d", i+1, len(servers)), |
|
|
|
|
"addr", rs.addr, |
|
|
|
|
"from", interval.Start.Time(), |
|
|
|
|
"through", interval.End.Time(), |
|
|
|
|
"series", len(rs.groups), |
|
|
|
|
"blocks", len(rs.blocks), |
|
|
|
|
"tenant", tenant, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error { |
|
|
|
|
req := &logproto.FilterChunkRefRequest{ |
|
|
|
|
From: interval.Start, |
|
|
|
|
@ -290,15 +280,95 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return flatten(results, count), nil |
|
|
|
|
|
|
|
|
|
buf := make([]*logproto.GroupedChunkRefs, 0, count) |
|
|
|
|
return mergeSeries(results, buf) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func flatten(input [][]*logproto.GroupedChunkRefs, n int) []*logproto.GroupedChunkRefs { |
|
|
|
|
result := make([]*logproto.GroupedChunkRefs, 0, n) |
|
|
|
|
for _, res := range input { |
|
|
|
|
result = append(result, res...) |
|
|
|
|
// mergeSeries combines respones from multiple FilterChunkRefs calls and deduplicates
|
|
|
|
|
// chunks from series that appear in multiple responses.
|
|
|
|
|
// To avoid allocations, an optional slice can be passed as second argument.
|
|
|
|
|
func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedChunkRefs) ([]*logproto.GroupedChunkRefs, error) { |
|
|
|
|
// clear provided buffer
|
|
|
|
|
buf = buf[:0] |
|
|
|
|
|
|
|
|
|
iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input)) |
|
|
|
|
for _, inp := range input { |
|
|
|
|
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp))) |
|
|
|
|
} |
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs]( |
|
|
|
|
func(a, b *logproto.GroupedChunkRefs) bool { |
|
|
|
|
return a.Fingerprint < b.Fingerprint |
|
|
|
|
}, |
|
|
|
|
iters..., |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
dedupeIter := v1.NewDedupingIter[*logproto.GroupedChunkRefs, *logproto.GroupedChunkRefs]( |
|
|
|
|
// eq
|
|
|
|
|
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint == b.Fingerprint }, |
|
|
|
|
// from
|
|
|
|
|
v1.Identity[*logproto.GroupedChunkRefs], |
|
|
|
|
// merge
|
|
|
|
|
func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs { |
|
|
|
|
return &logproto.GroupedChunkRefs{ |
|
|
|
|
Fingerprint: a.Fingerprint, |
|
|
|
|
Tenant: a.Tenant, |
|
|
|
|
Refs: mergeChunks(a.Refs, b.Refs), |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
// iterator
|
|
|
|
|
v1.NewPeekingIter(heapIter), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return v1.CollectInto(dedupeIter, buf) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef { |
|
|
|
|
if len(inputs) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(inputs) == 1 { |
|
|
|
|
slices.SortFunc( |
|
|
|
|
inputs[0], |
|
|
|
|
func(a, b *logproto.ShortRef) int { |
|
|
|
|
if a.Equal(b) { |
|
|
|
|
return 0 |
|
|
|
|
} |
|
|
|
|
if a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) { |
|
|
|
|
return -1 |
|
|
|
|
} |
|
|
|
|
return 1 |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
return inputs[0] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs)) |
|
|
|
|
for _, inp := range inputs { |
|
|
|
|
iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp))) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
chunkDedupe := v1.NewDedupingIter[*logproto.ShortRef, *logproto.ShortRef]( |
|
|
|
|
// eq
|
|
|
|
|
func(a, b *logproto.ShortRef) bool { return a.Equal(b) }, |
|
|
|
|
// from
|
|
|
|
|
v1.Identity[*logproto.ShortRef], |
|
|
|
|
// merge
|
|
|
|
|
func(a, b *logproto.ShortRef) *logproto.ShortRef { return a }, |
|
|
|
|
// iterator
|
|
|
|
|
v1.NewPeekingIter[*logproto.ShortRef]( |
|
|
|
|
v1.NewHeapIterator[*logproto.ShortRef]( |
|
|
|
|
func(a, b *logproto.ShortRef) bool { |
|
|
|
|
return a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) |
|
|
|
|
}, |
|
|
|
|
iters..., |
|
|
|
|
), |
|
|
|
|
), |
|
|
|
|
) |
|
|
|
|
merged, _ := v1.Collect(chunkDedupe) |
|
|
|
|
return merged |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// doForAddrs sequetially calls the provided callback function fn for each
|
|
|
|
|
|