diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index e195af9687..8e3d24df80 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -232,13 +232,13 @@ func MergeSeriesResponses(responses []*SeriesResponse) (*SeriesResponse, error) // Satisfy definitions.Request // GetStart returns the start timestamp of the request in milliseconds. -func (m *IndexStatsRequest) GetStart() int64 { - return int64(m.From) +func (m *IndexStatsRequest) GetStart() time.Time { + return time.Unix(0, m.From.UnixNano()) } // GetEnd returns the end timestamp of the request in milliseconds. -func (m *IndexStatsRequest) GetEnd() int64 { - return int64(m.Through) +func (m *IndexStatsRequest) GetEnd() time.Time { + return time.Unix(0, m.Through.UnixNano()) } // GetStep returns the step of the request in milliseconds. @@ -253,10 +253,10 @@ func (m *IndexStatsRequest) GetQuery() string { func (m *IndexStatsRequest) GetCachingOptions() (res definitions.CachingOptions) { return } // WithStartEnd clone the current request with different start and end timestamp. -func (m *IndexStatsRequest) WithStartEnd(startTime int64, endTime int64) definitions.Request { +func (m *IndexStatsRequest) WithStartEnd(start, end time.Time) definitions.Request { clone := *m - clone.From = model.TimeFromUnixNano(startTime * int64(time.Millisecond)) - clone.Through = model.TimeFromUnixNano(endTime * int64(time.Millisecond)) + clone.From = model.TimeFromUnixNano(start.UnixNano()) + clone.Through = model.TimeFromUnixNano(end.UnixNano()) return &clone } @@ -271,21 +271,21 @@ func (m *IndexStatsRequest) WithQuery(query string) definitions.Request { func (m *IndexStatsRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", m.GetQuery()), - otlog.String("start", timestamp.Time(m.GetStart()).String()), - otlog.String("end", timestamp.Time(m.GetEnd()).String()), + otlog.String("start", timestamp.Time(int64(m.From)).String()), + otlog.String("end", timestamp.Time(int64(m.Through)).String()), ) } // Satisfy definitions.Request for Volume // GetStart returns the start timestamp of the request in milliseconds. -func (m *VolumeRequest) GetStart() int64 { - return int64(m.From) +func (m *VolumeRequest) GetStart() time.Time { + return time.UnixMilli(int64(m.From)) } // GetEnd returns the end timestamp of the request in milliseconds. -func (m *VolumeRequest) GetEnd() int64 { - return int64(m.Through) +func (m *VolumeRequest) GetEnd() time.Time { + return time.UnixMilli(int64(m.Through)) } // GetQuery returns the query of the request. @@ -297,10 +297,10 @@ func (m *VolumeRequest) GetQuery() string { func (m *VolumeRequest) GetCachingOptions() (res definitions.CachingOptions) { return } // WithStartEnd clone the current request with different start and end timestamp. -func (m *VolumeRequest) WithStartEnd(startTime int64, endTime int64) definitions.Request { +func (m *VolumeRequest) WithStartEnd(start, end time.Time) definitions.Request { clone := *m - clone.From = model.TimeFromUnixNano(startTime * int64(time.Millisecond)) - clone.Through = model.TimeFromUnixNano(endTime * int64(time.Millisecond)) + clone.From = model.TimeFromUnixNano(start.UnixNano()) + clone.Through = model.TimeFromUnixNano(end.UnixNano()) return &clone } @@ -315,8 +315,8 @@ func (m *VolumeRequest) WithQuery(query string) definitions.Request { func (m *VolumeRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", m.GetQuery()), - otlog.String("start", timestamp.Time(m.GetStart()).String()), - otlog.String("end", timestamp.Time(m.GetEnd()).String()), + otlog.String("start", timestamp.Time(int64(m.From)).String()), + otlog.String("end", timestamp.Time(int64(m.Through)).String()), ) } diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 03559b2e2f..b24a46146b 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -46,18 +46,18 @@ type RequestProtobufCodec struct { Codec } -func (r *LokiRequest) GetEnd() int64 { - return r.EndTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiRequest) GetEnd() time.Time { + return r.EndTs } -func (r *LokiRequest) GetStart() int64 { - return r.StartTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiRequest) GetStart() time.Time { + return r.StartTs } -func (r *LokiRequest) WithStartEnd(s int64, e int64) queryrangebase.Request { +func (r *LokiRequest) WithStartEnd(s time.Time, e time.Time) queryrangebase.Request { clone := *r - clone.StartTs = time.Unix(0, s*int64(time.Millisecond)) - clone.EndTs = time.Unix(0, e*int64(time.Millisecond)) + clone.StartTs = s + clone.EndTs = e return &clone } @@ -83,8 +83,8 @@ func (r *LokiRequest) WithShards(shards logql.Shards) *LokiRequest { func (r *LokiRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", r.GetQuery()), - otlog.String("start", timestamp.Time(r.GetStart()).String()), - otlog.String("end", timestamp.Time(r.GetEnd()).String()), + otlog.String("start", timestamp.Time(r.GetStart().UnixNano()).String()), + otlog.String("end", timestamp.Time(r.GetEnd().UnixNano()).String()), otlog.Int64("step (ms)", r.GetStep()), otlog.Int64("interval (ms)", r.GetInterval()), otlog.Int64("limit", int64(r.GetLimit())), @@ -99,17 +99,17 @@ func (r *LokiInstantRequest) GetStep() int64 { return 0 } -func (r *LokiInstantRequest) GetEnd() int64 { - return r.TimeTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiInstantRequest) GetEnd() time.Time { + return r.TimeTs } -func (r *LokiInstantRequest) GetStart() int64 { - return r.TimeTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiInstantRequest) GetStart() time.Time { + return r.TimeTs } -func (r *LokiInstantRequest) WithStartEnd(s int64, _ int64) queryrangebase.Request { +func (r *LokiInstantRequest) WithStartEnd(s time.Time, _ time.Time) queryrangebase.Request { clone := *r - clone.TimeTs = time.Unix(0, s*int64(time.Millisecond)) + clone.TimeTs = s return &clone } @@ -128,7 +128,7 @@ func (r *LokiInstantRequest) WithShards(shards logql.Shards) *LokiInstantRequest func (r *LokiInstantRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", r.GetQuery()), - otlog.String("ts", timestamp.Time(r.GetStart()).String()), + otlog.String("ts", timestamp.Time(r.GetStart().UnixMilli()).String()), otlog.Int64("limit", int64(r.GetLimit())), otlog.String("direction", r.GetDirection().String()), otlog.String("shards", strings.Join(r.GetShards(), ",")), @@ -137,18 +137,18 @@ func (r *LokiInstantRequest) LogToSpan(sp opentracing.Span) { func (*LokiInstantRequest) GetCachingOptions() (res queryrangebase.CachingOptions) { return } -func (r *LokiSeriesRequest) GetEnd() int64 { - return r.EndTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiSeriesRequest) GetEnd() time.Time { + return r.EndTs } -func (r *LokiSeriesRequest) GetStart() int64 { - return r.StartTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LokiSeriesRequest) GetStart() time.Time { + return r.StartTs } -func (r *LokiSeriesRequest) WithStartEnd(s int64, e int64) queryrangebase.Request { +func (r *LokiSeriesRequest) WithStartEnd(s, e time.Time) queryrangebase.Request { clone := *r - clone.StartTs = time.Unix(0, s*int64(time.Millisecond)) - clone.EndTs = time.Unix(0, e*int64(time.Millisecond)) + clone.StartTs = s + clone.EndTs = e return &clone } @@ -168,8 +168,8 @@ func (r *LokiSeriesRequest) GetStep() int64 { func (r *LokiSeriesRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("matchers", strings.Join(r.GetMatch(), ",")), - otlog.String("start", timestamp.Time(r.GetStart()).String()), - otlog.String("end", timestamp.Time(r.GetEnd()).String()), + otlog.String("start", timestamp.Time(r.GetStart().UnixNano()).String()), + otlog.String("end", timestamp.Time(r.GetEnd().UnixNano()).String()), otlog.String("shards", strings.Join(r.GetShards(), ",")), ) } @@ -199,16 +199,16 @@ func (r *LabelRequest) AsProto() *logproto.LabelRequest { return &r.LabelRequest } -func (r *LabelRequest) GetEnd() int64 { - return r.End.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LabelRequest) GetEnd() time.Time { + return *r.End } func (r *LabelRequest) GetEndTs() time.Time { return *r.End } -func (r *LabelRequest) GetStart() int64 { - return r.Start.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +func (r *LabelRequest) GetStart() time.Time { + return *r.Start } func (r *LabelRequest) GetStartTs() time.Time { @@ -219,11 +219,11 @@ func (r *LabelRequest) GetStep() int64 { return 0 } -func (r *LabelRequest) WithStartEnd(s int64, e int64) queryrangebase.Request { +func (r *LabelRequest) WithStartEnd(s, e time.Time) queryrangebase.Request { clone := *r - tmp := time.Unix(0, s*int64(time.Millisecond)) + tmp := s clone.Start = &tmp - tmp = time.Unix(0, e*int64(time.Millisecond)) + tmp = e clone.End = &tmp return &clone } @@ -236,8 +236,8 @@ func (r *LabelRequest) WithQuery(query string) queryrangebase.Request { func (r *LabelRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( - otlog.String("start", timestamp.Time(r.GetStart()).String()), - otlog.String("end", timestamp.Time(r.GetEnd()).String()), + otlog.String("start", timestamp.Time(r.GetStart().UnixNano()).String()), + otlog.String("end", timestamp.Time(r.GetEnd().UnixNano()).String()), ) } @@ -504,7 +504,7 @@ func (Codec) DecodeHTTPGrpcResponse(r *httpgrpc.HTTPResponse, req queryrangebase return decodeResponseJSONFrom(r.Body, req, headers) } -func (Codec) EncodeHTTPGrpcResponse(ctx context.Context, req *httpgrpc.HTTPRequest, res queryrangebase.Response) (*httpgrpc.HTTPResponse, error) { +func (Codec) EncodeHTTPGrpcResponse(_ context.Context, req *httpgrpc.HTTPRequest, res queryrangebase.Response) (*httpgrpc.HTTPResponse, error) { version := loghttp.GetVersion(req.Url) var buf bytes.Buffer @@ -1341,8 +1341,8 @@ func ParamsFromRequest(req queryrangebase.Request) (logql.Params, error) { Query: r.GetQuery(), Limit: uint32(r.GetLimit()), Step: r.GetStep(), - StartTs: time.UnixMilli(r.GetStart()), - EndTs: time.UnixMilli(r.GetEnd()), + StartTs: time.UnixMilli(r.GetStart().UnixNano()), + EndTs: time.UnixMilli(r.GetEnd().UnixNano()), }, }, nil case *LokiInstantRequest: diff --git a/pkg/querier/queryrange/index_stats_cache.go b/pkg/querier/queryrange/index_stats_cache.go index 27d40db2da..4814394fd4 100644 --- a/pkg/querier/queryrange/index_stats_cache.go +++ b/pkg/querier/queryrange/index_stats_cache.go @@ -83,7 +83,7 @@ func shouldCacheStats(ctx context.Context, req queryrangebase.Request, lim Limit maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) now := statsCacheMiddlewareNowTimeFunc() - return maxCacheFreshness == 0 || model.Time(req.GetEnd()).Before(now.Add(-maxCacheFreshness)), nil + return maxCacheFreshness == 0 || model.Time(req.GetEnd().UnixMilli()).Before(now.Add(-maxCacheFreshness)), nil } func NewIndexStatsCacheMiddleware( diff --git a/pkg/querier/queryrange/index_stats_cache_test.go b/pkg/querier/queryrange/index_stats_cache_test.go index 3024ba60da..8c154c36a2 100644 --- a/pkg/querier/queryrange/index_stats_cache_test.go +++ b/pkg/querier/queryrange/index_stats_cache_test.go @@ -78,7 +78,7 @@ func TestIndexStatsCache(t *testing.T) { // should reuse part of the previous request and issue a new request for the remaining time till end. // The new start time is 15m (i.e. 25%) in the future with regard to the previous request time span. *calls = 0 - req := statsReq.WithStartEnd(statsReq.GetStart()+(15*time.Minute).Milliseconds(), statsReq.GetEnd()+(15*time.Minute).Milliseconds()) + req := statsReq.WithStartEnd(statsReq.GetStart().Add(15*time.Minute), statsReq.GetEnd().Add(15*time.Minute)) expectedStats := &IndexStatsResponse{ Response: &logproto.IndexStatsResponse{ Streams: 2, diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index ddf38d30cd..82bef4bf95 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -29,7 +29,6 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index/stats" - "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" "github.com/grafana/loki/pkg/util/validation" @@ -109,7 +108,7 @@ func (l cacheKeyLimits) GenerateCacheKey(ctx context.Context, userID string, r q var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { - currentInterval = r.GetStart() / denominator + currentInterval = r.GetStart().UnixMilli() / denominator } if l.transformer != nil { @@ -150,26 +149,26 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que // Clamp the time range based on the max query lookback. lookbackCapture := func(id string) time.Duration { return l.MaxQueryLookback(ctx, id) } if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, lookbackCapture); maxQueryLookback > 0 { - minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback)) + minStartTime := time.Now().Add(-maxQueryLookback) - if r.GetEnd() < minStartTime { + if r.GetEnd().Before(minStartTime) { // The request is fully outside the allowed range, so we can return an // empty response. level.Debug(log).Log( "msg", "skipping the execution of the query because its time range is before the 'max query lookback' setting", - "reqStart", util.FormatTimeMillis(r.GetStart()), - "redEnd", util.FormatTimeMillis(r.GetEnd()), + "reqStart", r.GetStart().String(), + "redEnd", r.GetEnd().String(), "maxQueryLookback", maxQueryLookback) return NewEmptyResponse(r) } - if r.GetStart() < minStartTime { + if r.GetStart().Before(minStartTime) { // Replace the start time in the request. level.Debug(log).Log( "msg", "the start time of the query has been manipulated because of the 'max query lookback' setting", - "original", util.FormatTimeMillis(r.GetStart()), - "updated", util.FormatTimeMillis(minStartTime)) + "original", r.GetStart().String(), + "updated", minStartTime.String()) r = r.WithStartEnd(minStartTime, r.GetEnd()) } @@ -178,7 +177,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que // Enforce the max query length. lengthCapture := func(id string) time.Duration { return l.MaxQueryLength(ctx, id) } if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, lengthCapture); maxQueryLength > 0 { - queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart())) + queryLen := timestamp.Time(r.GetEnd().UnixMilli()).Sub(timestamp.Time(r.GetStart().UnixMilli())) if queryLen > maxQueryLength { return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, model.Duration(maxQueryLength)) } @@ -281,7 +280,7 @@ func (q *querySizeLimiter) getBytesReadForRequest(ctx context.Context, r queryra // TODO: Set concurrency dynamically as in shardResolverForConf? start := time.Now() const maxConcurrentIndexReq = 10 - matcherStats, err := getStatsForMatchers(ctx, q.logger, q.statsHandler, model.Time(r.GetStart()), model.Time(r.GetEnd()), matcherGroups, maxConcurrentIndexReq, q.maxLookBackPeriod) + matcherStats, err := getStatsForMatchers(ctx, q.logger, q.statsHandler, model.Time(r.GetStart().UnixMilli()), model.Time(r.GetEnd().UnixMilli()), matcherGroups, maxConcurrentIndexReq, q.maxLookBackPeriod) if err != nil { return 0, err } @@ -309,8 +308,8 @@ func (q *querySizeLimiter) getSchemaCfg(r queryrangebase.Request) (config.Period return config.PeriodConfig{}, errors.New("failed to get range-vector and offset duration: " + err.Error()) } - adjustedStart := int64(model.Time(r.GetStart()).Add(-maxRVDuration).Add(-maxOffset)) - adjustedEnd := int64(model.Time(r.GetEnd()).Add(-maxOffset)) + adjustedStart := int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)) + adjustedEnd := int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset)) return ShardingConfigs(q.cfg).ValidRange(adjustedStart, adjustedEnd) } @@ -474,8 +473,8 @@ func (rt limitedRoundTripper) Do(c context.Context, request queryrangebase.Reque tenantIDs, rt.configs, rt.limits, - model.Time(request.GetStart()), - model.Time(request.GetEnd()), + model.Time(request.GetStart().UnixMilli()), + model.Time(request.GetEnd().UnixMilli()), ) if parallelism < 1 { diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 02c3862dd4..4ab81ec4ac 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -43,7 +43,7 @@ func TestLimits(t *testing.T) { require.Equal( t, - fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Hour/time.Millisecond), int64(time.Hour)), + fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart().UnixMilli()/int64(time.Hour/time.Millisecond), int64(time.Hour)), cacheKeyLimits{wrapped, nil}.GenerateCacheKey(context.Background(), "a", r), ) } @@ -580,7 +580,7 @@ func Test_MaxQuerySize_MaxLookBackPeriod(t *testing.T) { statsHandler := base.HandlerFunc(func(_ context.Context, req base.Request) (base.Response, error) { // This is the actual check that we're testing. - require.Equal(t, testTime.Add(-engineOpts.MaxLookBackPeriod).UnixMilli(), req.GetStart()) + require.Equal(t, testTime.Add(-engineOpts.MaxLookBackPeriod).UnixMilli(), req.GetStart().UnixMilli()) return &IndexStatsResponse{ Response: &logproto.IndexStatsResponse{ diff --git a/pkg/querier/queryrange/log_result_cache.go b/pkg/querier/queryrange/log_result_cache.go index 3f0555709b..a83041a94c 100644 --- a/pkg/querier/queryrange/log_result_cache.go +++ b/pkg/querier/queryrange/log_result_cache.go @@ -95,7 +95,7 @@ func (l *logResultCache) Do(ctx context.Context, req queryrangebase.Request) (qu cacheFreshnessCapture := func(id string) time.Duration { return l.limits.MaxCacheFreshness(ctx, id) } maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) maxCacheTime := int64(model.Now().Add(-maxCacheFreshness)) - if req.GetEnd() > maxCacheTime { + if req.GetEnd().UnixMilli() > maxCacheTime { return l.next.Do(ctx, req) } diff --git a/pkg/querier/queryrange/queryrangebase/definitions/interface.go b/pkg/querier/queryrange/queryrangebase/definitions/interface.go index dbb350a9c7..fb385817b5 100644 --- a/pkg/querier/queryrange/queryrangebase/definitions/interface.go +++ b/pkg/querier/queryrange/queryrangebase/definitions/interface.go @@ -3,6 +3,7 @@ package definitions import ( "context" "net/http" + "time" "github.com/gogo/protobuf/proto" "github.com/opentracing/opentracing-go" @@ -32,9 +33,9 @@ type Merger interface { // Request represents a query range request that can be process by middlewares. type Request interface { // GetStart returns the start timestamp of the request in milliseconds. - GetStart() int64 + GetStart() time.Time // GetEnd returns the end timestamp of the request in milliseconds. - GetEnd() int64 + GetEnd() time.Time // GetStep returns the step of the request in milliseconds. GetStep() int64 // GetQuery returns the query of the request. @@ -42,7 +43,7 @@ type Request interface { // GetCachingOptions returns the caching options. GetCachingOptions() CachingOptions // WithStartEnd clone the current request with different start and end timestamp. - WithStartEnd(startTime int64, endTime int64) Request + WithStartEnd(start time.Time, end time.Time) Request // WithQuery clone the current request with a different query. WithQuery(string) Request proto.Message diff --git a/pkg/querier/queryrange/queryrangebase/query_range.go b/pkg/querier/queryrange/queryrangebase/query_range.go index 70fe36dfc0..4c7426b171 100644 --- a/pkg/querier/queryrange/queryrangebase/query_range.go +++ b/pkg/querier/queryrange/queryrangebase/query_range.go @@ -7,10 +7,8 @@ import ( "io" "math" "net/http" - "net/url" "sort" "strconv" - "strings" "time" "github.com/gogo/status" @@ -22,7 +20,6 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/spanlogger" ) @@ -41,7 +38,7 @@ var ( errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per time series. Try increasing the value of the step parameter") // PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses. - PrometheusCodec Codec = &prometheusCodec{} + PrometheusCodec = &prometheusCodec{} // Name of the cache control header. cacheControlHeader = "Cache-Control" @@ -50,7 +47,7 @@ var ( type prometheusCodec struct{} // WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp. -func (q *PrometheusRequest) WithStartEnd(start int64, end int64) Request { +func (q *PrometheusRequest) WithStartEnd(start, end time.Time) Request { clone := *q clone.Start = start clone.End = end @@ -68,8 +65,8 @@ func (q *PrometheusRequest) WithQuery(query string) Request { func (q *PrometheusRequest) LogToSpan(sp opentracing.Span) { sp.LogFields( otlog.String("query", q.GetQuery()), - otlog.String("start", timestamp.Time(q.GetStart()).String()), - otlog.String("end", timestamp.Time(q.GetEnd()).String()), + otlog.String("start", timestamp.Time(q.GetStart().UnixMilli()).String()), + otlog.String("end", timestamp.Time(q.GetEnd().UnixMilli()).String()), otlog.Int64("step (ms)", q.GetStep()), ) } @@ -137,95 +134,6 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) { return &response, nil } -func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (Request, error) { - var result PrometheusRequest - var err error - result.Start, err = util.ParseTime(r.FormValue("start")) - if err != nil { - return nil, decorateWithParamName(err, "start") - } - - result.End, err = util.ParseTime(r.FormValue("end")) - if err != nil { - return nil, decorateWithParamName(err, "end") - } - - if result.End < result.Start { - return nil, errEndBeforeStart - } - - result.Step, err = parseDurationMs(r.FormValue("step")) - if err != nil { - return nil, decorateWithParamName(err, "step") - } - - if result.Step <= 0 { - return nil, errNegativeStep - } - - // For safety, limit the number of returned points per timeseries. - // This is sufficient for 60s resolution for a week or 1h resolution for a year. - if (result.End-result.Start)/result.Step > 11000 { - return nil, errStepTooSmall - } - - result.Query = r.FormValue("query") - result.Path = r.URL.Path - - // Include the specified headers from http request in prometheusRequest. - for _, header := range forwardHeaders { - for h, hv := range r.Header { - if strings.EqualFold(h, header) { - result.Headers = append(result.Headers, &PrometheusRequestHeader{Name: h, Values: hv}) - break - } - } - } - - for _, value := range r.Header.Values(cacheControlHeader) { - if strings.Contains(value, noStoreValue) { - result.CachingOptions.Disabled = true - break - } - } - - return &result, nil -} - -func (prometheusCodec) EncodeRequest(ctx context.Context, r Request) (*http.Request, error) { - promReq, ok := r.(*PrometheusRequest) - if !ok { - return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format") - } - params := url.Values{ - "start": []string{encodeTime(promReq.Start)}, - "end": []string{encodeTime(promReq.End)}, - "step": []string{encodeDurationMs(promReq.Step)}, - "query": []string{promReq.Query}, - } - u := &url.URL{ - Path: promReq.Path, - RawQuery: params.Encode(), - } - var h = http.Header{} - - for _, hv := range promReq.Headers { - for _, v := range hv.Values { - h.Add(hv.Name, v) - } - } - - req := &http.Request{ - Method: "GET", - RequestURI: u.String(), // This is what the httpgrpc code looks at. - URL: u, - Body: http.NoBody, - Header: h, - } - - return req.WithContext(ctx), nil -} - func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ Request) (Response, error) { if r.StatusCode/100 != 2 { body, _ := io.ReadAll(r.Body) diff --git a/pkg/querier/queryrange/queryrangebase/query_range_test.go b/pkg/querier/queryrange/queryrangebase/query_range_test.go index ed46ea66ad..21c115eec5 100644 --- a/pkg/querier/queryrange/queryrangebase/query_range_test.go +++ b/pkg/querier/queryrange/queryrangebase/query_range_test.go @@ -8,8 +8,6 @@ import ( "strconv" "testing" - "github.com/grafana/dskit/httpgrpc" - "github.com/grafana/dskit/user" jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,69 +15,6 @@ import ( "github.com/grafana/loki/pkg/logproto" ) -func TestRequest(t *testing.T) { - // Create a Copy parsedRequest to assign the expected headers to the request without affecting other tests using the global. - // The test below adds a Test-Header header to the request and expects it back once the encode/decode of request is done via PrometheusCodec - parsedRequestWithHeaders := *parsedRequest - parsedRequestWithHeaders.Headers = reqHeaders - for i, tc := range []struct { - url string - expected Request - expectedErr error - }{ - { - url: query, - expected: &parsedRequestWithHeaders, - }, - { - url: "api/v1/query_range?start=foo", - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "invalid parameter \"start\"; cannot parse \"foo\" to a valid timestamp"), - }, - { - url: "api/v1/query_range?start=123&end=bar", - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "invalid parameter \"end\"; cannot parse \"bar\" to a valid timestamp"), - }, - { - url: "api/v1/query_range?start=123&end=0", - expectedErr: errEndBeforeStart, - }, - { - url: "api/v1/query_range?start=123&end=456&step=baz", - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, "invalid parameter \"step\"; cannot parse \"baz\" to a valid duration"), - }, - { - url: "api/v1/query_range?start=123&end=456&step=-1", - expectedErr: errNegativeStep, - }, - { - url: "api/v1/query_range?start=0&end=11001&step=1", - expectedErr: errStepTooSmall, - }, - } { - t.Run(strconv.Itoa(i), func(t *testing.T) { - r, err := http.NewRequest("GET", tc.url, nil) - require.NoError(t, err) - r.Header.Add("Test-Header", "test") - - ctx := user.InjectOrgID(context.Background(), "1") - - // Get a deep copy of the request with Context changed to ctx - r = r.Clone(ctx) - - req, err := PrometheusCodec.DecodeRequest(ctx, r, []string{"Test-Header"}) - if err != nil { - require.EqualValues(t, tc.expectedErr, err) - return - } - require.EqualValues(t, tc.expected, req) - - rdash, err := PrometheusCodec.EncodeRequest(context.Background(), req) - require.NoError(t, err) - require.EqualValues(t, tc.url, rdash.RequestURI) - }) - } -} - func TestResponse(t *testing.T) { r := *parsedResponse r.Headers = respHeaders diff --git a/pkg/querier/queryrange/queryrangebase/queryrange.pb.go b/pkg/querier/queryrange/queryrangebase/queryrange.pb.go index da0a14e0fd..121b3ffb15 100644 --- a/pkg/querier/queryrange/queryrangebase/queryrange.pb.go +++ b/pkg/querier/queryrange/queryrangebase/queryrange.pb.go @@ -35,8 +35,8 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type PrometheusRequest struct { Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` - Start int64 `protobuf:"varint,2,opt,name=start,proto3" json:"start,omitempty"` - End int64 `protobuf:"varint,3,opt,name=end,proto3" json:"end,omitempty"` + Start time.Time `protobuf:"bytes,2,opt,name=start,proto3,stdtime" json:"start"` + End time.Time `protobuf:"bytes,3,opt,name=end,proto3,stdtime" json:"end"` Step int64 `protobuf:"varint,4,opt,name=step,proto3" json:"step,omitempty"` Timeout time.Duration `protobuf:"bytes,5,opt,name=timeout,proto3,stdduration" json:"timeout"` Query string `protobuf:"bytes,6,opt,name=query,proto3" json:"query,omitempty"` @@ -83,18 +83,18 @@ func (m *PrometheusRequest) GetPath() string { return "" } -func (m *PrometheusRequest) GetStart() int64 { +func (m *PrometheusRequest) GetStart() time.Time { if m != nil { return m.Start } - return 0 + return time.Time{} } -func (m *PrometheusRequest) GetEnd() int64 { +func (m *PrometheusRequest) GetEnd() time.Time { if m != nil { return m.End } - return 0 + return time.Time{} } func (m *PrometheusRequest) GetStep() int64 { @@ -435,59 +435,60 @@ func init() { } var fileDescriptor_4cc6a0c1d6b614c4 = []byte{ - // 821 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0x4f, 0x6f, 0xdc, 0x44, - 0x14, 0xdf, 0xc9, 0xfe, 0x9f, 0x54, 0xdb, 0x32, 0x8d, 0x8a, 0xd3, 0x22, 0x7b, 0xb5, 0x02, 0x29, - 0x48, 0xe0, 0x15, 0x41, 0x70, 0x2b, 0x22, 0x4e, 0x82, 0x68, 0x55, 0x89, 0x6a, 0xc2, 0x89, 0x0b, - 0x9a, 0x5d, 0xbf, 0x38, 0x56, 0xfc, 0xaf, 0x33, 0xe3, 0x8a, 0xbd, 0x71, 0xe2, 0xcc, 0x91, 0x8f, - 0xc0, 0x01, 0xf1, 0x39, 0x22, 0x4e, 0x39, 0x56, 0x1c, 0x0c, 0xd9, 0x5c, 0x90, 0x4f, 0xfd, 0x08, - 0x68, 0x66, 0xec, 0x8d, 0x77, 0xab, 0x40, 0x2f, 0xbb, 0xef, 0xcf, 0xef, 0xbd, 0x79, 0xbf, 0xdf, - 0x8c, 0x1f, 0xfe, 0x3c, 0x3b, 0x0f, 0xa6, 0x2f, 0x72, 0xe0, 0x21, 0x70, 0xfd, 0xbf, 0xe0, 0x2c, - 0x09, 0xa0, 0x61, 0xce, 0x98, 0x68, 0xba, 0x6e, 0xc6, 0x53, 0x99, 0x92, 0xd1, 0x3a, 0xe0, 0xe1, - 0x4e, 0x90, 0x06, 0xa9, 0x4e, 0x4d, 0x95, 0x65, 0x50, 0x0f, 0x77, 0x83, 0x34, 0x0d, 0x22, 0x98, - 0x6a, 0x6f, 0x96, 0x9f, 0x4e, 0x59, 0xb2, 0xa8, 0x52, 0xf6, 0x66, 0xca, 0xcf, 0x39, 0x93, 0x61, - 0x9a, 0x54, 0xf9, 0x47, 0x6a, 0xb0, 0x28, 0x0d, 0x4c, 0xcf, 0xda, 0xa8, 0x92, 0x87, 0x6f, 0x37, - 0xb5, 0x0f, 0xa7, 0x61, 0x12, 0xaa, 0xa6, 0xa2, 0x69, 0x9b, 0x26, 0x93, 0x3f, 0xb6, 0xf0, 0x3b, - 0xcf, 0x79, 0x1a, 0x83, 0x3c, 0x83, 0x5c, 0x50, 0x78, 0x91, 0x83, 0x90, 0x84, 0xe0, 0x4e, 0xc6, - 0xe4, 0x99, 0x85, 0xc6, 0x68, 0x6f, 0x48, 0xb5, 0x4d, 0x76, 0x70, 0x57, 0x48, 0xc6, 0xa5, 0xb5, - 0x35, 0x46, 0x7b, 0x6d, 0x6a, 0x1c, 0x72, 0x0f, 0xb7, 0x21, 0xf1, 0xad, 0xb6, 0x8e, 0x29, 0x53, - 0xd5, 0x0a, 0x09, 0x99, 0xd5, 0xd1, 0x21, 0x6d, 0x93, 0xc7, 0xb8, 0x2f, 0xc3, 0x18, 0xd2, 0x5c, - 0x5a, 0xdd, 0x31, 0xda, 0xdb, 0xde, 0xdf, 0x75, 0x0d, 0x73, 0xb7, 0x66, 0xee, 0x1e, 0x55, 0xcc, - 0xbd, 0xc1, 0x45, 0xe1, 0xb4, 0x7e, 0xf9, 0xcb, 0x41, 0xb4, 0xae, 0x51, 0x47, 0x6b, 0x52, 0x56, - 0x4f, 0xcf, 0x63, 0x1c, 0xf2, 0x04, 0x8f, 0xe6, 0x6c, 0x7e, 0x16, 0x26, 0xc1, 0x37, 0x99, 0xa6, - 0x64, 0xf5, 0x75, 0xef, 0x47, 0x6e, 0x93, 0xe6, 0xe1, 0x1a, 0xc4, 0xeb, 0xa8, 0xee, 0x74, 0xa3, - 0x90, 0x1c, 0xe3, 0xfe, 0xd7, 0xc0, 0x7c, 0xe0, 0xc2, 0x1a, 0x8c, 0xdb, 0x7b, 0xdb, 0xfb, 0xef, - 0xaf, 0xf5, 0x78, 0x43, 0x20, 0x03, 0xf6, 0xba, 0x65, 0xe1, 0xa0, 0x8f, 0x69, 0x5d, 0x3b, 0xf9, - 0x7d, 0x0b, 0x93, 0x26, 0x56, 0x64, 0x69, 0x22, 0x80, 0x4c, 0x70, 0xef, 0x44, 0x32, 0x99, 0x0b, - 0xa3, 0xa7, 0x87, 0xcb, 0xc2, 0xe9, 0x09, 0x1d, 0xa1, 0x55, 0x86, 0x3c, 0xc5, 0x9d, 0x23, 0x26, - 0x99, 0x16, 0x77, 0x7b, 0xdf, 0x76, 0xd7, 0x2f, 0xb1, 0x31, 0x81, 0x42, 0x79, 0x0f, 0x14, 0x8b, - 0xb2, 0x70, 0x46, 0x3e, 0x93, 0xec, 0xa3, 0x34, 0x0e, 0x25, 0xc4, 0x99, 0x5c, 0x50, 0xdd, 0x83, - 0x7c, 0x86, 0x87, 0xc7, 0x9c, 0xa7, 0xfc, 0xdb, 0x45, 0x06, 0xfa, 0x66, 0x86, 0xde, 0xbb, 0x65, - 0xe1, 0xdc, 0x87, 0x3a, 0xd8, 0xa8, 0xb8, 0x41, 0x92, 0x0f, 0x71, 0x57, 0x3b, 0xfa, 0xe6, 0x86, - 0xde, 0xfd, 0xb2, 0x70, 0xee, 0xea, 0x92, 0x06, 0xdc, 0x20, 0xc8, 0x57, 0x37, 0x7a, 0x75, 0xb5, - 0x5e, 0x1f, 0xdc, 0xaa, 0x97, 0xd1, 0xe0, 0x16, 0xc1, 0x7e, 0x42, 0x78, 0xb4, 0x4e, 0x8d, 0xb8, - 0x18, 0x53, 0x10, 0x79, 0x24, 0xf5, 0xf4, 0x46, 0xb0, 0x51, 0x59, 0x38, 0x98, 0xaf, 0xa2, 0xb4, - 0x81, 0x20, 0x47, 0xb8, 0x67, 0x3c, 0x6b, 0x4b, 0x4f, 0xf2, 0xde, 0xa6, 0x74, 0x27, 0x2c, 0xce, - 0x22, 0x38, 0x91, 0x1c, 0x58, 0xec, 0x8d, 0x2a, 0xe1, 0x7a, 0xa6, 0x1b, 0xad, 0x6a, 0x27, 0x17, - 0x08, 0xdf, 0x69, 0x02, 0xc9, 0x4b, 0xdc, 0x8b, 0xd8, 0x0c, 0x22, 0x75, 0x67, 0x6d, 0xfd, 0x60, - 0x57, 0x5f, 0xdf, 0x33, 0x08, 0xd8, 0x7c, 0xf1, 0x4c, 0x65, 0x9f, 0xb3, 0x90, 0x7b, 0x87, 0xaa, - 0xe7, 0x9f, 0x85, 0xf3, 0x49, 0x10, 0xca, 0xb3, 0x7c, 0xe6, 0xce, 0xd3, 0x78, 0x1a, 0x70, 0x76, - 0xca, 0x12, 0x36, 0x8d, 0xd2, 0xf3, 0x70, 0xda, 0xfc, 0x88, 0x5d, 0x5d, 0x77, 0xe0, 0xb3, 0x4c, - 0x02, 0x57, 0x83, 0xc4, 0x20, 0x79, 0x38, 0xa7, 0xd5, 0x69, 0xe4, 0x4b, 0xdc, 0x17, 0x7a, 0x0e, - 0x51, 0xf1, 0x79, 0xb0, 0x79, 0xb0, 0x19, 0xf3, 0x86, 0xc9, 0x4b, 0x16, 0xe5, 0x20, 0x68, 0x5d, - 0x36, 0x49, 0xf0, 0x48, 0xbd, 0x79, 0xf0, 0x57, 0xef, 0x6f, 0x17, 0xb7, 0xcf, 0x61, 0x51, 0x69, - 0xd9, 0x2f, 0x0b, 0x47, 0xb9, 0x54, 0xfd, 0x90, 0x03, 0xdc, 0x87, 0x1f, 0x24, 0x24, 0xf2, 0xe6, - 0xb8, 0x0d, 0xf9, 0x8e, 0x75, 0xda, 0xbb, 0x5b, 0x1d, 0x57, 0xc3, 0x69, 0x6d, 0x4c, 0x7e, 0x43, - 0xb8, 0x67, 0x40, 0xc4, 0xa9, 0x57, 0x84, 0x3a, 0xaa, 0xed, 0x0d, 0xcb, 0xc2, 0x31, 0x81, 0x7a, - 0x5b, 0xec, 0x9a, 0x6d, 0xa1, 0x37, 0x88, 0x99, 0x04, 0x12, 0xdf, 0xac, 0x8d, 0x31, 0x1e, 0x48, - 0xce, 0xe6, 0xf0, 0x7d, 0xe8, 0x57, 0x0f, 0xb0, 0x7e, 0x2c, 0x3a, 0xfc, 0xc4, 0x27, 0x5f, 0xe0, - 0x01, 0xaf, 0x28, 0x55, 0x5b, 0x64, 0xe7, 0x8d, 0x2d, 0x72, 0x90, 0x2c, 0xbc, 0x3b, 0x65, 0xe1, - 0xac, 0x90, 0x74, 0x65, 0x3d, 0xed, 0x0c, 0xda, 0xf7, 0x3a, 0x9e, 0xb8, 0xbc, 0xb2, 0x5b, 0xaf, - 0xae, 0xec, 0xd6, 0xeb, 0x2b, 0x1b, 0xfd, 0xb8, 0xb4, 0xd1, 0xaf, 0x4b, 0x1b, 0x5d, 0x2c, 0x6d, - 0x74, 0xb9, 0xb4, 0xd1, 0xdf, 0x4b, 0x1b, 0xfd, 0xb3, 0xb4, 0x5b, 0xaf, 0x97, 0x36, 0xfa, 0xf9, - 0xda, 0x6e, 0x5d, 0x5e, 0xdb, 0xad, 0x57, 0xd7, 0x76, 0xeb, 0xbb, 0xc7, 0xff, 0x75, 0xb7, 0xff, - 0xbb, 0x83, 0x67, 0x3d, 0x3d, 0xe0, 0xa7, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xd5, 0x7e, 0x59, - 0x12, 0x69, 0x06, 0x00, 0x00, + // 846 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4f, 0x6f, 0xdc, 0x44, + 0x14, 0x5f, 0xc7, 0xfb, 0x77, 0x5a, 0x6d, 0x61, 0x1a, 0x15, 0x27, 0x45, 0xf6, 0x6a, 0x05, 0x52, + 0x90, 0xc0, 0x2b, 0x8a, 0xe8, 0x01, 0xa9, 0x88, 0x38, 0x09, 0xa2, 0x55, 0x25, 0x2a, 0xa7, 0x27, + 0x2e, 0x68, 0x76, 0xfd, 0xe2, 0x58, 0xf1, 0xbf, 0xce, 0x8c, 0x2b, 0xf6, 0xc6, 0x89, 0x73, 0x4f, + 0x88, 0x8f, 0xc0, 0x01, 0xf1, 0x39, 0x72, 0xcc, 0xb1, 0xe2, 0x60, 0xc8, 0xe6, 0x82, 0x7c, 0xea, + 0x47, 0x40, 0xf3, 0xc7, 0xbb, 0xde, 0x8d, 0x02, 0xf4, 0xb2, 0xfb, 0x66, 0xde, 0xef, 0xbd, 0xf7, + 0x7b, 0xbf, 0x79, 0x7e, 0xe8, 0x61, 0x7e, 0x16, 0x4e, 0x5e, 0x14, 0x40, 0x23, 0xa0, 0xf2, 0x7f, + 0x4e, 0x49, 0x1a, 0x42, 0xc3, 0x9c, 0x12, 0xd6, 0x3c, 0xba, 0x39, 0xcd, 0x78, 0x86, 0x87, 0xeb, + 0x80, 0xdd, 0xed, 0x30, 0x0b, 0x33, 0xe9, 0x9a, 0x08, 0x4b, 0xa1, 0x76, 0x77, 0xc2, 0x2c, 0x0b, + 0x63, 0x98, 0xc8, 0xd3, 0xb4, 0x38, 0x99, 0x90, 0x74, 0xae, 0x5d, 0xf6, 0xa6, 0x2b, 0x28, 0x28, + 0xe1, 0x51, 0x96, 0x6a, 0xbf, 0xb3, 0xe9, 0xe7, 0x51, 0x02, 0x8c, 0x93, 0x24, 0xd7, 0x80, 0xfb, + 0x82, 0x79, 0x9c, 0x85, 0xaa, 0x68, 0x6d, 0x68, 0xe7, 0xc1, 0xff, 0x6b, 0x2b, 0x80, 0x93, 0x28, + 0x8d, 0x44, 0x55, 0xd6, 0xb4, 0x55, 0x92, 0xf1, 0xcf, 0x26, 0x7a, 0xf7, 0x19, 0xcd, 0x12, 0xe0, + 0xa7, 0x50, 0x30, 0x1f, 0x5e, 0x14, 0xc0, 0x38, 0xc6, 0xa8, 0x9d, 0x13, 0x7e, 0x6a, 0x19, 0x23, + 0x63, 0x6f, 0xe0, 0x4b, 0x1b, 0x7f, 0x81, 0x3a, 0x8c, 0x13, 0xca, 0xad, 0xad, 0x91, 0xb1, 0x77, + 0xeb, 0xc1, 0xae, 0xab, 0xc8, 0xbb, 0x35, 0x79, 0xf7, 0x79, 0x4d, 0xde, 0xeb, 0x9f, 0x97, 0x4e, + 0xeb, 0xd5, 0x9f, 0x8e, 0xe1, 0xab, 0x10, 0xfc, 0x10, 0x99, 0x90, 0x06, 0x96, 0xf9, 0x16, 0x91, + 0x22, 0x40, 0xf0, 0x60, 0x1c, 0x72, 0xab, 0x3d, 0x32, 0xf6, 0x4c, 0x5f, 0xda, 0xf8, 0x11, 0xea, + 0x09, 0x99, 0xb2, 0x82, 0x5b, 0x1d, 0x99, 0x6f, 0xe7, 0x5a, 0xbe, 0x43, 0x2d, 0xb3, 0x4a, 0xf7, + 0x8b, 0x48, 0x57, 0xc7, 0xe0, 0x6d, 0xd4, 0x91, 0x02, 0x59, 0x5d, 0xd9, 0x9b, 0x3a, 0xe0, 0xc7, + 0x68, 0x38, 0x23, 0xb3, 0xd3, 0x28, 0x0d, 0xbf, 0xcd, 0xa5, 0x3c, 0x56, 0x4f, 0xe6, 0xbe, 0xef, + 0x36, 0x25, 0x3b, 0x58, 0x83, 0x78, 0x6d, 0x91, 0xdd, 0xdf, 0x08, 0xc4, 0x47, 0xa8, 0xf7, 0x0d, + 0x90, 0x00, 0x28, 0xb3, 0xfa, 0x23, 0x73, 0xef, 0xd6, 0x83, 0x0f, 0xd6, 0x72, 0x5c, 0x13, 0x5b, + 0x81, 0xbd, 0x4e, 0x55, 0x3a, 0xc6, 0x27, 0x7e, 0x1d, 0x3b, 0xfe, 0x7d, 0x0b, 0xe1, 0x26, 0x96, + 0xe5, 0x59, 0xca, 0x00, 0x8f, 0x51, 0xf7, 0x98, 0x13, 0x5e, 0x30, 0xf5, 0x36, 0x1e, 0xaa, 0x4a, + 0xa7, 0xcb, 0xe4, 0x8d, 0xaf, 0x3d, 0xf8, 0x09, 0x6a, 0x1f, 0x12, 0x4e, 0xf4, 0x43, 0xd9, 0xee, + 0xfa, 0x40, 0x34, 0x18, 0x08, 0x94, 0x77, 0x4f, 0x74, 0x51, 0x95, 0xce, 0x30, 0x20, 0x9c, 0x7c, + 0x9c, 0x25, 0x11, 0x87, 0x24, 0xe7, 0x73, 0x5f, 0xe6, 0xc0, 0x9f, 0xa3, 0xc1, 0x11, 0xa5, 0x19, + 0x7d, 0x3e, 0xcf, 0x41, 0xbe, 0xdf, 0xc0, 0x7b, 0xaf, 0x2a, 0x9d, 0xbb, 0x50, 0x5f, 0x36, 0x22, + 0x56, 0x48, 0xfc, 0x11, 0xea, 0xc8, 0x83, 0x7c, 0xb9, 0x81, 0x77, 0xb7, 0x2a, 0x9d, 0x3b, 0x32, + 0xa4, 0x01, 0x57, 0x08, 0xfc, 0xf5, 0x4a, 0xaf, 0x8e, 0xd4, 0xeb, 0xc3, 0x1b, 0xf5, 0x52, 0x1a, + 0xdc, 0x20, 0xd8, 0x4f, 0x06, 0x1a, 0xae, 0xb7, 0x86, 0x5d, 0x84, 0x7c, 0x60, 0x45, 0xcc, 0x25, + 0x7b, 0x25, 0xd8, 0xb0, 0x2a, 0x1d, 0x44, 0x97, 0xb7, 0x7e, 0x03, 0x81, 0x0f, 0x51, 0x57, 0x9d, + 0xac, 0x2d, 0xc9, 0xe4, 0xfd, 0x4d, 0xe9, 0x8e, 0x49, 0x92, 0xc7, 0x70, 0xcc, 0x29, 0x90, 0xc4, + 0x1b, 0x6a, 0xe1, 0xba, 0x2a, 0x9b, 0xaf, 0x63, 0xc7, 0xe7, 0x06, 0xba, 0xdd, 0x04, 0xe2, 0x97, + 0xa8, 0x1b, 0x93, 0x29, 0xc4, 0xe2, 0xcd, 0x4c, 0x39, 0xb0, 0xcb, 0x2f, 0xf9, 0x29, 0x84, 0x64, + 0x36, 0x7f, 0x2a, 0xbc, 0xcf, 0x48, 0x44, 0xbd, 0x03, 0x91, 0xf3, 0x8f, 0xd2, 0xf9, 0x34, 0x8c, + 0xf8, 0x69, 0x31, 0x75, 0x67, 0x59, 0x32, 0x09, 0x29, 0x39, 0x21, 0x29, 0x99, 0xc4, 0xd9, 0x59, + 0x34, 0x69, 0x2e, 0x04, 0x57, 0xc6, 0xed, 0x07, 0x24, 0xe7, 0x40, 0x05, 0x91, 0x04, 0x38, 0x8d, + 0x66, 0xbe, 0xae, 0x86, 0xbf, 0x42, 0x3d, 0x26, 0x79, 0x30, 0xdd, 0xcf, 0xbd, 0xcd, 0xc2, 0x8a, + 0xe6, 0xaa, 0x93, 0x97, 0x24, 0x2e, 0x80, 0xf9, 0x75, 0xd8, 0x38, 0x45, 0x43, 0x31, 0xf3, 0x10, + 0x2c, 0xe7, 0x6f, 0x07, 0x99, 0x67, 0x30, 0xd7, 0x5a, 0xf6, 0xaa, 0xd2, 0x11, 0x47, 0x5f, 0xfc, + 0xe0, 0x7d, 0xd4, 0x83, 0x1f, 0x38, 0xa4, 0x7c, 0x55, 0x6e, 0x43, 0xbe, 0x23, 0xe9, 0xf6, 0xee, + 0xe8, 0x72, 0x35, 0xdc, 0xaf, 0x8d, 0xf1, 0x6f, 0x06, 0xea, 0x2a, 0x10, 0x76, 0xea, 0x75, 0x23, + 0x4a, 0x99, 0xde, 0xa0, 0x2a, 0x1d, 0x75, 0x51, 0xef, 0x94, 0x1d, 0xb5, 0x53, 0xb6, 0xa4, 0x5b, + 0x32, 0x81, 0x34, 0x50, 0x6b, 0x63, 0x84, 0xfa, 0x9c, 0x92, 0x19, 0x7c, 0x1f, 0x05, 0x7a, 0x00, + 0xeb, 0x61, 0x91, 0xd7, 0x8f, 0x03, 0xfc, 0x25, 0xea, 0x53, 0xdd, 0x92, 0xde, 0x22, 0xdb, 0xd7, + 0xb6, 0xc8, 0x7e, 0x3a, 0xf7, 0x6e, 0x57, 0xa5, 0xb3, 0x44, 0xfa, 0x4b, 0xeb, 0x49, 0xbb, 0x6f, + 0xbe, 0xd3, 0xf6, 0xd8, 0xc5, 0xa5, 0xdd, 0x7a, 0x7d, 0x69, 0xb7, 0xde, 0x5c, 0xda, 0xc6, 0x8f, + 0x0b, 0xdb, 0xf8, 0x75, 0x61, 0x1b, 0xe7, 0x0b, 0xdb, 0xb8, 0x58, 0xd8, 0xc6, 0x5f, 0x0b, 0xdb, + 0xf8, 0x7b, 0x61, 0xb7, 0xde, 0x2c, 0x6c, 0xe3, 0xd5, 0x95, 0xdd, 0xba, 0xb8, 0xb2, 0x5b, 0xaf, + 0xaf, 0xec, 0xd6, 0x77, 0x8f, 0xfe, 0xed, 0x6d, 0xff, 0x73, 0x9f, 0x4f, 0xbb, 0x92, 0xe0, 0x67, + 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x63, 0x5c, 0x0b, 0x88, 0xd6, 0x06, 0x00, 0x00, } func (this *PrometheusRequest) Equal(that interface{}) bool { @@ -512,10 +513,10 @@ func (this *PrometheusRequest) Equal(that interface{}) bool { if this.Path != that1.Path { return false } - if this.Start != that1.Start { + if !this.Start.Equal(that1.Start) { return false } - if this.End != that1.End { + if !this.End.Equal(that1.End) { return false } if this.Step != that1.Step { @@ -888,16 +889,22 @@ func (m *PrometheusRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x20 } - if m.End != 0 { - i = encodeVarintQueryrange(dAtA, i, uint64(m.End)) - i-- - dAtA[i] = 0x18 + n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.End, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.End):]) + if err3 != nil { + return 0, err3 } - if m.Start != 0 { - i = encodeVarintQueryrange(dAtA, i, uint64(m.Start)) - i-- - dAtA[i] = 0x10 + i -= n3 + i = encodeVarintQueryrange(dAtA, i, uint64(n3)) + i-- + dAtA[i] = 0x1a + n4, err4 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Start, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Start):]) + if err4 != nil { + return 0, err4 } + i -= n4 + i = encodeVarintQueryrange(dAtA, i, uint64(n4)) + i-- + dAtA[i] = 0x12 if len(m.Path) > 0 { i -= len(m.Path) copy(dAtA[i:], m.Path) @@ -1188,12 +1195,10 @@ func (m *PrometheusRequest) Size() (n int) { if l > 0 { n += 1 + l + sovQueryrange(uint64(l)) } - if m.Start != 0 { - n += 1 + sovQueryrange(uint64(m.Start)) - } - if m.End != 0 { - n += 1 + sovQueryrange(uint64(m.End)) - } + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Start) + n += 1 + l + sovQueryrange(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.End) + n += 1 + l + sovQueryrange(uint64(l)) if m.Step != 0 { n += 1 + sovQueryrange(uint64(m.Step)) } @@ -1342,8 +1347,8 @@ func (this *PrometheusRequest) String() string { repeatedStringForHeaders += "}" s := strings.Join([]string{`&PrometheusRequest{`, `Path:` + fmt.Sprintf("%v", this.Path) + `,`, - `Start:` + fmt.Sprintf("%v", this.Start) + `,`, - `End:` + fmt.Sprintf("%v", this.End) + `,`, + `Start:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `End:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.End), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `Step:` + fmt.Sprintf("%v", this.Step) + `,`, `Timeout:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timeout), "Duration", "duration.Duration", 1), `&`, ``, 1) + `,`, `Query:` + fmt.Sprintf("%v", this.Query) + `,`, @@ -1503,10 +1508,10 @@ func (m *PrometheusRequest) Unmarshal(dAtA []byte) error { m.Path = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 2: - if wireType != 0 { + if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Start", wireType) } - m.Start = 0 + var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowQueryrange @@ -1516,16 +1521,30 @@ func (m *PrometheusRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Start |= int64(b&0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Start, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex case 3: - if wireType != 0 { + if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field End", wireType) } - m.End = 0 + var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowQueryrange @@ -1535,11 +1554,25 @@ func (m *PrometheusRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.End |= int64(b&0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.End, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex case 4: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Step", wireType) diff --git a/pkg/querier/queryrange/queryrangebase/queryrange.proto b/pkg/querier/queryrange/queryrangebase/queryrange.proto index 8ee2895d66..ad66551d2b 100644 --- a/pkg/querier/queryrange/queryrangebase/queryrange.proto +++ b/pkg/querier/queryrange/queryrangebase/queryrange.proto @@ -5,6 +5,7 @@ package queryrangebase; import "gogoproto/gogo.proto"; import "google/protobuf/any.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; import "pkg/logproto/logproto.proto"; import "pkg/querier/queryrange/queryrangebase/definitions/definitions.proto"; @@ -14,8 +15,14 @@ option (gogoproto.unmarshaler_all) = true; message PrometheusRequest { string path = 1; - int64 start = 2; - int64 end = 3; + google.protobuf.Timestamp start = 2 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; + google.protobuf.Timestamp end = 3 [ + (gogoproto.stdtime) = true, + (gogoproto.nullable) = false + ]; int64 step = 4; google.protobuf.Duration timeout = 5 [ (gogoproto.stdduration) = true, diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index c81193d88e..05d6a26f67 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -21,7 +21,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/uber/jaeger-client-go" @@ -145,7 +144,7 @@ type constSplitter time.Duration // GenerateCacheKey generates a cache key based on the userID, Request and interval. func (t constSplitter) GenerateCacheKey(_ context.Context, userID string, r Request) string { - currentInterval := r.GetStart() / int64(time.Duration(t)/time.Millisecond) + currentInterval := r.GetStart().UnixMilli() / int64(time.Duration(t)/time.Millisecond) return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval) } @@ -237,7 +236,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { sp.LogKV( "query", r.GetQuery(), "step", time.UnixMilli(r.GetStep()), - "start", time.UnixMilli(r.GetStart()), + "start", r.GetStart(), "end", r.GetEnd(), "key", key, ) @@ -245,7 +244,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { cacheFreshnessCapture := func(id string) time.Duration { return s.limits.MaxCacheFreshness(ctx, id) } maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) maxCacheTime := int64(model.Now().Add(-maxCacheFreshness)) - if r.GetStart() > maxCacheTime { + if r.GetStart().UnixMilli() > maxCacheTime { return s.next.Do(ctx, r) } @@ -338,9 +337,9 @@ func (s resultsCache) isAtModifierCachable(r Request, maxCacheTime int64) bool { } // This resolves the start() and end() used with the @ modifier. - expr = promql.PreprocessExpr(expr, timestamp.Time(r.GetStart()), timestamp.Time(r.GetEnd())) + expr = promql.PreprocessExpr(expr, r.GetStart(), r.GetEnd()) - end := r.GetEnd() + end := r.GetEnd().UnixMilli() atModCachable := true parser.Inspect(expr, func(n parser.Node, _ []parser.Node) error { switch e := n.(type) { @@ -533,8 +532,8 @@ func toExtent(ctx context.Context, req Request, res Response) (Extent, error) { return Extent{}, err } return Extent{ - Start: req.GetStart(), - End: req.GetEnd(), + Start: req.GetStart().UnixMilli(), + End: req.GetEnd().UnixMilli(), Response: anyResp, TraceId: jaegerTraceID(ctx), }, nil @@ -545,11 +544,12 @@ func toExtent(ctx context.Context, req Request, res Response) (Extent, error) { func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Response, error) { var requests []Request var cachedResponses []Response - start := req.GetStart() + start := req.GetStart().UnixMilli() + end := req.GetEnd().UnixMilli() for _, extent := range extents { // If there is no overlap, ignore this extent. - if extent.GetEnd() < start || extent.Start > req.GetEnd() { + if extent.GetEnd() < start || extent.Start > end { continue } @@ -559,13 +559,13 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res // However if the step is large enough, the split_query_by_interval middleware would generate a query with same start and end. // For example, if the step size is more than 12h and the interval is 24h. // This means the extent's start and end time would be same, even if the timerange covers several hours. - if (req.GetStart() != req.GetEnd()) && (req.GetEnd()-req.GetStart() > s.minCacheExtent) && (extent.End-extent.Start < s.minCacheExtent) { + if (req.GetStart() != req.GetEnd()) && ((end - start) > s.minCacheExtent) && (extent.End-extent.Start < s.minCacheExtent) { continue } // If there is a bit missing at the front, make a request for that. if start < extent.Start { - r := req.WithStartEnd(start, extent.Start) + r := req.WithStartEnd(time.UnixMilli(start), time.UnixMilli(extent.Start)) requests = append(requests, r) } res, err := extent.toResponse() @@ -573,13 +573,13 @@ func (s resultsCache) partition(req Request, extents []Extent) ([]Request, []Res return nil, nil, err } // extract the overlap from the cached extent. - cachedResponses = append(cachedResponses, s.extractor.Extract(start, req.GetEnd(), res, extent.GetStart(), extent.GetEnd())) + cachedResponses = append(cachedResponses, s.extractor.Extract(start, end, res, extent.GetStart(), extent.GetEnd())) start = extent.End } // Lastly, make a request for any data missing at the end. - if start < req.GetEnd() { - r := req.WithStartEnd(start, req.GetEnd()) + if start < req.GetEnd().UnixMilli() { + r := req.WithStartEnd(time.UnixMilli(start), time.UnixMilli(end)) requests = append(requests, r) } diff --git a/pkg/querier/queryrange/queryrangebase/results_cache_test.go b/pkg/querier/queryrange/queryrangebase/results_cache_test.go index 93302bf426..bfe3ecea5f 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache_test.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache_test.go @@ -28,8 +28,8 @@ const ( var ( parsedRequest = &PrometheusRequest{ Path: "/api/v1/query_range", - Start: 1536673680 * 1e3, - End: 1536716898 * 1e3, + Start: time.UnixMilli(1536673680 * 1e3), + End: time.UnixMilli(1536716898 * 1e3), Step: 120 * 1e3, Query: "sum(container_memory_rss) by (namespace)", } @@ -41,8 +41,8 @@ var ( } noCacheRequest = &PrometheusRequest{ Path: "/api/v1/query_range", - Start: 1536673680 * 1e3, - End: 1536716898 * 1e3, + Start: time.UnixMilli(1536673680 * 1e3), + End: time.UnixMilli(1536716898 * 1e3), Step: 120 * 1e3, Query: "sum(container_memory_rss) by (namespace)", CachingOptions: CachingOptions{Disabled: true}, @@ -278,111 +278,111 @@ func TestShouldCache(t *testing.T) { // @ modifier on vector selectors. { name: "@ modifier on vector selector, before end, before maxCacheTime", - request: &PrometheusRequest{Query: "metric @ 123", End: 125000}, + request: &PrometheusRequest{Query: "metric @ 123", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on vector selector, after end, before maxCacheTime", - request: &PrometheusRequest{Query: "metric @ 127", End: 125000}, + request: &PrometheusRequest{Query: "metric @ 127", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on vector selector, before end, after maxCacheTime", - request: &PrometheusRequest{Query: "metric @ 151", End: 200000}, + request: &PrometheusRequest{Query: "metric @ 151", End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on vector selector, after end, after maxCacheTime", - request: &PrometheusRequest{Query: "metric @ 151", End: 125000}, + request: &PrometheusRequest{Query: "metric @ 151", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on vector selector with start() before maxCacheTime", - request: &PrometheusRequest{Query: "metric @ start()", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "metric @ start()", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on vector selector with end() after maxCacheTime", - request: &PrometheusRequest{Query: "metric @ end()", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "metric @ end()", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, // @ modifier on matrix selectors. { name: "@ modifier on matrix selector, before end, before maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ 123)", End: 125000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ 123)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on matrix selector, after end, before maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ 127)", End: 125000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ 127)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on matrix selector, before end, after maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: 200000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on matrix selector, after end, after maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: 125000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ 151)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on matrix selector with start() before maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ start())", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ start())", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on matrix selector with end() after maxCacheTime", - request: &PrometheusRequest{Query: "rate(metric[5m] @ end())", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "rate(metric[5m] @ end())", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, // @ modifier on subqueries. { name: "@ modifier on subqueries, before end, before maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 123)", End: 125000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 123)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on subqueries, after end, before maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 127)", End: 125000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 127)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on subqueries, before end, after maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: 200000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on subqueries, after end, after maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: 125000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ 151)", End: time.UnixMilli(125000)}, input: Response(&PrometheusResponse{}), expected: false, }, { name: "@ modifier on subqueries with start() before maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ start())", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ start())", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: true, }, { name: "@ modifier on subqueries with end() after maxCacheTime", - request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ end())", Start: 100000, End: 200000}, + request: &PrometheusRequest{Query: "sum_over_time(rate(metric[1m])[10m:1m] @ end())", Start: time.UnixMilli(100000), End: time.UnixMilli(200000)}, input: Response(&PrometheusResponse{}), expected: false, }, @@ -408,8 +408,8 @@ func TestPartition(t *testing.T) { { name: "Test a complete hit.", input: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(0, 100), @@ -422,32 +422,32 @@ func TestPartition(t *testing.T) { { name: "Test with a complete miss.", input: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(110, 210), }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), }, }, }, { name: "Test a partial hit.", input: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(50, 100), }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 0, - End: 50, + Start: time.UnixMilli(0), + End: time.UnixMilli(50), }, }, expectedCachedResponse: []Response{ @@ -457,8 +457,8 @@ func TestPartition(t *testing.T) { { name: "Test multiple partial hits.", input: &PrometheusRequest{ - Start: 100, - End: 200, + Start: time.UnixMilli(100), + End: time.UnixMilli(200), }, prevCachedResponse: []Extent{ mkExtent(50, 120), @@ -466,8 +466,8 @@ func TestPartition(t *testing.T) { }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 120, - End: 160, + Start: time.UnixMilli(120), + End: time.UnixMilli(160), }, }, expectedCachedResponse: []Response{ @@ -478,8 +478,8 @@ func TestPartition(t *testing.T) { { name: "Partial hits with tiny gap.", input: &PrometheusRequest{ - Start: 100, - End: 160, + Start: time.UnixMilli(100), + End: time.UnixMilli(160), }, prevCachedResponse: []Extent{ mkExtent(50, 120), @@ -487,8 +487,8 @@ func TestPartition(t *testing.T) { }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 120, - End: 160, + Start: time.UnixMilli(120), + End: time.UnixMilli(160), }, }, expectedCachedResponse: []Response{ @@ -498,16 +498,16 @@ func TestPartition(t *testing.T) { { name: "Extent is outside the range and the request has a single step (same start and end).", input: &PrometheusRequest{ - Start: 100, - End: 100, + Start: time.UnixMilli(100), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(50, 90), }, expectedRequests: []Request{ &PrometheusRequest{ - Start: 100, - End: 100, + Start: time.UnixMilli(100), + End: time.UnixMilli(100), }, }, }, @@ -515,8 +515,8 @@ func TestPartition(t *testing.T) { name: "Test when hit has a large step and only a single sample extent.", // If there is a only a single sample in the split interval, start and end will be the same. input: &PrometheusRequest{ - Start: 100, - End: 100, + Start: time.UnixMilli(100), + End: time.UnixMilli(100), }, prevCachedResponse: []Extent{ mkExtent(100, 100), @@ -549,8 +549,8 @@ func TestHandleHit(t *testing.T) { { name: "Should drop tiny extent that overlaps with non-tiny request only", input: &PrometheusRequest{ - Start: 100, - End: 120, + Start: time.UnixMilli(100), + End: time.UnixMilli(120), Step: 5, }, cachedEntry: []Extent{ @@ -570,8 +570,8 @@ func TestHandleHit(t *testing.T) { { name: "Should replace tiny extents that are cover by bigger request", input: &PrometheusRequest{ - Start: 100, - End: 200, + Start: time.UnixMilli(100), + End: time.UnixMilli(200), Step: 5, }, cachedEntry: []Extent{ @@ -594,8 +594,8 @@ func TestHandleHit(t *testing.T) { { name: "Should not drop tiny extent that completely overlaps with tiny request", input: &PrometheusRequest{ - Start: 100, - End: 105, + Start: time.UnixMilli(100), + End: time.UnixMilli(105), Step: 5, }, cachedEntry: []Extent{ @@ -609,8 +609,8 @@ func TestHandleHit(t *testing.T) { { name: "Should not drop tiny extent that partially center-overlaps with tiny request", input: &PrometheusRequest{ - Start: 106, - End: 108, + Start: time.UnixMilli(106), + End: time.UnixMilli(108), Step: 2, }, cachedEntry: []Extent{ @@ -623,8 +623,8 @@ func TestHandleHit(t *testing.T) { { name: "Should not drop tiny extent that partially left-overlaps with tiny request", input: &PrometheusRequest{ - Start: 100, - End: 106, + Start: time.UnixMilli(100), + End: time.UnixMilli(106), Step: 2, }, cachedEntry: []Extent{ @@ -641,8 +641,8 @@ func TestHandleHit(t *testing.T) { { name: "Should not drop tiny extent that partially right-overlaps with tiny request", input: &PrometheusRequest{ - Start: 100, - End: 106, + Start: time.UnixMilli(100), + End: time.UnixMilli(106), Step: 2, }, cachedEntry: []Extent{ @@ -659,8 +659,8 @@ func TestHandleHit(t *testing.T) { { name: "Should merge fragmented extents if request fills the hole", input: &PrometheusRequest{ - Start: 40, - End: 80, + Start: time.UnixMilli(40), + End: time.UnixMilli(80), Step: 20, }, cachedEntry: []Extent{ @@ -674,8 +674,8 @@ func TestHandleHit(t *testing.T) { { name: "Should left-extend extent if request starts earlier than extent in cache", input: &PrometheusRequest{ - Start: 40, - End: 80, + Start: time.UnixMilli(40), + End: time.UnixMilli(80), Step: 20, }, cachedEntry: []Extent{ @@ -688,8 +688,8 @@ func TestHandleHit(t *testing.T) { { name: "Should right-extend extent if request ends later than extent in cache", input: &PrometheusRequest{ - Start: 100, - End: 180, + Start: time.UnixMilli(100), + End: time.UnixMilli(180), Step: 20, }, cachedEntry: []Extent{ @@ -704,8 +704,8 @@ func TestHandleHit(t *testing.T) { input: &PrometheusRequest{ // This request is carefully crated such that cachedEntry is not used to fulfill // the request. - Start: 160, - End: 180, + Start: time.UnixMilli(160), + End: time.UnixMilli(180), Step: 20, }, cachedEntry: []Extent{ @@ -733,7 +733,7 @@ func TestHandleHit(t *testing.T) { merger: PrometheusCodec, parallelismForReq: func(_ context.Context, tenantIDs []string, r Request) int { return 1 }, next: HandlerFunc(func(_ context.Context, req Request) (Response, error) { - return mkAPIResponse(req.GetStart(), req.GetEnd(), req.GetStep()), nil + return mkAPIResponse(req.GetStart().UnixMilli(), req.GetEnd().UnixMilli(), req.GetStep()), nil }), } @@ -741,7 +741,7 @@ func TestHandleHit(t *testing.T) { response, updatedExtents, err := sut.handleHit(ctx, tc.input, tc.cachedEntry, 0) require.NoError(t, err) - expectedResponse := mkAPIResponse(tc.input.GetStart(), tc.input.GetEnd(), tc.input.GetStep()) + expectedResponse := mkAPIResponse(tc.input.GetStart().UnixMilli(), tc.input.GetEnd().UnixMilli(), tc.input.GetStep()) require.Equal(t, expectedResponse, response, "response does not match the expectation") require.Equal(t, tc.expectedUpdatedCachedEntry, updatedExtents, "updated cache entry does not match the expectation") }) @@ -791,7 +791,7 @@ func TestResultsCache(t *testing.T) { require.Equal(t, parsedResponse, resp) // Doing request with new end time should do one more query. - req := parsedRequest.WithStartEnd(parsedRequest.GetStart(), parsedRequest.GetEnd()+100) + req := parsedRequest.WithStartEnd(parsedRequest.GetStart(), parsedRequest.GetEnd().Add(100*time.Millisecond)) _, err = rc.Do(ctx, req) require.NoError(t, err) require.Equal(t, 2, calls) @@ -820,7 +820,7 @@ func TestResultsCacheRecent(t *testing.T) { ) require.NoError(t, err) - req := parsedRequest.WithStartEnd(int64(model.Now())-(60*1e3), int64(model.Now())) + req := parsedRequest.WithStartEnd(time.Now().Add(-60*1e3*time.Millisecond), time.Now()) calls := 0 rc := rcm.Wrap(HandlerFunc(func(_ context.Context, r Request) (Response, error) { @@ -893,7 +893,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") // create request with start end within the key extents - req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3)) + req := parsedRequest.WithStartEnd(time.UnixMilli(int64(modelNow)-(50*1e3)), time.UnixMilli(int64(modelNow)-(10*1e3))) // fill cache key := constSplitter(day).GenerateCacheKey(context.Background(), "1", req) @@ -972,14 +972,14 @@ func TestConstSplitter_generateCacheKey(t *testing.T) { interval time.Duration want string }{ - {"0", &PrometheusRequest{Start: 0, Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"}, - {"<30m", &PrometheusRequest{Start: toMs(10 * time.Minute), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"}, - {"30m", &PrometheusRequest{Start: toMs(30 * time.Minute), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:1"}, - {"91m", &PrometheusRequest{Start: toMs(91 * time.Minute), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:3"}, - {"0", &PrometheusRequest{Start: 0, Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"}, - {"<1d", &PrometheusRequest{Start: toMs(22 * time.Hour), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"}, - {"4d", &PrometheusRequest{Start: toMs(4 * 24 * time.Hour), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:4"}, - {"3d5h", &PrometheusRequest{Start: toMs(77 * time.Hour), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:3"}, + {"0", &PrometheusRequest{Start: time.UnixMilli(0), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"}, + {"<30m", &PrometheusRequest{Start: time.UnixMilli(toMs(10 * time.Minute)), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:0"}, + {"30m", &PrometheusRequest{Start: time.UnixMilli(toMs(30 * time.Minute)), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:1"}, + {"91m", &PrometheusRequest{Start: time.UnixMilli(toMs(91 * time.Minute)), Step: 10, Query: "foo{}"}, 30 * time.Minute, "fake:foo{}:10:3"}, + {"0", &PrometheusRequest{Start: time.UnixMilli(0), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"}, + {"<1d", &PrometheusRequest{Start: time.UnixMilli(toMs(22 * time.Hour)), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:0"}, + {"4d", &PrometheusRequest{Start: time.UnixMilli(toMs(4 * 24 * time.Hour)), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:4"}, + {"3d5h", &PrometheusRequest{Start: time.UnixMilli(toMs(77 * time.Hour)), Step: 10, Query: "foo{}"}, 24 * time.Hour, "fake:foo{}:10:3"}, } for _, tt := range tests { t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) { diff --git a/pkg/querier/queryrange/queryrangebase/step_align.go b/pkg/querier/queryrange/queryrangebase/step_align.go index edef2d6f08..7f61d41837 100644 --- a/pkg/querier/queryrange/queryrangebase/step_align.go +++ b/pkg/querier/queryrange/queryrangebase/step_align.go @@ -2,6 +2,7 @@ package queryrangebase import ( "context" + "time" ) // StepAlignMiddleware aligns the start and end of request to the step to @@ -17,7 +18,7 @@ type stepAlign struct { } func (s stepAlign) Do(ctx context.Context, r Request) (Response, error) { - start := (r.GetStart() / r.GetStep()) * r.GetStep() - end := (r.GetEnd() / r.GetStep()) * r.GetStep() - return s.next.Do(ctx, r.WithStartEnd(start, end)) + start := (r.GetStart().UnixMilli() / r.GetStep()) * r.GetStep() + end := (r.GetEnd().UnixMilli() / r.GetStep()) * r.GetStep() + return s.next.Do(ctx, r.WithStartEnd(time.UnixMilli(start), time.UnixMilli(end))) } diff --git a/pkg/querier/queryrange/queryrangebase/step_align_test.go b/pkg/querier/queryrange/queryrangebase/step_align_test.go index d68a2080d4..cf1468e7c2 100644 --- a/pkg/querier/queryrange/queryrangebase/step_align_test.go +++ b/pkg/querier/queryrange/queryrangebase/step_align_test.go @@ -4,6 +4,7 @@ import ( "context" "strconv" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -14,26 +15,26 @@ func TestStepAlign(t *testing.T) { }{ { input: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), Step: 10, }, expected: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), Step: 10, }, }, { input: &PrometheusRequest{ - Start: 2, - End: 102, + Start: time.UnixMilli(2), + End: time.UnixMilli(102), Step: 10, }, expected: &PrometheusRequest{ - Start: 0, - End: 100, + Start: time.UnixMilli(0), + End: time.UnixMilli(100), Step: 10, }, }, diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index b2af68b55b..143174439c 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -149,7 +149,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que return ast.next.Do(ctx, r) } - conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd()).Add(-maxOffset))) + conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset))) // cannot shard with this timerange if err != nil { level.Warn(logger).Log("err", err.Error(), "msg", "skipped AST mapper for request") @@ -173,7 +173,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que conf, ast.ng.Opts().MaxLookBackPeriod, ast.logger, - MinWeightedParallelism(ctx, tenants, ast.confs, ast.limits, model.Time(r.GetStart()), model.Time(r.GetEnd())), + MinWeightedParallelism(ctx, tenants, ast.confs, ast.limits, model.Time(r.GetStart().UnixMilli()), model.Time(r.GetEnd().UnixMilli())), ast.maxShards, r, ast.statsHandler, @@ -303,7 +303,7 @@ func (splitter *shardSplitter) Do(ctx context.Context, r queryrangebase.Request) cutoff := splitter.now().Add(-minShardingLookback) // Only attempt to shard queries which are older than the sharding lookback // (the period for which ingesters are also queried) or when the lookback is disabled. - if minShardingLookback == 0 || util.TimeFromMillis(r.GetEnd()).Before(cutoff) { + if minShardingLookback == 0 || util.TimeFromMillis(r.GetEnd().UnixMilli()).Before(cutoff) { return splitter.shardingware.Do(ctx, r) } return splitter.next.Do(ctx, r) @@ -400,7 +400,7 @@ type seriesShardingHandler struct { } func (ss *seriesShardingHandler) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - conf, err := ss.confs.GetConf(r.GetStart(), r.GetEnd()) + conf, err := ss.confs.GetConf(r.GetStart().UnixMilli(), r.GetEnd().UnixMilli()) // cannot shard with this timerange if err != nil { level.Warn(ss.logger).Log("err", err.Error(), "msg", "skipped sharding for request") @@ -433,7 +433,7 @@ func (ss *seriesShardingHandler) Do(ctx context.Context, r queryrangebase.Reques ctx, ss.next, requests, - MinWeightedParallelism(ctx, tenantIDs, ss.confs, ss.limits, model.Time(req.GetStart()), model.Time(req.GetEnd())), + MinWeightedParallelism(ctx, tenantIDs, ss.confs, ss.limits, model.Time(req.GetStart().UnixMilli()), model.Time(req.GetEnd().UnixMilli())), ) if err != nil { return nil, err diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index e3e83f967a..79d789b1ac 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -85,10 +85,7 @@ var ( ) func Test_shardSplitter(t *testing.T) { - req := defaultReq().WithStartEnd( - util.TimeToMillis(start), - util.TimeToMillis(end), - ) + req := defaultReq().WithStartEnd(start, end) for _, tc := range []struct { desc string @@ -812,7 +809,7 @@ func Test_ASTMapper_MaxLookBackPeriod(t *testing.T) { statsHandler := queryrangebase.HandlerFunc(func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { // This is the actual check that we're testing. - require.Equal(t, testTime.Add(-engineOpts.MaxLookBackPeriod).UnixMilli(), req.GetStart()) + require.Equal(t, testTime.Add(-engineOpts.MaxLookBackPeriod).UnixMilli(), req.GetStart().UnixMilli()) return &IndexStatsResponse{ Response: &logproto.IndexStatsResponse{ diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 9c409d14a5..91c098dd93 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -605,8 +605,8 @@ func NewMetricTripperware( tenantIDs, schema.Configs, limits, - model.Time(r.GetStart()), - model.Time(r.GetEnd()), + model.Time(r.GetStart().UnixMilli()), + model.Time(r.GetEnd().UnixMilli()), ) }, retentionEnabled, @@ -775,8 +775,8 @@ func NewVolumeTripperware( tenantIDs, schema.Configs, limits, - model.Time(r.GetStart()), - model.Time(r.GetEnd()), + model.Time(r.GetStart().UnixMilli()), + model.Time(r.GetEnd().UnixMilli()), ) }, retentionEnabled, @@ -876,8 +876,8 @@ func NewIndexStatsTripperware( tenantIDs, schema.Configs, limits, - model.Time(r.GetStart()), - model.Time(r.GetEnd()), + model.Time(r.GetStart().UnixMilli()), + model.Time(r.GetEnd().UnixMilli()), ) }, retentionEnabled, diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index 5cf2c83ae6..aed0e96e2b 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -43,8 +43,8 @@ func shardResolverForConf( logger: logger, handler: handler, limits: limits, - from: model.Time(r.GetStart()), - through: model.Time(r.GetEnd()), + from: model.Time(r.GetStart().UnixMilli()), + through: model.Time(r.GetEnd().UnixMilli()), maxParallelism: maxParallelism, maxShards: maxShards, defaultLookback: defaultLookback, diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 69bca0a4e6..97045066f4 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -230,7 +230,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que maxSeriesCapture := func(id string) int { return h.limits.MaxQuerySeries(ctx, id) } maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) - maxParallelism := MinWeightedParallelism(ctx, tenantIDs, h.configs, h.limits, model.Time(r.GetStart()), model.Time(r.GetEnd())) + maxParallelism := MinWeightedParallelism(ctx, tenantIDs, h.configs, h.limits, model.Time(r.GetStart().UnixMilli()), model.Time(r.GetEnd().UnixMilli())) resps, err := h.Process(ctx, maxParallelism, limit, input, maxSeries) if err != nil { return nil, err @@ -276,8 +276,8 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran reqs = append(reqs, NewLabelRequest(start, end, r.Query, r.Name, r.Path())) }) case *logproto.IndexStatsRequest: - startTS := model.Time(r.GetStart()).Time() - endTS := model.Time(r.GetEnd()).Time() + startTS := r.GetStart() + endTS := r.GetEnd() util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) { reqs = append(reqs, &logproto.IndexStatsRequest{ From: model.TimeFromUnix(start.Unix()), @@ -286,8 +286,8 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran }) }) case *logproto.VolumeRequest: - startTS := model.Time(r.GetStart()).Time() - endTS := model.Time(r.GetEnd()).Time() + startTS := r.GetStart() + endTS := r.GetEnd() util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) { reqs = append(reqs, &logproto.VolumeRequest{ From: model.TimeFromUnix(start.Unix()), @@ -363,7 +363,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer } end := time.Unix(0, endNs) - lokiReq = lokiReq.WithStartEnd(util.TimeToMillis(start), util.TimeToMillis(end)).(*LokiRequest) + lokiReq = lokiReq.WithStartEnd(start, end).(*LokiRequest) // step is >= configured split interval, let us just split the query interval by step if lokiReq.Step >= interval.Milliseconds() { diff --git a/pkg/querier/queryrange/volume_cache.go b/pkg/querier/queryrange/volume_cache.go index 5fa84840f7..0c54745654 100644 --- a/pkg/querier/queryrange/volume_cache.go +++ b/pkg/querier/queryrange/volume_cache.go @@ -91,7 +91,7 @@ func shouldCacheVolume(ctx context.Context, req queryrangebase.Request, lim Limi maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) now := volumeCacheMiddlewareNowTimeFunc() - return maxCacheFreshness == 0 || model.Time(req.GetEnd()).Before(now.Add(-maxCacheFreshness)), nil + return maxCacheFreshness == 0 || model.Time(req.GetEnd().UnixMilli()).Before(now.Add(-maxCacheFreshness)), nil } func NewVolumeCacheMiddleware( diff --git a/pkg/querier/queryrange/volume_cache_test.go b/pkg/querier/queryrange/volume_cache_test.go index f1e3a30d43..009302783b 100644 --- a/pkg/querier/queryrange/volume_cache_test.go +++ b/pkg/querier/queryrange/volume_cache_test.go @@ -116,7 +116,7 @@ func TestVolumeCache(t *testing.T) { // The new start time is 15m (i.e. 25%) in the future with regard to the previous request time span. *calls = 0 - req := volReq.WithStartEnd(volReq.GetStart()+(15*time.Minute).Milliseconds(), volReq.GetEnd()+(15*time.Minute).Milliseconds()) + req := volReq.WithStartEnd(volReq.GetStart().Add(15*time.Minute), volReq.GetEnd().Add(15*time.Minute)) vol := float64(0.75) expectedVol := &VolumeResponse{ Response: &logproto.VolumeResponse{