Bloom-Gateway cache (#11380)

**What this PR does / why we need it**:

This PR adds caching to the bloom-gateway client. It uses the result
cache from https://github.com/grafana/loki/pull/11343.

Here's how we:
- Merge responses: group all chunks by FP and remove duplicated chunk
checksums.
- Extract responses based on time span: For all chunks in each FP, add
to the extracted response only the chunks that overlaps with the desired
start and end time.
pull/11340/head^2
Salva Corts 2 years ago committed by GitHub
parent 79693d79ef
commit a0b462d366
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      docs/sources/configure/_index.md
  2. 217
      pkg/bloomgateway/cache.go
  3. 494
      pkg/bloomgateway/cache_test.go
  4. 62
      pkg/bloomgateway/client.go
  5. 4
      pkg/bloomgateway/client_test.go
  6. 1
      pkg/bloomgateway/config.go
  7. 2
      pkg/bloomgateway/querier.go
  8. 82
      pkg/logproto/compat.go
  9. 68
      pkg/logproto/compat_test.go
  10. 1
      pkg/logqlmodel/stats/context.go
  11. 10
      pkg/loki/modules.go
  12. 2
      pkg/storage/chunk/cache/resultscache/cache.go
  13. 4
      pkg/storage/chunk/cache/resultscache/config.go
  14. 6
      pkg/validation/limits.go

@ -1838,6 +1838,21 @@ client:
# CLI flag: -bloom-gateway-client.log-gateway-requests
[log_gateway_requests: <boolean> | default = false]
results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# bloom-gateway-client.cache
[cache: <cache_config>]
# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -bloom-gateway-client.cache.compression
[compression: <string> | default = ""]
# Flag to control whether to cache bloom gateway client requests/responses.
# CLI flag: -bloom-gateway-client.cache_results
[cache_results: <boolean> | default = false]
# Number of workers to use for filtering chunks concurrently.
# CLI flag: -bloom-gateway.worker-concurrency
[worker_concurrency: <int> | default = 4]
@ -3028,6 +3043,10 @@ shard_streams:
# CLI flag: -bloom-gateway.blocks-downloading-parallelism
[bloom_gateway_blocks_downloading_parallelism: <int> | default = 50]
# Interval for computing the cache key in the Bloom Gateway.
# CLI flag: -bloom-gateway.cache-key-interval
[bloom_gateway_cache_key_interval: <duration> | default = 15m]
# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]
@ -4233,6 +4252,7 @@ The TLS configuration.
The cache block configures the cache backend. The supported CLI flags `<prefix>` used to reference this configuration block are:
- `bloom-gateway-client.cache`
- `frontend`
- `frontend.index-stats-results-cache`
- `frontend.volume-results-cache`

@ -0,0 +1,217 @@
package bloomgateway
import (
"context"
"flag"
"sort"
"time"
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
)
const (
cacheParalellism = 1
)
type CacheConfig struct {
resultscache.Config `yaml:",inline"`
}
// RegisterFlags registers flags.
func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("bloom-gateway-client.cache.", f)
}
func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(f, prefix)
}
type CacheLimits interface {
resultscache.Limits
BloomGatewayCacheKeyInterval(tenantID string) time.Duration
}
type keyGen struct {
CacheLimits
}
func newCacheKeyGen(limits CacheLimits) keyGen {
return keyGen{limits}
}
func (k keyGen) GenerateCacheKey(ctx context.Context, tenant string, r resultscache.Request) string {
return resultscache.ConstSplitter(k.BloomGatewayCacheKeyInterval(tenant)).GenerateCacheKey(ctx, tenant, r)
}
type extractor struct{}
func newExtractor() extractor {
return extractor{}
}
// Extract extracts a subset of a response from the `start` and `end` timestamps in milliseconds.
// We remove chunks that are not within the given time range.
func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64) resultscache.Response {
res := r.(*logproto.FilterChunkRefResponse)
chunkRefs := make([]*logproto.GroupedChunkRefs, 0, len(res.ChunkRefs))
for _, chunkRef := range res.ChunkRefs {
refs := make([]*logproto.ShortRef, 0, len(chunkRef.Refs))
for _, ref := range chunkRef.Refs {
if model.Time(end) < ref.From || ref.Through <= model.Time(start) {
continue
}
refs = append(refs, ref)
}
if len(refs) > 0 {
chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Tenant: chunkRef.Tenant,
Refs: refs,
})
}
}
return &logproto.FilterChunkRefResponse{
ChunkRefs: chunkRefs,
}
}
type merger struct{}
func newMerger() merger {
return merger{}
}
// MergeResponse merges responses from multiple requests into a single Response
// We merge all chunks grouped by their fingerprint.
func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) {
var size int
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
size += len(res.ChunkRefs)
}
chunkRefs := make([]*logproto.GroupedChunkRefs, 0, size)
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
chunkRefs = append(chunkRefs, res.ChunkRefs...)
}
return &logproto.FilterChunkRefResponse{
ChunkRefs: mergeGroupedChunkRefs(chunkRefs),
}, nil
}
// Merge duplicated fingerprints by:
// 1. Sort the chunkRefs by their stream fingerprint
// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list.
func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs {
if len(chunkRefs) <= 1 {
return chunkRefs
}
sort.Slice(chunkRefs, func(i, j int) bool {
return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint
})
var lastDiffFP int
for i := 1; i < len(chunkRefs); i++ {
if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint {
chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...))
} else {
lastDiffFP++
chunkRefs[lastDiffFP] = chunkRefs[i]
}
}
return chunkRefs[:lastDiffFP+1]
}
// mergeShortRefs merges short-refs by removing duplicated checksums.
func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef {
if len(refs) <= 1 {
return refs
}
sort.Slice(refs, func(i, j int) bool {
return refs[i].Checksum < refs[j].Checksum
})
return slices.CompactFunc(refs, func(a, b *logproto.ShortRef) bool {
return a.Checksum == b.Checksum
})
}
type ClientCache struct {
cache *resultscache.ResultsCache
limits CacheLimits
logger log.Logger
}
func NewBloomGatewayClientCacheMiddleware(
logger log.Logger,
next logproto.BloomGatewayClient,
c cache.Cache,
limits CacheLimits,
cacheGen resultscache.CacheGenNumberLoader,
retentionEnabled bool,
) *ClientCache {
nextAsHandler := resultscache.HandlerFunc(func(ctx context.Context, cacheReq resultscache.Request) (resultscache.Response, error) {
req := cacheReq.(requestWithGrpcCallOptions)
return next.FilterChunkRefs(ctx, req.FilterChunkRefRequest, req.grpcCallOptions...)
})
resultsCache := resultscache.NewResultsCache(
logger,
c,
nextAsHandler,
newCacheKeyGen(limits),
limits,
newMerger(),
newExtractor(),
nil,
nil,
func(_ context.Context, _ []string, _ resultscache.Request) int {
return cacheParalellism
},
cacheGen,
retentionEnabled,
)
return &ClientCache{
cache: resultsCache,
limits: limits,
logger: logger,
}
}
func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
cacheReq := requestWithGrpcCallOptions{
FilterChunkRefRequest: req,
grpcCallOptions: opts,
}
res, err := c.cache.Do(ctx, cacheReq)
if err != nil {
return nil, err
}
return res.(*logproto.FilterChunkRefResponse), nil
}
type requestWithGrpcCallOptions struct {
*logproto.FilterChunkRefRequest
grpcCallOptions []grpc.CallOption
}
func (r requestWithGrpcCallOptions) WithStartEndForCache(start time.Time, end time.Time) resultscache.Request {
return requestWithGrpcCallOptions{
FilterChunkRefRequest: r.FilterChunkRefRequest.WithStartEndForCache(start, end).(*logproto.FilterChunkRefRequest),
grpcCallOptions: r.grpcCallOptions,
}
}

@ -0,0 +1,494 @@
package bloomgateway
import (
"context"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/pkg/util/constants"
)
// Range is 1000-4000
var templateResponse = &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 3000,
Through: 4000,
Checksum: 30,
},
{
From: 1000,
Through: 3000,
Checksum: 40,
},
},
},
},
}
func TestExtract(t *testing.T) {
for _, tc := range []struct {
name string
start int64
end int64
input *logproto.FilterChunkRefResponse
expected *logproto.FilterChunkRefResponse
}{
{
name: "start and end out of range",
start: 100,
end: 200,
input: templateResponse,
expected: &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
},
},
{
name: "start spans exact range",
start: 1000,
end: 4000,
input: templateResponse,
expected: templateResponse,
},
{
name: "start spans more than range",
start: 100,
end: 5000,
input: templateResponse,
expected: templateResponse,
},
{
name: "start and end within range",
start: 1700,
end: 2700,
input: templateResponse,
expected: &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 3000,
Checksum: 40,
},
},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
e := newExtractor()
actual := e.Extract(tc.start, tc.end, tc.input, 0, 0)
require.Equal(t, tc.expected, actual)
})
}
}
func TestMerge(t *testing.T) {
for _, tc := range []struct {
name string
input []*logproto.FilterChunkRefResponse
expected *logproto.FilterChunkRefResponse
}{
{
name: "empy input",
input: []*logproto.FilterChunkRefResponse{},
expected: &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
},
},
{
name: "single input",
input: []*logproto.FilterChunkRefResponse{templateResponse},
expected: templateResponse,
},
{
name: "repeating and non-repeating fingerprint with repeating and non-repeating chunks",
input: []*logproto.FilterChunkRefResponse{
{
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
},
},
{
ChunkRefs: []*logproto.GroupedChunkRefs{
// Same FP as in previous input and same chunks
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
// Same FP as in previous input, but different chunks
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
// Same chunk as in previous input
{
From: 1500,
Through: 2500,
Checksum: 20,
},
// New chunk
{
From: 2000,
Through: 2500,
Checksum: 30,
},
},
},
// New FP
{
Fingerprint: 3,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
},
},
{
ChunkRefs: []*logproto.GroupedChunkRefs{
// Same FP as in previous input and diff chunks
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 2000,
Through: 2700,
Checksum: 50,
},
},
},
},
},
},
expected: &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
{
From: 2000,
Through: 2500,
Checksum: 30,
},
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 2000,
Through: 2700,
Checksum: 50,
},
},
},
{
Fingerprint: 3,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
input := make([]resultscache.Response, 0, len(tc.input))
for _, i := range tc.input {
input = append(input, i)
}
m := newMerger()
actual, err := m.MergeResponse(input...)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
})
}
}
func TestCache(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")
limits := mockLimits{
cacheInterval: 15 * time.Minute,
}
cfg := CacheConfig{
Config: resultscache.Config{
CacheConfig: cache.Config{
Cache: cache.NewMockCache(),
},
},
}
c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger(), stats.BloomFilterCache, constants.Loki)
require.NoError(t, err)
defer c.Stop()
chunkRefs := []*logproto.ChunkRef{
{
Fingerprint: 2,
UserID: "fake",
From: 1500,
Through: 2500,
Checksum: 30,
},
{
Fingerprint: 3,
UserID: "fake",
From: 2500,
Through: 3500,
},
}
req := &logproto.FilterChunkRefRequest{
From: model.Time(2000),
Through: model.Time(3000),
Refs: groupRefs(t, chunkRefs),
Filters: []*logproto.LineFilterExpression{
{Operator: 1, Match: "foo"},
},
}
expectedRes := &logproto.FilterChunkRefResponse{
ChunkRefs: groupRefs(t, chunkRefs),
}
server, calls := newMockServer(expectedRes)
cacheMiddleware := NewBloomGatewayClientCacheMiddleware(
log.NewNopLogger(),
server,
c,
limits,
nil,
false,
)
// First call should go to the server
*calls = 0
res, err := cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 1, *calls)
require.Equal(t, expectedRes, res)
// Second call should go to the cache
*calls = 0
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 0, *calls)
require.Equal(t, expectedRes, res)
// Doing a request with new start and end should:
// 1. hit the server the leading time
// 2. hit the cache the cached span
// 3. hit the server for the trailing time
newChunkRefs := []*logproto.ChunkRef{
{
Fingerprint: 1,
UserID: "fake",
From: 1000,
Through: 1500,
Checksum: 10,
},
{
Fingerprint: 4,
UserID: "fake",
From: 3500,
Through: 4500,
},
}
server.SetResponse(&logproto.FilterChunkRefResponse{
ChunkRefs: groupRefs(t, newChunkRefs),
})
expectedRes = &logproto.FilterChunkRefResponse{
ChunkRefs: groupRefs(t, append(chunkRefs, newChunkRefs...)),
}
req.From = model.Time(100)
req.Through = model.Time(5000)
*calls = 0
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 2, *calls)
require.Equal(t, expectedRes, res)
// Doing a request again should only hit the cache
*calls = 0
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 0, *calls)
require.Equal(t, expectedRes, res)
}
type mockServer struct {
calls *int
res *logproto.FilterChunkRefResponse
}
func newMockServer(res *logproto.FilterChunkRefResponse) (*mockServer, *int) {
var calls int
return &mockServer{
calls: &calls,
res: res,
}, &calls
}
func (s *mockServer) SetResponse(res *logproto.FilterChunkRefResponse) {
s.res = res
}
func (s *mockServer) FilterChunkRefs(_ context.Context, _ *logproto.FilterChunkRefRequest, _ ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
*s.calls++
return s.res, nil
}
type mockLimits struct {
cacheFreshness time.Duration
cacheInterval time.Duration
}
func (m mockLimits) MaxCacheFreshness(_ context.Context, _ string) time.Duration {
return m.cacheFreshness
}
func (m mockLimits) BloomGatewayCacheKeyInterval(_ string) time.Duration {
return m.cacheInterval
}

@ -25,8 +25,11 @@ import (
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/queue"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/pkg/util/constants"
)
@ -99,6 +102,10 @@ type ClientConfig struct {
// Ring is the Bloom Gateway ring used to find the appropriate Bloom Gateway instance
// this client should talk to.
Ring ring.ReadRing `yaml:"-"`
// Cache configures the cache used to store the results of the Bloom Gateway server.
Cache CacheConfig `yaml:"results_cache,omitempty"`
CacheResults bool `yaml:"cache_results"`
}
// RegisterFlags registers flags for the Bloom Gateway client configuration.
@ -109,9 +116,25 @@ func (i *ClientConfig) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers flags for the Bloom Gateway client configuration with a common prefix.
func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+"grpc", f)
i.Cache.RegisterFlagsWithPrefix(prefix+"cache.", f)
f.BoolVar(&i.CacheResults, prefix+"cache_results", false, "Flag to control whether to cache bloom gateway client requests/responses.")
f.BoolVar(&i.LogGatewayRequests, prefix+"log-gateway-requests", false, "Flag to control whether requests sent to the gateway should be logged or not.")
}
func (i *ClientConfig) Validate() error {
if err := i.GRPCClientConfig.Validate(); err != nil {
return errors.Wrap(err, "grpc client config")
}
if i.CacheResults {
if err := i.Cache.Validate(); err != nil {
return errors.Wrap(err, "cache config")
}
}
return nil
}
type Client interface {
FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error)
}
@ -124,7 +147,15 @@ type GatewayClient struct {
ring ring.ReadRing
}
func NewGatewayClient(cfg ClientConfig, limits Limits, registerer prometheus.Registerer, logger log.Logger, metricsNamespace string) (*GatewayClient, error) {
func NewGatewayClient(
cfg ClientConfig,
limits Limits,
registerer prometheus.Registerer,
logger log.Logger,
metricsNamespace string,
cacheGen resultscache.CacheGenNumberLoader,
retentionEnabled bool,
) (*GatewayClient, error) {
latency := promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "bloom_gateway",
@ -138,22 +169,43 @@ func NewGatewayClient(cfg ClientConfig, limits Limits, registerer prometheus.Reg
return nil, err
}
var c cache.Cache
if cfg.CacheResults {
c, err = cache.New(cfg.Cache.CacheConfig, registerer, logger, stats.BloomFilterCache, constants.Loki)
if err != nil {
return nil, errors.Wrap(err, "new bloom gateway cache")
}
if cfg.Cache.Compression == "snappy" {
c = cache.NewSnappy(c, logger)
}
}
poolFactory := func(addr string) (ringclient.PoolClient, error) {
pool, err := NewBloomGatewayGRPCPool(addr, dialOpts)
if err != nil {
return nil, errors.Wrap(err, "new bloom gateway grpc pool")
}
if cfg.CacheResults {
pool.BloomGatewayClient = NewBloomGatewayClientCacheMiddleware(
logger,
pool.BloomGatewayClient,
c,
limits,
cacheGen,
retentionEnabled,
)
}
return pool, nil
}
c := &GatewayClient{
return &GatewayClient{
cfg: cfg,
logger: logger,
limits: limits,
pool: clientpool.NewPool("bloom-gateway", cfg.PoolConfig, cfg.Ring, ringclient.PoolAddrFunc(poolFactory), logger, metricsNamespace),
}
return c, nil
}, nil
}
func shuffleAddrs(addrs []string) []string {

@ -27,7 +27,7 @@ func TestBloomGatewayClient(t *testing.T) {
flagext.DefaultValues(&cfg)
t.Run("", func(t *testing.T) {
_, err := NewGatewayClient(cfg, l, reg, logger, "loki")
_, err := NewGatewayClient(cfg, l, reg, logger, "loki", nil, false)
require.NoError(t, err)
})
}
@ -194,7 +194,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
cfg := ClientConfig{}
flagext.DefaultValues(&cfg)
c, err := NewGatewayClient(cfg, l, reg, logger, "loki")
c, err := NewGatewayClient(cfg, l, reg, logger, "loki", nil, false)
require.NoError(t, err)
instances := []ring.InstanceDesc{

@ -38,6 +38,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
}
type Limits interface {
CacheLimits
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
BloomGatewayBlocksDownloadingParallelism(tenantID string) int

@ -41,8 +41,6 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
return nil, err
}
// TODO(chaudum): Cache response
// Flatten response from client and return
result := make([]*logproto.ChunkRef, 0, len(chunkRefs))
for i := range refs {

@ -1,6 +1,7 @@
package logproto
import (
"encoding/binary"
stdjson "encoding/json"
"fmt"
"math"
@ -10,6 +11,7 @@ import (
"time"
"unsafe"
"github.com/cespare/xxhash/v2"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
@ -334,3 +336,83 @@ func (m *VolumeRequest) LogToSpan(sp opentracing.Span) {
otlog.String("end", timestamp.Time(int64(m.Through)).String()),
)
}
// Satisfy definitions.Request for FilterChunkRefRequest
// GetStart returns the start timestamp of the request in milliseconds.
func (m *FilterChunkRefRequest) GetStart() time.Time {
return time.UnixMilli(int64(m.From))
}
// GetEnd returns the end timestamp of the request in milliseconds.
func (m *FilterChunkRefRequest) GetEnd() time.Time {
return time.UnixMilli(int64(m.Through))
}
// GetStep returns the step of the request in milliseconds. Always 0.
func (m *FilterChunkRefRequest) GetStep() int64 {
return 0
}
// GetQuery returns the query of the request.
// The query is the hash for the input chunks refs and the filter expressions.
func (m *FilterChunkRefRequest) GetQuery() string {
var encodeBuf []byte
var chunksHash uint64
if len(m.Refs) > 0 {
h := xxhash.New()
for _, ref := range m.Refs {
_, _ = h.Write(binary.AppendUvarint(encodeBuf[:0], ref.Fingerprint))
}
chunksHash = h.Sum64()
}
// Short circuit if there are no filters.
if len(m.Filters) == 0 {
return fmt.Sprintf("%d", chunksHash)
}
var sb strings.Builder
for i, filter := range m.Filters {
if i > 0 {
sb.WriteString(",")
}
sb.Write(fmt.Appendf(encodeBuf[:0], "%d", filter.Operator))
sb.WriteString("-")
sb.WriteString(filter.Match)
}
return fmt.Sprintf("%d/%s", chunksHash, sb.String())
}
// GetCachingOptions returns the caching options.
func (m *FilterChunkRefRequest) GetCachingOptions() (res resultscache.CachingOptions) { return }
// WithStartEndForCache implements resultscache.Request.
func (m *FilterChunkRefRequest) WithStartEndForCache(start, end time.Time) resultscache.Request {
// We Remove the chunks that are not within the given time range.
chunkRefs := make([]*GroupedChunkRefs, 0, len(m.Refs))
for _, chunkRef := range m.Refs {
refs := make([]*ShortRef, 0, len(chunkRef.Refs))
for _, ref := range chunkRef.Refs {
if end.Before(ref.From.Time()) || ref.Through.Time().Before(start) {
continue
}
refs = append(refs, ref)
}
if len(refs) > 0 {
chunkRefs = append(chunkRefs, &GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Tenant: chunkRef.Tenant,
Refs: refs,
})
}
}
clone := *m
clone.From = model.TimeFromUnixNano(start.UnixNano())
clone.Through = model.TimeFromUnixNano(end.UnixNano())
clone.Refs = chunkRefs
return &clone
}

@ -278,6 +278,74 @@ func TestMergeSeriesResponses(t *testing.T) {
}
}
func TestFilterChunkRefRequestGetQuery(t *testing.T) {
for _, tc := range []struct {
desc string
request FilterChunkRefRequest
expected string
}{
{
desc: "empty request",
expected: `0`,
},
{
desc: "request no filters",
request: FilterChunkRefRequest{
Refs: []*GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "test",
},
},
},
expected: `9962287286179718960`,
},
{
desc: "request with filters but no chunks",
request: FilterChunkRefRequest{
Filters: []*LineFilterExpression{
{
Operator: 0,
Match: "uuid",
},
},
},
expected: `0/0-uuid`,
},
{
desc: "request with filters and chunks",
request: FilterChunkRefRequest{
Refs: []*GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "test",
},
{
Fingerprint: 2,
Tenant: "test",
},
},
Filters: []*LineFilterExpression{
{
Operator: 0,
Match: "uuid",
},
{
Operator: 1,
Match: "trace",
},
},
},
expected: `8827404902424034886/0-uuid,1-trace`,
},
} {
t.Run(tc.desc, func(t *testing.T) {
actual := tc.request.GetQuery()
require.Equal(t, tc.expected, actual)
})
}
}
func benchmarkMergeLabelResponses(b *testing.B, responses []*LabelResponse) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {

@ -61,6 +61,7 @@ const (
StatsResultCache = "stats-result"
VolumeResultCache = "volume-result"
WriteDedupeCache = "write-dedupe"
BloomFilterCache = "bloom-filter"
)
// NewContext creates a new statistics context

@ -1334,7 +1334,15 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
var bloomQuerier indexgateway.BloomQuerier
if t.Cfg.BloomGateway.Enabled {
bloomGatewayClient, err := bloomgateway.NewGatewayClient(t.Cfg.BloomGateway.Client, t.Overrides, prometheus.DefaultRegisterer, logger, t.Cfg.MetricsNamespace)
bloomGatewayClient, err := bloomgateway.NewGatewayClient(
t.Cfg.BloomGateway.Client,
t.Overrides,
prometheus.DefaultRegisterer,
logger,
t.Cfg.MetricsNamespace,
t.cacheGenerationLoader,
t.Cfg.CompactorConfig.RetentionEnabled,
)
if err != nil {
return nil, err
}

@ -158,7 +158,7 @@ func (s ResultsCache) handleMiss(ctx context.Context, r Request, maxCacheTime in
return nil, nil, err
}
if !s.shouldCacheRes(ctx, r, response, maxCacheTime) {
if s.shouldCacheRes != nil && !s.shouldCacheRes(ctx, r, response, maxCacheTime) {
return response, []Extent{}, nil
}

@ -33,6 +33,10 @@ func (cfg *Config) Validate() error {
return errors.Errorf("unsupported compression type: %s", cfg.Compression)
}
if !cache.IsCacheConfigured(cfg.CacheConfig) {
return errors.New("no cache configured")
}
return nil
}

@ -191,6 +191,7 @@ type Limits struct {
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"`
BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"`
BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"`
AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."`
MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."`
@ -313,6 +314,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.")
f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.")
f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.")
f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Interval for computing the cache key in the Bloom Gateway.")
l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
@ -811,6 +813,10 @@ func (o *Overrides) BloomGatewayBlocksDownloadingParallelism(userID string) int
return o.getOverridesForUser(userID).BloomGatewayBlocksDownloadingParallelism
}
func (o *Overrides) BloomGatewayCacheKeyInterval(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomGatewayCacheKeyInterval
}
func (o *Overrides) BloomGatewayEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomGatewayEnabled
}

Loading…
Cancel
Save