Loki: Implement error source (#106766)

* Loki: Implement error source

* Add tests
pull/106782/head
Ivana Huckova 1 month ago committed by GitHub
parent dbe815ee68
commit 6767adb4a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 27
      pkg/tsdb/loki/api.go
  2. 83
      pkg/tsdb/loki/api_test.go
  3. 29
      pkg/tsdb/loki/loki.go
  4. 4
      pkg/tsdb/loki/parse_query.go
  5. 4
      pkg/tsdb/loki/step.go

@ -11,7 +11,6 @@ import (
"net/url" "net/url"
"path" "path"
"strconv" "strconv"
"syscall"
"time" "time"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
@ -60,7 +59,7 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery, cat
lokiUrl, err := url.Parse(lokiDsUrl) lokiUrl, err := url.Parse(lokiDsUrl)
if err != nil { if err != nil {
return nil, err return nil, backend.DownstreamError(fmt.Errorf("failed to parse Loki URL: %w", err))
} }
switch query.QueryType { switch query.QueryType {
@ -87,14 +86,14 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery, cat
lokiUrl.Path = path.Join(lokiUrl.Path, "/loki/api/v1/query") lokiUrl.Path = path.Join(lokiUrl.Path, "/loki/api/v1/query")
} }
default: default:
return nil, fmt.Errorf("invalid QueryType: %v", query.QueryType) return nil, backend.DownstreamError(fmt.Errorf("invalid QueryType: %v", query.QueryType))
} }
lokiUrl.RawQuery = qs.Encode() lokiUrl.RawQuery = qs.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", lokiUrl.String(), nil) req, err := http.NewRequestWithContext(ctx, "GET", lokiUrl.String(), nil)
if err != nil { if err != nil {
return nil, err return nil, backend.DownstreamError(fmt.Errorf("failed to create request: %w", err))
} }
if query.SupportingQueryType != SupportingQueryNone { if query.SupportingQueryType != SupportingQueryNone {
@ -183,13 +182,11 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts
if resp != nil { if resp != nil {
lp = append(lp, "statusCode", resp.StatusCode) lp = append(lp, "statusCode", resp.StatusCode)
} }
api.log.Error("Error received from Loki", lp...) api.log.Debug("Error received from Loki", lp...)
res := backend.DataResponse{ if backend.IsDownstreamHTTPError(err) {
Error: err, err = backend.DownstreamError(err)
}
if errors.Is(err, syscall.ECONNREFUSED) {
res.ErrorSource = backend.ErrorSourceDownstream
} }
res := backend.ErrorResponseWithErrorSource(err)
return &res, nil return &res, nil
} }
@ -208,7 +205,7 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts
ErrorSource: backend.ErrorSourceFromHTTPStatus(resp.StatusCode), ErrorSource: backend.ErrorSourceFromHTTPStatus(resp.StatusCode),
} }
lp = append(lp, "status", "error", "error", err, "statusSource", res.ErrorSource) lp = append(lp, "status", "error", "error", err, "statusSource", res.ErrorSource)
api.log.Error("Error received from Loki", lp...) api.log.Debug("Error received from Loki", lp...)
return &res, nil return &res, nil
} else { } else {
lp = append(lp, "status", "ok") lp = append(lp, "status", "ok")
@ -226,7 +223,7 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts
span.RecordError(res.Error) span.RecordError(res.Error)
span.SetStatus(codes.Error, res.Error.Error()) span.SetStatus(codes.Error, res.Error.Error())
instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "error") instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "error")
api.log.Error("Error parsing response from loki", "error", res.Error, "duration", time.Since(start), "stage", stageParseResponse) api.log.Debug("Error parsing response from loki", "error", res.Error, "duration", time.Since(start), "stage", stageParseResponse)
return nil, res.Error return nil, res.Error
} }
instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "ok") instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "ok")
@ -265,7 +262,7 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiR
api.log.Debug("Sending raw query to loki", "resourcePath", resourcePath) api.log.Debug("Sending raw query to loki", "resourcePath", resourcePath)
req, err := makeRawRequest(ctx, api.url, resourcePath) req, err := makeRawRequest(ctx, api.url, resourcePath)
if err != nil { if err != nil {
api.log.Error("Failed to prepare request to loki", "error", err, "resourcePath", resourcePath) api.log.Debug("Failed to prepare request to loki", "error", err, "resourcePath", resourcePath)
return RawLokiResponse{}, err return RawLokiResponse{}, err
} }
start := time.Now() start := time.Now()
@ -279,7 +276,7 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiR
if resp != nil { if resp != nil {
lp = append(lp, "statusCode", resp.StatusCode) lp = append(lp, "statusCode", resp.StatusCode)
} }
api.log.Error("Error received from Loki", lp...) api.log.Debug("Error received from Loki", lp...)
return RawLokiResponse{}, err return RawLokiResponse{}, err
} }
@ -298,7 +295,7 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiR
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
api.log.Error("Error reading response body bytes", "error", err) api.log.Debug("Error reading response body bytes", "error", err)
return RawLokiResponse{}, err return RawLokiResponse{}, err
} }

@ -7,6 +7,7 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery" "github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -280,3 +281,85 @@ func TestApiReturnValues(t *testing.T) {
require.ErrorContains(t, err, "foo") require.ErrorContains(t, err, "foo")
}) })
} }
func TestErrorSources(t *testing.T) {
errorResponse := []byte(`{"message": "test error"}`)
t.Run("should set correct error source for downstream errors", func(t *testing.T) {
called := false
api := makeMockedAPI(400, "application/json", errorResponse, func(req *http.Request) {
called = true
}, false)
res, err := api.DataQuery(context.Background(), lokiQuery{QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
require.NotNil(t, res.Error)
require.Equal(t, backend.ErrorSourceDownstream, res.ErrorSource)
})
t.Run("should set correct error source for plugin errors", func(t *testing.T) {
called := false
api := makeMockedAPI(406, "application/json", errorResponse, func(req *http.Request) {
called = true
}, false)
res, err := api.DataQuery(context.Background(), lokiQuery{QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
require.NotNil(t, res.Error)
require.Equal(t, backend.ErrorSourcePlugin, res.ErrorSource)
})
t.Run("should set correct error source for server errors", func(t *testing.T) {
called := false
api := makeMockedAPI(500, "application/json", errorResponse, func(req *http.Request) {
called = true
}, false)
res, err := api.DataQuery(context.Background(), lokiQuery{QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
require.NotNil(t, res.Error)
require.Equal(t, backend.ErrorSourceDownstream, res.ErrorSource)
})
t.Run("should handle downstream HTTP errors", func(t *testing.T) {
called := false
api := makeMockedAPI(400, "application/json", errorResponse, func(req *http.Request) {
called = true
}, false)
res, err := api.DataQuery(context.Background(), lokiQuery{QueryType: QueryTypeRange}, ResponseOpts{})
require.NoError(t, err)
require.True(t, called)
require.NotNil(t, res.Error)
require.Equal(t, backend.ErrorSourceDownstream, res.ErrorSource)
require.Contains(t, res.Error.Error(), "test error")
})
t.Run("should handle client errors in RawQuery", func(t *testing.T) {
called := false
api := makeMockedAPI(400, "application/json", errorResponse, func(req *http.Request) {
called = true
}, false)
res, err := api.RawQuery(context.Background(), "/loki/api/v1/labels")
require.NoError(t, err)
require.True(t, called)
require.Equal(t, 400, res.Status)
require.Contains(t, string(res.Body), "test error")
})
t.Run("should handle server errors in RawQuery", func(t *testing.T) {
called := false
api := makeMockedAPI(500, "application/json", errorResponse, func(req *http.Request) {
called = true
}, false)
_, err := api.RawQuery(context.Background(), "/loki/api/v1/labels")
require.Error(t, err)
require.True(t, called)
require.Contains(t, err.Error(), "test error")
})
}

@ -84,20 +84,23 @@ type ResponseOpts struct {
func parseQueryModel(raw json.RawMessage) (*QueryJSONModel, error) { func parseQueryModel(raw json.RawMessage) (*QueryJSONModel, error) {
model := &QueryJSONModel{} model := &QueryJSONModel{}
err := json.Unmarshal(raw, model) err := json.Unmarshal(raw, model)
return model, err if err != nil {
return nil, backend.DownstreamError(fmt.Errorf("failed to parse query model: %w", err))
}
return model, nil
} }
func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.InstanceFactoryFunc { func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.InstanceFactoryFunc {
return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
opts, err := settings.HTTPClientOptions(ctx) opts, err := settings.HTTPClientOptions(ctx)
if err != nil { if err != nil {
return nil, err return nil, backend.DownstreamError(fmt.Errorf("error reading settings: %w", err))
} }
opts.ForwardHTTPHeaders = true opts.ForwardHTTPHeaders = true
client, err := httpClientProvider.New(opts) client, err := httpClientProvider.New(opts)
if err != nil { if err != nil {
return nil, err return nil, backend.DownstreamError(fmt.Errorf("error creating http client: %w", err))
} }
model := &datasourceInfo{ model := &datasourceInfo{
@ -173,9 +176,8 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName] _, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
logger := s.logger.FromContext(ctx).With("fromAlert", fromAlert) logger := s.logger.FromContext(ctx).With("fromAlert", fromAlert)
if err != nil { if err != nil {
logger.Error("Failed to get data source info", "err", err) logger.Debug("Failed to get data source info", "err", err)
result := backend.NewQueryDataResponse() return nil, err
return result, err
} }
responseOpts := ResponseOpts{ responseOpts := ResponseOpts{
@ -214,11 +216,11 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datas
start := time.Now() start := time.Now()
queries, err := parseQuery(req, logQLScopes) queries, err := parseQuery(req, logQLScopes)
if err != nil { if err != nil {
plog.Error("Failed to prepare request to Loki", "error", err, "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest) plog.Debug("Failed to prepare request to Loki", "error", err, "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest)
return result, err return result, err
} }
plog.Info("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest, "runInParallel", runInParallel) plog.Debug("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest, "runInParallel", runInParallel)
ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries", trace.WithAttributes( ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries", trace.WithAttributes(
attribute.Bool("runInParallel", runInParallel), attribute.Bool("runInParallel", runInParallel),
@ -274,7 +276,8 @@ func executeQuery(ctx context.Context, query *lokiQuery, req *backend.QueryDataR
if err != nil { if err != nil {
span.RecordError(err) span.RecordError(err)
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
queryRes.Error = err errResp := backend.ErrorResponseWithErrorSource(err)
queryRes = &errResp
} }
return *queryRes return *queryRes
@ -284,7 +287,7 @@ func executeQuery(ctx context.Context, query *lokiQuery, req *backend.QueryDataR
func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts, plog log.Logger) (*backend.DataResponse, error) { func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts, plog log.Logger) (*backend.DataResponse, error) {
res, err := api.DataQuery(ctx, *query, responseOpts) res, err := api.DataQuery(ctx, *query, responseOpts)
if err != nil { if err != nil {
plog.Error("Error querying loki", "error", err) plog.Debug("Error querying loki", "error", err)
return res, err return res, err
} }
@ -296,7 +299,7 @@ func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts
err = adjustFrame(frame, query, false, responseOpts.logsDataplane) err = adjustFrame(frame, query, false, responseOpts.logsDataplane)
if err != nil { if err != nil {
plog.Error("Error adjusting frame", "error", err) plog.Debug("Error adjusting frame", "error", err)
return res, err return res, err
} }
} }
@ -307,12 +310,12 @@ func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts
func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*datasourceInfo, error) { func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*datasourceInfo, error) {
i, err := s.im.Get(ctx, pluginCtx) i, err := s.im.Get(ctx, pluginCtx)
if err != nil { if err != nil {
return nil, err return nil, backend.DownstreamError(fmt.Errorf("failed to get data source info: %w", err))
} }
instance, ok := i.(*datasourceInfo) instance, ok := i.(*datasourceInfo)
if !ok { if !ok {
return nil, fmt.Errorf("failed to cast data source info") return nil, backend.DownstreamError(fmt.Errorf("failed to cast data source info"))
} }
return instance, nil return instance, nil

@ -77,7 +77,7 @@ func parseQueryType(jsonPointerValue *string) (QueryType, error) {
case "range": case "range":
return QueryTypeRange, nil return QueryTypeRange, nil
default: default:
return QueryTypeRange, fmt.Errorf("invalid queryType: %s", jsonValue) return QueryTypeRange, backend.DownstreamError(fmt.Errorf("invalid queryType: %s", jsonValue))
} }
} }
} }
@ -97,7 +97,7 @@ func parseDirection(jsonPointerValue *string) (Direction, error) {
case "scan": case "scan":
return DirectionBackward, nil return DirectionBackward, nil
default: default:
return DirectionBackward, fmt.Errorf("invalid queryDirection: %s", jsonValue) return DirectionBackward, backend.DownstreamError(fmt.Errorf("invalid queryDirection: %s", jsonValue))
} }
} }
} }

@ -1,9 +1,11 @@
package loki package loki
import ( import (
"fmt"
"math" "math"
"time" "time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime" "github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
) )
@ -33,7 +35,7 @@ func calculateStep(interval time.Duration, timeRange time.Duration, resolution i
step, err := gtime.ParseIntervalStringToTimeDuration(*queryStep) step, err := gtime.ParseIntervalStringToTimeDuration(*queryStep)
if err != nil { if err != nil {
return step, err return step, backend.DownstreamError(fmt.Errorf("failed to parse query step: %w", err))
} }
return time.Duration(step.Nanoseconds() * resolution), nil return time.Duration(step.Nanoseconds() * resolution), nil

Loading…
Cancel
Save