|
|
|
@ -19,9 +19,10 @@ import ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func TestClient_ExecuteMultisearch(t *testing.T) { |
|
|
|
|
t.Run("Given a fake http client and a client with response", func(t *testing.T) { |
|
|
|
|
version, err := semver.NewVersion("8.0.0") |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
httpClientScenario(t, "Given a fake http client and a v7.0 client with response", &DatasourceInfo{ |
|
|
|
|
ds := DatasourceInfo{ |
|
|
|
|
Database: "[metrics-]YYYY.MM.DD", |
|
|
|
|
ESVersion: version, |
|
|
|
|
TimeField: "@timestamp", |
|
|
|
@ -29,31 +30,70 @@ func TestClient_ExecuteMultisearch(t *testing.T) { |
|
|
|
|
MaxConcurrentShardRequests: 6, |
|
|
|
|
IncludeFrozen: true, |
|
|
|
|
XPack: true, |
|
|
|
|
}, func(sc *scenarioContext) { |
|
|
|
|
sc.responseBody = `{ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var request *http.Request |
|
|
|
|
var requestBody *bytes.Buffer |
|
|
|
|
|
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { |
|
|
|
|
request = r |
|
|
|
|
buf, err := io.ReadAll(r.Body) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
requestBody = bytes.NewBuffer(buf) |
|
|
|
|
|
|
|
|
|
rw.Header().Set("Content-Type", "application/x-ndjson") |
|
|
|
|
_, err = rw.Write([]byte( |
|
|
|
|
`{ |
|
|
|
|
"responses": [ |
|
|
|
|
{ |
|
|
|
|
"hits": { "hits": [], "max_score": 0, "total": { "value": 4656, "relation": "eq"} }, |
|
|
|
|
"status": 200 |
|
|
|
|
} |
|
|
|
|
] |
|
|
|
|
}` |
|
|
|
|
}`)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
rw.WriteHeader(200) |
|
|
|
|
})) |
|
|
|
|
ds.URL = ts.URL |
|
|
|
|
|
|
|
|
|
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) |
|
|
|
|
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) |
|
|
|
|
timeRange := backend.TimeRange{ |
|
|
|
|
From: from, |
|
|
|
|
To: to, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ms, err := createMultisearchForTest(t, sc.client) |
|
|
|
|
c, err := NewClient(context.Background(), httpclient.NewProvider(), &ds, timeRange) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
res, err := sc.client.ExecuteMultisearch(ms) |
|
|
|
|
require.NotNil(t, c) |
|
|
|
|
|
|
|
|
|
currentNewDatasourceHTTPClient := newDatasourceHttpClient |
|
|
|
|
|
|
|
|
|
newDatasourceHttpClient = func(httpClientProvider httpclient.Provider, ds *DatasourceInfo) (*http.Client, error) { |
|
|
|
|
return ts.Client(), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
t.Cleanup(func() { |
|
|
|
|
ts.Close() |
|
|
|
|
newDatasourceHttpClient = currentNewDatasourceHTTPClient |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
ms, err := createMultisearchForTest(t, c) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
res, err := c.ExecuteMultisearch(ms) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
require.NotNil(t, sc.request) |
|
|
|
|
assert.Equal(t, http.MethodPost, sc.request.Method) |
|
|
|
|
assert.Equal(t, "/_msearch", sc.request.URL.Path) |
|
|
|
|
assert.Equal(t, "max_concurrent_shard_requests=6&ignore_throttled=false", sc.request.URL.RawQuery) |
|
|
|
|
require.NotNil(t, request) |
|
|
|
|
assert.Equal(t, http.MethodPost, request.Method) |
|
|
|
|
assert.Equal(t, "/_msearch", request.URL.Path) |
|
|
|
|
assert.Equal(t, "max_concurrent_shard_requests=6&ignore_throttled=false", request.URL.RawQuery) |
|
|
|
|
|
|
|
|
|
require.NotNil(t, sc.requestBody) |
|
|
|
|
require.NotNil(t, requestBody) |
|
|
|
|
|
|
|
|
|
headerBytes, err := sc.requestBody.ReadBytes('\n') |
|
|
|
|
headerBytes, err := requestBody.ReadBytes('\n') |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
bodyBytes := sc.requestBody.Bytes() |
|
|
|
|
bodyBytes := requestBody.Bytes() |
|
|
|
|
|
|
|
|
|
jHeader, err := simplejson.NewJson(headerBytes) |
|
|
|
|
require.NoError(t, err) |
|
|
|
@ -90,62 +130,3 @@ func createMultisearchForTest(t *testing.T, c Client) (*MultiSearchRequest, erro |
|
|
|
|
}) |
|
|
|
|
return msb.Build() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type scenarioContext struct { |
|
|
|
|
client Client |
|
|
|
|
request *http.Request |
|
|
|
|
requestBody *bytes.Buffer |
|
|
|
|
responseStatus int |
|
|
|
|
responseBody string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type scenarioFunc func(*scenarioContext) |
|
|
|
|
|
|
|
|
|
func httpClientScenario(t *testing.T, desc string, ds *DatasourceInfo, fn scenarioFunc) { |
|
|
|
|
t.Helper() |
|
|
|
|
|
|
|
|
|
t.Run(desc, func(t *testing.T) { |
|
|
|
|
sc := &scenarioContext{ |
|
|
|
|
responseStatus: 200, |
|
|
|
|
responseBody: `{ "responses": [] }`, |
|
|
|
|
} |
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { |
|
|
|
|
sc.request = r |
|
|
|
|
buf, err := io.ReadAll(r.Body) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
sc.requestBody = bytes.NewBuffer(buf) |
|
|
|
|
|
|
|
|
|
rw.Header().Set("Content-Type", "application/x-ndjson") |
|
|
|
|
_, err = rw.Write([]byte(sc.responseBody)) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
rw.WriteHeader(sc.responseStatus) |
|
|
|
|
})) |
|
|
|
|
ds.URL = ts.URL |
|
|
|
|
|
|
|
|
|
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) |
|
|
|
|
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) |
|
|
|
|
timeRange := backend.TimeRange{ |
|
|
|
|
From: from, |
|
|
|
|
To: to, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
c, err := NewClient(context.Background(), httpclient.NewProvider(), ds, timeRange) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.NotNil(t, c) |
|
|
|
|
sc.client = c |
|
|
|
|
|
|
|
|
|
currentNewDatasourceHTTPClient := newDatasourceHttpClient |
|
|
|
|
|
|
|
|
|
newDatasourceHttpClient = func(httpClientProvider httpclient.Provider, ds *DatasourceInfo) (*http.Client, error) { |
|
|
|
|
return ts.Client(), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
t.Cleanup(func() { |
|
|
|
|
ts.Close() |
|
|
|
|
newDatasourceHttpClient = currentNewDatasourceHTTPClient |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
fn(sc) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|