@ -1,6 +1,7 @@
package elasticsearch
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -14,6 +15,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
)
@ -39,7 +41,8 @@ const (
var searchWordsRegex = regexp . MustCompile ( regexp . QuoteMeta ( es . HighlightPreTagsString ) + ` (.*?) ` + regexp . QuoteMeta ( es . HighlightPostTagsString ) )
func parseResponse ( responses [ ] * es . SearchResponse , targets [ ] * Query , configuredFields es . ConfiguredFields ) ( * backend . QueryDataResponse , error ) {
func parseResponse ( ctx context . Context , responses [ ] * es . SearchResponse , targets [ ] * Query , configuredFields es . ConfiguredFields , logger log . Logger ) ( * backend . QueryDataResponse , error ) {
start := time . Now ( )
result := backend . QueryDataResponse {
Responses : backend . Responses { } ,
}
@ -51,6 +54,9 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF
target := targets [ i ]
if res . Error != nil {
mt , _ := json . Marshal ( target )
me , _ := json . Marshal ( res . Error )
logger . Error ( "Error response from Elasticsearch" , "error" , string ( me ) , "query" , string ( mt ) )
errResult := getErrorFromElasticResponse ( res )
result . Responses [ target . RefID ] = backend . DataResponse {
Error : errors . New ( errResult ) ,
@ -61,20 +67,23 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF
queryRes := backend . DataResponse { }
if isRawDataQuery ( target ) {
err := processRawDataResponse ( res , target , configuredFields , & queryRes )
err := processRawDataResponse ( res , target , configuredFields , & queryRes , logger )
if err != nil {
// TODO: This error never happens so we should remove it
return & backend . QueryDataResponse { } , err
}
result . Responses [ target . RefID ] = queryRes
} else if isRawDocumentQuery ( target ) {
err := processRawDocumentResponse ( res , target , & queryRes )
err := processRawDocumentResponse ( res , target , & queryRes , logger )
if err != nil {
// TODO: This error never happens so we should remove it
return & backend . QueryDataResponse { } , err
}
result . Responses [ target . RefID ] = queryRes
} else if isLogsQuery ( target ) {
err := processLogsResponse ( res , target , configuredFields , & queryRes )
err := processLogsResponse ( res , target , configuredFields , & queryRes , logger )
if err != nil {
// TODO: This error never happens so we should remove it
return & backend . QueryDataResponse { } , err
}
result . Responses [ target . RefID ] = queryRes
@ -82,7 +91,10 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF
// Process as metric query result
props := make ( map [ string ] string )
err := processBuckets ( res . Aggregations , target , & queryRes , props , 0 )
logger . Debug ( "Processed metric query response" )
if err != nil {
mt , _ := json . Marshal ( target )
logger . Error ( "Error processing buckets" , "error" , err , "query" , string ( mt ) , "aggregationsLength" , len ( res . Aggregations ) )
return & backend . QueryDataResponse { } , err
}
nameFields ( queryRes , target )
@ -91,10 +103,12 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF
result . Responses [ target . RefID ] = queryRes
}
}
logger . Info ( "Finished processing responses" , "duration" , time . Since ( start ) , "responsesLength" , len ( result . Responses ) , "queriesLength" , len ( targets ) , "action" , "parseResponse" )
return & result , nil
}
func processLogsResponse ( res * es . SearchResponse , target * Query , configuredFields es . ConfiguredFields , queryRes * backend . DataResponse ) error {
func processLogsResponse ( res * es . SearchResponse , target * Query , configuredFields es . ConfiguredFields , queryRes * backend . DataResponse , logger log . Logger ) error {
propNames := make ( map [ string ] bool )
docs := make ( [ ] map [ string ] interface { } , len ( res . Hits . Hits ) )
searchWords := make ( map [ string ] bool )
@ -168,12 +182,13 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields
setPreferredVisType ( frame , data . VisTypeLogs )
setLogsCustomMeta ( frame , searchWords , stringToIntWithDefaultValue ( target . Metrics [ 0 ] . Settings . Get ( "limit" ) . MustString ( ) , defaultSize ) )
frames = append ( frames , frame )
queryRes . Frames = frames
logger . Debug ( "Processed log query response" , "fieldsLength" , len ( frame . Fields ) )
return nil
}
func processRawDataResponse ( res * es . SearchResponse , target * Query , configuredFields es . ConfiguredFields , queryRes * backend . DataResponse ) error {
func processRawDataResponse ( res * es . SearchResponse , target * Query , configuredFields es . ConfiguredFields , queryRes * backend . DataResponse , logger log . Logger ) error {
propNames := make ( map [ string ] bool )
docs := make ( [ ] map [ string ] interface { } , len ( res . Hits . Hits ) )
@ -207,13 +222,15 @@ func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFie
frames := data . Frames { }
frame := data . NewFrame ( "" , fields ... )
frames = append ( frames , frame )
frames = append ( frames , frame )
queryRes . Frames = frames
logger . Debug ( "Processed raw data query response" , "fieldsLength" , len ( frame . Fields ) )
return nil
}
func processRawDocumentResponse ( res * es . SearchResponse , target * Query , queryRes * backend . DataResponse ) error {
func processRawDocumentResponse ( res * es . SearchResponse , target * Query , queryRes * backend . DataResponse , logger log . Logger ) error {
docs := make ( [ ] map [ string ] interface { } , len ( res . Hits . Hits ) )
for hitIdx , hit := range res . Hits . Hits {
doc := map [ string ] interface { } {
@ -266,6 +283,7 @@ func processRawDocumentResponse(res *es.SearchResponse, target *Query, queryRes
frames = append ( frames , frame )
queryRes . Frames = frames
logger . Debug ( "Processed raw document query response" , "fieldsLength" , len ( frame . Fields ) )
return nil
}
@ -650,32 +668,32 @@ func processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataRe
case countType :
countFrames , err := processCountMetric ( jsonBuckets , props )
if err != nil {
return err
return fmt . Errorf ( " error processing count metric: %w" , err )
}
frames = append ( frames , countFrames ... )
case percentilesType :
percentileFrames , err := processPercentilesMetric ( metric , jsonBuckets , props )
if err != nil {
return err
return fmt . Errorf ( " error processing percentiles metric: %w" , err )
}
frames = append ( frames , percentileFrames ... )
case topMetricsType :
topMetricsFrames , err := processTopMetricsMetric ( metric , jsonBuckets , props )
if err != nil {
return err
return fmt . Errorf ( " error processing top metrics metric: %w" , err )
}
frames = append ( frames , topMetricsFrames ... )
case extendedStatsType :
extendedStatsFrames , err := processExtendedStatsMetric ( metric , jsonBuckets , props )
if err != nil {
return err
return fmt . Errorf ( " error processing extended stats metric: %w" , err )
}
frames = append ( frames , extendedStatsFrames ... )
default :
defaultFrames , err := processDefaultMetric ( metric , jsonBuckets , props )
if err != nil {
return err
return fmt . Errorf ( " error processing default metric: %w" , err )
}
frames = append ( frames , defaultFrames ... )
}
@ -713,7 +731,7 @@ func processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Q
} else {
f , err := bucket . Get ( "key" ) . Float64 ( )
if err != nil {
return err
return fmt . Errorf ( " error appending bucket key to existing field with name %s: %w" , field . Name , err )
}
field . Append ( & f )
}
@ -728,7 +746,7 @@ func processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Q
} else {
f , err := bucket . Get ( "key" ) . Float64 ( )
if err != nil {
return err
return fmt . Errorf ( " error appending bucket key to new field with name %s: %w" , aggDef . Field , err )
}
aggDefField = extractDataField ( aggDef . Field , & f )
aggDefField . Append ( & f )