From 2efd7fa48131c7d5fd0b12fac651b52c49b78cca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Farkas?= Date: Mon, 26 Sep 2022 14:27:46 +0200 Subject: [PATCH] Elasticsearch: Reuse http client in the backend (#55172) * elastic: backend: reuse http client * fixed lint error --- pkg/tsdb/elasticsearch/client/client.go | 41 +++++++------------- pkg/tsdb/elasticsearch/client/client_test.go | 38 ++++++++---------- pkg/tsdb/elasticsearch/elasticsearch.go | 13 +++++-- pkg/tsdb/elasticsearch/elasticsearch_test.go | 15 +++---- 4 files changed, 47 insertions(+), 60 deletions(-) diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index d77a2f945ce..f105f645797 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -15,16 +15,14 @@ import ( "github.com/Masterminds/semver" "github.com/grafana/grafana-plugin-sdk-go/backend" - sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/tsdb/intervalv2" ) type DatasourceInfo struct { ID int64 - HTTPClientOpts sdkhttpclient.Options + HTTPClient *http.Client URL string Database string ESVersion *semver.Version @@ -42,10 +40,6 @@ var ( clientLog = log.New(loggerName) ) -var newDatasourceHttpClient = func(httpClientProvider httpclient.Provider, ds *DatasourceInfo) (*http.Client, error) { - return httpClientProvider.New(ds.HTTPClientOpts) -} - // Client represents a client which can interact with elasticsearch api type Client interface { GetTimeField() string @@ -56,7 +50,7 @@ type Client interface { } // NewClient creates a new elasticsearch client -var NewClient = func(ctx context.Context, httpClientProvider httpclient.Provider, ds *DatasourceInfo, timeRange backend.TimeRange) (Client, error) { +var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange) (Client, error) { ip, err := newIndexPattern(ds.Interval, ds.Database) if err != nil { return nil, err @@ -70,23 +64,21 @@ var NewClient = func(ctx context.Context, httpClientProvider httpclient.Provider clientLog.Debug("Creating new client", "version", ds.ESVersion, "timeField", ds.TimeField, "indices", strings.Join(indices, ", ")) return &baseClientImpl{ - ctx: ctx, - httpClientProvider: httpClientProvider, - ds: ds, - timeField: ds.TimeField, - indices: indices, - timeRange: timeRange, + ctx: ctx, + ds: ds, + timeField: ds.TimeField, + indices: indices, + timeRange: timeRange, }, nil } type baseClientImpl struct { - ctx context.Context - httpClientProvider httpclient.Provider - ds *DatasourceInfo - timeField string - indices []string - timeRange backend.TimeRange - debugEnabled bool + ctx context.Context + ds *DatasourceInfo + timeField string + indices []string + timeRange backend.TimeRange + debugEnabled bool } func (c *baseClientImpl) GetTimeField() string { @@ -173,18 +165,13 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [ req.Header.Set("Content-Type", "application/x-ndjson") - httpClient, err := newDatasourceHttpClient(c.httpClientProvider, c.ds) - if err != nil { - return nil, err - } - start := time.Now() defer func() { elapsed := time.Since(start) clientLog.Debug("Executed request", "took", elapsed) }() //nolint:bodyclose - resp, err := httpClient.Do(req) + resp, err := c.ds.HTTPClient.Do(req) if err != nil { return nil, err } diff --git a/pkg/tsdb/elasticsearch/client/client_test.go b/pkg/tsdb/elasticsearch/client/client_test.go index 43a7d794022..94a366bdbf1 100644 --- a/pkg/tsdb/elasticsearch/client/client_test.go +++ b/pkg/tsdb/elasticsearch/client/client_test.go @@ -12,7 +12,6 @@ import ( "github.com/Masterminds/semver" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/tsdb/intervalv2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,18 +19,6 @@ import ( func TestClient_ExecuteMultisearch(t *testing.T) { t.Run("Given a fake http client and a client with response", func(t *testing.T) { - version, err := semver.NewVersion("8.0.0") - require.NoError(t, err) - ds := DatasourceInfo{ - Database: "[metrics-]YYYY.MM.DD", - ESVersion: version, - TimeField: "@timestamp", - Interval: "Daily", - MaxConcurrentShardRequests: 6, - IncludeFrozen: true, - XPack: true, - } - var request *http.Request var requestBody *bytes.Buffer @@ -55,7 +42,21 @@ func TestClient_ExecuteMultisearch(t *testing.T) { require.NoError(t, err) rw.WriteHeader(200) })) - ds.URL = ts.URL + + version, err := semver.NewVersion("8.0.0") + require.NoError(t, err) + + ds := DatasourceInfo{ + URL: ts.URL, + HTTPClient: ts.Client(), + Database: "[metrics-]YYYY.MM.DD", + ESVersion: version, + TimeField: "@timestamp", + Interval: "Daily", + MaxConcurrentShardRequests: 6, + IncludeFrozen: true, + XPack: true, + } from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) @@ -64,19 +65,12 @@ func TestClient_ExecuteMultisearch(t *testing.T) { To: to, } - c, err := NewClient(context.Background(), httpclient.NewProvider(), &ds, timeRange) + c, err := NewClient(context.Background(), &ds, timeRange) require.NoError(t, err) require.NotNil(t, c) - currentNewDatasourceHTTPClient := newDatasourceHttpClient - - newDatasourceHttpClient = func(httpClientProvider httpclient.Provider, ds *DatasourceInfo) (*http.Client, error) { - return ts.Client(), nil - } - t.Cleanup(func() { ts.Close() - newDatasourceHttpClient = currentNewDatasourceHTTPClient }) ms, err := createMultisearchForTest(t, c) diff --git a/pkg/tsdb/elasticsearch/elasticsearch.go b/pkg/tsdb/elasticsearch/elasticsearch.go index 547e70ac986..00be0b20ff2 100644 --- a/pkg/tsdb/elasticsearch/elasticsearch.go +++ b/pkg/tsdb/elasticsearch/elasticsearch.go @@ -29,7 +29,7 @@ func ProvideService(httpClientProvider httpclient.Provider) *Service { eslog.Debug("initializing") return &Service{ - im: datasource.NewInstanceManager(newInstanceSettings()), + im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), httpClientProvider: httpClientProvider, intervalCalculator: intervalv2.NewCalculator(), } @@ -51,7 +51,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries") } - client, err := es.NewClient(ctx, s.httpClientProvider, dsInfo, req.Queries[0].TimeRange) + client, err := es.NewClient(ctx, dsInfo, req.Queries[0].TimeRange) if err != nil { return &backend.QueryDataResponse{}, err } @@ -60,7 +60,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) return query.execute() } -func newInstanceSettings() datasource.InstanceFactoryFunc { +func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc { return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { jsonData := map[string]interface{}{} err := json.Unmarshal(settings.JSONData, &jsonData) @@ -72,6 +72,11 @@ func newInstanceSettings() datasource.InstanceFactoryFunc { return nil, fmt.Errorf("error getting http options: %w", err) } + httpCli, err := httpClientProvider.New(httpCliOpts) + if err != nil { + return nil, err + } + // Set SigV4 service namespace if httpCliOpts.SigV4 != nil { httpCliOpts.SigV4.Service = "es" @@ -128,7 +133,7 @@ func newInstanceSettings() datasource.InstanceFactoryFunc { model := es.DatasourceInfo{ ID: settings.ID, URL: settings.URL, - HTTPClientOpts: httpCliOpts, + HTTPClient: httpCli, Database: settings.Database, MaxConcurrentShardRequests: int64(maxConcurrentShardRequests), ESVersion: version, diff --git a/pkg/tsdb/elasticsearch/elasticsearch_test.go b/pkg/tsdb/elasticsearch/elasticsearch_test.go index 93a86f76bef..4de838cddaa 100644 --- a/pkg/tsdb/elasticsearch/elasticsearch_test.go +++ b/pkg/tsdb/elasticsearch/elasticsearch_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/stretchr/testify/require" ) @@ -63,7 +64,7 @@ func TestNewInstanceSettings(t *testing.T) { JSONData: json.RawMessage(settingsJSON), } - _, err = newInstanceSettings()(dsSettings) + _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings) require.NoError(t, err) }) @@ -84,7 +85,7 @@ func TestNewInstanceSettings(t *testing.T) { JSONData: json.RawMessage(settingsJSON), } - _, err = newInstanceSettings()(dsSettings) + _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings) require.NoError(t, err) }) @@ -104,7 +105,7 @@ func TestNewInstanceSettings(t *testing.T) { JSONData: json.RawMessage(settingsJSON), } - _, err = newInstanceSettings()(dsSettings) + _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings) require.EqualError(t, err, "elasticsearch version is required, err=elasticsearch version=1234 is not supported") }) @@ -124,7 +125,7 @@ func TestNewInstanceSettings(t *testing.T) { JSONData: json.RawMessage(settingsJSON), } - _, err = newInstanceSettings()(dsSettings) + _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings) require.EqualError(t, err, "elasticsearch version is required, err=Invalid Semantic Version") }) @@ -143,7 +144,7 @@ func TestNewInstanceSettings(t *testing.T) { JSONData: json.RawMessage(settingsJSON), } - _, err = newInstanceSettings()(dsSettings) + _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings) require.EqualError(t, err, "elasticsearch version is required, err=elasticsearch version , cannot be cast to int") }) }) @@ -164,7 +165,7 @@ func TestNewInstanceSettings(t *testing.T) { JSONData: json.RawMessage(settingsJSON), } - _, err = newInstanceSettings()(dsSettings) + _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings) require.EqualError(t, err, "timeField cannot be cast to string") }) @@ -184,7 +185,7 @@ func TestNewInstanceSettings(t *testing.T) { JSONData: json.RawMessage(settingsJSON), } - _, err = newInstanceSettings()(dsSettings) + _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings) require.EqualError(t, err, "elasticsearch time field name is required") }) })