From 557b516b665eda3eb2c37d43ba69db65f81a181e Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 25 Oct 2023 07:25:06 -0700 Subject: [PATCH] Refactor the proto Request interface to return time.Time (#11018) Part of our query path improvements. In addition to Karstens new protobuf formats for all the query request and responses, plus our use of just proto grpc rather than httpgrpc, we'd like refactor the proto Request interface to return time.Time for start and end instead of int64. This will allow his protobuf formats to use more `logproto` proto types. There's still one or two tests that I need to fix. --------- Signed-off-by: Callum Styan Co-authored-by: Karsten Jeschkies --- pkg/logproto/compat.go | 36 ++-- pkg/querier/queryrange/codec.go | 74 +++---- pkg/querier/queryrange/index_stats_cache.go | 2 +- .../queryrange/index_stats_cache_test.go | 2 +- pkg/querier/queryrange/limits.go | 29 ++- pkg/querier/queryrange/limits_test.go | 4 +- pkg/querier/queryrange/log_result_cache.go | 2 +- .../queryrangebase/definitions/interface.go | 7 +- .../queryrange/queryrangebase/query_range.go | 100 +-------- .../queryrangebase/query_range_test.go | 65 ------ .../queryrangebase/queryrange.pb.go | 199 ++++++++++-------- .../queryrangebase/queryrange.proto | 11 +- .../queryrangebase/results_cache.go | 30 +-- .../queryrangebase/results_cache_test.go | 158 +++++++------- .../queryrange/queryrangebase/step_align.go | 7 +- .../queryrangebase/step_align_test.go | 17 +- pkg/querier/queryrange/querysharding.go | 10 +- pkg/querier/queryrange/querysharding_test.go | 7 +- pkg/querier/queryrange/roundtrip.go | 12 +- pkg/querier/queryrange/shard_resolver.go | 4 +- pkg/querier/queryrange/split_by_interval.go | 12 +- pkg/querier/queryrange/volume_cache.go | 2 +- pkg/querier/queryrange/volume_cache_test.go | 2 +- 23 files changed, 337 insertions(+), 455 deletions(-) diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index e195af9687..8e3d24df80 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -232,13 +232,13 @@ func MergeSeriesResponses(responses []*SeriesResponse) (*SeriesResponse, error) // Satisfy definitions.Request // GetStart returns the start timestamp of the request in milliseconds. -func (m *IndexStatsRequest) GetStart() int64 { - return int64(m.From) +func (m *IndexStatsRequest) GetStart() time.Time { + return time.Unix(0, m.From.UnixNano()) } // GetEnd returns the end timestamp of the request in milliseconds. -func (m *IndexStatsRequest) GetEnd() int64 { - return int64(m.Through) +func (m *IndexStatsRequest) GetEnd() time.Time { + return time.Unix(0, m.Through.UnixNano()) } // GetStep returns the step of the request in milliseconds. @@ -253,10 +253,10 @@ func (m *IndexStatsRequest) GetQuery() string { func (m *IndexStatsRequest) GetCachingOptions() (res definitions.CachingOptions) { return } // WithStartEnd clone the current request with different start and end timestamp. -func (m *IndexStatsRequest) WithStartEnd(startTime int64, endTime int64) definitions.Request { +func (m *IndexStatsRequest) WithStartEnd(start, end time.Time) definitions.Request { clone := *m - clone.From = model.TimeFromUnixNano(startTime * int64(time.Millisecond)) - clone.Through = model.TimeFromUnixNano(endTime * int64(time.Millisecond)) + clone.From = model.TimeFromUnixNano(start.UnixNano()) + clone.Through = model.TimeFromUnixNano(end.UnixNano()) return &clone } @@ -271,21 +271,21 @@ func (m *IndexStatsRequest) WithQuery(query string) definitions.Request { func (m *IndexStatsRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", m.GetQuery()), - otlog.String("start", timestamp.Time(m.GetStart()).String()), - otlog.String("end", timestamp.Time(m.GetEnd()).String()), + otlog.String("start", timestamp.Time(int64(m.From)).String()), + otlog.String("end", timestamp.Time(int64(m.Through)).String()), ) } // Satisfy definitions.Request for Volume // GetStart returns the start timestamp of the request in milliseconds. -func (m *VolumeRequest) GetStart() int64 { - return int64(m.From) +func (m *VolumeRequest) GetStart() time.Time { + return time.UnixMilli(int64(m.From)) } // GetEnd returns the end timestamp of the request in milliseconds. -func (m *VolumeRequest) GetEnd() int64 { - return int64(m.Through) +func (m *VolumeRequest) GetEnd() time.Time { + return time.UnixMilli(int64(m.Through)) } // GetQuery returns the query of the request. @@ -297,10 +297,10 @@ func (m *VolumeRequest) GetQuery() string { func (m *VolumeRequest) GetCachingOptions() (res definitions.CachingOptions) { return } // WithStartEnd clone the current request with different start and end timestamp. -func (m *VolumeRequest) WithStartEnd(startTime int64, endTime int64) definitions.Request { +func (m *VolumeRequest) WithStartEnd(start, end time.Time) definitions.Request { clone := *m - clone.From = model.TimeFromUnixNano(startTime * int64(time.Millisecond)) - clone.Through = model.TimeFromUnixNano(endTime * int64(time.Millisecond)) + clone.From = model.TimeFromUnixNano(start.UnixNano()) + clone.Through = model.TimeFromUnixNano(end.UnixNano()) return &clone } @@ -315,8 +315,8 @@ func (m *VolumeRequest) WithQuery(query string) definitions.Request { func (m *VolumeRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", m.GetQuery()), - otlog.String("start", timestamp.Time(m.GetStart()).String()), - otlog.String("end", timestamp.Time(m.GetEnd()).String()), + otlog.String("start", timestamp.Time(int64(m.From)).String()), + otlog.String("end", timestamp.Time(int64(m.Through)).String()), ) } diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 03559b2e2f..b24a46146b 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -46,18 +46,18 @@ type RequestProtobufCodec struct { Codec } -func (r *LokiRequest) GetEnd() int64 { - return r.EndTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiRequest) GetEnd() time.Time { + return r.EndTs } -func (r *LokiRequest) GetStart() int64 { - return r.StartTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiRequest) GetStart() time.Time { + return r.StartTs } -func (r *LokiRequest) WithStartEnd(s int64, e int64) queryrangebase.Request { +func (r *LokiRequest) WithStartEnd(s time.Time, e time.Time) queryrangebase.Request { clone := *r - clone.StartTs = time.Unix(0, s*int64(time.Millisecond)) - clone.EndTs = time.Unix(0, e*int64(time.Millisecond)) + clone.StartTs = s + clone.EndTs = e return &clone } @@ -83,8 +83,8 @@ func (r *LokiRequest) WithShards(shards logql.Shards) *LokiRequest { func (r *LokiRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", r.GetQuery()), - otlog.String("start", timestamp.Time(r.GetStart()).String()), - otlog.String("end", timestamp.Time(r.GetEnd()).String()), + otlog.String("start", timestamp.Time(r.GetStart().UnixNano()).String()), + otlog.String("end", timestamp.Time(r.GetEnd().UnixNano()).String()), otlog.Int64("step (ms)", r.GetStep()), otlog.Int64("interval (ms)", r.GetInterval()), otlog.Int64("limit", int64(r.GetLimit())), @@ -99,17 +99,17 @@ func (r *LokiInstantRequest) GetStep() int64 { return 0 } -func (r *LokiInstantRequest) GetEnd() int64 { - return r.TimeTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiInstantRequest) GetEnd() time.Time { + return r.TimeTs } -func (r *LokiInstantRequest) GetStart() int64 { - return r.TimeTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiInstantRequest) GetStart() time.Time { + return r.TimeTs } -func (r *LokiInstantRequest) WithStartEnd(s int64, _ int64) queryrangebase.Request { +func (r *LokiInstantRequest) WithStartEnd(s time.Time, _ time.Time) queryrangebase.Request { clone := *r - clone.TimeTs = time.Unix(0, s*int64(time.Millisecond)) + clone.TimeTs = s return &clone } @@ -128,7 +128,7 @@ func (r *LokiInstantRequest) WithShards(shards logql.Shards) *LokiInstantRequest func (r *LokiInstantRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", r.GetQuery()), - otlog.String("ts", timestamp.Time(r.GetStart()).String()), + otlog.String("ts", timestamp.Time(r.GetStart().UnixMilli()).String()), otlog.Int64("limit", int64(r.GetLimit())), otlog.String("direction", r.GetDirection().String()), otlog.String("shards", strings.Join(r.GetShards(), ",")), @@ -137,18 +137,18 @@ func (r *LokiInstantRequest) LogToSpan(sp opentracing.Span) { func (*LokiInstantRequest) GetCachingOptions() (res queryrangebase.CachingOptions) { return } -func (r *LokiSeriesRequest) GetEnd() int64 { - return r.EndTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiSeriesRequest) GetEnd() time.Time { + return r.EndTs } -func (r *LokiSeriesRequest) GetStart() int64 { - return r.StartTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiSeriesRequest) GetStart() time.Time { + return r.StartTs } -func (r *LokiSeriesRequest) WithStartEnd(s int64, e int64) queryrangebase.Request { +func (r *LokiSeriesRequest) WithStartEnd(s, e time.Time) queryrangebase.Request { clone := *r - clone.StartTs = time.Unix(0, s*int64(time.Millisecond)) - clone.EndTs = time.Unix(0, e*int64(time.Millisecond)) + clone.StartTs = s + clone.EndTs = e return &clone } @@ -168,8 +168,8 @@ func (r *LokiSeriesRequest) GetStep() int64 { func (r *LokiSeriesRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("matchers", strings.Join(r.GetMatch(), ",")), - otlog.String("start", timestamp.Time(r.GetStart()).String()), - otlog.String("end", timestamp.Time(r.GetEnd()).String()), + otlog.String("start", timestamp.Time(r.GetStart().UnixNano()).String()), + otlog.String("end", timestamp.Time(r.GetEnd().UnixNano()).String()), otlog.String("shards", strings.Join(r.GetShards(), ",")), ) } @@ -199,16 +199,16 @@ func (r *LabelRequest) AsProto() *logproto.LabelRequest { return &r.LabelRequest } -func (r *LabelRequest) GetEnd() int64 { - return r.End.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LabelRequest) GetEnd() time.Time { + return *r.End } func (r *LabelRequest) GetEndTs() time.Time { return *r.End } -func (r *LabelRequest) GetStart() int64 { - return r.Start.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LabelRequest) GetStart() time.Time { + return *r.Start } func (r *LabelRequest) GetStartTs() time.Time { @@ -219,11 +219,11 @@ func (r *LabelRequest) GetStep() int64 { return 0 } -func (r *LabelRequest) WithStartEnd(s int64, e int64) queryrangebase.Request { +func (r *LabelRequest) WithStartEnd(s, e time.Time) queryrangebase.Request { clone := *r - tmp := time.Unix(0, s*int64(time.Millisecond)) + tmp := s clone.Start = &tmp - tmp = time.Unix(0, e*int64(time.Millisecond)) + tmp = e clone.End = &tmp return &clone } @@ -236,8 +236,8 @@ func (r *LabelRequest) WithQuery(query string) queryrangebase.Request { func (r *LabelRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( - otlog.String("start", timestamp.Time(r.GetStart()).String()), - otlog.String("end", timestamp.Time(r.GetEnd()).String()), + otlog.String("start", timestamp.Time(r.GetStart().UnixNano()).String()), + otlog.String("end", timestamp.Time(r.GetEnd().UnixNano()).String()), ) } @@ -504,7 +504,7 @@ func (Codec) DecodeHTTPGrpcResponse(r *httpgrpc.HTTPResponse, req queryrangebase return decodeResponseJSONFrom(r.Body, req, headers) } -func (Codec) EncodeHTTPGrpcResponse(ctx context.Context, req *httpgrpc.HTTPRequest, res queryrangebase.Response) (*httpgrpc.HTTPResponse, error) { +func (Codec) EncodeHTTPGrpcResponse(_ context.Context, req *httpgrpc.HTTPRequest, res queryrangebase.Response) (*httpgrpc.HTTPResponse, error) { version := loghttp.GetVersion(req.Url) var buf bytes.Buffer @@ -1341,8 +1341,8 @@ func ParamsFromRequest(req queryrangebase.Request) (logql.Params, error) { Query: r.GetQuery(), Limit: uint32(r.GetLimit()), Step: r.GetStep(), - StartTs: time.UnixMilli(r.GetStart()), - EndTs: time.UnixMilli(r.GetEnd()), + StartTs: time.UnixMilli(r.GetStart().UnixNano()), + EndTs: time.UnixMilli(r.GetEnd().UnixNano()), }, }, nil case *LokiInstantRequest: diff --git a/pkg/querier/queryrange/index_stats_cache.go b/pkg/querier/queryrange/index_stats_cache.go index 27d40db2da..4814394fd4 100644 --- a/pkg/querier/queryrange/index_stats_cache.go +++ b/pkg/querier/queryrange/index_stats_cache.go @@ -83,7 +83,7 @@ func shouldCacheStats(ctx context.Context, req queryrangebase.Request, lim Limit maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) now := statsCacheMiddlewareNowTimeFunc() - return maxCacheFreshness == 0 || model.Time(req.GetEnd()).Before(now.Add(-maxCacheFreshness)), nil + return maxCacheFreshness == 0 || model.Time(req.GetEnd().UnixMilli()).Before(now.Add(-maxCacheFreshness)), nil } func NewIndexStatsCacheMiddleware( diff --git a/pkg/querier/queryrange/index_stats_cache_test.go b/pkg/querier/queryrange/index_stats_cache_test.go index 3024ba60da..8c154c36a2 100644 --- a/pkg/querier/queryrange/index_stats_cache_test.go +++ b/pkg/querier/queryrange/index_stats_cache_test.go @@ -78,7 +78,7 @@ func TestIndexStatsCache(t *testing.T) { // should reuse part of the previous request and issue a new request for the remaining time till end. // The new start time is 15m (i.e. 25%) in the future with regard to the previous request time span. *calls = 0 - req := statsReq.WithStartEnd(statsReq.GetStart()+(15*time.Minute).Milliseconds(), statsReq.GetEnd()+(15*time.Minute).Milliseconds()) + req := statsReq.WithStartEnd(statsReq.GetStart().Add(15*time.Minute), statsReq.GetEnd().Add(15*time.Minute)) expectedStats := &IndexStatsResponse{ Response: &logproto.IndexStatsResponse{ Streams: 2, diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index ddf38d30cd..82bef4bf95 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -29,7 +29,6 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index/stats" - "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" "github.com/grafana/loki/pkg/util/validation" @@ -109,7 +108,7 @@ func (l cacheKeyLimits) GenerateCacheKey(ctx context.Context, userID string, r q var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { - currentInterval = r.GetStart() / denominator + currentInterval = r.GetStart().UnixMilli() / denominator } if l.transformer != nil { @@ -150,26 +149,26 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que // Clamp the time range based on the max query lookback. lookbackCapture := func(id string) time.Duration { return l.MaxQueryLookback(ctx, id) } if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, lookbackCapture); maxQueryLookback > 0 { - minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback)) + minStartTime := time.Now().Add(-maxQueryLookback) - if r.GetEnd() < minStartTime { + if r.GetEnd().Before(minStartTime) { // The request is fully outside the allowed range, so we can return an // empty response. level.Debug(log).Log( "msg", "skipping the execution of the query because its time range is before the 'max query lookback' setting", - "reqStart", util.FormatTimeMillis(r.GetStart()), - "redEnd", util.FormatTimeMillis(r.GetEnd()), + "reqStart", r.GetStart().String(), + "redEnd", r.GetEnd().String(), "maxQueryLookback", maxQueryLookback) return NewEmptyResponse(r) } - if r.GetStart() < minStartTime { + if r.GetStart().Before(minStartTime) { // Replace the start time in the request. level.Debug(log).Log( "msg", "the start time of the query has been manipulated because of the 'max query lookback' setting", - "original", util.FormatTimeMillis(r.GetStart()), - "updated", util.FormatTimeMillis(minStartTime)) + "original", r.GetStart().String(), + "updated", minStartTime.String()) r = r.WithStartEnd(minStartTime, r.GetEnd()) } @@ -178,7 +177,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que // Enforce the max query length. lengthCapture := func(id string) time.Duration { return l.MaxQueryLength(ctx, id) } if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, lengthCapture); maxQueryLength > 0 { - queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart())) + queryLen := timestamp.Time(r.GetEnd().UnixMilli()).Sub(timestamp.Time(r.GetStart().UnixMilli())) if queryLen > maxQueryLength { return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, model.Duration(maxQueryLength)) } @@ -281,7 +280,7 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra // TODO: Set concurrency dynamically as in shardResolverForConf? start := time.Now() const maxConcurrentIndexReq = 10 - matcherStats, err := getStatsForMatchers(ctx, q.logger, q.statsHandler, model.Time(r.GetStart()), model.Time(r.GetEnd()), matcherGroups, maxConcurrentIndexReq, q.maxLookBackPeriod) + matcherStats, err := getStatsForMatchers(ctx, q.logger, q.statsHandler, model.Time(r.GetStart().UnixMilli()), model.Time(r.GetEnd().UnixMilli()), matcherGroups, maxConcurrentIndexReq, q.maxLookBackPeriod) if err != nil { return 0, err } @@ -309,8 +308,8 @@ func (q *querySizeLimiter) getSchemaCfg(r queryrangebase.Request) (config.Period return config.PeriodConfig{}, errors.New("failed to get range-vector and offset duration: " + err.Error()) } - adjustedStart := int64(model.Time(r.GetStart()).Add(-maxRVDuration).Add(-maxOffset)) - adjustedEnd := int64(model.Time(r.GetEnd()).Add(-maxOffset)) + adjustedStart := int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)) + adjustedEnd := int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset)) return ShardingConfigs(q.cfg).ValidRange(adjustedStart, adjustedEnd) } @@ -474,8 +473,8 @@ func (rt limitedRoundTripper) Do(c context.Context, request queryrangebase.Reque tenantIDs, rt.configs, rt.limits, - model.Time(request.GetStart()), - model.Time(request.GetEnd()), + model.Time(request.GetStart().UnixMilli()), + model.Time(request.GetEnd().UnixMilli()), ) if parallelism < 1 { diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 02c3862dd4..4ab81ec4ac 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -43,7 +43,7 @@ func TestLimits(t *testing.T) { require.Equal( t, - fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Hour/time.Millisecond), int64(time.Hour)), + fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart().UnixMilli()/int64(time.Hour/time.Millisecond), int64(time.Hour)), cacheKeyLimits{wrapped, nil}.GenerateCacheKey(context.Background(), "a", r), ) } @@ -580,7 +580,7 @@ func Test_MaxQuerySize_MaxLookBackPeriod(t *testing.T) { statsHandler := base.HandlerFunc(func(_ context.Context, req base.Request) (base.Response, error) { // This is the actual check that we're testing. - require.Equal(t, testTime.Add(-engineOpts.MaxLookBackPeriod).UnixMilli(), req.GetStart()) + require.Equal(t, testTime.Add(-engineOpts.MaxLookBackPeriod).UnixMilli(), req.GetStart().UnixMilli()) return &IndexStatsResponse{ Response: &logproto.IndexStatsResponse{ diff --git a/pkg/querier/queryrange/log_result_cache.go b/pkg/querier/queryrange/log_result_cache.go index 3f0555709b..a83041a94c 100644 --- a/pkg/querier/queryrange/log_result_cache.go +++ b/pkg/querier/queryrange/log_result_cache.go @@ -95,7 +95,7 @@ func (l *logResultCache) Do(ctx context.Context, req queryrangebase.Request) (qu cacheFreshnessCapture := func(id string) time.Duration { return l.limits.MaxCacheFreshness(ctx, id) } maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) maxCacheTime := int64(model.Now().Add(-maxCacheFreshness)) - if req.GetEnd() > maxCacheTime { + if req.GetEnd().UnixMilli() > maxCacheTime { return l.next.Do(ctx, req) } diff --git a/pkg/querier/queryrange/queryrangebase/definitions/interface.go b/pkg/querier/queryrange/queryrangebase/definitions/interface.go index dbb350a9c7..fb385817b5 100644 --- a/pkg/querier/queryrange/queryrangebase/definitions/interface.go +++ b/pkg/querier/queryrange/queryrangebase/definitions/interface.go @@ -3,6 +3,7 @@ package definitions import ( "context" "net/http" + "time" "github.com/gogo/protobuf/proto" "github.com/opentracing/opentracing-go" @@ -32,9 +33,9 @@ type Merger interface { // Request represents a query range request that can be process by middlewares. type Request interface { // GetStart returns the start timestamp of the request in milliseconds. - GetStart() int64 + GetStart() time.Time // GetEnd returns the end timestamp of the request in milliseconds. - GetEnd() int64 + GetEnd() time.Time // GetStep returns the step of the request in milliseconds. GetStep() int64 // GetQuery returns the query of the request. @@ -42,7 +43,7 @@ type Request interface { // GetCachingOptions returns the caching options. GetCachingOptions() CachingOptions // WithStartEnd clone the current request with different start and end timestamp. - WithStartEnd(startTime int64, endTime int64) Request + WithStartEnd(start time.Time, end time.Time) Request // WithQuery clone the current request with a different query. WithQuery(string) Request proto.Message diff --git a/pkg/querier/queryrange/queryrangebase/query_range.go b/pkg/querier/queryrange/queryrangebase/query_range.go index 70fe36dfc0..4c7426b171 100644 --- a/pkg/querier/queryrange/queryrangebase/query_range.go +++ b/pkg/querier/queryrange/queryrangebase/query_range.go @@ -7,10 +7,8 @@ import ( "io" "math" "net/http" - "net/url" "sort" "strconv" - "strings" "time" "github.com/gogo/status" @@ -22,7 +20,6 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/spanlogger" ) @@ -41,7 +38,7 @@ var ( errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per time series. Try increasing the value of the step parameter") // PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses. - PrometheusCodec Codec = &prometheusCodec{} + PrometheusCodec = &prometheusCodec{} // Name of the cache control header. cacheControlHeader = "Cache-Control" @@ -50,7 +47,7 @@ var ( type prometheusCodec struct{} // WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp. -func (q *PrometheusRequest) WithStartEnd(start int64, end int64) Request { +func (q *PrometheusRequest) WithStartEnd(start, end time.Time) Request { clone := *q clone.Start = start clone.End = end @@ -68,8 +65,8 @@ func (q *PrometheusRequest) WithQuery(query string) Request { func (q *PrometheusRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", q.GetQuery()), - otlog.String("start", timestamp.Time(q.GetStart()).String()), - otlog.String("end", timestamp.Time(q.GetEnd()).String()), + otlog.String("start", timestamp.Time(q.GetStart().UnixMilli()).String()), + otlog.String("end", timestamp.Time(q.GetEnd().UnixMilli()).String()), otlog.Int64("step (ms)", q.GetStep()), ) } @@ -137,95 +134,6 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) { return &response, nil } -func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (Request, error) { - var result PrometheusRequest - var err error - result.Start, err = util.ParseTime(r.FormValue("start")) - if err != nil { - return nil, decorateWithParamName(err, "start") - } - - result.End, err = util.ParseTime(r.FormValue("end")) - if err != nil { - return nil, decorateWithParamName(err, "end") - } - - if result.End < result.Start { - return nil, errEndBeforeStart - } - - result.Step, err = parseDurationMs(r.FormValue("step")) - if err != nil { - return nil, decorateWithParamName(err, "step") - } - - if result.Step <= 0 { - return nil, errNegativeStep - } - - // For safety, limit the number of returned points per timeseries. - // This is sufficient for 60s resolution for a week or 1h resolution for a year. - if (result.End-result.Start)/result.Step > 11000 { - return nil, errStepTooSmall - } - - result.Query = r.FormValue("query") - result.Path = r.URL.Path - - // Include the specified headers from http request in prometheusRequest. - for _, header := range forwardHeaders { - for h, hv := range r.Header { - if strings.EqualFold(h, header) { - result.Headers = append(result.Headers, &PrometheusRequestHeader{Name: h, Values: hv}) - break - } - } - } - - for _, value := range r.Header.Values(cacheControlHeader) { - if strings.Contains(value, noStoreValue) { - result.CachingOptions.Disabled = true - break - } - } - - return &result, nil -} - -func (prometheusCodec) EncodeRequest(ctx context.Context, r Request) (*http.Request, error) { - promReq, ok := r.(*PrometheusRequest) - if !ok { - return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format") - } - params := url.Values{ - "start": []string{encodeTime(promReq.Start)}, - "end": []string{encodeTime(promReq.End)}, - "step": []string{encodeDurationMs(promReq.Step)}, - "query": []string{promReq.Query}, - } - u := &url.URL{ - Path: promReq.Path, - RawQuery: params.Encode(), - } - var h = http.Header{} - - for _, hv := range promReq.Headers { - for _, v := range hv.Values { - h.Add(hv.Name, v) - } - } - - req := &http.Request{ - Method: "GET", - RequestURI: u.String(), // This is what the httpgrpc code looks at. - URL: u, - Body: http.NoBody, - Header: h, - } - - return req.WithContext(ctx), nil -} - func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ Request) (Response, error) { if r.StatusCode/100 != 2 { body, _ := io.ReadAll(r.Body) diff --git a/pkg/querier/queryrange/queryrangebase/query_range_test.go b/pkg/querier/queryrange/queryrangebase/query_range_test.go index ed46ea66ad..21c115eec5 100644 --- a/pkg/querier/queryrange/queryrangebase/query_range_test.go +++ b/pkg/querier/queryrange/queryrangebase/query_range_test.go @@ -8,8 +8,6 @@ import ( "strconv" "testing" - "github.com/grafana/dskit/httpgrpc" - "github.com/grafana/dskit/user" jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,69 +15,6 @@ import ( "github.com/grafana/loki/pkg/logproto" ) -func TestRequest(t *testing.T) { - // Create a Copy parsedRequest to assign the expected headers to the request without affecting other tests using the global. - // The test below adds a Test-Header header to the request and expects it back once the encode/decode of request is done via PrometheusCodec - parsedRequestWithHeaders := *parsedRequest - parsedRequestWithHeaders.Headers = reqHeaders - for i, tc := range []struct { - url string - expected Request - expectedErr error - }{ - { - url: query, - expected: &parsedRequestWithHeaders, - }, - { - url: "api/v1/query_range?start=foo", - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "invalid parameter \"start\"; cannot parse \"foo\" to a valid timestamp"), - }, - { - url: "api/v1/query_range?start=123&end=bar", - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "invalid parameter \"end\"; cannot parse \"bar\" to a valid timestamp"), - }, - { - url: "api/v1/query_range?start=123&end=0", - expectedErr: errEndBeforeStart, - }, - { - url: "api/v1/query_range?start=123&end=456&step=baz", - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "invalid parameter \"step\"; cannot parse \"baz\" to a valid duration"), - }, - { - url: "api/v1/query_range?start=123&end=456&step=-1", - expectedErr: errNegativeStep, - }, - { - url: "api/v1/query_range?start=0&end=11001&step=1", - expectedErr: errStepTooSmall, - }, - } { - t.Run(strconv.Itoa(i), func(t *testing.T) { - r, err := http.NewRequest("GET", tc.url, nil) - require.NoError(t, err) - r.Header.Add("Test-Header", "test") - - ctx := user.InjectOrgID(context.Background(), "1") - - // Get a deep copy of the request with Context changed to ctx - r = r.Clone(ctx) - - req, err := PrometheusCodec.DecodeRequest(ctx, r, []string{"Test-Header"}) - if err != nil { - require.EqualValues(t, tc.expectedErr, err) - return - } - require.EqualValues(t, tc.expected, req) - - rdash, err := PrometheusCodec.EncodeRequest(context.Background(), req) - require.NoError(t, err) - require.EqualValues(t, tc.url, rdash.RequestURI) - }) - } -} - func TestResponse(t *testing.T) { r := *parsedResponse r.Headers = respHeaders diff --git a/pkg/querier/queryrange/queryrangebase/queryrange.pb.go b/pkg/querier/queryrange/queryrangebase/queryrange.pb.go index da0a14e0fd..121b3ffb15 100644 --- a/pkg/querier/queryrange/queryrangebase/queryrange.pb.go +++ b/pkg/querier/queryrange/queryrangebase/queryrange.pb.go @@ -35,8 +35,8 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type PrometheusRequest struct { Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` - Start int64 `protobuf:"varint,2,opt,name=start,proto3" json:"start,omitempty"` - End int64 `protobuf:"varint,3,opt,name=end,proto3" json:"end,omitempty"` + Start time.Time `protobuf:"bytes,2,opt,name=start,proto3,stdtime" json:"start"` + End time.Time `protobuf:"bytes,3,opt,name=end,proto3,stdtime" json:"end"` Step int64 `protobuf:"varint,4,opt,name=step,proto3" json:"step,omitempty"` Timeout time.Duration `protobuf:"bytes,5,opt,name=timeout,proto3,stdduration" json:"timeout"` Query string `protobuf:"bytes,6,opt,name=query,proto3" json:"query,omitempty"` @@ -83,18 +83,18 @@ func (m *PrometheusRequest) GetPath() string { return "" } -func (m *PrometheusRequest) GetStart() int64 { +func (m *PrometheusRequest) GetStart() time.Time { if m != nil { return m.Start } - return 0 + return time.Time{} } -func (m *PrometheusRequest) GetEnd() int64 { +func (m *PrometheusRequest) GetEnd() time.Time { if m != nil { return m.End } - return 0 + return time.Time{} } func (m *PrometheusRequest) GetStep() int64 { @@ -435,59 +435,60 @@ func init() { } var fileDescriptor_4cc6a0c1d6b614c4 = []byte{ - // 821 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0x4f, 0x6f, 0xdc, 0x44, - 0x14, 0xdf, 0xc9, 0xfe, 0x9f, 0x54, 0xdb, 0x32, 0x8d, 0x8a, 0xd3, 0x22, 0x7b, 0xb5, 0x02, 0x29, - 0x48, 0xe0, 0x15, 0x41, 0x70, 0x2b, 0x22, 0x4e, 0x82, 0x68, 0x55, 0x89, 0x6a, 0xc2, 0x89, 0x0b, - 0x9a, 0x5d, 0xbf, 0x38, 0x56, 0xfc, 0xaf, 0x33, 0xe3, 0x8a, 0xbd, 0x71, 0xe2, 0xcc, 0x91, 0x8f, - 0xc0, 0x01, 0xf1, 0x39, 0x22, 0x4e, 0x39, 0x56, 0x1c, 0x0c, 0xd9, 0x5c, 0x90, 0x4f, 0xfd, 0x08, - 0x68, 0x66, 0xec, 0x8d, 0x77, 0xab, 0x40, 0x2f, 0xbb, 0xef, 0xcf, 0xef, 0xbd, 0x79, 0xbf, 0xdf, - 0x8c, 0x1f, 0xfe, 0x3c, 0x3b, 0x0f, 0xa6, 0x2f, 0x72, 0xe0, 0x21, 0x70, 0xfd, 0xbf, 0xe0, 0x2c, - 0x09, 0xa0, 0x61, 0xce, 0x98, 0x68, 0xba, 0x6e, 0xc6, 0x53, 0x99, 0x92, 0xd1, 0x3a, 0xe0, 0xe1, - 0x4e, 0x90, 0x06, 0xa9, 0x4e, 0x4d, 0x95, 0x65, 0x50, 0x0f, 0x77, 0x83, 0x34, 0x0d, 0x22, 0x98, - 0x6a, 0x6f, 0x96, 0x9f, 0x4e, 0x59, 0xb2, 0xa8, 0x52, 0xf6, 0x66, 0xca, 0xcf, 0x39, 0x93, 0x61, - 0x9a, 0x54, 0xf9, 0x47, 0x6a, 0xb0, 0x28, 0x0d, 0x4c, 0xcf, 0xda, 0xa8, 0x92, 0x87, 0x6f, 0x37, - 0xb5, 0x0f, 0xa7, 0x61, 0x12, 0xaa, 0xa6, 0xa2, 0x69, 0x9b, 0x26, 0x93, 0x3f, 0xb6, 0xf0, 0x3b, - 0xcf, 0x79, 0x1a, 0x83, 0x3c, 0x83, 0x5c, 0x50, 0x78, 0x91, 0x83, 0x90, 0x84, 0xe0, 0x4e, 0xc6, - 0xe4, 0x99, 0x85, 0xc6, 0x68, 0x6f, 0x48, 0xb5, 0x4d, 0x76, 0x70, 0x57, 0x48, 0xc6, 0xa5, 0xb5, - 0x35, 0x46, 0x7b, 0x6d, 0x6a, 0x1c, 0x72, 0x0f, 0xb7, 0x21, 0xf1, 0xad, 0xb6, 0x8e, 0x29, 0x53, - 0xd5, 0x0a, 0x09, 0x99, 0xd5, 0xd1, 0x21, 0x6d, 0x93, 0xc7, 0xb8, 0x2f, 0xc3, 0x18, 0xd2, 0x5c, - 0x5a, 0xdd, 0x31, 0xda, 0xdb, 0xde, 0xdf, 0x75, 0x0d, 0x73, 0xb7, 0x66, 0xee, 0x1e, 0x55, 0xcc, - 0xbd, 0xc1, 0x45, 0xe1, 0xb4, 0x7e, 0xf9, 0xcb, 0x41, 0xb4, 0xae, 0x51, 0x47, 0x6b, 0x52, 0x56, - 0x4f, 0xcf, 0x63, 0x1c, 0xf2, 0x04, 0x8f, 0xe6, 0x6c, 0x7e, 0x16, 0x26, 0xc1, 0x37, 0x99, 0xa6, - 0x64, 0xf5, 0x75, 0xef, 0x47, 0x6e, 0x93, 0xe6, 0xe1, 0x1a, 0xc4, 0xeb, 0xa8, 0xee, 0x74, 0xa3, - 0x90, 0x1c, 0xe3, 0xfe, 0xd7, 0xc0, 0x7c, 0xe0, 0xc2, 0x1a, 0x8c, 0xdb, 0x7b, 0xdb, 0xfb, 0xef, - 0xaf, 0xf5, 0x78, 0x43, 0x20, 0x03, 0xf6, 0xba, 0x65, 0xe1, 0xa0, 0x8f, 0x69, 0x5d, 0x3b, 0xf9, - 0x7d, 0x0b, 0x93, 0x26, 0x56, 0x64, 0x69, 0x22, 0x80, 0x4c, 0x70, 0xef, 0x44, 0x32, 0x99, 0x0b, - 0xa3, 0xa7, 0x87, 0xcb, 0xc2, 0xe9, 0x09, 0x1d, 0xa1, 0x55, 0x86, 0x3c, 0xc5, 0x9d, 0x23, 0x26, - 0x99, 0x16, 0x77, 0x7b, 0xdf, 0x76, 0xd7, 0x2f, 0xb1, 0x31, 0x81, 0x42, 0x79, 0x0f, 0x14, 0x8b, - 0xb2, 0x70, 0x46, 0x3e, 0x93, 0xec, 0xa3, 0x34, 0x0e, 0x25, 0xc4, 0x99, 0x5c, 0x50, 0xdd, 0x83, - 0x7c, 0x86, 0x87, 0xc7, 0x9c, 0xa7, 0xfc, 0xdb, 0x45, 0x06, 0xfa, 0x66, 0x86, 0xde, 0xbb, 0x65, - 0xe1, 0xdc, 0x87, 0x3a, 0xd8, 0xa8, 0xb8, 0x41, 0x92, 0x0f, 0x71, 0x57, 0x3b, 0xfa, 0xe6, 0x86, - 0xde, 0xfd, 0xb2, 0x70, 0xee, 0xea, 0x92, 0x06, 0xdc, 0x20, 0xc8, 0x57, 0x37, 0x7a, 0x75, 0xb5, - 0x5e, 0x1f, 0xdc, 0xaa, 0x97, 0xd1, 0xe0, 0x16, 0xc1, 0x7e, 0x42, 0x78, 0xb4, 0x4e, 0x8d, 0xb8, - 0x18, 0x53, 0x10, 0x79, 0x24, 0xf5, 0xf4, 0x46, 0xb0, 0x51, 0x59, 0x38, 0x98, 0xaf, 0xa2, 0xb4, - 0x81, 0x20, 0x47, 0xb8, 0x67, 0x3c, 0x6b, 0x4b, 0x4f, 0xf2, 0xde, 0xa6, 0x74, 0x27, 0x2c, 0xce, - 0x22, 0x38, 0x91, 0x1c, 0x58, 0xec, 0x8d, 0x2a, 0xe1, 0x7a, 0xa6, 0x1b, 0xad, 0x6a, 0x27, 0x17, - 0x08, 0xdf, 0x69, 0x02, 0xc9, 0x4b, 0xdc, 0x8b, 0xd8, 0x0c, 0x22, 0x75, 0x67, 0x6d, 0xfd, 0x60, - 0x57, 0x5f, 0xdf, 0x33, 0x08, 0xd8, 0x7c, 0xf1, 0x4c, 0x65, 0x9f, 0xb3, 0x90, 0x7b, 0x87, 0xaa, - 0xe7, 0x9f, 0x85, 0xf3, 0x49, 0x10, 0xca, 0xb3, 0x7c, 0xe6, 0xce, 0xd3, 0x78, 0x1a, 0x70, 0x76, - 0xca, 0x12, 0x36, 0x8d, 0xd2, 0xf3, 0x70, 0xda, 0xfc, 0x88, 0x5d, 0x5d, 0x77, 0xe0, 0xb3, 0x4c, - 0x02, 0x57, 0x83, 0xc4, 0x20, 0x79, 0x38, 0xa7, 0xd5, 0x69, 0xe4, 0x4b, 0xdc, 0x17, 0x7a, 0x0e, - 0x51, 0xf1, 0x79, 0xb0, 0x79, 0xb0, 0x19, 0xf3, 0x86, 0xc9, 0x4b, 0x16, 0xe5, 0x20, 0x68, 0x5d, - 0x36, 0x49, 0xf0, 0x48, 0xbd, 0x79, 0xf0, 0x57, 0xef, 0x6f, 0x17, 0xb7, 0xcf, 0x61, 0x51, 0x69, - 0xd9, 0x2f, 0x0b, 0x47, 0xb9, 0x54, 0xfd, 0x90, 0x03, 0xdc, 0x87, 0x1f, 0x24, 0x24, 0xf2, 0xe6, - 0xb8, 0x0d, 0xf9, 0x8e, 0x75, 0xda, 0xbb, 0x5b, 0x1d, 0x57, 0xc3, 0x69, 0x6d, 0x4c, 0x7e, 0x43, - 0xb8, 0x67, 0x40, 0xc4, 0xa9, 0x57, 0x84, 0x3a, 0xaa, 0xed, 0x0d, 0xcb, 0xc2, 0x31, 0x81, 0x7a, - 0x5b, 0xec, 0x9a, 0x6d, 0xa1, 0x37, 0x88, 0x99, 0x04, 0x12, 0xdf, 0xac, 0x8d, 0x31, 0x1e, 0x48, - 0xce, 0xe6, 0xf0, 0x7d, 0xe8, 0x57, 0x0f, 0xb0, 0x7e, 0x2c, 0x3a, 0xfc, 0xc4, 0x27, 0x5f, 0xe0, - 0x01, 0xaf, 0x28, 0x55, 0x5b, 0x64, 0xe7, 0x8d, 0x2d, 0x72, 0x90, 0x2c, 0xbc, 0x3b, 0x65, 0xe1, - 0xac, 0x90, 0x74, 0x65, 0x3d, 0xed, 0x0c, 0xda, 0xf7, 0x3a, 0x9e, 0xb8, 0xbc, 0xb2, 0x5b, 0xaf, - 0xae, 0xec, 0xd6, 0xeb, 0x2b, 0x1b, 0xfd, 0xb8, 0xb4, 0xd1, 0xaf, 0x4b, 0x1b, 0x5d, 0x2c, 0x6d, - 0x74, 0xb9, 0xb4, 0xd1, 0xdf, 0x4b, 0x1b, 0xfd, 0xb3, 0xb4, 0x5b, 0xaf, 0x97, 0x36, 0xfa, 0xf9, - 0xda, 0x6e, 0x5d, 0x5e, 0xdb, 0xad, 0x57, 0xd7, 0x76, 0xeb, 0xbb, 0xc7, 0xff, 0x75, 0xb7, 0xff, - 0xbb, 0x83, 0x67, 0x3d, 0x3d, 0xe0, 0xa7, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xd5, 0x7e, 0x59, - 0x12, 0x69, 0x06, 0x00, 0x00, + // 846 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4f, 0x6f, 0xdc, 0x44, + 0x14, 0x5f, 0xc7, 0xfb, 0x77, 0x5a, 0x6d, 0x61, 0x1a, 0x15, 0x27, 0x45, 0xf6, 0x6a, 0x05, 0x52, + 0x90, 0xc0, 0x2b, 0x8a, 0xe8, 0x01, 0xa9, 0x88, 0x38, 0x09, 0xa2, 0x55, 0x25, 0x2a, 0xa7, 0x27, + 0x2e, 0x68, 0x76, 0xfd, 0xe2, 0x58, 0xf1, 0xbf, 0xce, 0x8c, 0x2b, 0xf6, 0xc6, 0x89, 0x73, 0x4f, + 0x88, 0x8f, 0xc0, 0x01, 0xf1, 0x39, 0x72, 0xcc, 0xb1, 0xe2, 0x60, 0xc8, 0xe6, 0x82, 0x7c, 0xea, + 0x47, 0x40, 0xf3, 0xc7, 0xbb, 0xde, 0x8d, 0x02, 0xf4, 0xb2, 0xfb, 0x66, 0xde, 0xef, 0xbd, 0xf7, + 0x7b, 0xbf, 0x79, 0x7e, 0xe8, 0x61, 0x7e, 0x16, 0x4e, 0x5e, 0x14, 0x40, 0x23, 0xa0, 0xf2, 0x7f, + 0x4e, 0x49, 0x1a, 0x42, 0xc3, 0x9c, 0x12, 0xd6, 0x3c, 0xba, 0x39, 0xcd, 0x78, 0x86, 0x87, 0xeb, + 0x80, 0xdd, 0xed, 0x30, 0x0b, 0x33, 0xe9, 0x9a, 0x08, 0x4b, 0xa1, 0x76, 0x77, 0xc2, 0x2c, 0x0b, + 0x63, 0x98, 0xc8, 0xd3, 0xb4, 0x38, 0x99, 0x90, 0x74, 0xae, 0x5d, 0xf6, 0xa6, 0x2b, 0x28, 0x28, + 0xe1, 0x51, 0x96, 0x6a, 0xbf, 0xb3, 0xe9, 0xe7, 0x51, 0x02, 0x8c, 0x93, 0x24, 0xd7, 0x80, 0xfb, + 0x82, 0x79, 0x9c, 0x85, 0xaa, 0x68, 0x6d, 0x68, 0xe7, 0xc1, 0xff, 0x6b, 0x2b, 0x80, 0x93, 0x28, + 0x8d, 0x44, 0x55, 0xd6, 0xb4, 0x55, 0x92, 0xf1, 0xcf, 0x26, 0x7a, 0xf7, 0x19, 0xcd, 0x12, 0xe0, + 0xa7, 0x50, 0x30, 0x1f, 0x5e, 0x14, 0xc0, 0x38, 0xc6, 0xa8, 0x9d, 0x13, 0x7e, 0x6a, 0x19, 0x23, + 0x63, 0x6f, 0xe0, 0x4b, 0x1b, 0x7f, 0x81, 0x3a, 0x8c, 0x13, 0xca, 0xad, 0xad, 0x91, 0xb1, 0x77, + 0xeb, 0xc1, 0xae, 0xab, 0xc8, 0xbb, 0x35, 0x79, 0xf7, 0x79, 0x4d, 0xde, 0xeb, 0x9f, 0x97, 0x4e, + 0xeb, 0xd5, 0x9f, 0x8e, 0xe1, 0xab, 0x10, 0xfc, 0x10, 0x99, 0x90, 0x06, 0x96, 0xf9, 0x16, 0x91, + 0x22, 0x40, 0xf0, 0x60, 0x1c, 0x72, 0xab, 0x3d, 0x32, 0xf6, 0x4c, 0x5f, 0xda, 0xf8, 0x11, 0xea, + 0x09, 0x99, 0xb2, 0x82, 0x5b, 0x1d, 0x99, 0x6f, 0xe7, 0x5a, 0xbe, 0x43, 0x2d, 0xb3, 0x4a, 0xf7, + 0x8b, 0x48, 0x57, 0xc7, 0xe0, 0x6d, 0xd4, 0x91, 0x02, 0x59, 0x5d, 0xd9, 0x9b, 0x3a, 0xe0, 0xc7, + 0x68, 0x38, 0x23, 0xb3, 0xd3, 0x28, 0x0d, 0xbf, 0xcd, 0xa5, 0x3c, 0x56, 0x4f, 0xe6, 0xbe, 0xef, + 0x36, 0x25, 0x3b, 0x58, 0x83, 0x78, 0x6d, 0x91, 0xdd, 0xdf, 0x08, 0xc4, 0x47, 0xa8, 0xf7, 0x0d, + 0x90, 0x00, 0x28, 0xb3, 0xfa, 0x23, 0x73, 0xef, 0xd6, 0x83, 0x0f, 0xd6, 0x72, 0x5c, 0x13, 0x5b, + 0x81, 0xbd, 0x4e, 0x55, 0x3a, 0xc6, 0x27, 0x7e, 0x1d, 0x3b, 0xfe, 0x7d, 0x0b, 0xe1, 0x26, 0x96, + 0xe5, 0x59, 0xca, 0x00, 0x8f, 0x51, 0xf7, 0x98, 0x13, 0x5e, 0x30, 0xf5, 0x36, 0x1e, 0xaa, 0x4a, + 0xa7, 0xcb, 0xe4, 0x8d, 0xaf, 0x3d, 0xf8, 0x09, 0x6a, 0x1f, 0x12, 0x4e, 0xf4, 0x43, 0xd9, 0xee, + 0xfa, 0x40, 0x34, 0x18, 0x08, 0x94, 0x77, 0x4f, 0x74, 0x51, 0x95, 0xce, 0x30, 0x20, 0x9c, 0x7c, + 0x9c, 0x25, 0x11, 0x87, 0x24, 0xe7, 0x73, 0x5f, 0xe6, 0xc0, 0x9f, 0xa3, 0xc1, 0x11, 0xa5, 0x19, + 0x7d, 0x3e, 0xcf, 0x41, 0xbe, 0xdf, 0xc0, 0x7b, 0xaf, 0x2a, 0x9d, 0xbb, 0x50, 0x5f, 0x36, 0x22, + 0x56, 0x48, 0xfc, 0x11, 0xea, 0xc8, 0x83, 0x7c, 0xb9, 0x81, 0x77, 0xb7, 0x2a, 0x9d, 0x3b, 0x32, + 0xa4, 0x01, 0x57, 0x08, 0xfc, 0xf5, 0x4a, 0xaf, 0x8e, 0xd4, 0xeb, 0xc3, 0x1b, 0xf5, 0x52, 0x1a, + 0xdc, 0x20, 0xd8, 0x4f, 0x06, 0x1a, 0xae, 0xb7, 0x86, 0x5d, 0x84, 0x7c, 0x60, 0x45, 0xcc, 0x25, + 0x7b, 0x25, 0xd8, 0xb0, 0x2a, 0x1d, 0x44, 0x97, 0xb7, 0x7e, 0x03, 0x81, 0x0f, 0x51, 0x57, 0x9d, + 0xac, 0x2d, 0xc9, 0xe4, 0xfd, 0x4d, 0xe9, 0x8e, 0x49, 0x92, 0xc7, 0x70, 0xcc, 0x29, 0x90, 0xc4, + 0x1b, 0x6a, 0xe1, 0xba, 0x2a, 0x9b, 0xaf, 0x63, 0xc7, 0xe7, 0x06, 0xba, 0xdd, 0x04, 0xe2, 0x97, + 0xa8, 0x1b, 0x93, 0x29, 0xc4, 0xe2, 0xcd, 0x4c, 0x39, 0xb0, 0xcb, 0x2f, 0xf9, 0x29, 0x84, 0x64, + 0x36, 0x7f, 0x2a, 0xbc, 0xcf, 0x48, 0x44, 0xbd, 0x03, 0x91, 0xf3, 0x8f, 0xd2, 0xf9, 0x34, 0x8c, + 0xf8, 0x69, 0x31, 0x75, 0x67, 0x59, 0x32, 0x09, 0x29, 0x39, 0x21, 0x29, 0x99, 0xc4, 0xd9, 0x59, + 0x34, 0x69, 0x2e, 0x04, 0x57, 0xc6, 0xed, 0x07, 0x24, 0xe7, 0x40, 0x05, 0x91, 0x04, 0x38, 0x8d, + 0x66, 0xbe, 0xae, 0x86, 0xbf, 0x42, 0x3d, 0x26, 0x79, 0x30, 0xdd, 0xcf, 0xbd, 0xcd, 0xc2, 0x8a, + 0xe6, 0xaa, 0x93, 0x97, 0x24, 0x2e, 0x80, 0xf9, 0x75, 0xd8, 0x38, 0x45, 0x43, 0x31, 0xf3, 0x10, + 0x2c, 0xe7, 0x6f, 0x07, 0x99, 0x67, 0x30, 0xd7, 0x5a, 0xf6, 0xaa, 0xd2, 0x11, 0x47, 0x5f, 0xfc, + 0xe0, 0x7d, 0xd4, 0x83, 0x1f, 0x38, 0xa4, 0x7c, 0x55, 0x6e, 0x43, 0xbe, 0x23, 0xe9, 0xf6, 0xee, + 0xe8, 0x72, 0x35, 0xdc, 0xaf, 0x8d, 0xf1, 0x6f, 0x06, 0xea, 0x2a, 0x10, 0x76, 0xea, 0x75, 0x23, + 0x4a, 0x99, 0xde, 0xa0, 0x2a, 0x1d, 0x75, 0x51, 0xef, 0x94, 0x1d, 0xb5, 0x53, 0xb6, 0xa4, 0x5b, + 0x32, 0x81, 0x34, 0x50, 0x6b, 0x63, 0x84, 0xfa, 0x9c, 0x92, 0x19, 0x7c, 0x1f, 0x05, 0x7a, 0x00, + 0xeb, 0x61, 0x91, 0xd7, 0x8f, 0x03, 0xfc, 0x25, 0xea, 0x53, 0xdd, 0x92, 0xde, 0x22, 0xdb, 0xd7, + 0xb6, 0xc8, 0x7e, 0x3a, 0xf7, 0x6e, 0x57, 0xa5, 0xb3, 0x44, 0xfa, 0x4b, 0xeb, 0x49, 0xbb, 0x6f, + 0xbe, 0xd3, 0xf6, 0xd8, 0xc5, 0xa5, 0xdd, 0x7a, 0x7d, 0x69, 0xb7, 0xde, 0x5c, 0xda, 0xc6, 0x8f, + 0x0b, 0xdb, 0xf8, 0x75, 0x61, 0x1b, 0xe7, 0x0b, 0xdb, 0xb8, 0x58, 0xd8, 0xc6, 0x5f, 0x0b, 0xdb, + 0xf8, 0x7b, 0x61, 0xb7, 0xde, 0x2c, 0x6c, 0xe3, 0xd5, 0x95, 0xdd, 0xba, 0xb8, 0xb2, 0x5b, 0xaf, + 0xaf, 0xec, 0xd6, 0x77, 0x8f, 0xfe, 0xed, 0x6d, 0xff, 0x73, 0x9f, 0x4f, 0xbb, 0x92, 0xe0, 0x67, + 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x63, 0x5c, 0x0b, 0x88, 0xd6, 0x06, 0x00, 0x00, } func (this *PrometheusRequest) Equal(that interface{}) bool { @@ -512,10 +513,10 @@ func (this *PrometheusRequest) Equal(that interface{}) bool { if this.Path != that1.Path { return false } - if this.Start != that1.Start { + if !this.Start.Equal(that1.Start) { return false } - if this.End != that1.End { + if !this.End.Equal(that1.End) { return false } if this.Step != that1.Step { @@ -888,16 +889,22 @@ func (m *PrometheusRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x20 } - if m.End != 0 { - i = encodeVarintQueryrange(dAtA, i, uint64(m.End)) - i-- - dAtA[i] = 0x18 + n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.End, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.End):]) + if err3 != nil { + return 0, err3 } - if m.Start != 0 { - i = encodeVarintQueryrange(dAtA, i, uint64(m.Start)) - i-- - dAtA[i] = 0x10 + i -= n3 + i = encodeVarintQueryrange(dAtA, i, uint64(n3)) + i-- + dAtA[i] = 0x1a + n4, err4 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Start, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Start):]) + if err4 != nil { + return 0, err4 } + i -= n4 + i = encodeVarintQueryrange(dAtA, i, uint64(n4)) + i-- + dAtA[i] = 0x12 if len(m.Path) > 0 { i -= len(m.Path) copy(dAtA[i:], m.Path) @@ -1188,12 +1195,10 @@ func (m *PrometheusRequest) Size() (n int) { if l > 0 { n += 1 + l + sovQueryrange(uint64(l)) } - if m.Start != 0 { - n += 1 + sovQueryrange(uint64(m.Start)) - } - if m.End != 0 { - n += 1 + sovQueryrange(uint64(m.End)) - } + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Start) + n += 1 + l + sovQueryrange(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.End) + n += 1 + l + sovQueryrange(uint64(l)) if m.Step != 0 { n += 1 + sovQueryrange(uint64(m.Step)) } @@ -1342,8 +1347,8 @@ func (this *PrometheusRequest) String() string { repeatedStringForHeaders += "}" s := strings.Join([]string{`&PrometheusRequest{`, `Path:` + fmt.Sprintf("%v", this.Path) + `,`, - `Start:` + fmt.Sprintf("%v", this.Start) + `,`, - `End:` + fmt.Sprintf("%v", this.End) + `,`, + `Start:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `End:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.End), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `Step:` + fmt.Sprintf("%v", this.Step) + `,`, `Timeout:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timeout), "Duration", "duration.Duration", 1), `&`, ``, 1) + `,`, `Query:` + fmt.Sprintf("%v", this.Query) + `,`, @@ -1503,10 +1508,10 @@ func (m *PrometheusRequest) Unmarshal(dAtA []byte) error { m.Path = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: - if wireType != 0 { + if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Start", wireType) } - m.Start = 0 + var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowQueryrange @@ -1516,16 +1521,30 @@ func (m *PrometheusRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Start |= int64(b&0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Start, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex case 3: - if wireType != 0 { + if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field End", wireType) } - m.End = 0 + var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowQueryrange @@ -1535,11 +1554,25 @@ func (m *PrometheusRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.End |= int64(b&0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.End, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex case 4: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Step", wireType) diff --git a/pkg/querier/queryrange/queryrangebase/queryrange.proto b/pkg/querier/queryrange/queryrangebase/queryrange.proto index 8ee2895d66..ad66551d2b 100644 --- a/pkg/querier/queryrange/queryrangebase/queryrange.proto +++ b/pkg/querier/queryrange/queryrangebase/queryrange.proto @@ -5,6 +5,7 @@ package queryrangebase; import "gogoproto/gogo.proto"; import "google/protobuf/any.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; import "pkg/logproto/logproto.proto"; import "pkg/querier/queryrange/queryrangebase/definitions/definitions.proto"; @@ -14,8 +15,14 @@ option (gogoproto.unmarshaler_all) = true; message PrometheusRequest { string path = 1; - int64 start = 2; - int64 end = 3; + google.protobuf.Timestamp start = 2 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + google.protobuf.Timestamp end = 3 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; int64 step = 4; google.protobuf.Duration timeout = 5 [ (gogoproto.stdduration) = true, diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index c81193d88e..05d6a26f67 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -21,7 +21,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/uber/jaeger-client-go" @@ -145,7 +144,7 @@ type constSplitter time.Duration // GenerateCacheKey generates a cache key based on the userID, Request and interval. func (t constSplitter) GenerateCacheKey(_ context.Context, userID string, r Request) string { - currentInterval := r.GetStart() / int64(time.Duration(t)/time.Millisecond) + currentInterval := r.GetStart().UnixMilli() / int64(time.Duration(t)/time.Millisecond) return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval) } @@ -237,7 +236,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { sp.LogKV( "query", r.GetQuery(), "step", time.UnixMilli(r.GetStep()), - "start", time.UnixMilli(r.GetStart()), + "start", r.GetStart(), "end", r.GetEnd(), "key", key, ) @@ -245,7 +244,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { cacheFreshnessCapture := func(id string) time.Duration { return s.limits.MaxCacheFreshness(ctx, id) } maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) maxCacheTime := int64(model.Now().Add(-maxCacheFreshness)) - if r.GetStart() > maxCacheTime { + if r.GetStart().UnixMilli() > maxCacheTime { return s.next.Do(ctx, r) } @@ -338,9 +337,9 @@ func (s resultsCache) isAtModifierCachable(r Request, maxCacheTime int64) bool { } // This resolves the start() and end() used with the @ modifier. - expr = promql.PreprocessExpr(expr, timestamp.Time(r.GetStart()), timestamp.Time(r.GetEnd())) + expr = promql.PreprocessExpr(expr, r.GetStart(), r.GetEnd()) - end := r.GetEnd() + end := r.GetEnd().UnixMilli() atModCachable := true parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error { switch e := n.(type) { @@ -533,8 +532,8 @@ func toExtent(ctx context.Context, req Request, res Response) (Extent, error) { return Extent{}, err } return Extent{ - Start: req.GetStart(), - End: req.GetEnd(), + Start: req.GetStart().UnixMilli(), + End: req.GetEnd().UnixMilli(), Response: anyResp, TraceId: jaegerTraceID(ctx), }, nil @@ -545,11 +544,12 @@ func toExtent(ctx context.Context, req Request, res Response) (Extent, error) { func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Response, error) { var requests []Request var cachedResponses []Response - start := req.GetStart() + start := req.GetStart().UnixMilli() + end := req.GetEnd().UnixMilli() for _, extent := range extents { // If there is no overlap, ignore this extent. - if extent.GetEnd() < start || extent.Start > req.GetEnd() { + if extent.GetEnd() < start || extent.Start > end { continue } @@ -559,13 +559,13 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res // However if the step is large enough, the split_query_by_interval middleware would generate a query with same start and end. // For example, if the step size is more than 12h and the interval is 24h. // This means the extent's start and end time would be same, even if the timerange covers several hours. - if (req.GetStart() != req.GetEnd()) && (req.GetEnd()-req.GetStart() > s.minCacheExtent) && (extent.End-extent.Start < s.minCacheExtent) { + if (req.GetStart() != req.GetEnd()) && ((end - start) > s.minCacheExtent) && (extent.End-extent.Start < s.minCacheExtent) { continue } // If there is a bit missing at the front, make a request for that. if start < extent.Start { - r := req.WithStartEnd(start, extent.Start) + r := req.WithStartEnd(time.UnixMilli(start), time.UnixMilli(extent.Start)) requests = append(requests, r) } res, err := extent.toResponse() @@ -573,13 +573,13 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res return nil, nil, err } // extract the overlap from the cached extent. - cachedResponses = append(cachedResponses, s.extractor.Extract(start, req.GetEnd(), res, extent.GetStart(), extent.GetEnd())) + cachedResponses = append(cachedResponses, s.extractor.Extract(start, end, res, extent.GetStart(), extent.GetEnd())) start = extent.End } // Lastly, make a request for any data missing at the end. - if start < req.GetEnd() { - r := req.WithStartEnd(start, req.GetEnd()) + if start < req.GetEnd().UnixMilli() { + r := req.WithStartEnd(time.UnixMilli(start), time.UnixMilli(end)) requests = append(requests, r) } diff --git a/pkg/querier/queryrange/queryrangebase/results_cache_test.go b/pkg/querier/queryrange/queryrangebase/results_cache_test.go index 93302bf426..bfe3ecea5f 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache_test.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache_test.go @@ -28,8 +28,8 @@ const ( var ( parsedRequest = &PrometheusRequest{ Path: "/api/v1/query_range", - Start: 1536673680 * 1e3, - End: 1536716898 * 1e3, + Start: time.UnixMilli(1536673680 * 1e3), + End: time.UnixMilli(1536716898 * 1e3), Step: 120 * 1e3, Query: "sum(container_memory_rss) by (namespace)", } @@ -41,8 +41,8 @@ var ( } noCacheRequest = &PrometheusRequest{ Path: "/api/v1/query_range", - Start: 1536673680 * 1e3, - End: 1536716898 * 1e3, + Start: time.UnixMilli(1536673680 * 1e3), + End: time.UnixMilli(1536716898 * 1e3), Step: 120 * 1e3, Query: "sum(container_memory_rss) by (namespace)", CachingOptions: CachingOptions{Disabled: true}, @@ -278,111 +278,111 @@ func TestShouldCache(t *testing.T) { // @ modifier on vector selectors. { name: "@ modifier on vector selector, before end, before maxCacheTime", - request: &PrometheusRequest{Query: "metric @ 123", End: 125000}, + request: &PrometheusRequest{Query: "metric @ 123", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on vector selector, after end, before maxCacheTime", - request: &PrometheusRequest{Query: "metric @ 127", End: 125000}, + request: &PrometheusRequest{Query: "metric @ 127", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on vector selector, before end, after maxCacheTime", - request: &PrometheusRequest{Query: "metric @ 151", End: 200000}, + request: &PrometheusRequest{Query: "metric @ 151", End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on vector selector, after end, after maxCacheTime", - request: &PrometheusRequest{Query: "metric @ 151", End: 125000}, + request: &PrometheusRequest{Query: "metric @ 151", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on vector selector with start() before maxCacheTime", - request: &PrometheusRequest{Query: "metric @ start()", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "metric @ start()", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on vector selector with end() after maxCacheTime", - request: &PrometheusRequest{Query: "metric @ end()", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "metric @ end()", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, // @ modifier on matrix selectors. { name: "@ modifier on matrix selector, before end, before maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ 123)", End: 125000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ 123)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on matrix selector, after end, before maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ 127)", End: 125000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ 127)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on matrix selector, before end, after maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: 200000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on matrix selector, after end, after maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: 125000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on matrix selector with start() before maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ start())", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ start())", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on matrix selector with end() after maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ end())", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ end())", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, // @ modifier on subqueries. { name: "@ modifier on subqueries, before end, before maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 123)", End: 125000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 123)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on subqueries, after end, before maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 127)", End: 125000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 127)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on subqueries, before end, after maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: 200000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on subqueries, after end, after maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: 125000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on subqueries with start() before maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ start())", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ start())", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on subqueries with end() after maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ end())", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ end())", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, @@ -408,8 +408,8 @@ func TestPartition(t *testing.T) { { name: "Test a complete hit.", input: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(0, 100), @@ -422,32 +422,32 @@ func TestPartition(t *testing.T) { { name: "Test with a complete miss.", input: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(110, 210), }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), }, }, }, { name: "Test a partial hit.", input: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(50, 100), }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 0, - End: 50, + Start: time.UnixMilli(0), + End: time.UnixMilli(50), }, }, expectedCachedResponse: []Response{ @@ -457,8 +457,8 @@ func TestPartition(t *testing.T) { { name: "Test multiple partial hits.", input: &PrometheusRequest{ - Start: 100, - End: 200, + Start: time.UnixMilli(100), + End: time.UnixMilli(200), }, prevCachedResponse: []Extent{ mkExtent(50, 120), @@ -466,8 +466,8 @@ func TestPartition(t *testing.T) { }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 120, - End: 160, + Start: time.UnixMilli(120), + End: time.UnixMilli(160), }, }, expectedCachedResponse: []Response{ @@ -478,8 +478,8 @@ func TestPartition(t *testing.T) { { name: "Partial hits with tiny gap.", input: &PrometheusRequest{ - Start: 100, - End: 160, + Start: time.UnixMilli(100), + End: time.UnixMilli(160), }, prevCachedResponse: []Extent{ mkExtent(50, 120), @@ -487,8 +487,8 @@ func TestPartition(t *testing.T) { }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 120, - End: 160, + Start: time.UnixMilli(120), + End: time.UnixMilli(160), }, }, expectedCachedResponse: []Response{ @@ -498,16 +498,16 @@ func TestPartition(t *testing.T) { { name: "Extent is outside the range and the request has a single step (same start and end).", input: &PrometheusRequest{ - Start: 100, - End: 100, + Start: time.UnixMilli(100), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(50, 90), }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 100, - End: 100, + Start: time.UnixMilli(100), + End: time.UnixMilli(100), }, }, }, @@ -515,8 +515,8 @@ func TestPartition(t *testing.T) { name: "Test when hit has a large step and only a single sample extent.", // If there is a only a single sample in the split interval, start and end will be the same. input: &PrometheusRequest{ - Start: 100, - End: 100, + Start: time.UnixMilli(100), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(100, 100), @@ -549,8 +549,8 @@ func TestHandleHit(t *testing.T) { { name: "Should drop tiny extent that overlaps with non-tiny request only", input: &PrometheusRequest{ - Start: 100, - End: 120, + Start: time.UnixMilli(100), + End: time.UnixMilli(120), Step: 5, }, cachedEntry: []Extent{ @@ -570,8 +570,8 @@ func TestHandleHit(t *testing.T) { { name: "Should replace tiny extents that are cover by bigger request", input: &PrometheusRequest{ - Start: 100, - End: 200, + Start: time.UnixMilli(100), + End: time.UnixMilli(200), Step: 5, }, cachedEntry: []Extent{ @@ -594,8 +594,8 @@ func TestHandleHit(t *testing.T) { { name: "Should not drop tiny extent that completely overlaps with tiny request", input: &PrometheusRequest{ - Start: 100, - End: 105, + Start: time.UnixMilli(100), + End: time.UnixMilli(105), Step: 5, }, cachedEntry: []Extent{ @@ -609,8 +609,8 @@ func TestHandleHit(t *testing.T) { { name: "Should not drop tiny extent that partially center-overlaps with tiny request", input: &PrometheusRequest{ - Start: 106, - End: 108, + Start: time.UnixMilli(106), + End: time.UnixMilli(108), Step: 2, }, cachedEntry: []Extent{ @@ -623,8 +623,8 @@ func TestHandleHit(t *testing.T) { { name: "Should not drop tiny extent that partially left-overlaps with tiny request", input: &PrometheusRequest{ - Start: 100, - End: 106, + Start: time.UnixMilli(100), + End: time.UnixMilli(106), Step: 2, }, cachedEntry: []Extent{ @@ -641,8 +641,8 @@ func TestHandleHit(t *testing.T) { { name: "Should not drop tiny extent that partially right-overlaps with tiny request", input: &PrometheusRequest{ - Start: 100, - End: 106, + Start: time.UnixMilli(100), + End: time.UnixMilli(106), Step: 2, }, cachedEntry: []Extent{ @@ -659,8 +659,8 @@ func TestHandleHit(t *testing.T) { { name: "Should merge fragmented extents if request fills the hole", input: &PrometheusRequest{ - Start: 40, - End: 80, + Start: time.UnixMilli(40), + End: time.UnixMilli(80), Step: 20, }, cachedEntry: []Extent{ @@ -674,8 +674,8 @@ func TestHandleHit(t *testing.T) { { name: "Should left-extend extent if request starts earlier than extent in cache", input: &PrometheusRequest{ - Start: 40, - End: 80, + Start: time.UnixMilli(40), + End: time.UnixMilli(80), Step: 20, }, cachedEntry: []Extent{ @@ -688,8 +688,8 @@ func TestHandleHit(t *testing.T) { { name: "Should right-extend extent if request ends later than extent in cache", input: &PrometheusRequest{ - Start: 100, - End: 180, + Start: time.UnixMilli(100), + End: time.UnixMilli(180), Step: 20, }, cachedEntry: []Extent{ @@ -704,8 +704,8 @@ func TestHandleHit(t *testing.T) { input: &PrometheusRequest{ // This request is carefully crated such that cachedEntry is not used to fulfill // the request. - Start: 160, - End: 180, + Start: time.UnixMilli(160), + End: time.UnixMilli(180), Step: 20, }, cachedEntry: []Extent{ @@ -733,7 +733,7 @@ func TestHandleHit(t *testing.T) { merger: PrometheusCodec, parallelismForReq: func(_ context.Context, tenantIDs []string, r Request) int { return 1 }, next: HandlerFunc(func(_ context.Context, req Request) (Response, error) { - return mkAPIResponse(req.GetStart(), req.GetEnd(), req.GetStep()), nil + return mkAPIResponse(req.GetStart().UnixMilli(), req.GetEnd().UnixMilli(), req.GetStep()), nil }), } @@ -741,7 +741,7 @@ func TestHandleHit(t *testing.T) { response, updatedExtents, err := sut.handleHit(ctx, tc.input, tc.cachedEntry, 0) require.NoError(t, err) - expectedResponse := mkAPIResponse(tc.input.GetStart(), tc.input.GetEnd(), tc.input.GetStep()) + expectedResponse := mkAPIResponse(tc.input.GetStart().UnixMilli(), tc.input.GetEnd().UnixMilli(), tc.input.GetStep()) require.Equal(t, expectedResponse, response, "response does not match the expectation") require.Equal(t, tc.expectedUpdatedCachedEntry, updatedExtents, "updated cache entry does not match the expectation") }) @@ -791,7 +791,7 @@ func TestResultsCache(t *testing.T) { require.Equal(t, parsedResponse, resp) // Doing request with new end time should do one more query. - req := parsedRequest.WithStartEnd(parsedRequest.GetStart(), parsedRequest.GetEnd()+100) + req := parsedRequest.WithStartEnd(parsedRequest.GetStart(), parsedRequest.GetEnd().Add(100*time.Millisecond)) _, err = rc.Do(ctx, req) require.NoError(t, err) require.Equal(t, 2, calls) @@ -820,7 +820,7 @@ func TestResultsCacheRecent(t *testing.T) { ) require.NoError(t, err) - req := parsedRequest.WithStartEnd(int64(model.Now())-(60*1e3), int64(model.Now())) + req := parsedRequest.WithStartEnd(time.Now().Add(-60*1e3*time.Millisecond), time.Now()) calls := 0 rc := rcm.Wrap(HandlerFunc(func(_ context.Context, r Request) (Response, error) { @@ -893,7 +893,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") // create request with start end within the key extents - req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3)) + req := parsedRequest.WithStartEnd(time.UnixMilli(int64(modelNow)-(50*1e3)), time.UnixMilli(int64(modelNow)-(10*1e3))) // fill cache key := constSplitter(day).GenerateCacheKey(context.Background(), "1", req) @@ -972,14 +972,14 @@ func TestConstSplitter_generateCacheKey(t *testing.T) { interval time.Duration want string }{ - {"0", &PrometheusRequest{Start: 0, Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"}, - {"<30m", &PrometheusRequest{Start: toMs(10 * time.Minute), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"}, - {"30m", &PrometheusRequest{Start: toMs(30 * time.Minute), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:1"}, - {"91m", &PrometheusRequest{Start: toMs(91 * time.Minute), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:3"}, - {"0", &PrometheusRequest{Start: 0, Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"}, - {"<1d", &PrometheusRequest{Start: toMs(22 * time.Hour), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"}, - {"4d", &PrometheusRequest{Start: toMs(4 * 24 * time.Hour), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:4"}, - {"3d5h", &PrometheusRequest{Start: toMs(77 * time.Hour), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:3"}, + {"0", &PrometheusRequest{Start: time.UnixMilli(0), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"}, + {"<30m", &PrometheusRequest{Start: time.UnixMilli(toMs(10 * time.Minute)), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"}, + {"30m", &PrometheusRequest{Start: time.UnixMilli(toMs(30 * time.Minute)), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:1"}, + {"91m", &PrometheusRequest{Start: time.UnixMilli(toMs(91 * time.Minute)), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:3"}, + {"0", &PrometheusRequest{Start: time.UnixMilli(0), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"}, + {"<1d", &PrometheusRequest{Start: time.UnixMilli(toMs(22 * time.Hour)), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"}, + {"4d", &PrometheusRequest{Start: time.UnixMilli(toMs(4 * 24 * time.Hour)), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:4"}, + {"3d5h", &PrometheusRequest{Start: time.UnixMilli(toMs(77 * time.Hour)), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:3"}, } for _, tt := range tests { t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) { diff --git a/pkg/querier/queryrange/queryrangebase/step_align.go b/pkg/querier/queryrange/queryrangebase/step_align.go index edef2d6f08..7f61d41837 100644 --- a/pkg/querier/queryrange/queryrangebase/step_align.go +++ b/pkg/querier/queryrange/queryrangebase/step_align.go @@ -2,6 +2,7 @@ package queryrangebase import ( "context" + "time" ) // StepAlignMiddleware aligns the start and end of request to the step to @@ -17,7 +18,7 @@ type stepAlign struct { } func (s stepAlign) Do(ctx context.Context, r Request) (Response, error) { - start := (r.GetStart() / r.GetStep()) * r.GetStep() - end := (r.GetEnd() / r.GetStep()) * r.GetStep() - return s.next.Do(ctx, r.WithStartEnd(start, end)) + start := (r.GetStart().UnixMilli() / r.GetStep()) * r.GetStep() + end := (r.GetEnd().UnixMilli() / r.GetStep()) * r.GetStep() + return s.next.Do(ctx, r.WithStartEnd(time.UnixMilli(start), time.UnixMilli(end))) } diff --git a/pkg/querier/queryrange/queryrangebase/step_align_test.go b/pkg/querier/queryrange/queryrangebase/step_align_test.go index d68a2080d4..cf1468e7c2 100644 --- a/pkg/querier/queryrange/queryrangebase/step_align_test.go +++ b/pkg/querier/queryrange/queryrangebase/step_align_test.go @@ -4,6 +4,7 @@ import ( "context" "strconv" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -14,26 +15,26 @@ func TestStepAlign(t *testing.T) { }{ { input: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), Step: 10, }, expected: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), Step: 10, }, }, { input: &PrometheusRequest{ - Start: 2, - End: 102, + Start: time.UnixMilli(2), + End: time.UnixMilli(102), Step: 10, }, expected: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), Step: 10, }, }, diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index b2af68b55b..143174439c 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -149,7 +149,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que return ast.next.Do(ctx, r) } - conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd()).Add(-maxOffset))) + conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset))) // cannot shard with this timerange if err != nil { level.Warn(logger).Log("err", err.Error(), "msg", "skipped AST mapper for request") @@ -173,7 +173,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que conf, ast.ng.Opts().MaxLookBackPeriod, ast.logger, - MinWeightedParallelism(ctx, tenants, ast.confs, ast.limits, model.Time(r.GetStart()), model.Time(r.GetEnd())), + MinWeightedParallelism(ctx, tenants, ast.confs, ast.limits, model.Time(r.GetStart().UnixMilli()), model.Time(r.GetEnd().UnixMilli())), ast.maxShards, r, ast.statsHandler, @@ -303,7 +303,7 @@ func (splitter *shardSplitter) Do(ctx context.Context, r queryrangebase.Request) cutoff := splitter.now().Add(-minShardingLookback) // Only attempt to shard queries which are older than the sharding lookback // (the period for which ingesters are also queried) or when the lookback is disabled. - if minShardingLookback == 0 || util.TimeFromMillis(r.GetEnd()).Before(cutoff) { + if minShardingLookback == 0 || util.TimeFromMillis(r.GetEnd().UnixMilli()).Before(cutoff) { return splitter.shardingware.Do(ctx, r) } return splitter.next.Do(ctx, r) @@ -400,7 +400,7 @@ type seriesShardingHandler struct { } func (ss *seriesShardingHandler) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - conf, err := ss.confs.GetConf(r.GetStart(), r.GetEnd()) + conf, err := ss.confs.GetConf(r.GetStart().UnixMilli(), r.GetEnd().UnixMilli()) // cannot shard with this timerange if err != nil { level.Warn(ss.logger).Log("err", err.Error(), "msg", "skipped sharding for request") @@ -433,7 +433,7 @@ func (ss *seriesShardingHandler) Do(ctx context.Context, r queryrangebase.Reques ctx, ss.next, requests, - MinWeightedParallelism(ctx, tenantIDs, ss.confs, ss.limits, model.Time(req.GetStart()), model.Time(req.GetEnd())), + MinWeightedParallelism(ctx, tenantIDs, ss.confs, ss.limits, model.Time(req.GetStart().UnixMilli()), model.Time(req.GetEnd().UnixMilli())), ) if err != nil { return nil, err diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index e3e83f967a..79d789b1ac 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -85,10 +85,7 @@ var ( ) func Test_shardSplitter(t *testing.T) { - req := defaultReq().WithStartEnd( - util.TimeToMillis(start), - util.TimeToMillis(end), - ) + req := defaultReq().WithStartEnd(start, end) for _, tc := range []struct { desc string @@ -812,7 +809,7 @@ func Test_ASTMapper_MaxLookBackPeriod(t *testing.T) { statsHandler := queryrangebase.HandlerFunc(func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { // This is the actual check that we're testing. - require.Equal(t, testTime.Add(-engineOpts.MaxLookBackPeriod).UnixMilli(), req.GetStart()) + require.Equal(t, testTime.Add(-engineOpts.MaxLookBackPeriod).UnixMilli(), req.GetStart().UnixMilli()) return &IndexStatsResponse{ Response: &logproto.IndexStatsResponse{ diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 9c409d14a5..91c098dd93 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -605,8 +605,8 @@ func NewMetricTripperware( tenantIDs, schema.Configs, limits, - model.Time(r.GetStart()), - model.Time(r.GetEnd()), + model.Time(r.GetStart().UnixMilli()), + model.Time(r.GetEnd().UnixMilli()), ) }, retentionEnabled, @@ -775,8 +775,8 @@ func NewVolumeTripperware( tenantIDs, schema.Configs, limits, - model.Time(r.GetStart()), - model.Time(r.GetEnd()), + model.Time(r.GetStart().UnixMilli()), + model.Time(r.GetEnd().UnixMilli()), ) }, retentionEnabled, @@ -876,8 +876,8 @@ func NewIndexStatsTripperware( tenantIDs, schema.Configs, limits, - model.Time(r.GetStart()), - model.Time(r.GetEnd()), + model.Time(r.GetStart().UnixMilli()), + model.Time(r.GetEnd().UnixMilli()), ) }, retentionEnabled, diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index 5cf2c83ae6..aed0e96e2b 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -43,8 +43,8 @@ func shardResolverForConf( logger: logger, handler: handler, limits: limits, - from: model.Time(r.GetStart()), - through: model.Time(r.GetEnd()), + from: model.Time(r.GetStart().UnixMilli()), + through: model.Time(r.GetEnd().UnixMilli()), maxParallelism: maxParallelism, maxShards: maxShards, defaultLookback: defaultLookback, diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 69bca0a4e6..97045066f4 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -230,7 +230,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que maxSeriesCapture := func(id string) int { return h.limits.MaxQuerySeries(ctx, id) } maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) - maxParallelism := MinWeightedParallelism(ctx, tenantIDs, h.configs, h.limits, model.Time(r.GetStart()), model.Time(r.GetEnd())) + maxParallelism := MinWeightedParallelism(ctx, tenantIDs, h.configs, h.limits, model.Time(r.GetStart().UnixMilli()), model.Time(r.GetEnd().UnixMilli())) resps, err := h.Process(ctx, maxParallelism, limit, input, maxSeries) if err != nil { return nil, err @@ -276,8 +276,8 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran reqs = append(reqs, NewLabelRequest(start, end, r.Query, r.Name, r.Path())) }) case *logproto.IndexStatsRequest: - startTS := model.Time(r.GetStart()).Time() - endTS := model.Time(r.GetEnd()).Time() + startTS := r.GetStart() + endTS := r.GetEnd() util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) { reqs = append(reqs, &logproto.IndexStatsRequest{ From: model.TimeFromUnix(start.Unix()), @@ -286,8 +286,8 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran }) }) case *logproto.VolumeRequest: - startTS := model.Time(r.GetStart()).Time() - endTS := model.Time(r.GetEnd()).Time() + startTS := r.GetStart() + endTS := r.GetEnd() util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) { reqs = append(reqs, &logproto.VolumeRequest{ From: model.TimeFromUnix(start.Unix()), @@ -363,7 +363,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer } end := time.Unix(0, endNs) - lokiReq = lokiReq.WithStartEnd(util.TimeToMillis(start), util.TimeToMillis(end)).(*LokiRequest) + lokiReq = lokiReq.WithStartEnd(start, end).(*LokiRequest) // step is >= configured split interval, let us just split the query interval by step if lokiReq.Step >= interval.Milliseconds() { diff --git a/pkg/querier/queryrange/volume_cache.go b/pkg/querier/queryrange/volume_cache.go index 5fa84840f7..0c54745654 100644 --- a/pkg/querier/queryrange/volume_cache.go +++ b/pkg/querier/queryrange/volume_cache.go @@ -91,7 +91,7 @@ func shouldCacheVolume(ctx context.Context, req queryrangebase.Request, lim Limi maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) now := volumeCacheMiddlewareNowTimeFunc() - return maxCacheFreshness == 0 || model.Time(req.GetEnd()).Before(now.Add(-maxCacheFreshness)), nil + return maxCacheFreshness == 0 || model.Time(req.GetEnd().UnixMilli()).Before(now.Add(-maxCacheFreshness)), nil } func NewVolumeCacheMiddleware( diff --git a/pkg/querier/queryrange/volume_cache_test.go b/pkg/querier/queryrange/volume_cache_test.go index f1e3a30d43..009302783b 100644 --- a/pkg/querier/queryrange/volume_cache_test.go +++ b/pkg/querier/queryrange/volume_cache_test.go @@ -116,7 +116,7 @@ func TestVolumeCache(t *testing.T) { // The new start time is 15m (i.e. 25%) in the future with regard to the previous request time span. *calls = 0 - req := volReq.WithStartEnd(volReq.GetStart()+(15*time.Minute).Milliseconds(), volReq.GetEnd()+(15*time.Minute).Milliseconds()) + req := volReq.WithStartEnd(volReq.GetStart().Add(15*time.Minute), volReq.GetEnd().Add(15*time.Minute)) vol := float64(0.75) expectedVol := &VolumeResponse{ Response: &logproto.VolumeResponse{