Elasticsearch: Process ES multisearch JSON response by streaming (#93689)

Co-authored-by: Isabella Siu <isabella.siu@grafana.com>
pull/98904/head^2
Adam Yeats 4 months ago committed by GitHub
parent fc90a446c6
commit bab55a4cb8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md
  2. 1
      packages/grafana-data/src/types/featureToggles.gen.ts
  3. 6
      pkg/services/featuremgmt/registry.go
  4. 1
      pkg/services/featuremgmt/toggles_gen.csv
  5. 4
      pkg/services/featuremgmt/toggles_gen.go
  6. 15
      pkg/services/featuremgmt/toggles_gen.json
  7. 213
      pkg/tsdb/elasticsearch/client/client.go
  8. 124
      pkg/tsdb/elasticsearch/client/client_test.go

@ -233,6 +233,7 @@ Experimental features might be changed or removed without prior notice.
| `k8SFolderMove` | Enable folder's api server move | | `k8SFolderMove` | Enable folder's api server move |
| `teamHttpHeadersMimir` | Enables LBAC for datasources for Mimir to apply LBAC filtering of metrics to the client requests for users in teams | | `teamHttpHeadersMimir` | Enables LBAC for datasources for Mimir to apply LBAC filtering of metrics to the client requests for users in teams |
| `queryLibraryDashboards` | Enables Query Library feature in Dashboards | | `queryLibraryDashboards` | Enables Query Library feature in Dashboards |
| `elasticsearchImprovedParsing` | Enables less memory intensive Elasticsearch result parsing |
## Development feature toggles ## Development feature toggles

@ -253,4 +253,5 @@ export interface FeatureToggles {
ABTestFeatureToggleA?: boolean; ABTestFeatureToggleA?: boolean;
ABTestFeatureToggleB?: boolean; ABTestFeatureToggleB?: boolean;
queryLibraryDashboards?: boolean; queryLibraryDashboards?: boolean;
elasticsearchImprovedParsing?: boolean;
} }

@ -1750,6 +1750,12 @@ var (
Owner: grafanaFrontendPlatformSquad, Owner: grafanaFrontendPlatformSquad,
AllowSelfServe: false, AllowSelfServe: false,
}, },
{
Name: "elasticsearchImprovedParsing",
Description: "Enables less memory intensive Elasticsearch result parsing",
Stage: FeatureStageExperimental,
Owner: awsDatasourcesSquad,
},
} }
) )

@ -234,3 +234,4 @@ teamHttpHeadersMimir,experimental,@grafana/identity-access-team,false,false,fals
ABTestFeatureToggleA,experimental,@grafana/sharing-squad,false,false,false ABTestFeatureToggleA,experimental,@grafana/sharing-squad,false,false,false
ABTestFeatureToggleB,experimental,@grafana/sharing-squad,false,false,false ABTestFeatureToggleB,experimental,@grafana/sharing-squad,false,false,false
queryLibraryDashboards,experimental,@grafana/grafana-frontend-platform,false,false,false queryLibraryDashboards,experimental,@grafana/grafana-frontend-platform,false,false,false
elasticsearchImprovedParsing,experimental,@grafana/aws-datasources,false,false,false

1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
234 ABTestFeatureToggleA experimental @grafana/sharing-squad false false false
235 ABTestFeatureToggleB experimental @grafana/sharing-squad false false false
236 queryLibraryDashboards experimental @grafana/grafana-frontend-platform false false false
237 elasticsearchImprovedParsing experimental @grafana/aws-datasources false false false

@ -946,4 +946,8 @@ const (
// FlagQueryLibraryDashboards // FlagQueryLibraryDashboards
// Enables Query Library feature in Dashboards // Enables Query Library feature in Dashboards
FlagQueryLibraryDashboards = "queryLibraryDashboards" FlagQueryLibraryDashboards = "queryLibraryDashboards"
// FlagElasticsearchImprovedParsing
// Enables less memory intensive Elasticsearch result parsing
FlagElasticsearchImprovedParsing = "elasticsearchImprovedParsing"
) )

@ -1321,6 +1321,21 @@
"codeowner": "@grafana/aws-datasources" "codeowner": "@grafana/aws-datasources"
} }
}, },
{
"metadata": {
"name": "elasticsearchImprovedParsing",
"resourceVersion": "1736808262603",
"creationTimestamp": "2025-01-13T20:32:35Z",
"annotations": {
"grafana.app/updatedTimestamp": "2025-01-13 22:44:22.603729 +0000 UTC"
}
},
"spec": {
"description": "Enables less memory intensive Elasticsearch result parsing",
"stage": "experimental",
"codeowner": "@grafana/aws-datasources"
}
},
{ {
"metadata": { "metadata": {
"name": "enableDatagridEditing", "name": "enableDatagridEditing",

@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
@ -20,6 +21,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing" "github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
) )
// Used in logging to mark a stage // Used in logging to mark a stage
@ -202,8 +204,6 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
c.logger.Info("Response received from Elasticsearch", "status", "ok", "statusCode", res.StatusCode, "contentLength", res.ContentLength, "duration", time.Since(start), "stage", StageDatabaseRequest) c.logger.Info("Response received from Elasticsearch", "status", "ok", "statusCode", res.StatusCode, "contentLength", res.ContentLength, "duration", time.Since(start), "stage", StageDatabaseRequest)
start = time.Now() start = time.Now()
var msr MultiSearchResponse
dec := json.NewDecoder(res.Body)
_, resSpan := tracing.DefaultTracer().Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch.decodeResponse") _, resSpan := tracing.DefaultTracer().Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch.decodeResponse")
defer func() { defer func() {
if err != nil { if err != nil {
@ -212,19 +212,218 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
} }
resSpan.End() resSpan.End()
}() }()
err = dec.Decode(&msr)
var msr MultiSearchResponse
improvedParsingEnabled := isFeatureEnabled(c.ctx, featuremgmt.FlagElasticsearchImprovedParsing)
if improvedParsingEnabled {
err = StreamMultiSearchResponse(res.Body, &msr)
} else {
dec := json.NewDecoder(res.Body)
err = dec.Decode(&msr)
}
if err != nil { if err != nil {
c.logger.Error("Failed to decode response from Elasticsearch", "error", err, "duration", time.Since(start)) c.logger.Error("Failed to decode response from Elasticsearch", "error", err, "duration", time.Since(start), "improvedParsingEnabled", improvedParsingEnabled)
return nil, err return nil, err
} }
c.logger.Debug("Completed decoding of response from Elasticsearch", "duration", time.Since(start)) c.logger.Debug("Completed decoding of response from Elasticsearch", "duration", time.Since(start), "improvedParsingEnabled", improvedParsingEnabled)
msr.Status = res.StatusCode msr.Status = res.StatusCode
return &msr, nil return &msr, nil
} }
// StreamMultiSearchResponse processes the JSON response in a streaming fashion
func StreamMultiSearchResponse(body io.Reader, msr *MultiSearchResponse) error {
dec := json.NewDecoder(body)
_, err := dec.Token() // reads the `{` opening brace
if err != nil {
return err
}
for dec.More() {
tok, err := dec.Token()
if err != nil {
return err
}
if tok == "responses" {
_, err := dec.Token() // reads the `[` opening bracket for responses array
if err != nil {
return err
}
for dec.More() {
var sr SearchResponse
_, err := dec.Token() // reads `{` for each SearchResponse
if err != nil {
return err
}
for dec.More() {
field, err := dec.Token()
if err != nil {
return err
}
switch field {
case "hits":
sr.Hits = &SearchResponseHits{}
err := processHits(dec, &sr)
if err != nil {
return err
}
case "aggregations":
err := dec.Decode(&sr.Aggregations)
if err != nil {
return err
}
case "error":
err := dec.Decode(&sr.Error)
if err != nil {
return err
}
default:
// skip over unknown fields
err := skipUnknownField(dec)
if err != nil {
return err
}
}
}
msr.Responses = append(msr.Responses, &sr)
_, err = dec.Token() // reads `}` closing for each SearchResponse
if err != nil {
return err
}
}
_, err = dec.Token() // reads the `]` closing bracket for responses array
if err != nil {
return err
}
} else {
err := skipUnknownField(dec)
if err != nil {
return err
}
}
}
_, err = dec.Token() // reads the `}` closing brace for the entire JSON
return err
}
// processHits processes the hits in the JSON response incrementally.
func processHits(dec *json.Decoder, sr *SearchResponse) error {
tok, err := dec.Token() // reads the `{` opening brace for the hits object
if err != nil {
return err
}
if tok != json.Delim('{') {
return fmt.Errorf("expected '{' for hits object, got %v", tok)
}
for dec.More() {
tok, err := dec.Token()
if err != nil {
return err
}
if tok == "hits" {
if err := streamHitsArray(dec, sr); err != nil {
return err
}
} else {
// ignore these fields as they are not used in the current implementation
err := skipUnknownField(dec)
if err != nil {
return err
}
}
}
// read the closing `}` for the hits object
_, err = dec.Token()
if err != nil {
return err
}
return nil
}
// streamHitsArray processes the hits array field incrementally.
func streamHitsArray(dec *json.Decoder, sr *SearchResponse) error {
tok, err := dec.Token()
if err != nil {
return err
}
// read the opening `[` for the hits array
if tok != json.Delim('[') {
return fmt.Errorf("expected '[' for hits array, got %v", tok)
}
for dec.More() {
var hit map[string]interface{}
err = dec.Decode(&hit)
if err != nil {
return err
}
sr.Hits.Hits = append(sr.Hits.Hits, hit)
}
// read the closing bracket `]` for the hits array
tok, err = dec.Token()
if err != nil {
return err
}
if tok != json.Delim(']') {
return fmt.Errorf("expected ']' for closing hits array, got %v", tok)
}
return nil
}
// skipUnknownField skips over an unknown JSON field's value in the stream.
func skipUnknownField(dec *json.Decoder) error {
tok, err := dec.Token()
if err != nil {
return err
}
switch tok {
case json.Delim('{'):
// skip everything inside the object until we reach the closing `}`
for dec.More() {
if err := skipUnknownField(dec); err != nil {
return err
}
}
_, err = dec.Token() // read the closing `}`
return err
case json.Delim('['):
// skip everything inside the array until we reach the closing `]`
for dec.More() {
if err := skipUnknownField(dec); err != nil {
return err
}
}
_, err = dec.Token() // read the closing `]`
return err
default:
// no further action needed for primitives
return nil
}
}
func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest { func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest {
multiRequests := []*multiRequest{} multiRequests := []*multiRequest{}
@ -264,3 +463,7 @@ func (c *baseClientImpl) getMultiSearchQueryParameters() string {
func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder { func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder {
return NewMultiSearchRequestBuilder() return NewMultiSearchRequestBuilder()
} }
func isFeatureEnabled(ctx context.Context, feature string) bool {
return backend.GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled(feature)
}

@ -6,6 +6,7 @@ import (
"io" "io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings"
"testing" "testing"
"time" "time"
@ -286,6 +287,129 @@ func TestClient_Index(t *testing.T) {
} }
} }
func TestStreamMultiSearchResponse_Success(t *testing.T) {
jsonBody := `
{
"responses": [
{ "hits": { "hits": [] } },
{ "hits": { "hits": [] } }
]
}`
msr := &MultiSearchResponse{}
err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if len(msr.Responses) != 2 {
t.Errorf("expected 2 responses, got %d", len(msr.Responses))
}
}
func TestStreamMultiSearchResponse_MalformedJSON(t *testing.T) {
jsonBody := `
{
"responses": [
{ "hits": { "hits": [] } }
` // Missing closing braces
msr := &MultiSearchResponse{}
err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr)
if err == nil {
t.Fatalf("expected an error, got none")
}
}
func TestStreamMultiSearchResponse_MissingResponses(t *testing.T) {
jsonBody := `
{
"something_else": [
{ "hits": { "hits": [] } }
]
}`
msr := &MultiSearchResponse{}
err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if len(msr.Responses) != 0 {
t.Errorf("expected 0 responses, got %d", len(msr.Responses))
}
}
func TestStreamMultiSearchResponse_EmptyBody(t *testing.T) {
jsonBody := `{}`
msr := &MultiSearchResponse{}
err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if len(msr.Responses) != 0 {
t.Errorf("expected 0 responses, got %d", len(msr.Responses))
}
}
func TestStreamMultiSearchResponse_InvalidJSONStart(t *testing.T) {
jsonBody := `invalid_json`
msr := &MultiSearchResponse{}
err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr)
if err == nil {
t.Fatalf("expected an error due to invalid JSON, got none")
}
}
func TestStreamMultiSearchResponse_InvalidHitsField(t *testing.T) {
jsonBody := `
{
"responses": [
{ "hits": "invalid_string_value" }
]
}`
msr := &MultiSearchResponse{}
err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr)
if err == nil {
t.Fatalf("expected an error due to invalid 'hits' field, got none")
}
if err.Error() != "expected '{' for hits object, got invalid_string_value" {
t.Errorf("unexpected error message: %v", err)
}
}
func TestStreamMultiSearchResponse_InvalidHitElement(t *testing.T) {
jsonBody := `
{
"responses": [
{ "hits": { "hits": ["invalid_element"] } }
]
}`
msr := &MultiSearchResponse{}
err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr)
if err == nil {
t.Fatalf("expected an error due to invalid element in 'hits' array, got none")
}
expected := "json: cannot unmarshal string into Go value of type map[string]interface {}"
if err.Error() != expected {
t.Errorf("unexpected error message: expected %v, got %v", expected, err)
}
}
func createMultisearchForTest(t *testing.T, c Client, timeRange backend.TimeRange) (*MultiSearchRequest, error) { func createMultisearchForTest(t *testing.T, c Client, timeRange backend.TimeRange) (*MultiSearchRequest, error) {
t.Helper() t.Helper()

Loading…
Cancel
Save