mirror of https://github.com/grafana/loki
feat(caching): Support caching `/series` and `/labels` query results (#11539)
**What this PR does / why we need it**: Add support for caching metadata queries (both series and labels). caching happens after splitting similar to other types of queries. This pr adds the following configs to enable them. ``` cache_series_results: true|false (default false) cache_label_results: true|false (default false) ``` And the cache backend for them can be configured using `series_results_cache` and `label_results_cache` blocks under the `query_range` section. Currently the split interval for metadata queries is fixed and defaults to 24h, this pr makes it configurable by introducing `split_metadata_queries_by_interval` **Which issue(s) this PR fixes**: Fixes #<issue number> **Special notes for your reviewer**: **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](pull/11546/head^2d10549e3ec) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](0d4416a4b0) --------- Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> Co-authored-by: Ashwanth Goli <iamashwanth@gmail.com>
parent
8d01cbf0c5
commit
ce574485cf
@ -0,0 +1,87 @@ |
||||
auth_enabled: false |
||||
|
||||
server: |
||||
http_listen_port: 3100 |
||||
grpc_listen_port: 9096 |
||||
|
||||
common: |
||||
instance_addr: 127.0.0.1 |
||||
path_prefix: /tmp/loki |
||||
storage: |
||||
filesystem: |
||||
chunks_directory: /tmp/loki/chunks |
||||
rules_directory: /tmp/loki/rules |
||||
replication_factor: 1 |
||||
ring: |
||||
kvstore: |
||||
store: inmemory |
||||
|
||||
query_range: |
||||
align_queries_with_step: true |
||||
cache_index_stats_results: true |
||||
cache_results: true |
||||
cache_volume_results: true |
||||
cache_series_results: true |
||||
series_results_cache: |
||||
cache: |
||||
default_validity: 12h |
||||
memcached_client: |
||||
consistent_hash: true |
||||
addresses: "dns+localhost:11211" |
||||
max_idle_conns: 16 |
||||
timeout: 500ms |
||||
update_interval: 1m |
||||
index_stats_results_cache: |
||||
cache: |
||||
default_validity: 12h |
||||
memcached_client: |
||||
consistent_hash: true |
||||
addresses: "dns+localhost:11211" |
||||
max_idle_conns: 16 |
||||
timeout: 500ms |
||||
update_interval: 1m |
||||
max_retries: 5 |
||||
results_cache: |
||||
cache: |
||||
default_validity: 12h |
||||
memcached_client: |
||||
consistent_hash: true |
||||
addresses: "dns+localhost:11211" |
||||
max_idle_conns: 16 |
||||
timeout: 500ms |
||||
update_interval: 1m |
||||
volume_results_cache: |
||||
cache: |
||||
default_validity: 12h |
||||
memcached_client: |
||||
consistent_hash: true |
||||
addresses: "dns+localhost:11211" |
||||
max_idle_conns: 16 |
||||
timeout: 500ms |
||||
update_interval: 1m |
||||
|
||||
schema_config: |
||||
configs: |
||||
- from: 2020-10-24 |
||||
store: tsdb |
||||
object_store: filesystem |
||||
schema: v12 |
||||
index: |
||||
prefix: index_ |
||||
period: 24h |
||||
|
||||
ruler: |
||||
alertmanager_url: http://localhost:9093 |
||||
|
||||
# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration |
||||
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/ |
||||
# |
||||
# Statistics help us better understand how Loki is used, and they show us performance |
||||
# levels for most users. This helps us prioritize features and documentation. |
||||
# For more information on what's sent, look at |
||||
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go |
||||
# Refer to the buildReport method to see what goes into a report. |
||||
# |
||||
# If you would like to disable reporting, uncomment the following lines: |
||||
#analytics: |
||||
# reporting_enabled: false |
||||
@ -0,0 +1,99 @@ |
||||
package queryrange |
||||
|
||||
import ( |
||||
"context" |
||||
"flag" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
|
||||
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" |
||||
"github.com/grafana/loki/pkg/storage/chunk/cache" |
||||
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" |
||||
) |
||||
|
||||
type cacheKeyLabels struct { |
||||
Limits |
||||
transformer UserIDTransformer |
||||
} |
||||
|
||||
// GenerateCacheKey generates a cache key based on the userID, split duration and the interval of the request.
|
||||
// It also includes the label name and the provided query for label values request.
|
||||
func (i cacheKeyLabels) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { |
||||
lr := r.(*LabelRequest) |
||||
split := i.MetadataQuerySplitDuration(userID) |
||||
|
||||
var currentInterval int64 |
||||
if denominator := int64(split / time.Millisecond); denominator > 0 { |
||||
currentInterval = lr.GetStart().UnixMilli() / denominator |
||||
} |
||||
|
||||
if i.transformer != nil { |
||||
userID = i.transformer(ctx, userID) |
||||
} |
||||
|
||||
if lr.GetValues() { |
||||
return fmt.Sprintf("labelvalues:%s:%s:%s:%d:%d", userID, lr.GetName(), lr.GetQuery(), currentInterval, split) |
||||
} |
||||
|
||||
return fmt.Sprintf("labels:%s:%d:%d", userID, currentInterval, split) |
||||
} |
||||
|
||||
type labelsExtractor struct{} |
||||
|
||||
// Extract extracts the labels response for the specific time range.
|
||||
// It is a no-op since it is not possible to partition the labels data by time range as it is just a slice of strings.
|
||||
func (p labelsExtractor) Extract(_, _ int64, res resultscache.Response, _, _ int64) resultscache.Response { |
||||
return res |
||||
} |
||||
|
||||
func (p labelsExtractor) ResponseWithoutHeaders(resp queryrangebase.Response) queryrangebase.Response { |
||||
labelsResp := resp.(*LokiLabelNamesResponse) |
||||
return &LokiLabelNamesResponse{ |
||||
Status: labelsResp.Status, |
||||
Data: labelsResp.Data, |
||||
Version: labelsResp.Version, |
||||
Statistics: labelsResp.Statistics, |
||||
} |
||||
} |
||||
|
||||
type LabelsCacheConfig struct { |
||||
queryrangebase.ResultsCacheConfig `yaml:",inline"` |
||||
} |
||||
|
||||
// RegisterFlags registers flags.
|
||||
func (cfg *LabelsCacheConfig) RegisterFlags(f *flag.FlagSet) { |
||||
cfg.RegisterFlagsWithPrefix(f, "frontend.label-results-cache.") |
||||
} |
||||
|
||||
func (cfg *LabelsCacheConfig) Validate() error { |
||||
return cfg.ResultsCacheConfig.Validate() |
||||
} |
||||
|
||||
func NewLabelsCacheMiddleware( |
||||
logger log.Logger, |
||||
limits Limits, |
||||
merger queryrangebase.Merger, |
||||
c cache.Cache, |
||||
cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, |
||||
shouldCache queryrangebase.ShouldCacheFn, |
||||
parallelismForReq queryrangebase.ParallelismForReqFn, |
||||
retentionEnabled bool, |
||||
transformer UserIDTransformer, |
||||
metrics *queryrangebase.ResultsCacheMetrics, |
||||
) (queryrangebase.Middleware, error) { |
||||
return queryrangebase.NewResultsCacheMiddleware( |
||||
logger, |
||||
c, |
||||
cacheKeyLabels{limits, transformer}, |
||||
limits, |
||||
merger, |
||||
labelsExtractor{}, |
||||
cacheGenNumberLoader, |
||||
shouldCache, |
||||
parallelismForReq, |
||||
retentionEnabled, |
||||
metrics, |
||||
) |
||||
} |
||||
@ -0,0 +1,251 @@ |
||||
package queryrange |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/grafana/dskit/user" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/loghttp" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/logqlmodel/stats" |
||||
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" |
||||
"github.com/grafana/loki/pkg/storage/chunk/cache" |
||||
"github.com/grafana/loki/pkg/util" |
||||
) |
||||
|
||||
func TestCacheKeyLabels_GenerateCacheKey(t *testing.T) { |
||||
k := cacheKeyLabels{ |
||||
transformer: nil, |
||||
Limits: fakeLimits{ |
||||
metadataSplitDuration: map[string]time.Duration{ |
||||
"fake": time.Hour, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) |
||||
start := from.Time() |
||||
end := through.Time() |
||||
|
||||
req := LabelRequest{ |
||||
LabelRequest: logproto.LabelRequest{ |
||||
Start: &start, |
||||
End: &end, |
||||
}, |
||||
} |
||||
|
||||
expectedInterval := testTime.UnixMilli() / time.Hour.Milliseconds() |
||||
|
||||
t.Run("labels", func(t *testing.T) { |
||||
require.Equal(t, fmt.Sprintf(`labels:fake:%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req)) |
||||
}) |
||||
|
||||
t.Run("label values", func(t *testing.T) { |
||||
req := req |
||||
req.Name = "foo" |
||||
req.Values = true |
||||
require.Equal(t, fmt.Sprintf(`labelvalues:fake:foo::%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req)) |
||||
|
||||
req.Query = `{cluster="eu-west1"}` |
||||
require.Equal(t, fmt.Sprintf(`labelvalues:fake:foo:{cluster="eu-west1"}:%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", &req)) |
||||
}) |
||||
} |
||||
|
||||
func TestLabelsCache(t *testing.T) { |
||||
setupCacheMW := func() queryrangebase.Middleware { |
||||
cacheMiddleware, err := NewLabelsCacheMiddleware( |
||||
log.NewNopLogger(), |
||||
fakeLimits{ |
||||
metadataSplitDuration: map[string]time.Duration{ |
||||
"fake": 24 * time.Hour, |
||||
}, |
||||
}, |
||||
DefaultCodec, |
||||
cache.NewMockCache(), |
||||
nil, |
||||
nil, |
||||
func(_ context.Context, _ []string, _ queryrangebase.Request) int { |
||||
return 1 |
||||
}, |
||||
false, |
||||
nil, |
||||
nil, |
||||
) |
||||
require.NoError(t, err) |
||||
return cacheMiddleware |
||||
} |
||||
|
||||
cacheMiddleware := setupCacheMW() |
||||
for _, values := range []bool{false, true} { |
||||
prefix := "labels" |
||||
if values { |
||||
prefix = "label values" |
||||
} |
||||
t.Run(prefix+": cache the response for the same request", func(t *testing.T) { |
||||
start := testTime.Truncate(time.Millisecond) |
||||
end := start.Add(time.Hour) |
||||
|
||||
labelsReq := LabelRequest{ |
||||
LabelRequest: logproto.LabelRequest{ |
||||
Start: &start, |
||||
End: &end, |
||||
}, |
||||
} |
||||
|
||||
if values { |
||||
labelsReq.Values = true |
||||
labelsReq.Name = "foo" |
||||
labelsReq.Query = `{cluster="eu-west1"}` |
||||
} |
||||
|
||||
labelsResp := &LokiLabelNamesResponse{ |
||||
Status: "success", |
||||
Version: uint32(loghttp.VersionV1), |
||||
Data: []string{"bar", "buzz"}, |
||||
Statistics: stats.Result{ |
||||
Summary: stats.Summary{ |
||||
Splits: 1, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
called := 0 |
||||
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { |
||||
called++ |
||||
|
||||
// should request the entire length with no partitioning as nothing is cached yet.
|
||||
require.Equal(t, labelsReq.GetStart(), r.GetStart()) |
||||
require.Equal(t, labelsReq.GetEnd(), r.GetEnd()) |
||||
|
||||
got := r.(*LabelRequest) |
||||
require.Equal(t, labelsReq.GetName(), got.GetName()) |
||||
require.Equal(t, labelsReq.GetValues(), got.GetValues()) |
||||
require.Equal(t, labelsReq.GetQuery(), got.GetQuery()) |
||||
|
||||
return labelsResp, nil |
||||
})) |
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "fake") |
||||
got, err := handler.Do(ctx, &labelsReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, called) // called actual handler, as not cached.
|
||||
require.Equal(t, labelsResp, got) |
||||
|
||||
// Doing same request again shouldn't change anything.
|
||||
called = 0 |
||||
got, err = handler.Do(ctx, &labelsReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 0, called) |
||||
require.Equal(t, labelsResp, got) |
||||
}) |
||||
} |
||||
|
||||
// reset cacheMiddleware
|
||||
cacheMiddleware = setupCacheMW() |
||||
for _, values := range []bool{false, true} { |
||||
prefix := "labels" |
||||
if values { |
||||
prefix = "label values" |
||||
} |
||||
t.Run(prefix+": a new request with overlapping time range should reuse part of the previous request for the overlap", func(t *testing.T) { |
||||
cacheMiddleware := setupCacheMW() |
||||
|
||||
start := testTime.Truncate(time.Millisecond) |
||||
end := start.Add(time.Hour) |
||||
|
||||
labelsReq1 := LabelRequest{ |
||||
LabelRequest: logproto.LabelRequest{ |
||||
Start: &start, |
||||
End: &end, |
||||
}, |
||||
} |
||||
|
||||
if values { |
||||
labelsReq1.Values = true |
||||
labelsReq1.Name = "foo" |
||||
labelsReq1.Query = `{cluster="eu-west1"}` |
||||
} |
||||
|
||||
labelsResp1 := &LokiLabelNamesResponse{ |
||||
Status: "success", |
||||
Version: uint32(loghttp.VersionV1), |
||||
Data: []string{"bar", "buzz"}, |
||||
Statistics: stats.Result{ |
||||
Summary: stats.Summary{ |
||||
Splits: 1, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
called := 0 |
||||
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { |
||||
called++ |
||||
|
||||
// should request the entire length with no partitioning as nothing is cached yet.
|
||||
require.Equal(t, labelsReq1.GetStart(), r.GetStart()) |
||||
require.Equal(t, labelsReq1.GetEnd(), r.GetEnd()) |
||||
|
||||
got := r.(*LabelRequest) |
||||
require.Equal(t, labelsReq1.GetName(), got.GetName()) |
||||
require.Equal(t, labelsReq1.GetValues(), got.GetValues()) |
||||
require.Equal(t, labelsReq1.GetQuery(), got.GetQuery()) |
||||
|
||||
return labelsResp1, nil |
||||
})) |
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "fake") |
||||
got, err := handler.Do(ctx, &labelsReq1) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, called) |
||||
require.Equal(t, labelsResp1, got) |
||||
|
||||
labelsReq2 := labelsReq1.WithStartEnd(labelsReq1.GetStart().Add(15*time.Minute), labelsReq1.GetEnd().Add(15*time.Minute)) |
||||
|
||||
called = 0 |
||||
handler = cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { |
||||
called++ |
||||
|
||||
// make downstream request only for the non-overlapping portion of the query.
|
||||
require.Equal(t, labelsReq1.GetEnd(), r.GetStart()) |
||||
require.Equal(t, labelsReq2.GetEnd(), r.GetEnd()) |
||||
|
||||
got := r.(*LabelRequest) |
||||
require.Equal(t, labelsReq1.GetName(), got.GetName()) |
||||
require.Equal(t, labelsReq1.GetValues(), got.GetValues()) |
||||
require.Equal(t, labelsReq1.GetQuery(), got.GetQuery()) |
||||
|
||||
return &LokiLabelNamesResponse{ |
||||
Status: "success", |
||||
Version: uint32(loghttp.VersionV1), |
||||
Data: []string{"fizz"}, |
||||
Statistics: stats.Result{ |
||||
Summary: stats.Summary{ |
||||
Splits: 1, |
||||
}, |
||||
}, |
||||
}, nil |
||||
})) |
||||
|
||||
got, err = handler.Do(ctx, labelsReq2) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, called) |
||||
// two splits as we merge the results from the extent and downstream request
|
||||
labelsResp1.Statistics.Summary.Splits = 2 |
||||
require.Equal(t, &LokiLabelNamesResponse{ |
||||
Status: "success", |
||||
Version: uint32(loghttp.VersionV1), |
||||
Data: []string{"bar", "buzz", "fizz"}, |
||||
Statistics: stats.Result{ |
||||
Summary: stats.Summary{ |
||||
Splits: 2, |
||||
}, |
||||
}, |
||||
}, got) |
||||
}) |
||||
} |
||||
} |
||||
@ -0,0 +1,100 @@ |
||||
package queryrange |
||||
|
||||
import ( |
||||
"context" |
||||
"flag" |
||||
"fmt" |
||||
"sort" |
||||
strings "strings" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
|
||||
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" |
||||
"github.com/grafana/loki/pkg/storage/chunk/cache" |
||||
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" |
||||
) |
||||
|
||||
type cacheKeySeries struct { |
||||
Limits |
||||
transformer UserIDTransformer |
||||
} |
||||
|
||||
// GenerateCacheKey generates a cache key based on the userID, matchers, split duration and the interval of the request.
|
||||
func (i cacheKeySeries) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { |
||||
sr := r.(*LokiSeriesRequest) |
||||
split := i.MetadataQuerySplitDuration(userID) |
||||
|
||||
var currentInterval int64 |
||||
if denominator := int64(split / time.Millisecond); denominator > 0 { |
||||
currentInterval = sr.GetStart().UnixMilli() / denominator |
||||
} |
||||
|
||||
if i.transformer != nil { |
||||
userID = i.transformer(ctx, userID) |
||||
} |
||||
|
||||
matchers := sr.GetMatch() |
||||
sort.Strings(matchers) |
||||
matcherStr := strings.Join(matchers, ",") |
||||
|
||||
return fmt.Sprintf("series:%s:%s:%d:%d", userID, matcherStr, currentInterval, split) |
||||
} |
||||
|
||||
type seriesExtractor struct{} |
||||
|
||||
// Extract extracts the series response for the specific time range.
|
||||
// It is a no-op since it is not possible to partition the series data by time range as it is just a list of kv pairs.
|
||||
func (p seriesExtractor) Extract(_, _ int64, res resultscache.Response, _, _ int64) resultscache.Response { |
||||
return res |
||||
} |
||||
|
||||
func (p seriesExtractor) ResponseWithoutHeaders(resp queryrangebase.Response) queryrangebase.Response { |
||||
seriesRes := resp.(*LokiSeriesResponse) |
||||
return &LokiSeriesResponse{ |
||||
Data: seriesRes.Data, |
||||
Status: seriesRes.Status, |
||||
Version: seriesRes.Version, |
||||
Statistics: seriesRes.Statistics, |
||||
} |
||||
} |
||||
|
||||
type SeriesCacheConfig struct { |
||||
queryrangebase.ResultsCacheConfig `yaml:",inline"` |
||||
} |
||||
|
||||
// RegisterFlags registers flags.
|
||||
func (cfg *SeriesCacheConfig) RegisterFlags(f *flag.FlagSet) { |
||||
cfg.RegisterFlagsWithPrefix(f, "frontend.series-results-cache.") |
||||
} |
||||
|
||||
func (cfg *SeriesCacheConfig) Validate() error { |
||||
return cfg.ResultsCacheConfig.Validate() |
||||
} |
||||
|
||||
func NewSeriesCacheMiddleware( |
||||
logger log.Logger, |
||||
limits Limits, |
||||
merger queryrangebase.Merger, |
||||
c cache.Cache, |
||||
cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, |
||||
shouldCache queryrangebase.ShouldCacheFn, |
||||
parallelismForReq queryrangebase.ParallelismForReqFn, |
||||
retentionEnabled bool, |
||||
transformer UserIDTransformer, |
||||
metrics *queryrangebase.ResultsCacheMetrics, |
||||
) (queryrangebase.Middleware, error) { |
||||
return queryrangebase.NewResultsCacheMiddleware( |
||||
logger, |
||||
c, |
||||
cacheKeySeries{limits, transformer}, |
||||
limits, |
||||
merger, |
||||
seriesExtractor{}, |
||||
cacheGenNumberLoader, |
||||
shouldCache, |
||||
parallelismForReq, |
||||
retentionEnabled, |
||||
metrics, |
||||
) |
||||
} |
||||
@ -0,0 +1,314 @@ |
||||
package queryrange |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/grafana/dskit/user" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/loghttp" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/logqlmodel/stats" |
||||
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" |
||||
"github.com/grafana/loki/pkg/storage/chunk/cache" |
||||
"github.com/grafana/loki/pkg/util" |
||||
) |
||||
|
||||
var ( |
||||
seriesAPIPath = "/loki/api/v1/series" |
||||
) |
||||
|
||||
func TestCacheKeySeries_GenerateCacheKey(t *testing.T) { |
||||
k := cacheKeySeries{ |
||||
transformer: nil, |
||||
Limits: fakeLimits{ |
||||
metadataSplitDuration: map[string]time.Duration{ |
||||
"fake": time.Hour, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) |
||||
req := &LokiSeriesRequest{ |
||||
StartTs: from.Time(), |
||||
EndTs: through.Time(), |
||||
Match: []string{`{namespace="prod"}`, `{service="foo"}`}, |
||||
Path: seriesAPIPath, |
||||
} |
||||
|
||||
expectedInterval := testTime.UnixMilli() / time.Hour.Milliseconds() |
||||
require.Equal(t, fmt.Sprintf(`series:fake:{namespace="prod"},{service="foo"}:%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", req)) |
||||
|
||||
t.Run("same set of matchers in any order should result in the same cache key", func(t *testing.T) { |
||||
from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) |
||||
|
||||
for _, matchers := range [][]string{ |
||||
{`{cluster="us-central"}`, `{namespace="prod"}`, `{service=~"foo.*"}`}, |
||||
{`{namespace="prod"}`, `{service=~"foo.*"}`, `{cluster="us-central"}`}, |
||||
} { |
||||
req := &LokiSeriesRequest{ |
||||
StartTs: from.Time(), |
||||
EndTs: through.Time(), |
||||
Match: matchers, |
||||
Path: seriesAPIPath, |
||||
} |
||||
expectedInterval := testTime.UnixMilli() / time.Hour.Milliseconds() |
||||
require.Equal(t, fmt.Sprintf(`series:fake:{cluster="us-central"},{namespace="prod"},{service=~"foo.*"}:%d:%d`, expectedInterval, time.Hour.Nanoseconds()), k.GenerateCacheKey(context.Background(), "fake", req)) |
||||
} |
||||
|
||||
}) |
||||
} |
||||
|
||||
func TestSeriesCache(t *testing.T) { |
||||
setupCacheMW := func() queryrangebase.Middleware { |
||||
cacheMiddleware, err := NewSeriesCacheMiddleware( |
||||
log.NewNopLogger(), |
||||
fakeLimits{ |
||||
metadataSplitDuration: map[string]time.Duration{ |
||||
"fake": 24 * time.Hour, |
||||
}, |
||||
}, |
||||
DefaultCodec, |
||||
cache.NewMockCache(), |
||||
nil, |
||||
nil, |
||||
func(_ context.Context, _ []string, _ queryrangebase.Request) int { |
||||
return 1 |
||||
}, |
||||
false, |
||||
nil, |
||||
nil, |
||||
) |
||||
require.NoError(t, err) |
||||
|
||||
return cacheMiddleware |
||||
} |
||||
|
||||
t.Run("caches the response for the same request", func(t *testing.T) { |
||||
cacheMiddleware := setupCacheMW() |
||||
from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) |
||||
|
||||
seriesReq := &LokiSeriesRequest{ |
||||
StartTs: from.Time(), |
||||
EndTs: through.Time(), |
||||
Match: []string{`{namespace=~".*"}`}, |
||||
Path: seriesAPIPath, |
||||
} |
||||
|
||||
seriesResp := &LokiSeriesResponse{ |
||||
Status: "success", |
||||
Version: uint32(loghttp.VersionV1), |
||||
Data: []logproto.SeriesIdentifier{ |
||||
{ |
||||
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, |
||||
}, |
||||
}, |
||||
Statistics: stats.Result{ |
||||
Summary: stats.Summary{ |
||||
Splits: 1, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
called := 0 |
||||
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { |
||||
called++ |
||||
|
||||
// should request the entire length with no partitioning as nothing is cached yet.
|
||||
require.Equal(t, seriesReq.GetStart(), r.GetStart()) |
||||
require.Equal(t, seriesReq.GetEnd(), r.GetEnd()) |
||||
|
||||
return seriesResp, nil |
||||
})) |
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "fake") |
||||
got, err := handler.Do(ctx, seriesReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, called) // called actual handler, as not cached.
|
||||
require.Equal(t, seriesResp, got) |
||||
|
||||
// Doing same request again shouldn't change anything.
|
||||
called = 0 |
||||
got, err = handler.Do(ctx, seriesReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 0, called) |
||||
require.Equal(t, seriesResp, got) |
||||
}) |
||||
|
||||
t.Run("a new request with overlapping time range should reuse part of the previous request for the overlap", func(t *testing.T) { |
||||
cacheMiddleware := setupCacheMW() |
||||
|
||||
from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) |
||||
req1 := &LokiSeriesRequest{ |
||||
StartTs: from.Time(), |
||||
EndTs: through.Time(), |
||||
Match: []string{`{namespace=~".*"}`}, |
||||
Path: seriesAPIPath, |
||||
} |
||||
resp1 := &LokiSeriesResponse{ |
||||
Status: "success", |
||||
Version: uint32(loghttp.VersionV1), |
||||
Data: []logproto.SeriesIdentifier{ |
||||
{ |
||||
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, |
||||
}, |
||||
{ |
||||
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, |
||||
}, |
||||
}, |
||||
Statistics: stats.Result{ |
||||
Summary: stats.Summary{ |
||||
Splits: 1, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
called := 0 |
||||
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { |
||||
called++ |
||||
|
||||
// should request the entire length with no partitioning as nothing is cached yet.
|
||||
require.Equal(t, req1.GetStart(), r.GetStart()) |
||||
require.Equal(t, req1.GetEnd(), r.GetEnd()) |
||||
|
||||
return resp1, nil |
||||
})) |
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "fake") |
||||
got, err := handler.Do(ctx, req1) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, called) |
||||
require.Equal(t, resp1, got) |
||||
|
||||
req2 := req1.WithStartEnd(req1.GetStart().Add(15*time.Minute), req1.GetEnd().Add(15*time.Minute)) |
||||
|
||||
called = 0 |
||||
handler = cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { |
||||
called++ |
||||
|
||||
// make downstream request only for the non-overlapping portion of the query.
|
||||
require.Equal(t, req1.GetEnd(), r.GetStart()) |
||||
require.Equal(t, req1.GetEnd().Add(15*time.Minute), r.GetEnd()) |
||||
|
||||
return &LokiSeriesResponse{ |
||||
Status: "success", |
||||
Version: uint32(loghttp.VersionV1), |
||||
Data: []logproto.SeriesIdentifier{ |
||||
{ |
||||
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, |
||||
}, |
||||
}, |
||||
Statistics: stats.Result{ |
||||
Summary: stats.Summary{ |
||||
Splits: 1, |
||||
}, |
||||
}, |
||||
}, nil |
||||
})) |
||||
|
||||
got, err = handler.Do(ctx, req2) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, called) |
||||
// two splits as we merge the results from the extent and downstream request
|
||||
resp1.Statistics.Summary.Splits = 2 |
||||
require.Equal(t, &LokiSeriesResponse{ |
||||
Status: "success", |
||||
Version: uint32(loghttp.VersionV1), |
||||
Data: []logproto.SeriesIdentifier{ |
||||
{ |
||||
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, |
||||
}, |
||||
{ |
||||
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, |
||||
}, |
||||
{ |
||||
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, |
||||
}, |
||||
}, |
||||
Statistics: stats.Result{ |
||||
Summary: stats.Summary{ |
||||
Splits: 2, |
||||
}, |
||||
}, |
||||
}, got) |
||||
}) |
||||
|
||||
t.Run("caches are only valid for the same request parameters", func(t *testing.T) { |
||||
cacheMiddleware := setupCacheMW() |
||||
|
||||
from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) |
||||
seriesReq := &LokiSeriesRequest{ |
||||
StartTs: from.Time(), |
||||
EndTs: through.Time(), |
||||
Match: []string{`{namespace=~".*"}`}, |
||||
Path: seriesAPIPath, |
||||
} |
||||
seriesResp := &LokiSeriesResponse{ |
||||
Status: "success", |
||||
Version: uint32(loghttp.VersionV1), |
||||
Data: []logproto.SeriesIdentifier{ |
||||
{ |
||||
Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, |
||||
}, |
||||
}, |
||||
Statistics: stats.Result{ |
||||
Summary: stats.Summary{ |
||||
Splits: 1, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
called := 0 |
||||
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { |
||||
called++ |
||||
|
||||
// should request the entire length as none of the subsequent queries hit the cache.
|
||||
require.Equal(t, seriesReq.GetStart(), r.GetStart()) |
||||
require.Equal(t, seriesReq.GetEnd(), r.GetEnd()) |
||||
return seriesResp, nil |
||||
})) |
||||
|
||||
// initial call to fill cache
|
||||
ctx := user.InjectOrgID(context.Background(), "fake") |
||||
_, err := handler.Do(ctx, seriesReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, called) |
||||
|
||||
type testCase struct { |
||||
fn func(*LokiSeriesRequest) |
||||
user string |
||||
} |
||||
testCases := map[string]testCase{ |
||||
"different match": { |
||||
fn: func(req *LokiSeriesRequest) { |
||||
req.Match = append(req.Match, `{foo="bar"}`) |
||||
}, |
||||
}, |
||||
"different user": { |
||||
user: "fake2s", |
||||
}, |
||||
} |
||||
|
||||
for name, tc := range testCases { |
||||
called = 0 |
||||
seriesReq := seriesReq |
||||
|
||||
if tc.fn != nil { |
||||
tc.fn(seriesReq) |
||||
} |
||||
|
||||
if tc.user != "" { |
||||
ctx = user.InjectOrgID(context.Background(), tc.user) |
||||
} |
||||
|
||||
_, err = handler.Do(ctx, seriesReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, called, name) |
||||
} |
||||
}) |
||||
} |
||||
Loading…
Reference in new issue