Elasticsearch: Reuse http client in the backend (#55172)

* elastic: backend: reuse http client

* fixed lint error
pull/55751/head
Gábor Farkas 3 years ago committed by GitHub
parent c0ecdf6783
commit 2efd7fa481
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      pkg/tsdb/elasticsearch/client/client.go
  2. 38
      pkg/tsdb/elasticsearch/client/client_test.go
  3. 13
      pkg/tsdb/elasticsearch/elasticsearch.go
  4. 15
      pkg/tsdb/elasticsearch/elasticsearch_test.go

@ -15,16 +15,14 @@ import (
"github.com/Masterminds/semver" "github.com/Masterminds/semver"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tsdb/intervalv2" "github.com/grafana/grafana/pkg/tsdb/intervalv2"
) )
type DatasourceInfo struct { type DatasourceInfo struct {
ID int64 ID int64
HTTPClientOpts sdkhttpclient.Options HTTPClient *http.Client
URL string URL string
Database string Database string
ESVersion *semver.Version ESVersion *semver.Version
@ -42,10 +40,6 @@ var (
clientLog = log.New(loggerName) clientLog = log.New(loggerName)
) )
var newDatasourceHttpClient = func(httpClientProvider httpclient.Provider, ds *DatasourceInfo) (*http.Client, error) {
return httpClientProvider.New(ds.HTTPClientOpts)
}
// Client represents a client which can interact with elasticsearch api // Client represents a client which can interact with elasticsearch api
type Client interface { type Client interface {
GetTimeField() string GetTimeField() string
@ -56,7 +50,7 @@ type Client interface {
} }
// NewClient creates a new elasticsearch client // NewClient creates a new elasticsearch client
var NewClient = func(ctx context.Context, httpClientProvider httpclient.Provider, ds *DatasourceInfo, timeRange backend.TimeRange) (Client, error) { var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange) (Client, error) {
ip, err := newIndexPattern(ds.Interval, ds.Database) ip, err := newIndexPattern(ds.Interval, ds.Database)
if err != nil { if err != nil {
return nil, err return nil, err
@ -70,23 +64,21 @@ var NewClient = func(ctx context.Context, httpClientProvider httpclient.Provider
clientLog.Debug("Creating new client", "version", ds.ESVersion, "timeField", ds.TimeField, "indices", strings.Join(indices, ", ")) clientLog.Debug("Creating new client", "version", ds.ESVersion, "timeField", ds.TimeField, "indices", strings.Join(indices, ", "))
return &baseClientImpl{ return &baseClientImpl{
ctx: ctx, ctx: ctx,
httpClientProvider: httpClientProvider, ds: ds,
ds: ds, timeField: ds.TimeField,
timeField: ds.TimeField, indices: indices,
indices: indices, timeRange: timeRange,
timeRange: timeRange,
}, nil }, nil
} }
type baseClientImpl struct { type baseClientImpl struct {
ctx context.Context ctx context.Context
httpClientProvider httpclient.Provider ds *DatasourceInfo
ds *DatasourceInfo timeField string
timeField string indices []string
indices []string timeRange backend.TimeRange
timeRange backend.TimeRange debugEnabled bool
debugEnabled bool
} }
func (c *baseClientImpl) GetTimeField() string { func (c *baseClientImpl) GetTimeField() string {
@ -173,18 +165,13 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [
req.Header.Set("Content-Type", "application/x-ndjson") req.Header.Set("Content-Type", "application/x-ndjson")
httpClient, err := newDatasourceHttpClient(c.httpClientProvider, c.ds)
if err != nil {
return nil, err
}
start := time.Now() start := time.Now()
defer func() { defer func() {
elapsed := time.Since(start) elapsed := time.Since(start)
clientLog.Debug("Executed request", "took", elapsed) clientLog.Debug("Executed request", "took", elapsed)
}() }()
//nolint:bodyclose //nolint:bodyclose
resp, err := httpClient.Do(req) resp, err := c.ds.HTTPClient.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -12,7 +12,6 @@ import (
"github.com/Masterminds/semver" "github.com/Masterminds/semver"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/tsdb/intervalv2" "github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -20,18 +19,6 @@ import (
func TestClient_ExecuteMultisearch(t *testing.T) { func TestClient_ExecuteMultisearch(t *testing.T) {
t.Run("Given a fake http client and a client with response", func(t *testing.T) { t.Run("Given a fake http client and a client with response", func(t *testing.T) {
version, err := semver.NewVersion("8.0.0")
require.NoError(t, err)
ds := DatasourceInfo{
Database: "[metrics-]YYYY.MM.DD",
ESVersion: version,
TimeField: "@timestamp",
Interval: "Daily",
MaxConcurrentShardRequests: 6,
IncludeFrozen: true,
XPack: true,
}
var request *http.Request var request *http.Request
var requestBody *bytes.Buffer var requestBody *bytes.Buffer
@ -55,7 +42,21 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
rw.WriteHeader(200) rw.WriteHeader(200)
})) }))
ds.URL = ts.URL
version, err := semver.NewVersion("8.0.0")
require.NoError(t, err)
ds := DatasourceInfo{
URL: ts.URL,
HTTPClient: ts.Client(),
Database: "[metrics-]YYYY.MM.DD",
ESVersion: version,
TimeField: "@timestamp",
Interval: "Daily",
MaxConcurrentShardRequests: 6,
IncludeFrozen: true,
XPack: true,
}
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
@ -64,19 +65,12 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
To: to, To: to,
} }
c, err := NewClient(context.Background(), httpclient.NewProvider(), &ds, timeRange) c, err := NewClient(context.Background(), &ds, timeRange)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, c) require.NotNil(t, c)
currentNewDatasourceHTTPClient := newDatasourceHttpClient
newDatasourceHttpClient = func(httpClientProvider httpclient.Provider, ds *DatasourceInfo) (*http.Client, error) {
return ts.Client(), nil
}
t.Cleanup(func() { t.Cleanup(func() {
ts.Close() ts.Close()
newDatasourceHttpClient = currentNewDatasourceHTTPClient
}) })
ms, err := createMultisearchForTest(t, c) ms, err := createMultisearchForTest(t, c)

@ -29,7 +29,7 @@ func ProvideService(httpClientProvider httpclient.Provider) *Service {
eslog.Debug("initializing") eslog.Debug("initializing")
return &Service{ return &Service{
im: datasource.NewInstanceManager(newInstanceSettings()), im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
httpClientProvider: httpClientProvider, httpClientProvider: httpClientProvider,
intervalCalculator: intervalv2.NewCalculator(), intervalCalculator: intervalv2.NewCalculator(),
} }
@ -51,7 +51,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries") return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries")
} }
client, err := es.NewClient(ctx, s.httpClientProvider, dsInfo, req.Queries[0].TimeRange) client, err := es.NewClient(ctx, dsInfo, req.Queries[0].TimeRange)
if err != nil { if err != nil {
return &backend.QueryDataResponse{}, err return &backend.QueryDataResponse{}, err
} }
@ -60,7 +60,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return query.execute() return query.execute()
} }
func newInstanceSettings() datasource.InstanceFactoryFunc { func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
jsonData := map[string]interface{}{} jsonData := map[string]interface{}{}
err := json.Unmarshal(settings.JSONData, &jsonData) err := json.Unmarshal(settings.JSONData, &jsonData)
@ -72,6 +72,11 @@ func newInstanceSettings() datasource.InstanceFactoryFunc {
return nil, fmt.Errorf("error getting http options: %w", err) return nil, fmt.Errorf("error getting http options: %w", err)
} }
httpCli, err := httpClientProvider.New(httpCliOpts)
if err != nil {
return nil, err
}
// Set SigV4 service namespace // Set SigV4 service namespace
if httpCliOpts.SigV4 != nil { if httpCliOpts.SigV4 != nil {
httpCliOpts.SigV4.Service = "es" httpCliOpts.SigV4.Service = "es"
@ -128,7 +133,7 @@ func newInstanceSettings() datasource.InstanceFactoryFunc {
model := es.DatasourceInfo{ model := es.DatasourceInfo{
ID: settings.ID, ID: settings.ID,
URL: settings.URL, URL: settings.URL,
HTTPClientOpts: httpCliOpts, HTTPClient: httpCli,
Database: settings.Database, Database: settings.Database,
MaxConcurrentShardRequests: int64(maxConcurrentShardRequests), MaxConcurrentShardRequests: int64(maxConcurrentShardRequests),
ESVersion: version, ESVersion: version,

@ -5,6 +5,7 @@ import (
"testing" "testing"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -63,7 +64,7 @@ func TestNewInstanceSettings(t *testing.T) {
JSONData: json.RawMessage(settingsJSON), JSONData: json.RawMessage(settingsJSON),
} }
_, err = newInstanceSettings()(dsSettings) _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings)
require.NoError(t, err) require.NoError(t, err)
}) })
@ -84,7 +85,7 @@ func TestNewInstanceSettings(t *testing.T) {
JSONData: json.RawMessage(settingsJSON), JSONData: json.RawMessage(settingsJSON),
} }
_, err = newInstanceSettings()(dsSettings) _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings)
require.NoError(t, err) require.NoError(t, err)
}) })
@ -104,7 +105,7 @@ func TestNewInstanceSettings(t *testing.T) {
JSONData: json.RawMessage(settingsJSON), JSONData: json.RawMessage(settingsJSON),
} }
_, err = newInstanceSettings()(dsSettings) _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings)
require.EqualError(t, err, "elasticsearch version is required, err=elasticsearch version=1234 is not supported") require.EqualError(t, err, "elasticsearch version is required, err=elasticsearch version=1234 is not supported")
}) })
@ -124,7 +125,7 @@ func TestNewInstanceSettings(t *testing.T) {
JSONData: json.RawMessage(settingsJSON), JSONData: json.RawMessage(settingsJSON),
} }
_, err = newInstanceSettings()(dsSettings) _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings)
require.EqualError(t, err, "elasticsearch version is required, err=Invalid Semantic Version") require.EqualError(t, err, "elasticsearch version is required, err=Invalid Semantic Version")
}) })
@ -143,7 +144,7 @@ func TestNewInstanceSettings(t *testing.T) {
JSONData: json.RawMessage(settingsJSON), JSONData: json.RawMessage(settingsJSON),
} }
_, err = newInstanceSettings()(dsSettings) _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings)
require.EqualError(t, err, "elasticsearch version is required, err=elasticsearch version <nil>, cannot be cast to int") require.EqualError(t, err, "elasticsearch version is required, err=elasticsearch version <nil>, cannot be cast to int")
}) })
}) })
@ -164,7 +165,7 @@ func TestNewInstanceSettings(t *testing.T) {
JSONData: json.RawMessage(settingsJSON), JSONData: json.RawMessage(settingsJSON),
} }
_, err = newInstanceSettings()(dsSettings) _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings)
require.EqualError(t, err, "timeField cannot be cast to string") require.EqualError(t, err, "timeField cannot be cast to string")
}) })
@ -184,7 +185,7 @@ func TestNewInstanceSettings(t *testing.T) {
JSONData: json.RawMessage(settingsJSON), JSONData: json.RawMessage(settingsJSON),
} }
_, err = newInstanceSettings()(dsSettings) _, err = newInstanceSettings(httpclient.NewProvider())(dsSettings)
require.EqualError(t, err, "elasticsearch time field name is required") require.EqualError(t, err, "elasticsearch time field name is required")
}) })
}) })

Loading…
Cancel
Save