feat: Support split align and caching for instant metric query results (#11814)

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
pull/12003/head^2
Kaviraj Kanagaraj 2 years ago committed by GitHub
parent b7cb85f92b
commit fac5997b18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      .gitignore
  2. 3
      CHANGELOG.md
  3. 11
      cmd/loki/loki-local-with-memcached.yaml
  4. 30
      docs/sources/configure/_index.md
  5. 4
      pkg/logql/downstream.go
  6. 17
      pkg/logql/metrics.go
  7. 85
      pkg/logql/rangemapper.go
  8. 84
      pkg/logql/rangemapper_test.go
  9. 49
      pkg/logqlmodel/stats/context.go
  10. 230
      pkg/logqlmodel/stats/stats.pb.go
  11. 4
      pkg/logqlmodel/stats/stats.proto
  12. 7
      pkg/loki/config_wrapper.go
  13. 43
      pkg/loki/config_wrapper_test.go
  14. 110
      pkg/querier/queryrange/codec_test.go
  15. 43
      pkg/querier/queryrange/downstreamer.go
  16. 181
      pkg/querier/queryrange/downstreamer_test.go
  17. 85
      pkg/querier/queryrange/instant_metric_cache.go
  18. 9
      pkg/querier/queryrange/limits.go
  19. 1
      pkg/querier/queryrange/limits/definitions.go
  20. 10
      pkg/querier/queryrange/prometheus_test.go
  21. 104
      pkg/querier/queryrange/roundtrip.go
  22. 8
      pkg/querier/queryrange/roundtrip_test.go
  23. 33
      pkg/querier/queryrange/split_by_range.go
  24. 332
      pkg/querier/queryrange/split_by_range_test.go
  25. 12
      pkg/util/marshal/legacy/marshal_test.go
  26. 22
      pkg/util/marshal/marshal_test.go
  27. 8
      pkg/validation/limits.go

4
.gitignore vendored

@ -27,8 +27,8 @@ cmd/querytee/querytee
dlv
rootfs/
dist
coverage.txt
test_results.txt
*coverage.txt
*test_results.txt
.DS_Store
.aws-sam
.idea

@ -6,6 +6,7 @@
##### Enhancements
* [11814](https://github.com/grafana/loki/pull/11814) **kavirajk**: feat: Support split align and caching for instant metric query results
* [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods.
* [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests.
* [11633](https://github.com/grafana/loki/pull/11633) **cyriltovena**: Add profiling integrations to tracing instrumentation.
@ -70,7 +71,7 @@
* [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction.
* [11587](https://github.com/grafana/loki/pull/11587) **trevorwhitney** Fix semantics of label parsing logic of metrics and logs queries. Both only parse the first label if multiple extractions into the same label are requested.
* [11776](https://github.com/grafana/loki/pull/11776) **ashwanthgoli** Background Cache: Fixes a bug that is causing the background queue size to be incremented twice for each enqueued item.
* [11921](https://github.com/grafana/loki/pull/11921) **paul1r**: Parsing: String array elements were not being parsed correctly in JSON processing
* [11921](https://github.com/grafana/loki/pull/11921) **paul1r**: Parsing: String array elements were not being parsed correctly in JSON processing
##### Changes

@ -22,6 +22,17 @@ query_range:
cache_results: true
cache_volume_results: true
cache_series_results: true
cache_instant_metric_results: true
instant_metric_query_split_align: true
instant_metric_results_cache:
cache:
default_validity: 12h
memcached_client:
consistent_hash: true
addresses: "dns+localhost:11211"
max_idle_conns: 16
timeout: 500ms
update_interval: 1m
series_results_cache:
cache:
default_validity: 12h

@ -886,6 +886,28 @@ volume_results_cache:
# CLI flag: -frontend.volume-results-cache.compression
[compression: <string> | default = ""]
# Cache instant metric query results.
# CLI flag: -querier.cache-instant-metric-results
[cache_instant_metric_results: <boolean> | default = false]
# If a cache config is not specified and cache_instant_metric_results is true,
# the config for the results cache is used.
instant_metric_results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# frontend.instant-metric-results-cache
[cache: <cache_config>]
# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -frontend.instant-metric-results-cache.compression
[compression: <string> | default = ""]
# Whether to align the splits of instant metric query with splitByInterval and
# query's exec time. Useful when instant_metric_cache is enabled
# CLI flag: -querier.instant-metric-query-split-align
[instant_metric_query_split_align: <boolean> | default = false]
# Cache series query results.
# CLI flag: -querier.cache-series-results
[cache_series_results: <boolean> | default = false]
@ -2935,6 +2957,13 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -experimental.querier.recent-metadata-query-window
[recent_metadata_query_window: <duration> | default = 0s]
# Split instant metric queries by a time interval and execute in parallel. The
# value 0 disables splitting instant metric queries by time. This also
# determines how cache keys are chosen when instant metric query result caching
# is enabled.
# CLI flag: -querier.split-instant-metric-queries-by-interval
[split_instant_metric_queries_by_interval: <duration> | default = 1h]
# Interval to use for time-based splitting when a request is within the
# `query_ingesters_within` window; defaults to `split-queries-by-interval` by
# setting to 0.
@ -4403,6 +4432,7 @@ The cache block configures the cache backend. The supported CLI flags `<prefix>`
- `bloom.metas-cache`
- `frontend`
- `frontend.index-stats-results-cache`
- `frontend.instant-metric-results-cache`
- `frontend.label-results-cache`
- `frontend.series-results-cache`
- `frontend.volume-results-cache`

@ -636,6 +636,10 @@ func NewResultStepEvaluator(res logqlmodel.Result, params Params) (StepEvaluator
step = params.Step()
)
if res.Data == nil {
return nil, fmt.Errorf("data in the passed result is nil (res.Data), cannot be processed by stepevaluator")
}
switch data := res.Data.(type) {
case promql.Vector:
return NewVectorStepEvaluator(start, data), nil

@ -94,7 +94,8 @@ func RecordRangeAndInstantQueryMetrics(
) {
var (
logger = fixLogger(ctx, log)
rt = string(GetRangeType(p))
rangeType = GetRangeType(p)
rt = string(rangeType)
latencyType = latencyTypeFast
returnedLines = 0
)
@ -103,6 +104,12 @@ func RecordRangeAndInstantQueryMetrics(
level.Warn(logger).Log("msg", "error parsing query type", "err", err)
}
resultCache := stats.Caches.Result
if queryType == QueryTypeMetric && rangeType == InstantType {
resultCache = stats.Caches.InstantMetricResult
}
// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
@ -162,10 +169,10 @@ func RecordRangeAndInstantQueryMetrics(
"cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested,
"cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound,
"cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(),
"cache_result_req", stats.Caches.Result.EntriesRequested,
"cache_result_hit", stats.Caches.Result.EntriesFound,
"cache_result_download_time", stats.Caches.Result.CacheDownloadTime(),
"cache_result_query_length_served", stats.Caches.Result.CacheQueryLengthServed(),
"cache_result_req", resultCache.EntriesRequested,
"cache_result_hit", resultCache.EntriesFound,
"cache_result_download_time", resultCache.CacheDownloadTime(),
"cache_result_query_length_served", resultCache.CacheQueryLengthServed(),
}...)
logValues = append(logValues, tagsToKeyValues(queryTags)...)

@ -57,6 +57,20 @@ type RangeMapper struct {
splitByInterval time.Duration
metrics *MapperMetrics
stats *MapperStats
splitAlignTs time.Time
}
// NewRangeMapperWithSplitAlign is similar to `NewRangeMapper` except it accepts additonal `splitAlign` argument and used to
// align the subqueries generated according to that. Look at `rangeSplitAlign` method for more information.
func NewRangeMapperWithSplitAlign(interval time.Duration, splitAlign time.Time, metrics *MapperMetrics, stats *MapperStats) (RangeMapper, error) {
rm, err := NewRangeMapper(interval, metrics, stats)
if err != nil {
return RangeMapper{}, err
}
rm.splitAlignTs = splitAlign
return rm, nil
}
// NewRangeMapper creates a new RangeMapper instance with the given duration as
@ -327,6 +341,77 @@ func (m RangeMapper) getOriginalOffset(expr syntax.SampleExpr) (offset time.Dura
// rangeInterval should be greater than m.splitByInterval, otherwise the resultant expression
// will have an unnecessary aggregation operation
func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
if m.splitAlignTs.IsZero() {
return m.rangeSplit(expr, rangeInterval, recorder)
}
return m.rangeSplitAlign(expr, rangeInterval, recorder)
}
// rangeSplitAlign try to split given `rangeInterval` into units of `m.splitByInterval` by making sure `rangeInterval` is aligned with `m.splitByInterval` for as much as the units as possible.
// Consider following example with real use case.
// Instant Query: `sum(rate({foo="bar"}[3h])`
// execTs: 12:34:00
// splitBy: 1h
// Given above parameters, queries will be split into following
// 1. sum(rate({foo="bar"}[34m]))
// 2. sum(rate({foo="bar"}[1h] offset 34m))
// 3. sum(rate({foo="bar"}[1h] offset 1h34m))
// 4. sum(rate({foo="bar"}[26m] offset 2h34m))
func (m RangeMapper) rangeSplitAlign(
expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder,
) syntax.SampleExpr {
if rangeInterval <= m.splitByInterval {
return expr
}
originalOffset, err := m.getOriginalOffset(expr)
if err != nil {
return expr
}
align := m.splitAlignTs.Sub(m.splitAlignTs.Truncate(m.splitByInterval)) // say, 12:34:00 - 12:00:00(truncated) = 34m
if align == 0 {
return m.rangeSplit(expr, rangeInterval, recorder) // Don't have to align
}
var (
newRng = align
// TODO(kavi): If the originalOffset is non-zero, there may be a edge case, where subqueries generated won't be aligned correctly. Handle this edge case in separate PR.
newOffset = originalOffset
downstreams *ConcatSampleExpr
pendingRangeInterval = rangeInterval
splits = 0
)
// first subquery
downstreams = appendDownstream(downstreams, expr, newRng, newOffset)
splits++
newOffset += align // e.g: offset 34m
pendingRangeInterval -= newRng
newRng = m.splitByInterval // [1h]
// Rest of the subqueries.
for pendingRangeInterval > 0 {
if pendingRangeInterval < m.splitByInterval {
newRng = pendingRangeInterval // last subquery
}
downstreams = appendDownstream(downstreams, expr, newRng, newOffset)
newOffset += m.splitByInterval
pendingRangeInterval -= newRng
splits++
}
// update stats and metrics
m.stats.AddSplitQueries(splits)
recorder.Add(splits, MetricsKey)
return downstreams
}
func (m RangeMapper) rangeSplit(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
splitCount := int(math.Ceil(float64(rangeInterval) / float64(m.splitByInterval)))
if splitCount <= 1 {
return expr

@ -93,6 +93,84 @@ func Test_SplitRangeInterval(t *testing.T) {
}
}
func Test_RangeMapperSplitAlign(t *testing.T) {
cases := []struct {
name string
expr string
queryTime time.Time
splityByInterval time.Duration
expected string
expectedSplits int
}{
{
name: "query_time_aligned_with_split_by",
expr: `bytes_over_time({app="foo"}[3m])`,
expected: `sum without() (
downstream<bytes_over_time({app="foo"}[1m] offset 2m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m] offset 1m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m]), shard=<nil>>
)`,
queryTime: time.Unix(60, 0), // 1970 00:01:00
splityByInterval: 1 * time.Minute,
expectedSplits: 3,
},
{
name: "query_time_aligned_with_split_by_with_original_offset",
expr: `bytes_over_time({app="foo"}[3m] offset 20m10s)`, // NOTE: original query has offset, which should be considered in all the splits subquery
expected: `sum without() (
downstream<bytes_over_time({app="foo"}[1m] offset 22m10s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m] offset 21m10s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m] offset 20m10s), shard=<nil>>
)`,
queryTime: time.Unix(60, 0), // 1970 00:01:00
splityByInterval: 1 * time.Minute,
expectedSplits: 3,
},
{
name: "query_time_not_aligned_with_split_by",
expr: `bytes_over_time({app="foo"}[3h])`,
expected: `sum without() (
downstream<bytes_over_time({app="foo"}[6m] offset 2h54m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1h] offset 1h54m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1h] offset 54m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[54m]), shard=<nil>>
)`,
queryTime: time.Date(0, 0, 0, 12, 54, 0, 0, time.UTC), // 1970 12:54:00
splityByInterval: 1 * time.Hour,
expectedSplits: 4,
},
{
name: "query_time_not_aligned_with_split_by_with_original_offset",
expr: `bytes_over_time({app="foo"}[3h] offset 1h2m20s)`, // NOTE: original query has offset, which should be considered in all the splits subquery
expected: `sum without() (
downstream<bytes_over_time({app="foo"}[6m] offset 3h56m20s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1h] offset 2h56m20s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1h] offset 1h56m20s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[54m] offset 1h2m20s), shard=<nil>>
)`,
queryTime: time.Date(0, 0, 0, 12, 54, 0, 0, time.UTC), // 1970 12:54:00
splityByInterval: 1 * time.Hour,
expectedSplits: 4,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mapperStats := NewMapperStats()
rvm, err := NewRangeMapperWithSplitAlign(tc.splityByInterval, tc.queryTime, nilShardMetrics, mapperStats)
require.NoError(t, err)
noop, mappedExpr, err := rvm.Parse(syntax.MustParseExpr(tc.expr))
require.NoError(t, err)
require.Equal(t, removeWhiteSpace(tc.expected), removeWhiteSpace(mappedExpr.String()))
require.Equal(t, tc.expectedSplits, mapperStats.GetSplitQueries())
require.False(t, noop)
})
}
}
func Test_SplitRangeVectorMapping(t *testing.T) {
for _, tc := range []struct {
expr string
@ -1675,7 +1753,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
// Non-splittable vector aggregators - should go deeper in the AST
{
`topk(2, count_over_time({app="foo"}[3m]))`,
`topk(2,
`topk(2,
sum without () (
downstream<count_over_time({app="foo"}[1m] offset 2m0s), shard=<nil>>
++ downstream<count_over_time({app="foo"}[1m] offset 1m0s), shard=<nil>>
@ -1713,7 +1791,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
++ downstream<sum by (baz) (count_over_time({app="foo"} [1m] offset 1m0s)), shard=<nil>>
++ downstream<sum by (baz) (count_over_time({app="foo"} [1m])), shard=<nil>>
)
),
),
"x", "$1", "a", "(.*)"
)`,
3,
@ -1727,7 +1805,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
++ downstream<count_over_time({job="api-server",service="a:c"} |= "err" [1m] offset 1m0s), shard=<nil>>
++ downstream<count_over_time({job="api-server",service="a:c"} |= "err" [1m]), shard=<nil>>
)
/ 180),
/ 180),
"foo", "$1", "service", "(.*):.*"
)`,
3,

@ -55,17 +55,18 @@ type Context struct {
type CacheType string
const (
ChunkCache CacheType = "chunk" //nolint:staticcheck
IndexCache CacheType = "index" //nolint:staticcheck
ResultCache CacheType = "result" //nolint:staticcheck
StatsResultCache CacheType = "stats-result" //nolint:staticcheck
VolumeResultCache CacheType = "volume-result" //nolint:staticcheck
WriteDedupeCache CacheType = "write-dedupe" //nolint:staticcheck
SeriesResultCache CacheType = "series-result" //nolint:staticcheck
LabelResultCache CacheType = "label-result" //nolint:staticcheck
BloomFilterCache CacheType = "bloom-filter" //nolint:staticcheck
BloomBlocksCache CacheType = "bloom-blocks" //nolint:staticcheck
BloomMetasCache CacheType = "bloom-metas" //nolint:staticcheck
ChunkCache CacheType = "chunk" //nolint:staticcheck
IndexCache CacheType = "index" //nolint:staticcheck
ResultCache CacheType = "result" //nolint:staticcheck
StatsResultCache CacheType = "stats-result" //nolint:staticcheck
VolumeResultCache CacheType = "volume-result" //nolint:staticcheck
InstantMetricResultsCache CacheType = "instant-metric-result" // nolint:staticcheck
WriteDedupeCache CacheType = "write-dedupe" //nolint:staticcheck
SeriesResultCache CacheType = "series-result" //nolint:staticcheck
LabelResultCache CacheType = "label-result" //nolint:staticcheck
BloomFilterCache CacheType = "bloom-filter" //nolint:staticcheck
BloomBlocksCache CacheType = "bloom-blocks" //nolint:staticcheck
BloomMetasCache CacheType = "bloom-metas" //nolint:staticcheck
)
// NewContext creates a new statistics context
@ -98,13 +99,14 @@ func (c *Context) Ingester() Ingester {
// Caches returns the cache statistics accumulated so far.
func (c *Context) Caches() Caches {
return Caches{
Chunk: c.caches.Chunk,
Index: c.caches.Index,
Result: c.caches.Result,
StatsResult: c.caches.StatsResult,
VolumeResult: c.caches.VolumeResult,
SeriesResult: c.caches.SeriesResult,
LabelResult: c.caches.LabelResult,
Chunk: c.caches.Chunk,
Index: c.caches.Index,
Result: c.caches.Result,
StatsResult: c.caches.StatsResult,
VolumeResult: c.caches.VolumeResult,
SeriesResult: c.caches.SeriesResult,
LabelResult: c.caches.LabelResult,
InstantMetricResult: c.caches.InstantMetricResult,
}
}
@ -222,6 +224,7 @@ func (c *Caches) Merge(m Caches) {
c.VolumeResult.Merge(m.VolumeResult)
c.SeriesResult.Merge(m.SeriesResult)
c.LabelResult.Merge(m.LabelResult)
c.InstantMetricResult.Merge(m.InstantMetricResult)
}
func (c *Cache) Merge(m Cache) {
@ -470,6 +473,8 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache {
stats = &c.caches.SeriesResult
case LabelResultCache:
stats = &c.caches.LabelResult
case InstantMetricResultsCache:
stats = &c.caches.InstantMetricResult
default:
return nil
}
@ -571,6 +576,12 @@ func (c Caches) Log(log log.Logger) {
"Cache.Result.EntriesStored", c.Result.EntriesStored,
"Cache.Result.BytesSent", humanize.Bytes(uint64(c.Result.BytesSent)),
"Cache.Result.BytesReceived", humanize.Bytes(uint64(c.Result.BytesReceived)),
"Cache.Result.DownloadTime", c.Result.CacheDownloadTime(),
"Cache.InstantMetricResult.Requests", c.InstantMetricResult.Requests,
"Cache.InstantMetricResult.EntriesRequested", c.InstantMetricResult.EntriesRequested,
"Cache.InstantMetricResult.EntriesFound", c.InstantMetricResult.EntriesFound,
"Cache.InstantMetricResult.EntriesStored", c.InstantMetricResult.EntriesStored,
"Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)),
"Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)),
"Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(),
)
}

@ -95,13 +95,14 @@ func (m *Result) GetCaches() Caches {
}
type Caches struct {
Chunk Cache `protobuf:"bytes,1,opt,name=chunk,proto3" json:"chunk"`
Index Cache `protobuf:"bytes,2,opt,name=index,proto3" json:"index"`
Result Cache `protobuf:"bytes,3,opt,name=result,proto3" json:"result"`
StatsResult Cache `protobuf:"bytes,4,opt,name=statsResult,proto3" json:"statsResult"`
VolumeResult Cache `protobuf:"bytes,5,opt,name=volumeResult,proto3" json:"volumeResult"`
SeriesResult Cache `protobuf:"bytes,6,opt,name=seriesResult,proto3" json:"seriesResult"`
LabelResult Cache `protobuf:"bytes,7,opt,name=labelResult,proto3" json:"labelResult"`
Chunk Cache `protobuf:"bytes,1,opt,name=chunk,proto3" json:"chunk"`
Index Cache `protobuf:"bytes,2,opt,name=index,proto3" json:"index"`
Result Cache `protobuf:"bytes,3,opt,name=result,proto3" json:"result"`
StatsResult Cache `protobuf:"bytes,4,opt,name=statsResult,proto3" json:"statsResult"`
VolumeResult Cache `protobuf:"bytes,5,opt,name=volumeResult,proto3" json:"volumeResult"`
SeriesResult Cache `protobuf:"bytes,6,opt,name=seriesResult,proto3" json:"seriesResult"`
LabelResult Cache `protobuf:"bytes,7,opt,name=labelResult,proto3" json:"labelResult"`
InstantMetricResult Cache `protobuf:"bytes,8,opt,name=instantMetricResult,proto3" json:"instantMetricResult"`
}
func (m *Caches) Reset() { *m = Caches{} }
@ -185,6 +186,13 @@ func (m *Caches) GetLabelResult() Cache {
return Cache{}
}
func (m *Caches) GetInstantMetricResult() Cache {
if m != nil {
return m.InstantMetricResult
}
return Cache{}
}
// Summary is the summary of a query statistics.
type Summary struct {
// Total bytes processed per second.
@ -773,83 +781,85 @@ func init() {
func init() { proto.RegisterFile("pkg/logqlmodel/stats/stats.proto", fileDescriptor_6cdfe5d2aea33ebb) }
var fileDescriptor_6cdfe5d2aea33ebb = []byte{
// 1215 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x57, 0x4d, 0x6f, 0xe3, 0x54,
0x17, 0x8e, 0x27, 0xaf, 0x93, 0xce, 0xed, 0xe7, 0xdc, 0x76, 0xde, 0xc9, 0x80, 0x64, 0x97, 0xc0,
0x88, 0x22, 0x50, 0x23, 0x3e, 0x24, 0x04, 0x62, 0x24, 0xe4, 0x0e, 0x95, 0x2a, 0x75, 0x44, 0x39,
0x81, 0x0d, 0x3b, 0xc7, 0xbe, 0x4d, 0xa2, 0x3a, 0x76, 0x6a, 0x5f, 0x97, 0xe9, 0x0a, 0x7e, 0x02,
0x3f, 0x83, 0x0d, 0x2b, 0x56, 0x48, 0x88, 0x0d, 0x9b, 0x59, 0x76, 0x39, 0x2b, 0x8b, 0xa6, 0x1b,
0xe4, 0xd5, 0x48, 0xfc, 0x01, 0x74, 0xcf, 0xbd, 0xf1, 0x57, 0x9c, 0x99, 0x6e, 0xe2, 0x7b, 0x9e,
0xf3, 0x3c, 0xe7, 0x7e, 0x9e, 0x73, 0x6f, 0xc8, 0xee, 0xf4, 0x6c, 0xd8, 0xf3, 0x82, 0xe1, 0xb9,
0x37, 0x09, 0x5c, 0xe6, 0xf5, 0x22, 0x6e, 0xf3, 0x48, 0xfe, 0xee, 0x4f, 0xc3, 0x80, 0x07, 0x54,
0x47, 0xe3, 0x8d, 0x9d, 0x61, 0x30, 0x0c, 0x10, 0xe9, 0x89, 0x96, 0x74, 0x76, 0xff, 0xd5, 0x48,
0x0b, 0x58, 0x14, 0x7b, 0x9c, 0x7e, 0x46, 0xda, 0x51, 0x3c, 0x99, 0xd8, 0xe1, 0x65, 0x47, 0xdb,
0xd5, 0xf6, 0x56, 0x3f, 0xda, 0xd8, 0x97, 0x61, 0xfa, 0x12, 0xb5, 0x36, 0x9f, 0x27, 0x66, 0x23,
0x4d, 0xcc, 0x39, 0x0d, 0xe6, 0x0d, 0x21, 0x3d, 0x8f, 0x59, 0x38, 0x66, 0x61, 0xe7, 0x4e, 0x49,
0xfa, 0x8d, 0x44, 0x73, 0xa9, 0xa2, 0xc1, 0xbc, 0x41, 0x1f, 0x93, 0x95, 0xb1, 0x3f, 0x64, 0x11,
0x67, 0x61, 0xa7, 0x89, 0xda, 0x4d, 0xa5, 0x3d, 0x52, 0xb0, 0xb5, 0xa5, 0xc4, 0x19, 0x11, 0xb2,
0x16, 0xfd, 0x84, 0xb4, 0x1c, 0xdb, 0x19, 0xb1, 0xa8, 0xf3, 0x3f, 0x14, 0xaf, 0x2b, 0xf1, 0x01,
0x82, 0xd6, 0xba, 0x92, 0xea, 0x48, 0x02, 0xc5, 0xed, 0xfe, 0xd9, 0x24, 0x2d, 0xc9, 0xa0, 0x1f,
0x12, 0xdd, 0x19, 0xc5, 0xfe, 0x99, 0x9a, 0xf3, 0x5a, 0x51, 0x5f, 0x90, 0x0b, 0x0a, 0xc8, 0x8f,
0x90, 0x8c, 0x7d, 0x97, 0x3d, 0x53, 0x73, 0x5d, 0x22, 0x41, 0x0a, 0xc8, 0x8f, 0x18, 0x66, 0x88,
0xab, 0xac, 0xe6, 0x58, 0xd6, 0x6c, 0x28, 0x8d, 0xe2, 0x80, 0xfa, 0xd2, 0x03, 0xb2, 0x8a, 0x34,
0xb9, 0x41, 0x6a, 0x86, 0x65, 0xe9, 0xb6, 0x92, 0x16, 0x89, 0x50, 0x34, 0xe8, 0x21, 0x59, 0xbb,
0x08, 0xbc, 0x78, 0xc2, 0x54, 0x14, 0xbd, 0x26, 0xca, 0x8e, 0x8a, 0x52, 0x62, 0x42, 0xc9, 0x12,
0x71, 0x22, 0xb1, 0x65, 0xf3, 0xd1, 0xb4, 0x5e, 0x15, 0xa7, 0xc8, 0x84, 0x92, 0x25, 0x26, 0xe5,
0xd9, 0x03, 0xe6, 0xa9, 0x30, 0xed, 0x57, 0x4d, 0xaa, 0x40, 0x84, 0xa2, 0xd1, 0xfd, 0xbd, 0x45,
0xda, 0xea, 0x58, 0xd2, 0xef, 0xc8, 0x83, 0xc1, 0x25, 0x67, 0xd1, 0x49, 0x18, 0x38, 0x2c, 0x8a,
0x98, 0x7b, 0xc2, 0xc2, 0x3e, 0x73, 0x02, 0xdf, 0xc5, 0x3d, 0x6d, 0x5a, 0x6f, 0xa6, 0x89, 0xb9,
0x8c, 0x02, 0xcb, 0x1c, 0x22, 0xac, 0x37, 0xf6, 0x6b, 0xc3, 0xde, 0xc9, 0xc3, 0x2e, 0xa1, 0xc0,
0x32, 0x07, 0x3d, 0x22, 0xdb, 0x3c, 0xe0, 0xb6, 0x67, 0x95, 0xba, 0xc5, 0x63, 0xd1, 0xb4, 0x1e,
0xa4, 0x89, 0x59, 0xe7, 0x86, 0x3a, 0x30, 0x0b, 0x75, 0x5c, 0xea, 0x0a, 0x8f, 0x49, 0x31, 0x54,
0xd9, 0x0d, 0x75, 0x20, 0xdd, 0x23, 0x2b, 0xec, 0x19, 0x73, 0xbe, 0x1d, 0x4f, 0x18, 0x1e, 0x10,
0xcd, 0x5a, 0x13, 0x09, 0x37, 0xc7, 0x20, 0x6b, 0xd1, 0xf7, 0xc9, 0xdd, 0xf3, 0x98, 0xc5, 0x0c,
0xa9, 0x2d, 0xa4, 0xae, 0xa7, 0x89, 0x99, 0x83, 0x90, 0x37, 0xe9, 0x3e, 0x21, 0x51, 0x3c, 0x90,
0xa9, 0x1e, 0xe1, 0x56, 0x37, 0xad, 0x8d, 0x34, 0x31, 0x0b, 0x28, 0x14, 0xda, 0xf4, 0x98, 0xec,
0xe0, 0xe8, 0xbe, 0xf2, 0xb9, 0x3c, 0x31, 0x3c, 0x0e, 0x7d, 0xe6, 0x76, 0x56, 0x50, 0xd9, 0x49,
0x13, 0xb3, 0xd6, 0x0f, 0xb5, 0x28, 0xed, 0x92, 0x56, 0x34, 0xf5, 0xc6, 0x3c, 0xea, 0xdc, 0x45,
0x3d, 0x11, 0x29, 0x26, 0x11, 0x50, 0x5f, 0xe4, 0x8c, 0xec, 0xd0, 0x8d, 0x3a, 0xa4, 0xc0, 0x41,
0x04, 0xd4, 0x37, 0x1b, 0xd5, 0x49, 0x10, 0xf1, 0xc3, 0xb1, 0xc7, 0x59, 0x88, 0xab, 0xd7, 0x59,
0xad, 0x8c, 0xaa, 0xe2, 0x87, 0x5a, 0x94, 0xfe, 0x48, 0x1e, 0x21, 0xde, 0xe7, 0x61, 0xec, 0xf0,
0x38, 0x64, 0xee, 0x53, 0xc6, 0x6d, 0xd7, 0xe6, 0x76, 0xe5, 0x48, 0xac, 0x61, 0xf8, 0xf7, 0xd2,
0xc4, 0xbc, 0x9d, 0x00, 0x6e, 0x47, 0xeb, 0x7e, 0x41, 0xda, 0xaa, 0x2c, 0x8b, 0x4a, 0x16, 0xf1,
0x20, 0x64, 0x95, 0xe2, 0xd7, 0x17, 0x58, 0x5e, 0xc9, 0x90, 0x02, 0xf2, 0xd3, 0xfd, 0xf5, 0x0e,
0x59, 0x39, 0xca, 0xab, 0xef, 0x1a, 0xf6, 0x09, 0x4c, 0xe4, 0xad, 0xcc, 0x37, 0xdd, 0xda, 0x12,
0x15, 0xa0, 0x88, 0x43, 0xc9, 0xa2, 0x87, 0x84, 0xa2, 0x7d, 0x20, 0xaa, 0x69, 0xf4, 0xd4, 0xe6,
0xa8, 0x95, 0x49, 0xf5, 0xff, 0x34, 0x31, 0x6b, 0xbc, 0x50, 0x83, 0x65, 0xbd, 0x5b, 0x68, 0x47,
0x2a, 0x87, 0xf2, 0xde, 0x15, 0x0e, 0x25, 0x8b, 0x7e, 0x4e, 0x36, 0xf2, 0x0c, 0xe8, 0x33, 0x9f,
0xab, 0x84, 0xa1, 0x69, 0x62, 0x56, 0x3c, 0x50, 0xb1, 0xf3, 0xf5, 0xd2, 0x6f, 0xbd, 0x5e, 0x7f,
0x34, 0x89, 0x8e, 0xfe, 0xac, 0x63, 0x39, 0x09, 0x60, 0xa7, 0xaa, 0x3c, 0xe5, 0x1d, 0x67, 0x1e,
0xa8, 0xd8, 0xf4, 0x6b, 0x72, 0xbf, 0x80, 0x3c, 0x09, 0x7e, 0xf0, 0xbd, 0xc0, 0x76, 0xb3, 0x55,
0x7b, 0x98, 0x26, 0x66, 0x3d, 0x01, 0xea, 0x61, 0xb1, 0x07, 0x4e, 0x09, 0xc3, 0x7c, 0x6e, 0xe6,
0x7b, 0xb0, 0xe8, 0x85, 0x1a, 0x8c, 0x3a, 0xe4, 0xa1, 0x48, 0xde, 0x4b, 0x60, 0xa7, 0x2c, 0x64,
0xbe, 0xc3, 0xdc, 0xfc, 0xfc, 0x75, 0xd6, 0x77, 0xb5, 0xbd, 0x15, 0xeb, 0x51, 0x9a, 0x98, 0x6f,
0x2d, 0x25, 0xcd, 0x0f, 0x29, 0x2c, 0x8f, 0x93, 0xdf, 0xd1, 0x95, 0x1b, 0x50, 0x60, 0x4b, 0xee,
0xe8, 0xf9, 0xfc, 0x80, 0x9d, 0x46, 0x87, 0x8c, 0x3b, 0xa3, 0xac, 0xb4, 0x15, 0xe7, 0x57, 0xf2,
0x42, 0x0d, 0xd6, 0xfd, 0x4d, 0x27, 0x3a, 0xf6, 0x23, 0xb6, 0x6f, 0xc4, 0x6c, 0x57, 0x76, 0x2a,
0x32, 0xaa, 0x78, 0x6e, 0xca, 0x1e, 0xa8, 0xd8, 0x25, 0xad, 0xac, 0x1d, 0x7a, 0x8d, 0x56, 0x56,
0x8d, 0x8a, 0x4d, 0x0f, 0xc8, 0x3d, 0x97, 0x39, 0xc1, 0x64, 0x1a, 0x62, 0xfa, 0xca, 0xae, 0x5b,
0x28, 0xbf, 0x9f, 0x26, 0xe6, 0xa2, 0x13, 0x16, 0xa1, 0x6a, 0x10, 0x39, 0x86, 0x76, 0x7d, 0x10,
0x39, 0x8c, 0x45, 0x88, 0x3e, 0x26, 0x9b, 0xd5, 0x71, 0xc8, 0xc2, 0xbc, 0x9d, 0x26, 0x66, 0xd5,
0x05, 0x55, 0x40, 0xc8, 0xf1, 0x2c, 0x3e, 0x89, 0xa7, 0xde, 0xd8, 0xb1, 0x85, 0xfc, 0x6e, 0x2e,
0xaf, 0xb8, 0xa0, 0x0a, 0x08, 0xf9, 0xb4, 0x52, 0x80, 0x49, 0x2e, 0xaf, 0xb8, 0xa0, 0x0a, 0xd0,
0x29, 0xd9, 0xcd, 0x16, 0x76, 0x49, 0x89, 0x54, 0x05, 0xfd, 0x9d, 0x34, 0x31, 0x5f, 0xcb, 0x85,
0xd7, 0x32, 0xe8, 0x25, 0x79, 0xbb, 0xb8, 0x86, 0xcb, 0x3a, 0x95, 0x65, 0xfe, 0xdd, 0x34, 0x31,
0x6f, 0x43, 0x87, 0xdb, 0x90, 0xba, 0x7f, 0x35, 0x89, 0x8e, 0x4f, 0x29, 0x51, 0x23, 0x99, 0xbc,
0x16, 0x0f, 0x83, 0xd8, 0x2f, 0x55, 0xe8, 0x22, 0x0e, 0x25, 0x8b, 0x7e, 0x49, 0xb6, 0xd8, 0xfc,
0x32, 0x3d, 0x8f, 0x45, 0xad, 0x97, 0x95, 0x46, 0xb7, 0x76, 0xd2, 0xc4, 0x5c, 0xf0, 0xc1, 0x02,
0x42, 0x3f, 0x25, 0xeb, 0x0a, 0xc3, 0xe2, 0x27, 0x1f, 0x38, 0xba, 0x75, 0x2f, 0x4d, 0xcc, 0xb2,
0x03, 0xca, 0xa6, 0x10, 0xe2, 0x8b, 0x0c, 0x98, 0xc3, 0xc6, 0x17, 0xd9, 0x73, 0x06, 0x85, 0x25,
0x07, 0x94, 0x4d, 0xf1, 0x30, 0x41, 0x00, 0x4b, 0xba, 0x4c, 0x2f, 0x7c, 0x98, 0x64, 0x20, 0xe4,
0x4d, 0xf1, 0xde, 0x09, 0xe5, 0x58, 0x65, 0x2e, 0xe9, 0xf2, 0xbd, 0x33, 0xc7, 0x20, 0x6b, 0x89,
0x05, 0x74, 0x8b, 0x25, 0xb2, 0x9d, 0x5f, 0x32, 0x45, 0x1c, 0x4a, 0x96, 0xc8, 0x37, 0x2c, 0x67,
0xc7, 0xcc, 0x1f, 0xf2, 0x51, 0x9f, 0x85, 0x17, 0xd9, 0x2b, 0x06, 0xf3, 0x6d, 0xc1, 0x09, 0x8b,
0x90, 0x35, 0xb8, 0xba, 0x36, 0x1a, 0x2f, 0xae, 0x8d, 0xc6, 0xcb, 0x6b, 0x43, 0xfb, 0x69, 0x66,
0x68, 0xbf, 0xcc, 0x0c, 0xed, 0xf9, 0xcc, 0xd0, 0xae, 0x66, 0x86, 0xf6, 0xf7, 0xcc, 0xd0, 0xfe,
0x99, 0x19, 0x8d, 0x97, 0x33, 0x43, 0xfb, 0xf9, 0xc6, 0x68, 0x5c, 0xdd, 0x18, 0x8d, 0x17, 0x37,
0x46, 0xe3, 0xfb, 0x0f, 0x86, 0x63, 0x3e, 0x8a, 0x07, 0xfb, 0x4e, 0x30, 0xe9, 0x0d, 0x43, 0xfb,
0xd4, 0xf6, 0xed, 0x9e, 0x17, 0x9c, 0x8d, 0x7b, 0x75, 0x7f, 0x14, 0x07, 0x2d, 0xfc, 0x1b, 0xf8,
0xf1, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xa8, 0xe8, 0xef, 0xe7, 0x47, 0x0e, 0x00, 0x00,
// 1241 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x57, 0x4b, 0x6f, 0xe3, 0x54,
0x14, 0x8e, 0x27, 0xe3, 0xa4, 0xbd, 0x7d, 0xce, 0x6d, 0x87, 0xc9, 0x30, 0x92, 0x5d, 0x02, 0x23,
0x8a, 0x40, 0x8d, 0x78, 0x48, 0x08, 0xc4, 0x48, 0xc8, 0x1d, 0x2a, 0x55, 0x6a, 0x45, 0x39, 0x81,
0x0d, 0xac, 0x1c, 0xfb, 0x36, 0xb1, 0xea, 0xd8, 0xa9, 0x7d, 0x5d, 0xa6, 0x2b, 0xf8, 0x09, 0xec,
0xf9, 0x03, 0x6c, 0x58, 0xb1, 0x42, 0x62, 0xc7, 0x66, 0x96, 0x5d, 0xce, 0xca, 0xa2, 0xe9, 0x06,
0x79, 0x35, 0x12, 0x7f, 0x00, 0xdd, 0x47, 0x6c, 0x5f, 0xc7, 0x99, 0xe9, 0x26, 0xbe, 0xe7, 0x3b,
0xdf, 0x77, 0xee, 0xc3, 0xe7, 0x1c, 0xdf, 0xa0, 0x9d, 0xc9, 0xd9, 0xb0, 0xe7, 0x87, 0xc3, 0x73,
0x7f, 0x1c, 0xba, 0xc4, 0xef, 0xc5, 0xd4, 0xa6, 0xb1, 0xf8, 0xdd, 0x9b, 0x44, 0x21, 0x0d, 0xb1,
0xce, 0x8d, 0x37, 0xb7, 0x87, 0xe1, 0x30, 0xe4, 0x48, 0x8f, 0x8d, 0x84, 0xb3, 0xfb, 0x9f, 0x86,
0x5a, 0x40, 0xe2, 0xc4, 0xa7, 0xf8, 0x33, 0xd4, 0x8e, 0x93, 0xf1, 0xd8, 0x8e, 0x2e, 0x3b, 0xda,
0x8e, 0xb6, 0xbb, 0xf2, 0xd1, 0xfa, 0x9e, 0x08, 0xd3, 0x17, 0xa8, 0xb5, 0xf1, 0x3c, 0x35, 0x1b,
0x59, 0x6a, 0xce, 0x68, 0x30, 0x1b, 0x30, 0xe9, 0x79, 0x42, 0x22, 0x8f, 0x44, 0x9d, 0x3b, 0x8a,
0xf4, 0x1b, 0x81, 0x16, 0x52, 0x49, 0x83, 0xd9, 0x00, 0x3f, 0x41, 0x4b, 0x5e, 0x30, 0x24, 0x31,
0x25, 0x51, 0xa7, 0xc9, 0xb5, 0x1b, 0x52, 0x7b, 0x28, 0x61, 0x6b, 0x53, 0x8a, 0x73, 0x22, 0xe4,
0x23, 0xfc, 0x09, 0x6a, 0x39, 0xb6, 0x33, 0x22, 0x71, 0xe7, 0x2e, 0x17, 0xaf, 0x49, 0xf1, 0x3e,
0x07, 0xad, 0x35, 0x29, 0xd5, 0x39, 0x09, 0x24, 0xb7, 0xfb, 0xeb, 0x5d, 0xd4, 0x12, 0x0c, 0xfc,
0x21, 0xd2, 0x9d, 0x51, 0x12, 0x9c, 0xc9, 0x3d, 0xaf, 0x96, 0xf5, 0x25, 0x39, 0xa3, 0x80, 0x78,
0x30, 0x89, 0x17, 0xb8, 0xe4, 0x99, 0xdc, 0xeb, 0x02, 0x09, 0xa7, 0x80, 0x78, 0xb0, 0x65, 0x46,
0xfc, 0x94, 0xe5, 0x1e, 0x55, 0xcd, 0xba, 0xd4, 0x48, 0x0e, 0xc8, 0x27, 0xde, 0x47, 0x2b, 0x9c,
0x26, 0x5e, 0x90, 0xdc, 0xa1, 0x2a, 0xdd, 0x92, 0xd2, 0x32, 0x11, 0xca, 0x06, 0x3e, 0x40, 0xab,
0x17, 0xa1, 0x9f, 0x8c, 0x89, 0x8c, 0xa2, 0xd7, 0x44, 0xd9, 0x96, 0x51, 0x14, 0x26, 0x28, 0x16,
0x8b, 0x13, 0xb3, 0x57, 0x36, 0x5b, 0x4d, 0xeb, 0x55, 0x71, 0xca, 0x4c, 0x50, 0x2c, 0xb6, 0x29,
0xdf, 0x1e, 0x10, 0x5f, 0x86, 0x69, 0xbf, 0x6a, 0x53, 0x25, 0x22, 0x94, 0x0d, 0xfc, 0x03, 0xda,
0xf2, 0x82, 0x98, 0xda, 0x01, 0x3d, 0x26, 0x34, 0xf2, 0x1c, 0x19, 0x6c, 0xa9, 0x26, 0xd8, 0x23,
0x19, 0xac, 0x4e, 0x00, 0x75, 0x60, 0xf7, 0xcf, 0x16, 0x6a, 0xcb, 0x9c, 0xc7, 0xdf, 0xa1, 0x07,
0x83, 0x4b, 0x4a, 0xe2, 0x93, 0x28, 0x74, 0x48, 0x1c, 0x13, 0xf7, 0x84, 0x44, 0x7d, 0xe2, 0x84,
0x81, 0xcb, 0x13, 0xa6, 0x69, 0x3d, 0xca, 0x52, 0x73, 0x11, 0x05, 0x16, 0x39, 0x58, 0x58, 0xdf,
0x0b, 0x6a, 0xc3, 0xde, 0x29, 0xc2, 0x2e, 0xa0, 0xc0, 0x22, 0x07, 0x3e, 0x44, 0x5b, 0x34, 0xa4,
0xb6, 0x6f, 0x29, 0xd3, 0xf2, 0x9c, 0x6b, 0x5a, 0x0f, 0xd8, 0x21, 0xd4, 0xb8, 0xa1, 0x0e, 0xcc,
0x43, 0x1d, 0x29, 0x53, 0xf1, 0x1c, 0x2c, 0x87, 0x52, 0xdd, 0x50, 0x07, 0xe2, 0x5d, 0xb4, 0x44,
0x9e, 0x11, 0xe7, 0x5b, 0x6f, 0x4c, 0x78, 0xf6, 0x69, 0xd6, 0x2a, 0xab, 0xe6, 0x19, 0x06, 0xf9,
0x08, 0xbf, 0x8f, 0x96, 0xcf, 0x13, 0x92, 0x10, 0x4e, 0x6d, 0x71, 0xea, 0x5a, 0x96, 0x9a, 0x05,
0x08, 0xc5, 0x10, 0xef, 0x21, 0x14, 0x27, 0x03, 0xd1, 0x47, 0x62, 0x9e, 0x47, 0x4d, 0x6b, 0x3d,
0x4b, 0xcd, 0x12, 0x0a, 0xa5, 0x31, 0x3e, 0x42, 0xdb, 0x7c, 0x75, 0x5f, 0x05, 0x54, 0xa4, 0x23,
0x4d, 0xa2, 0x80, 0xb8, 0x3c, 0x69, 0x9a, 0x56, 0x27, 0x4b, 0xcd, 0x5a, 0x3f, 0xd4, 0xa2, 0xb8,
0x8b, 0x5a, 0xf1, 0xc4, 0xf7, 0x68, 0xdc, 0x59, 0xe6, 0x7a, 0xc4, 0xea, 0x57, 0x20, 0x20, 0x9f,
0x9c, 0x33, 0xb2, 0x23, 0x37, 0xee, 0xa0, 0x12, 0x87, 0x23, 0x20, 0x9f, 0xf9, 0xaa, 0x4e, 0xc2,
0x98, 0x1e, 0x78, 0x3e, 0x25, 0x11, 0x3f, 0xbd, 0xce, 0x4a, 0x65, 0x55, 0x15, 0x3f, 0xd4, 0xa2,
0xf8, 0x27, 0xf4, 0x98, 0xe3, 0x7d, 0x1a, 0x25, 0x0e, 0x4d, 0x22, 0xe2, 0x1e, 0x13, 0x6a, 0xbb,
0x36, 0xb5, 0x2b, 0x29, 0xb1, 0xca, 0xc3, 0xbf, 0x97, 0xa5, 0xe6, 0xed, 0x04, 0x70, 0x3b, 0x5a,
0xf7, 0x0b, 0xd4, 0x96, 0x3d, 0x9f, 0xb5, 0xc9, 0x98, 0x86, 0x11, 0xa9, 0x74, 0xd6, 0x3e, 0xc3,
0x8a, 0x36, 0xc9, 0x29, 0x20, 0x1e, 0xdd, 0xdf, 0xef, 0xa0, 0xa5, 0xc3, 0xa2, 0xb5, 0xaf, 0xf2,
0x39, 0x81, 0xb0, 0x3a, 0x16, 0xf5, 0xa6, 0x5b, 0x9b, 0xac, 0xbd, 0x94, 0x71, 0x50, 0x2c, 0x7c,
0x80, 0x30, 0xb7, 0xf7, 0x59, 0xab, 0x8e, 0x8f, 0x6d, 0xca, 0xb5, 0xa2, 0xa8, 0xde, 0xc8, 0x52,
0xb3, 0xc6, 0x0b, 0x35, 0x58, 0x3e, 0xbb, 0xc5, 0xed, 0x58, 0xd6, 0x50, 0x31, 0xbb, 0xc4, 0x41,
0xb1, 0xf0, 0xe7, 0x68, 0xbd, 0xa8, 0x80, 0x3e, 0x09, 0xa8, 0x2c, 0x18, 0x9c, 0xa5, 0x66, 0xc5,
0x03, 0x15, 0xbb, 0x38, 0x2f, 0xfd, 0xd6, 0xe7, 0xf5, 0x57, 0x13, 0xe9, 0xdc, 0x9f, 0x4f, 0x2c,
0x36, 0x01, 0xe4, 0x54, 0xb6, 0xa7, 0x62, 0xe2, 0xdc, 0x03, 0x15, 0x1b, 0x7f, 0x8d, 0xee, 0x97,
0x90, 0xa7, 0xe1, 0x8f, 0x81, 0x1f, 0xda, 0x6e, 0x7e, 0x6a, 0x0f, 0xb3, 0xd4, 0xac, 0x27, 0x40,
0x3d, 0xcc, 0xde, 0x81, 0xa3, 0x60, 0xbc, 0x9e, 0x9b, 0xc5, 0x3b, 0x98, 0xf7, 0x42, 0x0d, 0x86,
0x1d, 0xf4, 0x90, 0x15, 0xef, 0x25, 0x90, 0x53, 0x12, 0x91, 0xc0, 0x21, 0x6e, 0x91, 0x7f, 0x9d,
0xb5, 0x1d, 0x6d, 0x77, 0xc9, 0x7a, 0x9c, 0xa5, 0xe6, 0x5b, 0x0b, 0x49, 0xb3, 0x24, 0x85, 0xc5,
0x71, 0x8a, 0x0b, 0x40, 0xe5, 0xf3, 0xca, 0xb0, 0x05, 0x17, 0x80, 0xd9, 0xfe, 0x80, 0x9c, 0xc6,
0x07, 0x84, 0x3a, 0xa3, 0xbc, 0xb5, 0x95, 0xf7, 0xa7, 0x78, 0xa1, 0x06, 0xeb, 0xfe, 0xa1, 0x23,
0x9d, 0xcf, 0xc3, 0x5e, 0xdf, 0x88, 0xd8, 0xae, 0x98, 0x94, 0x55, 0x54, 0x39, 0x6f, 0x54, 0x0f,
0x54, 0x6c, 0x45, 0x2b, 0x7a, 0x87, 0x5e, 0xa3, 0x15, 0x5d, 0xa3, 0x62, 0xe3, 0x7d, 0x74, 0xcf,
0x25, 0x4e, 0x38, 0x9e, 0x44, 0xbc, 0x7c, 0xc5, 0xd4, 0x2d, 0x2e, 0xbf, 0x9f, 0xa5, 0xe6, 0xbc,
0x13, 0xe6, 0xa1, 0x6a, 0x10, 0xb1, 0x86, 0x76, 0x7d, 0x10, 0xb1, 0x8c, 0x79, 0x08, 0x3f, 0x41,
0x1b, 0xd5, 0x75, 0x88, 0xc6, 0xbc, 0x95, 0xa5, 0x66, 0xd5, 0x05, 0x55, 0x80, 0xc9, 0x79, 0x2e,
0x3e, 0x4d, 0x26, 0xbe, 0xe7, 0xd8, 0x4c, 0xbe, 0x5c, 0xc8, 0x2b, 0x2e, 0xa8, 0x02, 0x4c, 0x3e,
0xa9, 0x34, 0x60, 0x54, 0xc8, 0x2b, 0x2e, 0xa8, 0x02, 0x78, 0x82, 0x76, 0xf2, 0x83, 0x5d, 0xd0,
0x22, 0x65, 0x43, 0x7f, 0x27, 0x4b, 0xcd, 0xd7, 0x72, 0xe1, 0xb5, 0x0c, 0x7c, 0x89, 0xde, 0x2e,
0x9f, 0xe1, 0xa2, 0x49, 0x45, 0x9b, 0x7f, 0x37, 0x4b, 0xcd, 0xdb, 0xd0, 0xe1, 0x36, 0xa4, 0xee,
0xdf, 0x4d, 0xa4, 0xf3, 0xab, 0x15, 0xeb, 0x91, 0x44, 0x7c, 0x16, 0x0f, 0xc2, 0x24, 0x50, 0x3a,
0x74, 0x19, 0x07, 0xc5, 0xc2, 0x5f, 0xa2, 0x4d, 0x32, 0xfb, 0x98, 0x9e, 0x27, 0xac, 0xd7, 0x8b,
0x4e, 0xa3, 0x5b, 0xdb, 0x59, 0x6a, 0xce, 0xf9, 0x60, 0x0e, 0xc1, 0x9f, 0xa2, 0x35, 0x89, 0xf1,
0xe6, 0x27, 0x2e, 0x38, 0xba, 0x75, 0x2f, 0x4b, 0x4d, 0xd5, 0x01, 0xaa, 0xc9, 0x84, 0xfc, 0x46,
0x06, 0xc4, 0x21, 0xde, 0x45, 0x7e, 0x9d, 0xe1, 0x42, 0xc5, 0x01, 0xaa, 0xc9, 0x2e, 0x26, 0x1c,
0xe0, 0x2d, 0x5d, 0x94, 0x17, 0xbf, 0x98, 0xe4, 0x20, 0x14, 0x43, 0x76, 0xdf, 0x89, 0xc4, 0x5a,
0x45, 0x2d, 0xe9, 0xe2, 0xbe, 0x33, 0xc3, 0x20, 0x1f, 0xb1, 0x03, 0x74, 0xcb, 0x2d, 0xb2, 0x5d,
0x7c, 0x64, 0xca, 0x38, 0x28, 0x16, 0xab, 0x37, 0xde, 0xce, 0x8e, 0x48, 0x30, 0xa4, 0xa3, 0x3e,
0x89, 0x2e, 0xf2, 0x5b, 0x0c, 0xaf, 0xb7, 0x39, 0x27, 0xcc, 0x43, 0xd6, 0xe0, 0xea, 0xda, 0x68,
0xbc, 0xb8, 0x36, 0x1a, 0x2f, 0xaf, 0x0d, 0xed, 0xe7, 0xa9, 0xa1, 0xfd, 0x36, 0x35, 0xb4, 0xe7,
0x53, 0x43, 0xbb, 0x9a, 0x1a, 0xda, 0x3f, 0x53, 0x43, 0xfb, 0x77, 0x6a, 0x34, 0x5e, 0x4e, 0x0d,
0xed, 0x97, 0x1b, 0xa3, 0x71, 0x75, 0x63, 0x34, 0x5e, 0xdc, 0x18, 0x8d, 0xef, 0x3f, 0x18, 0x7a,
0x74, 0x94, 0x0c, 0xf6, 0x9c, 0x70, 0xdc, 0x1b, 0x46, 0xf6, 0xa9, 0x1d, 0xd8, 0x3d, 0x3f, 0x3c,
0xf3, 0x7a, 0x75, 0xff, 0x42, 0x07, 0x2d, 0xfe, 0x1f, 0xf3, 0xe3, 0xff, 0x03, 0x00, 0x00, 0xff,
0xff, 0x38, 0x60, 0xd8, 0x7d, 0xa4, 0x0e, 0x00, 0x00,
}
func (this *Result) Equal(that interface{}) bool {
@ -925,6 +935,9 @@ func (this *Caches) Equal(that interface{}) bool {
if !this.LabelResult.Equal(&that1.LabelResult) {
return false
}
if !this.InstantMetricResult.Equal(&that1.InstantMetricResult) {
return false
}
return true
}
func (this *Summary) Equal(that interface{}) bool {
@ -1193,7 +1206,7 @@ func (this *Caches) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 11)
s := make([]string, 0, 12)
s = append(s, "&stats.Caches{")
s = append(s, "Chunk: "+strings.Replace(this.Chunk.GoString(), `&`, ``, 1)+",\n")
s = append(s, "Index: "+strings.Replace(this.Index.GoString(), `&`, ``, 1)+",\n")
@ -1202,6 +1215,7 @@ func (this *Caches) GoString() string {
s = append(s, "VolumeResult: "+strings.Replace(this.VolumeResult.GoString(), `&`, ``, 1)+",\n")
s = append(s, "SeriesResult: "+strings.Replace(this.SeriesResult.GoString(), `&`, ``, 1)+",\n")
s = append(s, "LabelResult: "+strings.Replace(this.LabelResult.GoString(), `&`, ``, 1)+",\n")
s = append(s, "InstantMetricResult: "+strings.Replace(this.InstantMetricResult.GoString(), `&`, ``, 1)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -1391,6 +1405,16 @@ func (m *Caches) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
{
size, err := m.InstantMetricResult.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintStats(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x42
{
size, err := m.LabelResult.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
@ -1877,6 +1901,8 @@ func (m *Caches) Size() (n int) {
n += 1 + l + sovStats(uint64(l))
l = m.LabelResult.Size()
n += 1 + l + sovStats(uint64(l))
l = m.InstantMetricResult.Size()
n += 1 + l + sovStats(uint64(l))
return n
}
@ -2085,6 +2111,7 @@ func (this *Caches) String() string {
`VolumeResult:` + strings.Replace(strings.Replace(this.VolumeResult.String(), "Cache", "Cache", 1), `&`, ``, 1) + `,`,
`SeriesResult:` + strings.Replace(strings.Replace(this.SeriesResult.String(), "Cache", "Cache", 1), `&`, ``, 1) + `,`,
`LabelResult:` + strings.Replace(strings.Replace(this.LabelResult.String(), "Cache", "Cache", 1), `&`, ``, 1) + `,`,
`InstantMetricResult:` + strings.Replace(strings.Replace(this.InstantMetricResult.String(), "Cache", "Cache", 1), `&`, ``, 1) + `,`,
`}`,
}, "")
return s
@ -2637,6 +2664,39 @@ func (m *Caches) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field InstantMetricResult", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStats
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthStats
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthStats
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := m.InstantMetricResult.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipStats(dAtA[iNdEx:])

@ -57,6 +57,10 @@ message Caches {
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "labelResult"
];
Cache instantMetricResult = 8 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "instantMetricResult"
];
}
// Summary is the summary of a query statistics.

@ -646,6 +646,13 @@ func applyEmbeddedCacheConfig(r *ConfigWrapper) {
r.QueryRange.LabelsCacheConfig.CacheConfig = r.QueryRange.ResultsCacheConfig.CacheConfig
r.QueryRange.LabelsCacheConfig.CacheConfig.Prefix = prefix
}
instantMetricCacheConfig := r.QueryRange.InstantMetricCacheConfig.CacheConfig
if !cache.IsCacheConfigured(instantMetricCacheConfig) {
prefix := instantMetricCacheConfig.Prefix
r.QueryRange.InstantMetricCacheConfig.CacheConfig = r.QueryRange.ResultsCacheConfig.CacheConfig
r.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix = prefix
}
}
func applyIngesterFinalSleep(cfg *ConfigWrapper) {

@ -1055,6 +1055,49 @@ query_range:
})
})
t.Run("for the instant-metric results cache config", func(t *testing.T) {
t.Run("no embedded cache enabled by default if Redis is set", func(t *testing.T) {
configFileString := `---
query_range:
instant_metric_results_cache:
cache:
redis:
endpoint: endpoint.redis.org`
config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.EqualValues(t, "endpoint.redis.org", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Redis.Endpoint)
assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix)
assert.False(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled)
})
t.Run("no embedded cache enabled by default if Memcache is set", func(t *testing.T) {
configFileString := `---
query_range:
instant_metric_results_cache:
cache:
memcached_client:
host: memcached.host.org`
config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.EqualValues(t, "memcached.host.org", config.QueryRange.InstantMetricCacheConfig.CacheConfig.MemcacheClient.Host)
assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix)
assert.False(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled)
})
t.Run("embedded cache is enabled by default if no other cache is set", func(t *testing.T) {
config, _, _ := configWrapperFromYAML(t, minimalConfig, nil)
assert.True(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled)
assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix)
})
t.Run("gets results cache config if not configured directly", func(t *testing.T) {
config, _, _ := configWrapperFromYAML(t, defaultResulsCacheString, nil)
assert.EqualValues(t, "memcached.host.org", config.QueryRange.InstantMetricCacheConfig.CacheConfig.MemcacheClient.Host)
assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix)
assert.False(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled)
})
})
t.Run("for the labels results cache config", func(t *testing.T) {
t.Run("no embedded cache enabled by default if Redis is set", func(t *testing.T) {
configFileString := `---

@ -427,10 +427,12 @@ func Test_codec_DecodeResponse(t *testing.T) {
func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
// test fixtures from pkg/util/marshal_test
var queryTests = []struct {
name string
actual parser.Value
expected string
}{
{
"basic",
logqlmodel.Streams{
logproto.Stream{
Entries: []logproto.Entry{
@ -462,6 +464,7 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
},
// vector test
{
"vector",
promql.Vector{
{
T: 1568404331324,
@ -524,6 +527,7 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
},
// matrix test
{
"matrix",
promql.Matrix{
{
Floats: []promql.FPoint{
@ -607,50 +611,53 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
}
codec := RequestProtobufCodec{}
for i, queryTest := range queryTests {
params := url.Values{
"query": []string{`{app="foo"}`},
}
u := &url.URL{
Path: "/loki/api/v1/query_range",
RawQuery: params.Encode(),
}
httpReq := &http.Request{
Method: "GET",
RequestURI: u.String(),
URL: u,
}
req, err := codec.DecodeRequest(context.TODO(), httpReq, nil)
require.NoError(t, err)
i := i
t.Run(queryTest.name, func(t *testing.T) {
params := url.Values{
"query": []string{`{app="foo"}`},
}
u := &url.URL{
Path: "/loki/api/v1/query_range",
RawQuery: params.Encode(),
}
httpReq := &http.Request{
Method: "GET",
RequestURI: u.String(),
URL: u,
}
req, err := codec.DecodeRequest(context.TODO(), httpReq, nil)
require.NoError(t, err)
// parser.Value -> queryrange.QueryResponse
var b bytes.Buffer
result := logqlmodel.Result{
Data: queryTest.actual,
Statistics: statsResult,
}
err = WriteQueryResponseProtobuf(&logql.LiteralParams{}, result, &b)
require.NoError(t, err)
// parser.Value -> queryrange.QueryResponse
var b bytes.Buffer
result := logqlmodel.Result{
Data: queryTest.actual,
Statistics: statsResult,
}
err = WriteQueryResponseProtobuf(&logql.LiteralParams{}, result, &b)
require.NoError(t, err)
// queryrange.QueryResponse -> queryrangebase.Response
querierResp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(&b),
Header: http.Header{
"Content-Type": []string{ProtobufType},
},
}
resp, err := codec.DecodeResponse(context.TODO(), querierResp, req)
require.NoError(t, err)
// queryrange.QueryResponse -> queryrangebase.Response
querierResp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(&b),
Header: http.Header{
"Content-Type": []string{ProtobufType},
},
}
resp, err := codec.DecodeResponse(context.TODO(), querierResp, req)
require.NoError(t, err)
// queryrange.Response -> JSON
ctx := user.InjectOrgID(context.Background(), "1")
httpResp, err := codec.EncodeResponse(ctx, httpReq, resp)
require.NoError(t, err)
// queryrange.Response -> JSON
ctx := user.InjectOrgID(context.Background(), "1")
httpResp, err := codec.EncodeResponse(ctx, httpReq, resp)
require.NoError(t, err)
body, _ := io.ReadAll(httpResp.Body)
require.JSONEqf(t, queryTest.expected, string(body), "Protobuf Decode Query Test %d failed", i)
body, err := io.ReadAll(httpResp.Body)
require.NoError(t, err)
require.JSONEqf(t, queryTest.expected, string(body), "Protobuf Decode Query Test %d failed", i)
})
}
}
func Test_codec_EncodeRequest(t *testing.T) {
@ -1645,6 +1652,16 @@ var (
"downloadTime": 0,
"queryLengthServed": 0
},
"instantMetricResult": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0,
"queryLengthServed": 0
},
"result": {
"entriesFound": 0,
"entriesRequested": 0,
@ -2027,13 +2044,14 @@ var (
},
Caches: stats.Caches{
Chunk: stats.Cache{},
Index: stats.Cache{},
StatsResult: stats.Cache{},
VolumeResult: stats.Cache{},
SeriesResult: stats.Cache{},
LabelResult: stats.Cache{},
Result: stats.Cache{},
Chunk: stats.Cache{},
Index: stats.Cache{},
StatsResult: stats.Cache{},
VolumeResult: stats.Cache{},
SeriesResult: stats.Cache{},
LabelResult: stats.Cache{},
Result: stats.Cache{},
InstantMetricResult: stats.Cache{},
},
}
)

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
@ -14,6 +15,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/querier/plan"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
@ -27,6 +29,8 @@ const (
type DownstreamHandler struct {
limits Limits
next queryrangebase.Handler
splitAlign bool
}
func ParamsToLokiRequest(params logql.Params) queryrangebase.Request {
@ -86,6 +90,7 @@ func (h DownstreamHandler) Downstreamer(ctx context.Context) logql.Downstreamer
parallelism: p,
locks: locks,
handler: h.next,
splitAlign: h.splitAlign,
}
}
@ -94,16 +99,50 @@ type instance struct {
parallelism int
locks chan struct{}
handler queryrangebase.Handler
splitAlign bool
}
// withoutOffset returns the given query string with offsets removed and timestamp adjusted accordingly. If no offset is present in original query, it will be returned as is.
func withoutOffset(query logql.DownstreamQuery) (string, time.Time, time.Time) {
expr := query.Params.GetExpression()
var (
newStart = query.Params.Start()
newEnd = query.Params.End()
)
expr.Walk(func(e syntax.Expr) {
switch rng := e.(type) {
case *syntax.RangeAggregationExpr:
off := rng.Left.Offset
if off != 0 {
rng.Left.Offset = 0 // remove offset
// adjust start and end time
newEnd = newEnd.Add(-off)
newStart = newStart.Add(-off)
}
}
})
return expr.String(), newStart, newEnd
}
func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery, acc logql.Accumulator) ([]logqlmodel.Result, error) {
return in.For(ctx, queries, acc, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) {
req := ParamsToLokiRequest(qry.Params).WithQuery(qry.Params.GetExpression().String())
var req queryrangebase.Request
if in.splitAlign {
qs, newStart, newEnd := withoutOffset(qry)
req = ParamsToLokiRequest(qry.Params).WithQuery(qs).WithStartEnd(newStart, newEnd)
} else {
req = ParamsToLokiRequest(qry.Params).WithQuery(qry.Params.GetExpression().String())
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance")
defer sp.Finish()
logger := spanlogger.FromContext(ctx)
defer logger.Finish()
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler))
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream")
res, err := in.handler.Do(ctx, req)
if err != nil {

@ -3,6 +3,7 @@ package queryrange
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
@ -12,6 +13,7 @@ import (
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
@ -325,71 +327,142 @@ func TestInstanceFor(t *testing.T) {
}
func TestInstanceDownstream(t *testing.T) {
params, err := logql.NewLiteralParams(
`{foo="bar"}`,
time.Now(),
time.Now(),
0,
0,
logproto.BACKWARD,
1000,
nil,
)
require.NoError(t, err)
expr, err := syntax.ParseExpr(`{foo="bar"}`)
require.NoError(t, err)
expectedResp := func() *LokiResponse {
return &LokiResponse{
Data: LokiData{
Result: []logproto.Stream{{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 0), Line: "foo"},
},
}},
t.Run("Downstream simple query", func(t *testing.T) {
ts := time.Unix(1, 0)
params, err := logql.NewLiteralParams(
`{foo="bar"}`,
ts,
ts,
0,
0,
logproto.BACKWARD,
1000,
nil,
)
require.NoError(t, err)
expr, err := syntax.ParseExpr(`{foo="bar"}`)
require.NoError(t, err)
expectedResp := func() *LokiResponse {
return &LokiResponse{
Data: LokiData{
Result: []logproto.Stream{{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 0), Line: "foo"},
},
}},
},
Statistics: stats.Result{
Summary: stats.Summary{QueueTime: 1, ExecTime: 2},
},
}
}
queries := []logql.DownstreamQuery{
{
Params: logql.ParamsWithShardsOverride{
Params: logql.ParamsWithExpressionOverride{Params: params, ExpressionOverride: expr},
ShardsOverride: logql.Shards{{Shard: 0, Of: 2}}.Encode(),
},
},
Statistics: stats.Result{
Summary: stats.Summary{QueueTime: 1, ExecTime: 2},
}
var got queryrangebase.Request
var want queryrangebase.Request
handler := queryrangebase.HandlerFunc(
func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
// for some reason these seemingly can't be checked in their own goroutines,
// so we assign them to scoped variables for later comparison.
got = req
want = ParamsToLokiRequest(queries[0].Params).WithQuery(expr.String())
return expectedResp(), nil
},
)
expected, err := ResponseToResult(expectedResp())
require.Nil(t, err)
results, err := DownstreamHandler{
limits: fakeLimits{},
next: handler,
}.Downstreamer(context.Background()).Downstream(context.Background(), queries, logql.NewBufferedAccumulator(len(queries)))
fmt.Println("want", want.GetEnd(), want.GetStart(), "got", got.GetEnd(), got.GetStart())
require.Equal(t, want, got)
require.Nil(t, err)
require.Equal(t, 1, len(results))
require.Equal(t, expected.Data, results[0].Data)
})
t.Run("Downstream with offset removed", func(t *testing.T) {
ts := time.Unix(1, 0)
params, err := logql.NewLiteralParams(
`sum(rate({foo="bar"}[2h] offset 1h))`,
ts,
ts,
0,
0,
logproto.BACKWARD,
1000,
nil,
)
require.NoError(t, err)
expectedResp := func() *LokiResponse {
return &LokiResponse{
Data: LokiData{
Result: []logproto.Stream{{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 0), Line: "foo"},
},
}},
},
Statistics: stats.Result{
Summary: stats.Summary{QueueTime: 1, ExecTime: 2},
},
}
}
}
queries := []logql.DownstreamQuery{
{
Params: logql.ParamsWithShardsOverride{
Params: logql.ParamsWithExpressionOverride{Params: params, ExpressionOverride: expr},
ShardsOverride: logql.Shards{{Shard: 0, Of: 2}}.Encode(),
queries := []logql.DownstreamQuery{
{
Params: params,
},
},
}
}
var got queryrangebase.Request
var want queryrangebase.Request
handler := queryrangebase.HandlerFunc(
func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
// for some reason these seemingly can't be checked in their own goroutines,
// so we assign them to scoped variables for later comparison.
got = req
want = ParamsToLokiRequest(queries[0].Params).WithQuery(expr.String())
var got queryrangebase.Request
var want queryrangebase.Request
handler := queryrangebase.HandlerFunc(
func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
// for some reason these seemingly can't be checked in their own goroutines,
// so we assign them to scoped variables for later comparison.
got = req
want = ParamsToLokiRequest(params).WithQuery(`sum(rate({foo="bar"}[2h]))`).WithStartEnd(ts.Add(-1*time.Hour), ts.Add(-1*time.Hour)) // without offset and start, end adjusted for instant query
return expectedResp(), nil
},
)
return expectedResp(), nil
},
)
expected, err := ResponseToResult(expectedResp())
require.Nil(t, err)
expected, err := ResponseToResult(expectedResp())
require.NoError(t, err)
results, err := DownstreamHandler{
limits: fakeLimits{},
next: handler,
}.Downstreamer(context.Background()).Downstream(context.Background(), queries, logql.NewBufferedAccumulator(len(queries)))
results, err := DownstreamHandler{
limits: fakeLimits{},
next: handler,
splitAlign: true,
}.Downstreamer(context.Background()).Downstream(context.Background(), queries, logql.NewBufferedAccumulator(len(queries)))
require.Equal(t, want, got)
assert.Equal(t, want, got)
require.Nil(t, err)
require.Equal(t, 1, len(results))
require.Equal(t, expected.Data, results[0].Data)
require.Nil(t, err)
require.Equal(t, 1, len(results))
require.Equal(t, expected.Data, results[0].Data)
})
}
func TestCancelWhileWaitingResponse(t *testing.T) {

@ -0,0 +1,85 @@
package queryrange
import (
"context"
"flag"
"fmt"
"time"
"github.com/go-kit/log"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
)
type InstantMetricSplitter struct {
Limits
transformer UserIDTransformer
}
// GenerateCacheKey generates a cache key based on the userID, Request and interval.
func (i InstantMetricSplitter) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string {
split := i.InstantMetricQuerySplitDuration(userID)
var currentInterval int64
if denominator := int64(split / time.Millisecond); denominator > 0 {
currentInterval = r.GetStart().UnixMilli() / denominator
}
if i.transformer != nil {
userID = i.transformer(ctx, userID)
}
// include both the currentInterval and the split duration in key to ensure
// a cache key can't be reused when an interval changes
return fmt.Sprintf("instant-metric:%s:%s:%d:%d", userID, r.GetQuery(), currentInterval, split)
}
type InstantMetricCacheConfig struct {
queryrangebase.ResultsCacheConfig `yaml:",inline"`
}
// RegisterFlags registers flags.
func (cfg *InstantMetricCacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix(f, "frontend.instant-metric-results-cache.")
}
func (cfg *InstantMetricCacheConfig) Validate() error {
return cfg.ResultsCacheConfig.Validate()
}
type instantMetricExtractor struct{}
func NewInstantMetricCacheMiddleware(
log log.Logger,
limits Limits,
merger queryrangebase.Merger,
c cache.Cache,
cacheGenNumberLoader queryrangebase.CacheGenNumberLoader,
shouldCache queryrangebase.ShouldCacheFn,
parallelismForReq queryrangebase.ParallelismForReqFn,
retentionEnabled bool,
transformer UserIDTransformer,
metrics *queryrangebase.ResultsCacheMetrics,
) (queryrangebase.Middleware, error) {
return queryrangebase.NewResultsCacheMiddleware(
log,
c,
InstantMetricSplitter{limits, transformer},
limits,
merger,
PrometheusExtractor{},
cacheGenNumberLoader,
func(ctx context.Context, r queryrangebase.Request) bool {
if shouldCache != nil && !shouldCache(ctx, r) {
return false
}
return true
},
parallelismForReq,
retentionEnabled,
false,
metrics,
)
}

@ -68,6 +68,15 @@ func (l limits) QuerySplitDuration(user string) time.Duration {
return *l.splitDuration
}
func (l limits) InstantMetricQuerySplitDuration(user string) time.Duration {
// NOTE: It returns `splitDuration` for both instant and range queries.
// no need to have separate limits for now.
if l.splitDuration == nil {
return l.Limits.QuerySplitDuration(user)
}
return *l.splitDuration
}
func (l limits) TSDBMaxQueryParallelism(ctx context.Context, user string) int {
if l.maxQueryParallelism == nil {
return l.Limits.TSDBMaxQueryParallelism(ctx, user)

@ -14,6 +14,7 @@ type Limits interface {
queryrangebase.Limits
logql.Limits
QuerySplitDuration(string) time.Duration
InstantMetricQuerySplitDuration(string) time.Duration
MetadataQuerySplitDuration(string) time.Duration
RecentMetadataQuerySplitDuration(string) time.Duration
RecentMetadataQueryWindow(string) time.Duration

@ -118,6 +118,16 @@ var emptyStats = `"stats": {
"downloadTime": 0,
"queryLengthServed": 0
},
"instantMetricResult": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0,
"queryLengthServed": 0
},
"result": {
"entriesFound": 0,
"entriesRequested": 0,

@ -44,16 +44,19 @@ const (
// Config is the configuration for the queryrange tripperware
type Config struct {
base.Config `yaml:",inline"`
Transformer UserIDTransformer `yaml:"-"`
CacheIndexStatsResults bool `yaml:"cache_index_stats_results"`
StatsCacheConfig IndexStatsCacheConfig `yaml:"index_stats_results_cache" doc:"description=If a cache config is not specified and cache_index_stats_results is true, the config for the results cache is used."`
CacheVolumeResults bool `yaml:"cache_volume_results"`
VolumeCacheConfig VolumeCacheConfig `yaml:"volume_results_cache" doc:"description=If a cache config is not specified and cache_volume_results is true, the config for the results cache is used."`
CacheSeriesResults bool `yaml:"cache_series_results"`
SeriesCacheConfig SeriesCacheConfig `yaml:"series_results_cache" doc:"description=If series_results_cache is not configured and cache_series_results is true, the config for the results cache is used."`
CacheLabelResults bool `yaml:"cache_label_results"`
LabelsCacheConfig LabelsCacheConfig `yaml:"label_results_cache" doc:"description=If label_results_cache is not configured and cache_label_results is true, the config for the results cache is used."`
base.Config `yaml:",inline"`
Transformer UserIDTransformer `yaml:"-"`
CacheIndexStatsResults bool `yaml:"cache_index_stats_results"`
StatsCacheConfig IndexStatsCacheConfig `yaml:"index_stats_results_cache" doc:"description=If a cache config is not specified and cache_index_stats_results is true, the config for the results cache is used."`
CacheVolumeResults bool `yaml:"cache_volume_results"`
VolumeCacheConfig VolumeCacheConfig `yaml:"volume_results_cache" doc:"description=If a cache config is not specified and cache_volume_results is true, the config for the results cache is used."`
CacheInstantMetricResults bool `yaml:"cache_instant_metric_results"`
InstantMetricCacheConfig InstantMetricCacheConfig `yaml:"instant_metric_results_cache" doc:"description=If a cache config is not specified and cache_instant_metric_results is true, the config for the results cache is used."`
InstantMetricQuerySplitAlign bool `yaml:"instant_metric_query_split_align" doc:"description=Whether to align the splits of instant metric query with splitByInterval and query's exec time. Useful when instant_metric_cache is enabled"`
CacheSeriesResults bool `yaml:"cache_series_results"`
SeriesCacheConfig SeriesCacheConfig `yaml:"series_results_cache" doc:"description=If series_results_cache is not configured and cache_series_results is true, the config for the results cache is used."`
CacheLabelResults bool `yaml:"cache_label_results"`
LabelsCacheConfig LabelsCacheConfig `yaml:"label_results_cache" doc:"description=If label_results_cache is not configured and cache_label_results is true, the config for the results cache is used."`
}
// RegisterFlags adds the flags required to configure this flag set.
@ -63,6 +66,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.StatsCacheConfig.RegisterFlags(f)
f.BoolVar(&cfg.CacheVolumeResults, "querier.cache-volume-results", false, "Cache volume query results.")
cfg.VolumeCacheConfig.RegisterFlags(f)
f.BoolVar(&cfg.CacheInstantMetricResults, "querier.cache-instant-metric-results", false, "Cache instant metric query results.")
cfg.InstantMetricCacheConfig.RegisterFlags(f)
f.BoolVar(&cfg.InstantMetricQuerySplitAlign, "querier.instant-metric-query-split-align", false, "Align the instant metric splits with splityByInterval and query's exec time.")
f.BoolVar(&cfg.CacheSeriesResults, "querier.cache-series-results", false, "Cache series query results.")
cfg.SeriesCacheConfig.RegisterFlags(f)
f.BoolVar(&cfg.CacheLabelResults, "querier.cache-label-results", false, "Cache label query results.")
@ -132,12 +138,13 @@ func NewMiddleware(
metrics := NewMetrics(registerer, metricsNamespace)
var (
resultsCache cache.Cache
statsCache cache.Cache
volumeCache cache.Cache
seriesCache cache.Cache
labelsCache cache.Cache
err error
resultsCache cache.Cache
statsCache cache.Cache
volumeCache cache.Cache
instantMetricCache cache.Cache
seriesCache cache.Cache
labelsCache cache.Cache
err error
)
if cfg.CacheResults {
@ -161,6 +168,13 @@ func NewMiddleware(
}
}
if cfg.CacheInstantMetricResults {
instantMetricCache, err = newResultsCacheFromConfig(cfg.InstantMetricCacheConfig.ResultsCacheConfig, registerer, log, stats.InstantMetricResultsCache)
if err != nil {
return nil, nil, err
}
}
if cfg.CacheSeriesResults {
seriesCache, err = newResultsCacheFromConfig(cfg.SeriesCacheConfig.ResultsCacheConfig, registerer, log, stats.SeriesResultCache)
if err != nil {
@ -211,7 +225,7 @@ func NewMiddleware(
return nil, nil, err
}
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, metricsNamespace)
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, metrics, codec, instantMetricCache, cacheGenNumLoader, retentionEnabled, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}
@ -761,7 +775,51 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge
}
// NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries
func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
func NewInstantMetricTripperware(
cfg Config,
engineOpts logql.EngineOpts,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
metrics *Metrics,
merger base.Merger,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
indexStatsTripperware base.Middleware,
metricsNamespace string,
) (base.Middleware, error) {
var cacheMiddleware base.Middleware
if cfg.CacheInstantMetricResults {
var err error
cacheMiddleware, err = NewInstantMetricCacheMiddleware(
log,
limits,
merger,
c,
cacheGenNumLoader,
func(_ context.Context, r base.Request) bool {
return !r.GetCachingOptions().Disabled
},
func(ctx context.Context, tenantIDs []string, r base.Request) int {
return MinWeightedParallelism(
ctx,
tenantIDs,
schema.Configs,
limits,
model.Time(r.GetStart().UnixMilli()),
model.Time(r.GetEnd().UnixMilli()),
)
},
retentionEnabled,
cfg.Transformer,
metrics.ResultsCacheMetrics,
)
if err != nil {
return nil, err
}
}
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
@ -769,11 +827,19 @@ func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log lo
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
NewSplitByRangeMiddleware(log, engineOpts, limits, cfg.InstantMetricQuerySplitAlign, metrics.MiddlewareMapperMetrics.rangeMapper),
}
if cfg.CacheInstantMetricResults {
queryRangeMiddleware = append(
queryRangeMiddleware,
base.InstrumentMiddleware("instant_metric_results_cache", metrics.InstrumentMiddlewareMetrics),
cacheMiddleware,
)
}
if cfg.ShardedQueries {
queryRangeMiddleware = append(queryRangeMiddleware,
NewSplitByRangeMiddleware(log, engineOpts, limits, metrics.MiddlewareMapperMetrics.rangeMapper),
NewQueryShardMiddleware(
log,
schema.Configs,

@ -1247,6 +1247,7 @@ type fakeLimits struct {
metadataSplitDuration map[string]time.Duration
recentMetadataSplitDuration map[string]time.Duration
recentMetadataQueryWindow map[string]time.Duration
instantMetricSplitDuration map[string]time.Duration
ingesterSplitDuration map[string]time.Duration
minShardingLookback time.Duration
queryTimeout time.Duration
@ -1266,6 +1267,13 @@ func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
return f.splitDuration[key]
}
func (f fakeLimits) InstantMetricQuerySplitDuration(key string) time.Duration {
if f.instantMetricSplitDuration == nil {
return 0
}
return f.instantMetricSplitDuration[key]
}
func (f fakeLimits) MetadataQuerySplitDuration(key string) time.Duration {
if f.metadataSplitDuration == nil {
return 0

@ -26,20 +26,25 @@ type splitByRange struct {
limits Limits
ng *logql.DownstreamEngine
metrics *logql.MapperMetrics
// Whether to align rangeInterval align to splitByInterval in the subqueries.
splitAlign bool
}
// NewSplitByRangeMiddleware creates a new Middleware that splits log requests by the range interval.
func NewSplitByRangeMiddleware(logger log.Logger, engineOpts logql.EngineOpts, limits Limits, metrics *logql.MapperMetrics) queryrangebase.Middleware {
func NewSplitByRangeMiddleware(logger log.Logger, engineOpts logql.EngineOpts, limits Limits, splitAlign bool, metrics *logql.MapperMetrics) queryrangebase.Middleware {
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return &splitByRange{
logger: log.With(logger, "middleware", "InstantQuery.splitByRangeVector"),
next: next,
limits: limits,
ng: logql.NewDownstreamEngine(engineOpts, DownstreamHandler{
limits: limits,
next: next,
limits: limits,
next: next,
splitAlign: splitAlign,
}, limits, logger),
metrics: metrics,
metrics: metrics,
splitAlign: splitAlign,
}
})
}
@ -57,14 +62,26 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) (
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
interval := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, s.limits.QuerySplitDuration)
interval := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, s.limits.InstantMetricQuerySplitDuration)
// if no interval configured, continue to the next middleware
if interval == 0 {
return s.next.Do(ctx, request)
}
mapperStats := logql.NewMapperStats()
mapper, err := logql.NewRangeMapper(interval, s.metrics, mapperStats)
ir, ok := request.(*LokiInstantRequest)
if !ok {
return nil, fmt.Errorf("expected *LokiInstantRequest, got %T", request)
}
var mapper logql.RangeMapper
if s.splitAlign {
mapper, err = logql.NewRangeMapperWithSplitAlign(interval, ir.TimeTs, s.metrics, mapperStats)
} else {
mapper, err = logql.NewRangeMapper(interval, s.metrics, mapperStats)
}
if err != nil {
return nil, err
}
@ -85,10 +102,6 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) (
queryStatsCtx := stats.FromContext(ctx)
queryStatsCtx.AddSplitQueries(int64(mapperStats.GetSplitQueries()))
if _, ok := request.(*LokiInstantRequest); !ok {
return nil, fmt.Errorf("expected *LokiInstantRequest, got %T", request)
}
query := s.ng.Query(ctx, logql.ParamsWithExpressionOverride{Params: params, ExpressionOverride: parsed})
res, err := query.Exec(ctx)

@ -8,6 +8,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/loghttp"
@ -17,14 +18,291 @@ import (
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)
func Test_RangeVectorSplitAlign(t *testing.T) {
var (
twelve34 = time.Date(1970, 1, 1, 12, 34, 0, 0, time.UTC) // 1970 12:34:00 UTC
twelve = time.Date(1970, 1, 1, 12, 00, 0, 0, time.UTC) // 1970 12:00:00 UTC
eleven = twelve.Add(-1 * time.Hour) // 1970 11:00:00 UTC
ten = eleven.Add(-1 * time.Hour) // 1970 10:00:00 UTC
)
for _, tc := range []struct {
name string
in queryrangebase.Request
subQueries []queryrangebase.RequestResponse
expected queryrangebase.Response
splitByInterval time.Duration
}{
{
name: "sum_splitBy_aligned_with_query_time",
splitByInterval: 1 * time.Minute,
in: &LokiInstantRequest{
Query: `sum(bytes_over_time({app="foo"}[3m]))`,
TimeTs: time.Unix(180, 0),
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(bytes_over_time({app="foo"}[3m]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1m]))`, 1, time.Unix(60, 0)),
subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1m]))`, 2, time.Unix(120, 0)),
subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1m]))`, 3, time.Unix(180, 0)),
},
expected: expectedMergedResponseWithTime(1+2+3, time.Unix(180, 0)), // original `TimeTs` of the query.
},
{
name: "sum_splitBy_not_aligned_query_time",
splitByInterval: 1 * time.Hour,
in: &LokiInstantRequest{
Query: `sum(bytes_over_time({app="foo"}[3h]))`,
TimeTs: twelve34,
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(bytes_over_time({app="foo"}[3h]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[34m]))`, 1, twelve34),
subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1h]))`, 2, twelve),
subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1h]))`, 3, eleven),
subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[26m]))`, 4, ten),
},
expected: expectedMergedResponseWithTime(1+2+3+4, twelve34), // original `TimeTs` of the query.
},
{
name: "sum_aggregation_splitBy_aligned_with_query_time",
splitByInterval: 1 * time.Minute,
in: &LokiInstantRequest{
Query: `sum by (bar) (bytes_over_time({app="foo"}[3m]))`,
TimeTs: time.Unix(180, 0),
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum by (bar) (bytes_over_time({app="foo"}[3m]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1m]))`, 10, time.Unix(60, 0)),
subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1m]))`, 20, time.Unix(120, 0)),
subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1m]))`, 30, time.Unix(180, 0)),
},
expected: expectedMergedResponseWithTime(10+20+30, time.Unix(180, 0)),
},
{
name: "sum_aggregation_splitBy_not_aligned_with_query_time",
splitByInterval: 1 * time.Hour,
in: &LokiInstantRequest{
Query: `sum by (bar) (bytes_over_time({app="foo"}[3h]))`,
TimeTs: twelve34,
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum by (bar) (bytes_over_time({app="foo"}[3h]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[34m]))`, 10, twelve34), // 12:34:00
subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1h]))`, 20, twelve), // 12:00:00 aligned
subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1h]))`, 30, eleven), // 11:00:00 aligned
subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[26m]))`, 40, ten), // 10:00:00
},
expected: expectedMergedResponseWithTime(10+20+30+40, twelve34),
},
{
name: "count_over_time_aligned_with_query_time",
splitByInterval: 1 * time.Minute,
in: &LokiInstantRequest{
Query: `sum(count_over_time({app="foo"}[3m]))`,
TimeTs: time.Unix(180, 0),
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(count_over_time({app="foo"}[3m]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1m]))`, 1, time.Unix(60, 0)),
subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1m]))`, 1, time.Unix(120, 0)),
subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1m]))`, 1, time.Unix(180, 0)),
},
expected: expectedMergedResponseWithTime(1+1+1, time.Unix(180, 0)),
},
{
name: "count_over_time_not_aligned_with_query_time",
splitByInterval: 1 * time.Hour,
in: &LokiInstantRequest{
Query: `sum(count_over_time({app="foo"}[3h]))`,
TimeTs: twelve34,
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(count_over_time({app="foo"}[3h]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[34m]))`, 1, twelve34),
subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1h]))`, 1, twelve),
subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1h]))`, 1, eleven),
subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[26m]))`, 1, ten),
},
expected: expectedMergedResponseWithTime(1+1+1+1, twelve34),
},
{
name: "sum_agg_count_over_time_align_with_query_time",
splitByInterval: 1 * time.Minute,
in: &LokiInstantRequest{
Query: `sum by (bar) (count_over_time({app="foo"}[3m]))`,
TimeTs: time.Unix(180, 0),
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum by (bar) (count_over_time({app="foo"}[3m]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1m]))`, 0, time.Unix(60, 0)),
subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1m]))`, 0, time.Unix(120, 0)),
subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1m]))`, 0, time.Unix(180, 0)),
},
expected: expectedMergedResponseWithTime(0+0+0, time.Unix(180, 0)),
},
{
name: "sum_agg_count_over_time_not_align_with_query_time",
splitByInterval: 1 * time.Hour,
in: &LokiInstantRequest{
Query: `sum by (bar) (count_over_time({app="foo"}[3h]))`,
TimeTs: twelve34,
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum by (bar) (count_over_time({app="foo"}[3h]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[34m]))`, 0, twelve34),
subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1h]))`, 0, twelve),
subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1h]))`, 0, eleven),
subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[26m]))`, 0, ten),
},
expected: expectedMergedResponseWithTime(0+0+0+0, twelve34),
},
{
name: "sum_over_time_aligned_with_query_time",
splitByInterval: 1 * time.Minute,
in: &LokiInstantRequest{
Query: `sum(sum_over_time({app="foo"} | unwrap bar [3m]))`,
TimeTs: time.Unix(180, 0),
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(sum_over_time({app="foo"} | unwrap bar [3m]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1m]))`, 1, time.Unix(60, 0)),
subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1m]))`, 2, time.Unix(120, 0)),
subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1m]))`, 3, time.Unix(180, 0)),
},
expected: expectedMergedResponseWithTime(1+2+3, time.Unix(180, 0)),
},
{
name: "sum_over_time_not_aligned_with_query_time",
splitByInterval: 1 * time.Hour,
in: &LokiInstantRequest{
Query: `sum(sum_over_time({app="foo"} | unwrap bar [3h]))`,
TimeTs: twelve34,
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(sum_over_time({app="foo"} | unwrap bar [3h]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[34m]))`, 1, twelve34),
subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1h]))`, 2, twelve),
subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1h]))`, 3, eleven),
subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[26m]))`, 4, ten),
},
expected: expectedMergedResponseWithTime(1+2+3+4, twelve34),
},
{
name: "sum_agg_sum_over_time_aligned_with_query_time",
splitByInterval: 1 * time.Minute,
in: &LokiInstantRequest{
Query: `sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3m]))`,
TimeTs: time.Unix(180, 0),
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3m]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1m]))`, 1, time.Unix(60, 0)),
subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1m]))`, 2, time.Unix(120, 0)),
subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1m]))`, 3, time.Unix(180, 0)),
},
expected: expectedMergedResponseWithTime(1+2+3, time.Unix(180, 0)),
},
{
name: "sum_agg_sum_over_time_not_aligned_with_query_time",
splitByInterval: 1 * time.Hour,
in: &LokiInstantRequest{
Query: `sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3h]))`,
TimeTs: twelve34,
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3h]))`),
},
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[34m]))`, 1, twelve34),
subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1h]))`, 2, twelve),
subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1h]))`, 3, eleven),
subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[26m]))`, 4, ten),
},
expected: expectedMergedResponseWithTime(1+2+3+4, twelve34),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
srm := NewSplitByRangeMiddleware(log.NewNopLogger(), testEngineOpts, fakeLimits{
maxSeries: 10000,
queryTimeout: time.Second,
instantMetricSplitDuration: map[string]time.Duration{
"tenant": tc.splitByInterval,
},
}, true, nilShardingMetrics) // enable splitAlign
ctx := user.InjectOrgID(context.TODO(), "tenant")
byTimeTs := make(map[int64]queryrangebase.RequestResponse)
for _, v := range tc.subQueries {
key := v.Request.(*LokiInstantRequest).TimeTs.UnixNano()
byTimeTs[key] = v
}
resp, err := srm.Wrap(queryrangebase.HandlerFunc(
func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
// req should match with one of the subqueries.
ts := req.(*LokiInstantRequest).TimeTs
subq, ok := byTimeTs[ts.UnixNano()]
if !ok { // every req **should** match with one of the subqueries
return nil, fmt.Errorf("subquery request '%s-%s' not found", req.GetQuery(), ts)
}
// Assert subquery request
assert.Equal(t, subq.Request.GetQuery(), req.GetQuery())
assert.Equal(t, subq.Request, req)
return subq.Response, nil
})).Do(ctx, tc.in)
require.NoError(t, err)
assert.Equal(t, tc.expected, resp.(*LokiPromResponse).Response)
})
}
}
func Test_RangeVectorSplit(t *testing.T) {
srm := NewSplitByRangeMiddleware(log.NewNopLogger(), testEngineOpts, fakeLimits{
maxSeries: 10000,
queryTimeout: time.Second,
splitDuration: map[string]time.Duration{
instantMetricSplitDuration: map[string]time.Duration{
"tenant": time.Minute,
},
}, nilShardingMetrics)
}, false, nilShardingMetrics)
ctx := user.InjectOrgID(context.TODO(), "tenant")
@ -151,6 +429,39 @@ func Test_RangeVectorSplit(t *testing.T) {
}
}
// subQueryRequestResponse returns a RequestResponse containing the expected subQuery instant request
// and a response containing a sample value returned from the following wrapper
func subQueryRequestResponseWithQueryTime(expectedSubQuery string, sampleValue float64, exec time.Time) queryrangebase.RequestResponse {
return queryrangebase.RequestResponse{
Request: &LokiInstantRequest{
Query: expectedSubQuery,
TimeTs: exec,
Path: "/loki/api/v1/query",
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(expectedSubQuery),
},
},
Response: &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrangebase.SampleStream{
{
Labels: []logproto.LabelAdapter{
{Name: "app", Value: "foo"},
},
Samples: []logproto.LegacySample{
{TimestampMs: 1000, Value: sampleValue},
},
},
},
},
},
},
}
}
// subQueryRequestResponse returns a RequestResponse containing the expected subQuery instant request
// and a response containing a sample value returned from the following wrapper
func subQueryRequestResponse(expectedSubQuery string, sampleValue float64) queryrangebase.RequestResponse {
@ -202,3 +513,20 @@ func expectedMergedResponse(expectedSampleValue float64) *queryrangebase.Prometh
},
}
}
func expectedMergedResponseWithTime(expectedSampleValue float64, exec time.Time) *queryrangebase.PrometheusResponse {
return &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrangebase.SampleStream{
{
Labels: []logproto.LabelAdapter{},
Samples: []logproto.LegacySample{
{TimestampMs: exec.UnixMilli(), Value: expectedSampleValue},
},
},
},
},
}
}

@ -161,6 +161,16 @@ var queryTests = []struct {
"downloadTime": 0,
"queryLengthServed": 0
},
"instantMetricResult": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0,
"queryLengthServed": 0
},
"result": {
"entriesFound": 0,
"entriesRequested": 0,
@ -180,7 +190,7 @@ var queryTests = []struct {
"shards": 0,
"splits": 0,
"subqueries": 0,
"totalBytesProcessed": 0,
"totalBytesProcessed": 0,
"totalEntriesReturned": 0,
"totalLinesProcessed": 0,
"totalStructuredMetadataBytesProcessed": 0,

@ -129,6 +129,16 @@ const emptyStats = `{
"downloadTime": 0,
"queryLengthServed": 0
},
"instantMetricResult": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0,
"queryLengthServed": 0
},
"result": {
"entriesFound": 0,
"entriesRequested": 0,
@ -208,13 +218,13 @@ var queryTestWithEncodingFlags = []struct {
[ "123456789012346", "super line with labels", {
"structuredMetadata": {
"foo": "a",
"bar": "b"
}
"bar": "b"
}
}],
[ "123456789012347", "super line with labels msg=text", {
"structuredMetadata": {
"foo": "a",
"bar": "b"
"bar": "b"
},
"parsed": {
"msg": "text"
@ -549,13 +559,13 @@ var tailTestWithEncodingFlags = []struct {
[ "123456789012346", "super line with labels", {
"structuredMetadata": {
"foo": "a",
"bar": "b"
}
"bar": "b"
}
}],
[ "123456789012347", "super line with labels msg=text", {
"structuredMetadata": {
"foo": "a",
"bar": "b"
"bar": "b"
},
"parsed": {
"msg": "text"

@ -111,6 +111,7 @@ type Limits struct {
MetadataQuerySplitDuration model.Duration `yaml:"split_metadata_queries_by_interval" json:"split_metadata_queries_by_interval"`
RecentMetadataQuerySplitDuration model.Duration `yaml:"split_recent_metadata_queries_by_interval" json:"split_recent_metadata_queries_by_interval"`
RecentMetadataQueryWindow model.Duration `yaml:"recent_metadata_query_window" json:"recent_metadata_query_window"`
InstantMetricQuerySplitDuration model.Duration `yaml:"split_instant_metric_queries_by_interval" json:"split_instant_metric_queries_by_interval"`
IngesterQuerySplitDuration model.Duration `yaml:"split_ingester_queries_by_interval" json:"split_ingester_queries_by_interval"`
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`
MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"`
@ -307,6 +308,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
_ = l.QuerySplitDuration.Set("1h")
f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by a time interval and execute in parallel. The value 0 disables splitting by time. This also determines how cache keys are chosen when result caching is enabled.")
_ = l.InstantMetricQuerySplitDuration.Set("1h")
f.Var(&l.InstantMetricQuerySplitDuration, "querier.split-instant-metric-queries-by-interval", "Split instant metric queries by a time interval and execute in parallel. The value 0 disables splitting instant metric queries by time. This also determines how cache keys are chosen when instant metric query result caching is enabled.")
_ = l.MetadataQuerySplitDuration.Set("24h")
f.Var(&l.MetadataQuerySplitDuration, "querier.split-metadata-queries-by-interval", "Split metadata queries by a time interval and execute in parallel. The value 0 disables splitting metadata queries by time. This also determines how cache keys are chosen when label/series result caching is enabled.")
@ -601,6 +604,11 @@ func (o *Overrides) QuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration)
}
// InstantMetricQuerySplitDuration returns the tenant specific instant metric queries splitby interval applied in the query frontend.
func (o *Overrides) InstantMetricQuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).InstantMetricQuerySplitDuration)
}
// MetadataQuerySplitDuration returns the tenant specific metadata splitby interval applied in the query frontend.
func (o *Overrides) MetadataQuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MetadataQuerySplitDuration)

Loading…
Cancel
Save