From 6767adb4a61dd364b59c241248455a4b29fa80a3 Mon Sep 17 00:00:00 2001 From: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> Date: Mon, 16 Jun 2025 15:52:35 +0200 Subject: [PATCH] Loki: Implement error source (#106766) * Loki: Implement error source * Add tests --- pkg/tsdb/loki/api.go | 27 ++++++------ pkg/tsdb/loki/api_test.go | 83 ++++++++++++++++++++++++++++++++++++ pkg/tsdb/loki/loki.go | 29 +++++++------ pkg/tsdb/loki/parse_query.go | 4 +- pkg/tsdb/loki/step.go | 4 +- 5 files changed, 116 insertions(+), 31 deletions(-) diff --git a/pkg/tsdb/loki/api.go b/pkg/tsdb/loki/api.go index 7d4f629c5ed..89dd9f615ef 100644 --- a/pkg/tsdb/loki/api.go +++ b/pkg/tsdb/loki/api.go @@ -11,7 +11,6 @@ import ( "net/url" "path" "strconv" - "syscall" "time" jsoniter "github.com/json-iterator/go" @@ -60,7 +59,7 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery, cat lokiUrl, err := url.Parse(lokiDsUrl) if err != nil { - return nil, err + return nil, backend.DownstreamError(fmt.Errorf("failed to parse Loki URL: %w", err)) } switch query.QueryType { @@ -87,14 +86,14 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery, cat lokiUrl.Path = path.Join(lokiUrl.Path, "/loki/api/v1/query") } default: - return nil, fmt.Errorf("invalid QueryType: %v", query.QueryType) + return nil, backend.DownstreamError(fmt.Errorf("invalid QueryType: %v", query.QueryType)) } lokiUrl.RawQuery = qs.Encode() req, err := http.NewRequestWithContext(ctx, "GET", lokiUrl.String(), nil) if err != nil { - return nil, err + return nil, backend.DownstreamError(fmt.Errorf("failed to create request: %w", err)) } if query.SupportingQueryType != SupportingQueryNone { @@ -183,13 +182,11 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts if resp != nil { lp = append(lp, "statusCode", resp.StatusCode) } - api.log.Error("Error received from Loki", lp...) - res := backend.DataResponse{ - Error: err, - } - if errors.Is(err, syscall.ECONNREFUSED) { - res.ErrorSource = backend.ErrorSourceDownstream + api.log.Debug("Error received from Loki", lp...) + if backend.IsDownstreamHTTPError(err) { + err = backend.DownstreamError(err) } + res := backend.ErrorResponseWithErrorSource(err) return &res, nil } @@ -208,7 +205,7 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts ErrorSource: backend.ErrorSourceFromHTTPStatus(resp.StatusCode), } lp = append(lp, "status", "error", "error", err, "statusSource", res.ErrorSource) - api.log.Error("Error received from Loki", lp...) + api.log.Debug("Error received from Loki", lp...) return &res, nil } else { lp = append(lp, "status", "ok") @@ -226,7 +223,7 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts span.RecordError(res.Error) span.SetStatus(codes.Error, res.Error.Error()) instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "error") - api.log.Error("Error parsing response from loki", "error", res.Error, "duration", time.Since(start), "stage", stageParseResponse) + api.log.Debug("Error parsing response from loki", "error", res.Error, "duration", time.Since(start), "stage", stageParseResponse) return nil, res.Error } instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "ok") @@ -265,7 +262,7 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiR api.log.Debug("Sending raw query to loki", "resourcePath", resourcePath) req, err := makeRawRequest(ctx, api.url, resourcePath) if err != nil { - api.log.Error("Failed to prepare request to loki", "error", err, "resourcePath", resourcePath) + api.log.Debug("Failed to prepare request to loki", "error", err, "resourcePath", resourcePath) return RawLokiResponse{}, err } start := time.Now() @@ -279,7 +276,7 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiR if resp != nil { lp = append(lp, "statusCode", resp.StatusCode) } - api.log.Error("Error received from Loki", lp...) + api.log.Debug("Error received from Loki", lp...) return RawLokiResponse{}, err } @@ -298,7 +295,7 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiR body, err := io.ReadAll(resp.Body) if err != nil { - api.log.Error("Error reading response body bytes", "error", err) + api.log.Debug("Error reading response body bytes", "error", err) return RawLokiResponse{}, err } diff --git a/pkg/tsdb/loki/api_test.go b/pkg/tsdb/loki/api_test.go index d7acd1c791d..d48bed634b1 100644 --- a/pkg/tsdb/loki/api_test.go +++ b/pkg/tsdb/loki/api_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery" "github.com/stretchr/testify/require" ) @@ -280,3 +281,85 @@ func TestApiReturnValues(t *testing.T) { require.ErrorContains(t, err, "foo") }) } + +func TestErrorSources(t *testing.T) { + errorResponse := []byte(`{"message": "test error"}`) + + t.Run("should set correct error source for downstream errors", func(t *testing.T) { + called := false + api := makeMockedAPI(400, "application/json", errorResponse, func(req *http.Request) { + called = true + }, false) + + res, err := api.DataQuery(context.Background(), lokiQuery{QueryType: QueryTypeRange}, ResponseOpts{}) + require.NoError(t, err) + require.True(t, called) + require.NotNil(t, res.Error) + require.Equal(t, backend.ErrorSourceDownstream, res.ErrorSource) + }) + + t.Run("should set correct error source for plugin errors", func(t *testing.T) { + called := false + api := makeMockedAPI(406, "application/json", errorResponse, func(req *http.Request) { + called = true + }, false) + + res, err := api.DataQuery(context.Background(), lokiQuery{QueryType: QueryTypeRange}, ResponseOpts{}) + require.NoError(t, err) + require.True(t, called) + require.NotNil(t, res.Error) + require.Equal(t, backend.ErrorSourcePlugin, res.ErrorSource) + }) + + t.Run("should set correct error source for server errors", func(t *testing.T) { + called := false + api := makeMockedAPI(500, "application/json", errorResponse, func(req *http.Request) { + called = true + }, false) + + res, err := api.DataQuery(context.Background(), lokiQuery{QueryType: QueryTypeRange}, ResponseOpts{}) + require.NoError(t, err) + require.True(t, called) + require.NotNil(t, res.Error) + require.Equal(t, backend.ErrorSourceDownstream, res.ErrorSource) + }) + + t.Run("should handle downstream HTTP errors", func(t *testing.T) { + called := false + api := makeMockedAPI(400, "application/json", errorResponse, func(req *http.Request) { + called = true + }, false) + + res, err := api.DataQuery(context.Background(), lokiQuery{QueryType: QueryTypeRange}, ResponseOpts{}) + require.NoError(t, err) + require.True(t, called) + require.NotNil(t, res.Error) + require.Equal(t, backend.ErrorSourceDownstream, res.ErrorSource) + require.Contains(t, res.Error.Error(), "test error") + }) + + t.Run("should handle client errors in RawQuery", func(t *testing.T) { + called := false + api := makeMockedAPI(400, "application/json", errorResponse, func(req *http.Request) { + called = true + }, false) + + res, err := api.RawQuery(context.Background(), "/loki/api/v1/labels") + require.NoError(t, err) + require.True(t, called) + require.Equal(t, 400, res.Status) + require.Contains(t, string(res.Body), "test error") + }) + + t.Run("should handle server errors in RawQuery", func(t *testing.T) { + called := false + api := makeMockedAPI(500, "application/json", errorResponse, func(req *http.Request) { + called = true + }, false) + + _, err := api.RawQuery(context.Background(), "/loki/api/v1/labels") + require.Error(t, err) + require.True(t, called) + require.Contains(t, err.Error(), "test error") + }) +} diff --git a/pkg/tsdb/loki/loki.go b/pkg/tsdb/loki/loki.go index 4b8563e5405..c3b56578153 100644 --- a/pkg/tsdb/loki/loki.go +++ b/pkg/tsdb/loki/loki.go @@ -84,20 +84,23 @@ type ResponseOpts struct { func parseQueryModel(raw json.RawMessage) (*QueryJSONModel, error) { model := &QueryJSONModel{} err := json.Unmarshal(raw, model) - return model, err + if err != nil { + return nil, backend.DownstreamError(fmt.Errorf("failed to parse query model: %w", err)) + } + return model, nil } func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.InstanceFactoryFunc { return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { opts, err := settings.HTTPClientOptions(ctx) if err != nil { - return nil, err + return nil, backend.DownstreamError(fmt.Errorf("error reading settings: %w", err)) } opts.ForwardHTTPHeaders = true client, err := httpClientProvider.New(opts) if err != nil { - return nil, err + return nil, backend.DownstreamError(fmt.Errorf("error creating http client: %w", err)) } model := &datasourceInfo{ @@ -173,9 +176,8 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) _, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName] logger := s.logger.FromContext(ctx).With("fromAlert", fromAlert) if err != nil { - logger.Error("Failed to get data source info", "err", err) - result := backend.NewQueryDataResponse() - return result, err + logger.Debug("Failed to get data source info", "err", err) + return nil, err } responseOpts := ResponseOpts{ @@ -214,11 +216,11 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datas start := time.Now() queries, err := parseQuery(req, logQLScopes) if err != nil { - plog.Error("Failed to prepare request to Loki", "error", err, "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest) + plog.Debug("Failed to prepare request to Loki", "error", err, "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest) return result, err } - plog.Info("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest, "runInParallel", runInParallel) + plog.Debug("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest, "runInParallel", runInParallel) ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries", trace.WithAttributes( attribute.Bool("runInParallel", runInParallel), @@ -274,7 +276,8 @@ func executeQuery(ctx context.Context, query *lokiQuery, req *backend.QueryDataR if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - queryRes.Error = err + errResp := backend.ErrorResponseWithErrorSource(err) + queryRes = &errResp } return *queryRes @@ -284,7 +287,7 @@ func executeQuery(ctx context.Context, query *lokiQuery, req *backend.QueryDataR func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts, plog log.Logger) (*backend.DataResponse, error) { res, err := api.DataQuery(ctx, *query, responseOpts) if err != nil { - plog.Error("Error querying loki", "error", err) + plog.Debug("Error querying loki", "error", err) return res, err } @@ -296,7 +299,7 @@ func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts err = adjustFrame(frame, query, false, responseOpts.logsDataplane) if err != nil { - plog.Error("Error adjusting frame", "error", err) + plog.Debug("Error adjusting frame", "error", err) return res, err } } @@ -307,12 +310,12 @@ func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*datasourceInfo, error) { i, err := s.im.Get(ctx, pluginCtx) if err != nil { - return nil, err + return nil, backend.DownstreamError(fmt.Errorf("failed to get data source info: %w", err)) } instance, ok := i.(*datasourceInfo) if !ok { - return nil, fmt.Errorf("failed to cast data source info") + return nil, backend.DownstreamError(fmt.Errorf("failed to cast data source info")) } return instance, nil diff --git a/pkg/tsdb/loki/parse_query.go b/pkg/tsdb/loki/parse_query.go index ce45715eb29..06cf90cb339 100644 --- a/pkg/tsdb/loki/parse_query.go +++ b/pkg/tsdb/loki/parse_query.go @@ -77,7 +77,7 @@ func parseQueryType(jsonPointerValue *string) (QueryType, error) { case "range": return QueryTypeRange, nil default: - return QueryTypeRange, fmt.Errorf("invalid queryType: %s", jsonValue) + return QueryTypeRange, backend.DownstreamError(fmt.Errorf("invalid queryType: %s", jsonValue)) } } } @@ -97,7 +97,7 @@ func parseDirection(jsonPointerValue *string) (Direction, error) { case "scan": return DirectionBackward, nil default: - return DirectionBackward, fmt.Errorf("invalid queryDirection: %s", jsonValue) + return DirectionBackward, backend.DownstreamError(fmt.Errorf("invalid queryDirection: %s", jsonValue)) } } } diff --git a/pkg/tsdb/loki/step.go b/pkg/tsdb/loki/step.go index 4023abfe1db..54c12b2aad2 100644 --- a/pkg/tsdb/loki/step.go +++ b/pkg/tsdb/loki/step.go @@ -1,9 +1,11 @@ package loki import ( + "fmt" "math" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/gtime" ) @@ -33,7 +35,7 @@ func calculateStep(interval time.Duration, timeRange time.Duration, resolution i step, err := gtime.ParseIntervalStringToTimeDuration(*queryStep) if err != nil { - return step, err + return step, backend.DownstreamError(fmt.Errorf("failed to parse query step: %w", err)) } return time.Duration(step.Nanoseconds() * resolution), nil