Loki: backend: use custom http code (#44643)

* Loki: backend: use custom http code

* simplfied comment

Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com>

Co-authored-by: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com>
pull/45608/head
Gábor Farkas 4 years ago committed by GitHub
parent 380e07ba29
commit 5d704fd46e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 134
      pkg/tsdb/loki/api.go
  2. 43
      pkg/tsdb/loki/framing_test.go
  3. 57
      pkg/tsdb/loki/loki.go
  4. 3
      pkg/tsdb/loki/loki_bench_test.go

@ -0,0 +1,134 @@
package loki
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/loki/pkg/loghttp"
jsoniter "github.com/json-iterator/go"
)
type LokiAPI struct {
client *http.Client
url string
log log.Logger
}
func newLokiAPI(client *http.Client, url string, log log.Logger) *LokiAPI {
return &LokiAPI{client: client, url: url, log: log}
}
func makeRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*http.Request, error) {
qs := url.Values{}
qs.Set("query", query.Expr)
qs.Set("step", query.Step.String())
qs.Set("start", strconv.FormatInt(query.Start.UnixNano(), 10))
qs.Set("end", strconv.FormatInt(query.End.UnixNano(), 10))
lokiUrl, err := url.Parse(lokiDsUrl)
if err != nil {
return nil, err
}
lokiUrl.Path = "/loki/api/v1/query_range"
lokiUrl.RawQuery = qs.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", lokiUrl.String(), nil)
if err != nil {
return nil, err
}
// NOTE:
// 1. we are missing "dynamic" http params, like OAuth data.
// this never worked before (and it is not needed for alerting scenarios),
// so it is not a regression.
// twe need to have that when we migrate to backend-queries.
//
// 2. we will have to send a custom http header based on the VolumeQuery prop
// (again, not needed for the alerting scenario)
// if query.VolumeQuery {
// req.Header.Set("X-Query-Tags", "Source=logvolhist")
// }
return req, nil
}
type lokiError struct {
Message string
}
// we know there is an error,
// based on the http-response-body
// we have to make an informative error-object
func makeLokiError(body io.ReadCloser) error {
var buf bytes.Buffer
_, err := buf.ReadFrom(body)
if err != nil {
return err
}
bytes := buf.Bytes()
// the error-message is probably a JSON structure,
// with a string-field named "message". we want the
// value of that field.
// but, the response might be just a simple string,
// this was used in older Loki versions.
// so our approach is this:
// - we try to convert the bytes to JSON
// - we take the value of the field "message"
// - if any of these steps fail, or if "message" is empty, we return the whole text
var data lokiError
err = json.Unmarshal(bytes, &data)
if err != nil {
// we were unable to convert the bytes to JSON, we return the whole text
return fmt.Errorf("%v", string(bytes))
}
errorMessage := data.Message
if errorMessage == "" {
// we got no usable error message, we return the whole text
return fmt.Errorf("%v", string(bytes))
}
return fmt.Errorf("%v", errorMessage)
}
func (api *LokiAPI) QueryRange(ctx context.Context, query lokiQuery) (*loghttp.QueryResponse, error) {
req, err := makeRequest(ctx, api.url, query)
if err != nil {
return nil, err
}
resp, err := api.client.Do(req)
if err != nil {
return nil, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
api.log.Warn("Failed to close response body", "err", err)
}
}()
if resp.StatusCode/100 != 2 {
return nil, makeLokiError(resp.Body)
}
var response loghttp.QueryResponse
err = jsoniter.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return nil, err
}
return &response, nil
}

@ -2,6 +2,7 @@ package loki
import (
"bytes"
"context"
"io/ioutil"
"net/http"
"os"
@ -11,7 +12,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/stretchr/testify/require"
)
@ -42,7 +43,7 @@ func TestSuccessResponse(t *testing.T) {
bytes, err := os.ReadFile(responseFileName)
require.NoError(t, err)
frames, err := runQuery(makeMockedClient(200, "application/json", bytes), &lokiQuery{Expr: "up(ALERTS)", Step: time.Second * 42})
frames, err := runQuery(context.Background(), makeMockedAPI(200, "application/json", bytes), &lokiQuery{Expr: "up(ALERTS)", Step: time.Second * 42})
require.NoError(t, err)
dr := &backend.DataResponse{
@ -60,16 +61,9 @@ func TestErrorResponse(t *testing.T) {
// NOTE: when there is an error-response, it comes with
// HTTP code 400, and the format seems to change between versions:
// 2.3.x: content-type=text/plain, content is plaintext
// 2.4.0: content-type=application/json, content is plaintext !!!
// 2.4.1: same as 2.4.0
// 2.4.2: same as 2.4.0 (2.4.2 is currently the latest)
// 2.4.x: content-type=application/json, content is plaintext: https://github.com/grafana/loki/issues/4844
// main-branch: content-type=application/json, content is JSON
// we should always be able to to return some kind of error message
//
// also, the returned error message is not what we want to return
// to the user, but this is what is currently returned to the user,
// so the tests check for that. we will have to change this in the future.
tt := []struct {
name string
body []byte
@ -85,25 +79,31 @@ func TestErrorResponse(t *testing.T) {
"message": "parse error at line 1, col 8: something is wrong"
}`),
contentType: "application/json; charset=utf-8",
errorMessage: "Run out of attempts while querying the server",
errorMessage: "parse error at line 1, col 8: something is wrong",
},
{
name: "parse a non-json error body with json content type (loki 2.4.0,2.4.1,2.4.2)",
body: []byte("parse error at line 1, col 8: something is wrong"),
contentType: "application/json; charset=UTF-8",
errorMessage: "Run out of attempts while querying the server",
errorMessage: "parse error at line 1, col 8: something is wrong",
},
{
name: "parse an error response in plain text",
body: []byte("parse error at line 1, col 8: something is wrong"),
contentType: "text/plain; charset=utf-8",
errorMessage: "Run out of attempts while querying the server",
errorMessage: "parse error at line 1, col 8: something is wrong",
},
{
name: "parse an error response that is broken JSON",
body: []byte(`{"message":"error message but the JSON is not finished`),
contentType: "text/plain; charset=utf-8",
errorMessage: `{"message":"error message but the JSON is not finished`,
},
}
for _, test := range tt {
t.Run(test.name, func(t *testing.T) {
frames, err := runQuery(makeMockedClient(400, test.contentType, test.body), &lokiQuery{})
frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body), &lokiQuery{})
require.Len(t, frames, 0)
require.Error(t, err)
@ -112,13 +112,13 @@ func TestErrorResponse(t *testing.T) {
}
}
type MockedRoundTripper struct {
type mockedRoundTripper struct {
statusCode int
responseBytes []byte
contentType string
}
func (mockedRT *MockedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
func (mockedRT *mockedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
header := http.Header{}
header.Add("Content-Type", mockedRT.contentType)
return &http.Response{
@ -128,13 +128,10 @@ func (mockedRT *MockedRoundTripper) RoundTrip(req *http.Request) (*http.Response
}, nil
}
func makeMockedClient(statusCode int, contentType string, responseBytes []byte) *client.DefaultClient {
client := &client.DefaultClient{
Address: "http://localhost:9999",
Tripperware: func(t http.RoundTripper) http.RoundTripper {
return &MockedRoundTripper{statusCode: statusCode, responseBytes: responseBytes, contentType: contentType}
},
func makeMockedAPI(statusCode int, contentType string, responseBytes []byte) *LokiAPI {
client := http.Client{
Transport: &mockedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes},
}
return client
return newLokiAPI(&client, "http://localhost:9999", log.New("test"))
}

@ -2,8 +2,6 @@ package loki
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"regexp"
@ -17,12 +15,9 @@ import (
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"go.opentelemetry.io/otel/attribute"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
)
@ -45,12 +40,8 @@ var (
)
type datasourceInfo struct {
HTTPClient *http.Client
URL string
TLSClientConfig *tls.Config
BasicAuthUser string
BasicAuthPassword string
TimeInterval string `json:"timeInterval"`
HTTPClient *http.Client
URL string
}
type QueryModel struct {
@ -74,24 +65,9 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
return nil, err
}
tlsClientConfig, err := httpClientProvider.GetTLSConfig(opts)
if err != nil {
return nil, err
}
jsonData := datasourceInfo{}
err = json.Unmarshal(settings.JSONData, &jsonData)
if err != nil {
return nil, fmt.Errorf("error reading settings: %w", err)
}
model := &datasourceInfo{
HTTPClient: client,
URL: settings.URL,
TLSClientConfig: tlsClientConfig,
TimeInterval: jsonData.TimeInterval,
BasicAuthUser: settings.BasicAuthUser,
BasicAuthPassword: settings.DecryptedSecureJSONData["basicAuthPassword"],
HTTPClient: client,
URL: settings.URL,
}
return model, nil
}
@ -105,17 +81,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return result, err
}
client := &client.DefaultClient{
Address: dsInfo.URL,
Username: dsInfo.BasicAuthUser,
Password: dsInfo.BasicAuthPassword,
TLSConfig: config.TLSConfig{
InsecureSkipVerify: dsInfo.TLSClientConfig.InsecureSkipVerify,
},
Tripperware: func(t http.RoundTripper) http.RoundTripper {
return dsInfo.HTTPClient.Transport
},
}
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, s.plog)
queries, err := parseQuery(req)
if err != nil {
@ -130,7 +96,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano()))
defer span.End()
frames, err := runQuery(client, query)
frames, err := runQuery(ctx, api, query)
queryRes := backend.DataResponse{}
@ -204,15 +170,8 @@ func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (data.Frames,
}
// we extracted this part of the functionality to make it easy to unit-test it
func runQuery(client *client.DefaultClient, query *lokiQuery) (data.Frames, error) {
// `limit` only applies to log-producing queries, and we
// currently only support metric queries, so this can be set to any value.
limit := 1
// we do not use `interval`, so we set it to zero
interval := time.Duration(0)
value, err := client.QueryRange(query.Expr, limit, query.Start, query.End, logproto.BACKWARD, query.Step, interval, false)
func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery) (data.Frames, error) {
value, err := api.QueryRange(ctx, *query)
if err != nil {
return data.Frames{}, err
}

@ -1,6 +1,7 @@
package loki
import (
"context"
"fmt"
"math/rand"
"strings"
@ -15,7 +16,7 @@ func BenchmarkMatrixJson(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
_, _ = runQuery(makeMockedClient(200, "application/json", bytes), &lokiQuery{})
_, _ = runQuery(context.Background(), makeMockedAPI(200, "application/json", bytes), &lokiQuery{})
}
}

Loading…
Cancel
Save