|
|
|
@ -1,15 +1,19 @@ |
|
|
|
|
package queryrange |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"testing" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/stretchr/testify/require" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/dskit/user" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/loki/pkg/loghttp" |
|
|
|
|
"github.com/grafana/loki/pkg/logproto" |
|
|
|
|
"github.com/grafana/loki/pkg/push" |
|
|
|
|
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" |
|
|
|
|
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const forRangeQuery = false |
|
|
|
@ -250,3 +254,78 @@ func Test_toPrometheusResponse(t *testing.T) { |
|
|
|
|
}, promResp.Response.Data) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func Test_VolumeMiddleware(t *testing.T) { |
|
|
|
|
makeVolumeRequest := func(req *logproto.VolumeRequest) *queryrangebase.PrometheusResponse { |
|
|
|
|
nextHandler := queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { |
|
|
|
|
return &VolumeResponse{ |
|
|
|
|
Response: &logproto.VolumeResponse{ |
|
|
|
|
Volumes: []logproto.Volume{ |
|
|
|
|
{ |
|
|
|
|
Name: `{foo="bar"}`, |
|
|
|
|
Volume: 42, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, nil |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
m := NewVolumeMiddleware() |
|
|
|
|
wrapped := m.Wrap(nextHandler) |
|
|
|
|
|
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "fake") |
|
|
|
|
resp, err := wrapped.Do(ctx, req) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, resp) |
|
|
|
|
|
|
|
|
|
return resp.(*LokiPromResponse).Response |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
t.Run("it breaks query up into subqueries according to step", func(t *testing.T) { |
|
|
|
|
volumeReq := &logproto.VolumeRequest{ |
|
|
|
|
From: 10, |
|
|
|
|
Through: 20, |
|
|
|
|
Matchers: `{foo="bar"}`, |
|
|
|
|
Limit: seriesvolume.DefaultLimit, |
|
|
|
|
Step: 1, |
|
|
|
|
AggregateBy: seriesvolume.Series, |
|
|
|
|
} |
|
|
|
|
promResp := makeVolumeRequest(volumeReq) |
|
|
|
|
|
|
|
|
|
require.Equal(t, promResp.Data.ResultType, loghttp.ResultTypeMatrix) |
|
|
|
|
require.Equal(t, len(promResp.Data.Result), 1) |
|
|
|
|
require.Equal(t, len(promResp.Data.Result[0].Samples), 10) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
t.Run("only returns one datapoint when step is > than time range", func(t *testing.T) { |
|
|
|
|
volumeReq := &logproto.VolumeRequest{ |
|
|
|
|
From: 10, |
|
|
|
|
Through: 20, |
|
|
|
|
Matchers: `{foo="bar"}`, |
|
|
|
|
Limit: seriesvolume.DefaultLimit, |
|
|
|
|
Step: 20, |
|
|
|
|
AggregateBy: seriesvolume.Series, |
|
|
|
|
} |
|
|
|
|
promResp := makeVolumeRequest(volumeReq) |
|
|
|
|
|
|
|
|
|
require.Equal(t, promResp.Data.ResultType, loghttp.ResultTypeVector) |
|
|
|
|
require.Equal(t, len(promResp.Data.Result), 1) |
|
|
|
|
require.Equal(t, len(promResp.Data.Result[0].Samples), 1) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
t.Run("when requested time range is not evenly divisible by step, an extra datpoint is added", func(t *testing.T) { |
|
|
|
|
volumeReq := &logproto.VolumeRequest{ |
|
|
|
|
From: 1698830441000, // 2023-11-01T09:20:41Z
|
|
|
|
|
Through: 1698830498000, // 2023-11-01T09:21:38Z, difference is 57s
|
|
|
|
|
Matchers: `{foo="bar"}`, |
|
|
|
|
Limit: seriesvolume.DefaultLimit, |
|
|
|
|
Step: 60000, // 60s
|
|
|
|
|
AggregateBy: seriesvolume.Series, |
|
|
|
|
} |
|
|
|
|
promResp := makeVolumeRequest(volumeReq) |
|
|
|
|
|
|
|
|
|
require.Equal(t, promResp.Data.ResultType, loghttp.ResultTypeMatrix) |
|
|
|
|
require.Equal(t, 1, len(promResp.Data.Result)) |
|
|
|
|
require.Equal(t, 2, len(promResp.Data.Result[0].Samples)) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|