Test querier handlers reponse format. (#9854)

**What this PR does / why we need it**:
This is a follow up to #9813 that introduced a handler test to verify
the response format for the legacy and v1 APIs.

The test requires mocking of the engine and query which meant the
introduction of a query engine interface.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
pull/9567/head^2
Karsten Jeschkies 2 years ago committed by GitHub
parent 9aadb348ed
commit 9393f3f11a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/querier/http.go
  2. 282
      pkg/querier/http_test.go
  3. 38
      pkg/querier/querier_mock_test.go

@ -47,12 +47,16 @@ type QueryResponse struct {
Result parser.Value `json:"result"` Result parser.Value `json:"result"`
} }
type Engine interface {
Query(logql.Params) logql.Query
}
// nolint // QuerierAPI defines HTTP handler functions for the querier. // nolint // QuerierAPI defines HTTP handler functions for the querier.
type QuerierAPI struct { type QuerierAPI struct {
querier Querier querier Querier
cfg Config cfg Config
limits Limits limits Limits
engine *logql.Engine engine Engine
} }
// NewQuerierAPI returns an instance of the QuerierAPI. // NewQuerierAPI returns an instance of the QuerierAPI.

@ -13,14 +13,153 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/validation"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/grafana/dskit/tenant" "github.com/grafana/dskit/tenant"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/weaveworks/common/user" "github.com/weaveworks/common/user"
)
"github.com/grafana/loki/pkg/validation" var (
statsResultString = `"stats" : {
"ingester" : {
"store": {
"chunk":{
"compressedBytes": 1,
"decompressedBytes": 2,
"decompressedLines": 3,
"headChunkBytes": 4,
"headChunkLines": 5,
"totalDuplicates": 8
},
"chunksDownloadTime": 0,
"totalChunksRef": 0,
"totalChunksDownloaded": 0
},
"totalBatches": 6,
"totalChunksMatched": 7,
"totalLinesSent": 9,
"totalReached": 10
},
"querier": {
"store" : {
"chunk": {
"compressedBytes": 11,
"decompressedBytes": 12,
"decompressedLines": 13,
"headChunkBytes": 14,
"headChunkLines": 15,
"totalDuplicates": 19
},
"chunksDownloadTime": 16,
"totalChunksRef": 17,
"totalChunksDownloaded": 18
}
},
"cache": {
"chunk": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0
},
"index": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0
},
"statsResult": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0
},
"result": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0
}
},
"summary": {
"bytesProcessedPerSecond": 20,
"execTime": 22,
"linesProcessedPerSecond": 23,
"queueTime": 21,
"shards": 0,
"splits": 0,
"subqueries": 0,
"totalBytesProcessed": 24,
"totalEntriesReturned": 10,
"totalLinesProcessed": 25
}
}`
statsResult = stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 20,
QueueTime: 21,
ExecTime: 22,
LinesProcessedPerSecond: 23,
TotalBytesProcessed: 24,
TotalLinesProcessed: 25,
TotalEntriesReturned: 10,
},
Querier: stats.Querier{
Store: stats.Store{
Chunk: stats.Chunk{
CompressedBytes: 11,
DecompressedBytes: 12,
DecompressedLines: 13,
HeadChunkBytes: 14,
HeadChunkLines: 15,
TotalDuplicates: 19,
},
ChunksDownloadTime: 16,
TotalChunksRef: 17,
TotalChunksDownloaded: 18,
},
},
Ingester: stats.Ingester{
Store: stats.Store{
Chunk: stats.Chunk{
CompressedBytes: 1,
DecompressedBytes: 2,
DecompressedLines: 3,
HeadChunkBytes: 4,
HeadChunkLines: 5,
TotalDuplicates: 8,
},
},
TotalBatches: 6,
TotalChunksMatched: 7,
TotalLinesSent: 9,
TotalReached: 10,
},
Caches: stats.Caches{
Chunk: stats.Cache{},
Index: stats.Cache{},
Result: stats.Cache{},
},
}
) )
func TestTailHandler(t *testing.T) { func TestTailHandler(t *testing.T) {
@ -205,19 +344,6 @@ func TestSeriesVolumeHandler(t *testing.T) {
}, },
} }
setupAPI := func(querier *querierMock) *QuerierAPI {
return NewQuerierAPI(Config{}, querier, nil, log.NewNopLogger())
}
makeRequest := func(handler http.HandlerFunc, req *http.Request) *httptest.ResponseRecorder {
err := req.ParseForm()
require.NoError(t, err)
w := httptest.NewRecorder()
handler(w, req)
return w
}
t.Run("shared beavhior between range and instant queries", func(t *testing.T) { t.Run("shared beavhior between range and instant queries", func(t *testing.T) {
for _, tc := range []struct { for _, tc := range []struct {
mode string mode string
@ -236,7 +362,7 @@ func TestSeriesVolumeHandler(t *testing.T) {
"&end=1"+ "&end=1"+
"&query=%7Bfoo%3D%22bar%22%7D", nil) "&query=%7Bfoo%3D%22bar%22%7D", nil)
w := makeRequest(tc.handler(api), req) w := makeRequest(t, tc.handler(api), req)
calls := querier.GetMockedCallsByMethod("SeriesVolume") calls := querier.GetMockedCallsByMethod("SeriesVolume")
require.Len(t, calls, 1) require.Len(t, calls, 1)
@ -258,7 +384,7 @@ func TestSeriesVolumeHandler(t *testing.T) {
api := setupAPI(querier) api := setupAPI(querier)
req := httptest.NewRequest(http.MethodGet, "/series_volume?start=0&end=1&query=%7Bfoo%3D%22bar%22%7D", nil) req := httptest.NewRequest(http.MethodGet, "/series_volume?start=0&end=1&query=%7Bfoo%3D%22bar%22%7D", nil)
w := makeRequest(tc.handler(api), req) w := makeRequest(t, tc.handler(api), req)
calls := querier.GetMockedCallsByMethod("SeriesVolume") calls := querier.GetMockedCallsByMethod("SeriesVolume")
require.Len(t, calls, 1) require.Len(t, calls, 1)
@ -275,7 +401,7 @@ func TestSeriesVolumeHandler(t *testing.T) {
api := setupAPI(querier) api := setupAPI(querier)
req := httptest.NewRequest(http.MethodGet, "/series_volume?start=0&end=1&query=%7Bfoo%3D%22bar%22%7D", nil) req := httptest.NewRequest(http.MethodGet, "/series_volume?start=0&end=1&query=%7Bfoo%3D%22bar%22%7D", nil)
w := makeRequest(tc.handler(api), req) w := makeRequest(t, tc.handler(api), req)
calls := querier.GetMockedCallsByMethod("SeriesVolume") calls := querier.GetMockedCallsByMethod("SeriesVolume")
require.Len(t, calls, 1) require.Len(t, calls, 1)
@ -296,7 +422,7 @@ func TestSeriesVolumeHandler(t *testing.T) {
"&end=1"+ "&end=1"+
"&step=42"+ "&step=42"+
"&query=%7Bfoo%3D%22bar%22%7D", nil) "&query=%7Bfoo%3D%22bar%22%7D", nil)
makeRequest(api.SeriesVolumeInstantHandler, req) makeRequest(t, api.SeriesVolumeInstantHandler, req)
calls := querier.GetMockedCallsByMethod("SeriesVolume") calls := querier.GetMockedCallsByMethod("SeriesVolume")
require.Len(t, calls, 1) require.Len(t, calls, 1)
@ -315,7 +441,7 @@ func TestSeriesVolumeHandler(t *testing.T) {
"&end=1"+ "&end=1"+
"&step=42"+ "&step=42"+
"&query=%7Bfoo%3D%22bar%22%7D", nil) "&query=%7Bfoo%3D%22bar%22%7D", nil)
makeRequest(api.SeriesVolumeRangeHandler, req) makeRequest(t, api.SeriesVolumeRangeHandler, req)
calls := querier.GetMockedCallsByMethod("SeriesVolume") calls := querier.GetMockedCallsByMethod("SeriesVolume")
require.Len(t, calls, 1) require.Len(t, calls, 1)
@ -333,7 +459,7 @@ func TestSeriesVolumeHandler(t *testing.T) {
"?start=0"+ "?start=0"+
"&end=1"+ "&end=1"+
"&query=%7Bfoo%3D%22bar%22%7D", nil) "&query=%7Bfoo%3D%22bar%22%7D", nil)
makeRequest(api.SeriesVolumeRangeHandler, req) makeRequest(t, api.SeriesVolumeRangeHandler, req)
calls := querier.GetMockedCallsByMethod("SeriesVolume") calls := querier.GetMockedCallsByMethod("SeriesVolume")
require.Len(t, calls, 1) require.Len(t, calls, 1)
@ -342,3 +468,119 @@ func TestSeriesVolumeHandler(t *testing.T) {
require.Equal(t, time.Second.Milliseconds(), request.Step) require.Equal(t, time.Second.Milliseconds(), request.Step)
}) })
} }
func TestResponseFormat(t *testing.T) {
for _, tc := range []struct {
url string
accept string
handler func(api *QuerierAPI) http.HandlerFunc
result logqlmodel.Result
expectedRespone string
}{
{
url: "/api/prom/query",
handler: func(api *QuerierAPI) http.HandlerFunc {
return api.LogQueryHandler
},
result: logqlmodel.Result{
Data: logqlmodel.Streams{
logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
},
},
Labels: `{foo="bar"}`,
},
},
Statistics: statsResult,
},
expectedRespone: `{
` + statsResultString + `,
"streams": [
{
"labels": "{foo=\"bar\"}",
"entries": [
{
"line": "super line",
"ts": "1970-01-02T10:17:36.789012345Z"
}
]
}
]
}`,
},
{
url: "/loki/api/v1/query_range",
handler: func(api *QuerierAPI) http.HandlerFunc {
return api.RangeQueryHandler
},
result: logqlmodel.Result{
Data: logqlmodel.Streams{
logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
},
},
Labels: `{foo="bar"}`,
},
},
Statistics: statsResult,
},
expectedRespone: `{
"status": "success",
"data": {
"resultType": "streams",
` + statsResultString + `,
"result": [{
"stream": {"foo": "bar"},
"values": [
["123456789012345", "super line"]
]
}]
}
}`,
},
} {
t.Run(fmt.Sprintf("%s returns the expected format", tc.url), func(t *testing.T) {
engine := newEngineMock()
engine.On("Query", mock.Anything, mock.Anything).Return(queryMock{tc.result})
api := setupAPIWithEngine(engine)
req := httptest.NewRequest(http.MethodGet, tc.url+
"?start=0"+
"&end=1"+
"&query=%7Bfoo%3D%22bar%22%7D", nil)
req = req.WithContext(user.InjectOrgID(context.Background(), "1"))
w := makeRequest(t, tc.handler(api), req)
require.Equalf(t, http.StatusOK, w.Code, "unexpected response: %s", w.Body.String())
require.JSONEq(t, tc.expectedRespone, w.Body.String())
})
}
}
func makeRequest(t *testing.T, handler http.HandlerFunc, req *http.Request) *httptest.ResponseRecorder {
err := req.ParseForm()
require.NoError(t, err)
w := httptest.NewRecorder()
handler(w, req)
return w
}
func setupAPI(querier *querierMock) *QuerierAPI {
api := NewQuerierAPI(Config{}, querier, nil, log.NewNopLogger())
return api
}
func setupAPIWithEngine(engine *engineMock) *QuerierAPI {
limits, _ := validation.NewOverrides(validation.Limits{}, mockTenantLimits{})
api := NewQuerierAPI(Config{}, nil, limits, log.NewNopLogger())
api.engine = engine
return api
}

@ -23,11 +23,13 @@ import (
"github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/validation"
) )
// querierClientMock is a mockable version of QuerierClient, used in querier // querierClientMock is a mockable version of QuerierClient, used in querier
@ -547,3 +549,39 @@ func (q *querierMock) SeriesVolume(ctx context.Context, req *logproto.VolumeRequ
return resp.(*logproto.VolumeResponse), err return resp.(*logproto.VolumeResponse), err
} }
type engineMock struct {
util.ExtendedMock
}
func newEngineMock() *engineMock {
return &engineMock{}
}
func (e *engineMock) Query(p logql.Params) logql.Query {
args := e.Called(p)
return args.Get(0).(logql.Query)
}
type queryMock struct {
result logqlmodel.Result
}
func (q queryMock) Exec(_ context.Context) (logqlmodel.Result, error) {
return q.result, nil
}
type mockTenantLimits map[string]*validation.Limits
func (tl mockTenantLimits) TenantLimits(userID string) *validation.Limits {
limits, ok := tl[userID]
if !ok {
return &validation.Limits{}
}
return limits
}
func (tl mockTenantLimits) AllByUserID() map[string]*validation.Limits {
return tl
}

Loading…
Cancel
Save