Elastic: Support request cancellation properly (Uses new backendSrv.fetch Observable request API) (#30009)

* Elastic: Fixes so templating queries work

* Chore: fixes test

* Fix: fixes getFields from metricFindQuery

* Elastic: Support request cancellation properly

* Refactor: refactors tests

Co-authored-by: Elfo404 <gio.ricci@grafana.com>
pull/30096/head
Hugo Häggmark 5 years ago committed by GitHub
parent b094621196
commit b2d5466933
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      public/app/plugins/datasource/elasticsearch/components/QueryEditor/BucketAggregationsEditor/BucketAggregationEditor.tsx
  2. 2
      public/app/plugins/datasource/elasticsearch/components/QueryEditor/MetricAggregationsEditor/MetricEditor.tsx
  3. 588
      public/app/plugins/datasource/elasticsearch/datasource.test.ts
  4. 149
      public/app/plugins/datasource/elasticsearch/datasource.ts

@ -48,7 +48,7 @@ export const BucketAggregationEditor: FunctionComponent<QueryMetricEditorProps>
}
};
return (await get()).map(toSelectableValue);
return (await get().toPromise()).map(toSelectableValue);
};
return (

@ -83,7 +83,7 @@ export const MetricEditor: FunctionComponent<Props> = ({ value }) => {
return datasource.getFields('number');
};
return (await get()).map(toSelectableValue);
return (await get().toPromise()).map(toSelectableValue);
};
return (

@ -1,23 +1,26 @@
import _ from 'lodash';
import { Observable, of, throwError } from 'rxjs';
import {
ArrayVector,
CoreApp,
DataQueryRequest,
DataSourceInstanceSettings,
DataSourcePluginMeta,
dateMath,
DateTime,
dateTime,
Field,
MetricFindValue,
MutableDataFrame,
TimeRange,
toUtc,
} from '@grafana/data';
import _ from 'lodash';
import { BackendSrvRequest, FetchResponse } from '@grafana/runtime';
import { ElasticDatasource, enhanceDataFrame } from './datasource';
import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__
import { TemplateSrv } from 'app/features/templating/template_srv';
import { ElasticsearchOptions, ElasticsearchQuery } from './types';
import { Filters } from './components/QueryEditor/BucketAggregationsEditor/aggregations';
import { createFetchResponse } from '../../../../test/helpers/createFetchResponse';
const ELASTICSEARCH_MOCK_URL = 'http://elasticsearch.local';
@ -42,12 +45,27 @@ const createTimeRange = (from: DateTime, to: DateTime): TimeRange => ({
},
});
describe('ElasticDatasource', function(this: any) {
const datasourceRequestMock = jest.spyOn(backendSrv, 'datasourceRequest');
beforeEach(() => {
interface Args {
data?: any;
from?: string;
jsonData?: any;
database?: string;
mockImplementation?: (options: BackendSrvRequest) => Observable<FetchResponse>;
}
function getTestContext({
data = {},
from = 'now-5m',
jsonData = {},
database = '[asd-]YYYY.MM.DD',
mockImplementation = undefined,
}: Args = {}) {
jest.clearAllMocks();
});
const defaultMock = (options: BackendSrvRequest) => of(createFetchResponse(data));
const fetchMock = jest.spyOn(backendSrv, 'fetch');
fetchMock.mockImplementation(mockImplementation ?? defaultMock);
const templateSrv: any = {
replace: jest.fn(text => {
@ -60,72 +78,63 @@ describe('ElasticDatasource', function(this: any) {
getAdhocFilters: jest.fn(() => []),
};
interface TestContext {
ds: ElasticDatasource;
}
const ctx = {} as TestContext;
function createTimeSrv(from: string) {
const srv: any = {
time: { from: from, to: 'now' },
const timeSrv: any = {
time: { from, to: 'now' },
};
srv.timeRange = jest.fn(() => {
timeSrv.timeRange = jest.fn(() => {
return {
from: dateMath.parse(srv.time.from, false),
to: dateMath.parse(srv.time.to, true),
from: dateMath.parse(timeSrv.time.from, false),
to: dateMath.parse(timeSrv.time.to, true),
};
});
srv.setTime = jest.fn(time => {
srv.time = time;
timeSrv.setTime = jest.fn(time => {
timeSrv.time = time;
});
return srv;
}
const instanceSettings: DataSourceInstanceSettings<ElasticsearchOptions> = {
id: 1,
meta: {} as DataSourcePluginMeta,
name: 'test-elastic',
type: 'type',
uid: 'uid',
url: ELASTICSEARCH_MOCK_URL,
database,
jsonData,
};
function createDatasource(instanceSettings: DataSourceInstanceSettings<ElasticsearchOptions>) {
instanceSettings.jsonData = instanceSettings.jsonData || ({} as ElasticsearchOptions);
ctx.ds = new ElasticDatasource(instanceSettings, templateSrv as TemplateSrv);
}
const ds = new ElasticDatasource(instanceSettings, templateSrv);
describe('When testing datasource with index pattern', () => {
beforeEach(() => {
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: '[asd-]YYYY.MM.DD',
jsonData: { interval: 'Daily', esVersion: 2 } as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
});
return { timeSrv, ds, fetchMock };
}
describe('ElasticDatasource', function(this: any) {
describe('When testing datasource with index pattern', () => {
it('should translate index pattern to current day', () => {
let requestOptions: any;
datasourceRequestMock.mockImplementation(options => {
requestOptions = options;
return Promise.resolve({ data: {} });
});
const { ds, fetchMock } = getTestContext({ jsonData: { interval: 'Daily', esVersion: 2 } });
ctx.ds.testDatasource();
ds.testDatasource();
const today = toUtc().format('YYYY.MM.DD');
expect(requestOptions.url).toBe(`${ELASTICSEARCH_MOCK_URL}/asd-${today}/_mapping`);
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0][0].url).toBe(`${ELASTICSEARCH_MOCK_URL}/asd-${today}/_mapping`);
});
});
describe('When issuing metric query with interval pattern', () => {
let requestOptions: any, parts: any, header: any, query: any, result: any;
beforeEach(async () => {
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: '[asd-]YYYY.MM.DD',
jsonData: { interval: 'Daily', esVersion: 2 } as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
datasourceRequestMock.mockImplementation(options => {
requestOptions = options;
return Promise.resolve({
data: {
async function runScenario() {
const range = { from: toUtc([2015, 4, 30, 10]), to: toUtc([2015, 5, 1, 10]) };
const targets = [
{
alias: '$varAlias',
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '1' }],
metrics: [{ type: 'count', id: '1' }],
query: 'escape\\:test',
},
];
const query: any = { range, targets };
const data = {
responses: [
{
aggregations: {
@ -140,64 +149,68 @@ describe('ElasticDatasource', function(this: any) {
},
},
],
},
});
});
};
const { ds, fetchMock } = getTestContext({ jsonData: { interval: 'Daily', esVersion: 2 }, data });
query = {
range: {
from: toUtc([2015, 4, 30, 10]),
to: toUtc([2015, 5, 1, 10]),
},
targets: [
let result: any = {};
await expect(ds.query(query)).toEmitValuesWith(received => {
expect(received.length).toBe(1);
expect(received[0]).toEqual({
data: [
{
alias: '$varAlias',
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '1' }],
metrics: [{ type: 'count', id: '1' }],
query: 'escape\\:test',
datapoints: [[10, 1000]],
metric: 'count',
props: {},
refId: undefined,
target: 'resolvedVariable',
},
],
};
});
result = received[0];
});
result = await ctx.ds.query(query);
expect(fetchMock).toHaveBeenCalledTimes(1);
const requestOptions = fetchMock.mock.calls[0][0];
const parts = requestOptions.data.split('\n');
const header = JSON.parse(parts[0]);
const body = JSON.parse(parts[1]);
parts = requestOptions.data.split('\n');
header = JSON.parse(parts[0]);
});
return { result, body, header, query };
}
it('should translate index pattern to current day', () => {
it('should translate index pattern to current day', async () => {
const { header } = await runScenario();
expect(header.index).toEqual(['asd-2015.05.30', 'asd-2015.05.31', 'asd-2015.06.01']);
});
it('should not resolve the variable in the original alias field in the query', () => {
it('should not resolve the variable in the original alias field in the query', async () => {
const { query } = await runScenario();
expect(query.targets[0].alias).toEqual('$varAlias');
});
it('should resolve the alias variable for the alias/target in the result', () => {
it('should resolve the alias variable for the alias/target in the result', async () => {
const { result } = await runScenario();
expect(result.data[0].target).toEqual('resolvedVariable');
});
it('should json escape lucene query', () => {
const body = JSON.parse(parts[1]);
it('should json escape lucene query', async () => {
const { body } = await runScenario();
expect(body.query.bool.filter[1].query_string.query).toBe('escape\\:test');
});
});
describe('When issuing logs query with interval pattern', () => {
async function setupDataSource(jsonData?: Partial<ElasticsearchOptions>) {
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: 'mock-index',
jsonData: {
jsonData = {
interval: 'Daily',
esVersion: 2,
timeField: '@timestamp',
...(jsonData || {}),
} as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
datasourceRequestMock.mockImplementation(options => {
return Promise.resolve(logsResponse);
};
const { ds } = getTestContext({
jsonData,
data: logsResponse.data,
database: 'mock-index',
});
const query: DataQueryRequest<ElasticsearchQuery> = {
@ -206,7 +219,13 @@ describe('ElasticDatasource', function(this: any) {
{
alias: '$varAlias',
refId: 'A',
bucketAggs: [{ type: 'date_histogram', settings: { interval: 'auto' }, id: '2' }],
bucketAggs: [
{
type: 'date_histogram',
settings: { interval: 'auto' },
id: '2',
},
],
metrics: [{ type: 'count', id: '1' }],
query: 'escape\\:test',
isLogsQuery: true,
@ -215,8 +234,14 @@ describe('ElasticDatasource', function(this: any) {
],
} as DataQueryRequest<ElasticsearchQuery>;
const queryBuilderSpy = jest.spyOn(ctx.ds.queryBuilder, 'getLogsQuery');
const response = await ctx.ds.query(query);
const queryBuilderSpy = jest.spyOn(ds.queryBuilder, 'getLogsQuery');
let response: any = {};
await expect(ds.query(query)).toEmitValuesWith(received => {
expect(received.length).toBe(1);
response = received[0];
});
return { queryBuilderSpy, response };
}
@ -243,43 +268,35 @@ describe('ElasticDatasource', function(this: any) {
});
describe('When issuing document query', () => {
let requestOptions: any, parts: any, header: any;
async function runScenario() {
const range = createTimeRange(dateTime([2015, 4, 30, 10]), dateTime([2015, 5, 1, 10]));
const targets = [{ refId: 'A', metrics: [{ type: 'raw_document', id: '1' }], query: 'test' }];
const query: any = { range, targets };
const data = { responses: [] };
beforeEach(() => {
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: 'test',
jsonData: { esVersion: 2 } as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
const { ds, fetchMock } = getTestContext({ jsonData: { esVersion: 2 }, data, database: 'test' });
datasourceRequestMock.mockImplementation(options => {
requestOptions = options;
return Promise.resolve({ data: { responses: [] } });
await expect(ds.query(query)).toEmitValuesWith(received => {
expect(received.length).toBe(1);
expect(received[0]).toEqual({ data: [] });
});
const query: DataQueryRequest<ElasticsearchQuery> = {
range: createTimeRange(dateTime([2015, 4, 30, 10]), dateTime([2015, 5, 1, 10])),
targets: [
{
refId: 'A',
metrics: [{ type: 'raw_document', id: '1' }],
query: 'test',
},
],
} as DataQueryRequest<ElasticsearchQuery>;
ctx.ds.query(query);
expect(fetchMock).toHaveBeenCalledTimes(1);
const requestOptions = fetchMock.mock.calls[0][0];
const parts = requestOptions.data.split('\n');
const header = JSON.parse(parts[0]);
const body = JSON.parse(parts[1]);
parts = requestOptions.data.split('\n');
header = JSON.parse(parts[0]);
});
return { body, header };
}
it('should set search type to query_then_fetch', () => {
it('should set search type to query_then_fetch', async () => {
const { header } = await runScenario();
expect(header.search_type).toEqual('query_then_fetch');
});
it('should set size', () => {
const body = JSON.parse(parts[1]);
it('should set size', async () => {
const { body } = await runScenario();
expect(body.size).toBe(500);
});
});
@ -298,15 +315,9 @@ describe('ElasticDatasource', function(this: any) {
],
} as DataQueryRequest<ElasticsearchQuery>;
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: '[asd-]YYYY.MM.DD',
jsonData: { interval: 'Daily', esVersion: 7 } as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
it('should process it properly', async () => {
datasourceRequestMock.mockImplementation(() => {
return Promise.resolve({
const { ds } = getTestContext({
jsonData: { interval: 'Daily', esVersion: 7 },
data: {
took: 1,
responses: [
@ -319,23 +330,24 @@ describe('ElasticDatasource', function(this: any) {
],
},
});
});
const errObject = {
data: '{\n "reason": "all shards failed"\n}',
message: 'all shards failed',
config: {
url: 'http://localhost:3000/api/tsdb/query',
},
};
try {
await ctx.ds.query(query);
} catch (err) {
expect(err).toEqual(errObject);
}
await expect(ds.query(query)).toEmitValuesWith(received => {
expect(received.length).toBe(1);
expect(received[0]).toEqual(errObject);
});
});
it('should properly throw an unknown error', async () => {
datasourceRequestMock.mockImplementation(() => {
return Promise.resolve({
const { ds } = getTestContext({
jsonData: { interval: 'Daily', esVersion: 7 },
data: {
took: 1,
responses: [
@ -346,32 +358,24 @@ describe('ElasticDatasource', function(this: any) {
],
},
});
});
const errObject = {
data: '{}',
message: 'Unknown elastic error response',
config: {
url: 'http://localhost:3000/api/tsdb/query',
},
};
try {
await ctx.ds.query(query);
} catch (err) {
expect(err).toEqual(errObject);
}
await expect(ds.query(query)).toEmitValuesWith(received => {
expect(received.length).toBe(1);
expect(received[0]).toEqual(errObject);
});
});
});
describe('When getting fields', () => {
beforeEach(() => {
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: 'metricbeat',
jsonData: { esVersion: 50 } as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
datasourceRequestMock.mockImplementation(options => {
return Promise.resolve({
data: {
const data = {
metricbeat: {
mappings: {
metricsets: {
@ -416,14 +420,14 @@ describe('ElasticDatasource', function(this: any) {
},
},
},
},
});
});
});
};
it('should return nested fields', async () => {
const fieldObjects = await ctx.ds.getFields();
const { ds } = getTestContext({ data, jsonData: { esVersion: 50 }, database: 'metricbeat' });
await expect(ds.getFields()).toEmitValuesWith(received => {
expect(received.length).toBe(1);
const fieldObjects = received[0];
const fields = _.map(fieldObjects, 'text');
expect(fields).toEqual([
@ -439,29 +443,35 @@ describe('ElasticDatasource', function(this: any) {
'system.process.name',
]);
});
});
it('should return number fields', async () => {
const fieldObjects = await ctx.ds.getFields('number');
const { ds } = getTestContext({ data, jsonData: { esVersion: 50 }, database: 'metricbeat' });
await expect(ds.getFields('number')).toEmitValuesWith(received => {
expect(received.length).toBe(1);
const fieldObjects = received[0];
const fields = _.map(fieldObjects, 'text');
expect(fields).toEqual(['system.cpu.system', 'system.cpu.user', 'system.process.cpu.total']);
});
});
it('should return date fields', async () => {
const fieldObjects = await ctx.ds.getFields('date');
const { ds } = getTestContext({ data, jsonData: { esVersion: 50 }, database: 'metricbeat' });
await expect(ds.getFields('date')).toEmitValuesWith(received => {
expect(received.length).toBe(1);
const fieldObjects = received[0];
const fields = _.map(fieldObjects, 'text');
expect(fields).toEqual(['@timestamp', '__timestamp', '@timestampnano']);
});
});
});
describe('When getting field mappings on indices with gaps', () => {
const twoWeekTimeSrv: any = createTimeSrv('now-2w');
const basicResponse = {
data: {
metricbeat: {
mappings: {
metricsets: {
@ -477,11 +487,9 @@ describe('ElasticDatasource', function(this: any) {
},
},
},
},
};
const alternateResponse = {
data: {
metricbeat: {
mappings: {
metricsets: {
@ -492,91 +500,86 @@ describe('ElasticDatasource', function(this: any) {
},
},
},
},
};
beforeEach(() => {
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: '[asd-]YYYY.MM.DD',
jsonData: { interval: 'Daily', esVersion: 50 } as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
});
it('should return fields of the newest available index', async () => {
const twoDaysBefore = toUtc()
.subtract(2, 'day')
.format('YYYY.MM.DD');
const threeDaysBefore = toUtc()
.subtract(3, 'day')
.format('YYYY.MM.DD');
datasourceRequestMock.mockImplementation(options => {
if (options.url === `${ELASTICSEARCH_MOCK_URL}/asd-${twoDaysBefore}/_mapping`) {
return Promise.resolve(basicResponse);
} else if (options.url === `${ELASTICSEARCH_MOCK_URL}/asd-${threeDaysBefore}/_mapping`) {
return Promise.resolve(alternateResponse);
const baseUrl = `${ELASTICSEARCH_MOCK_URL}/asd-${twoDaysBefore}/_mapping`;
const alternateUrl = `${ELASTICSEARCH_MOCK_URL}/asd-${threeDaysBefore}/_mapping`;
const { ds, timeSrv } = getTestContext({
from: 'now-2w',
jsonData: { interval: 'Daily', esVersion: 50 },
mockImplementation: options => {
if (options.url === baseUrl) {
return of(createFetchResponse(basicResponse));
} else if (options.url === alternateUrl) {
return of(createFetchResponse(alternateResponse));
}
return Promise.reject({ status: 404 });
return throwError({ status: 404 });
},
});
const range = twoWeekTimeSrv.timeRange();
const fieldObjects = await ctx.ds.getFields(undefined, range);
const range = timeSrv.timeRange();
await expect(ds.getFields(undefined, range)).toEmitValuesWith(received => {
expect(received.length).toBe(1);
const fieldObjects = received[0];
const fields = _.map(fieldObjects, 'text');
expect(fields).toEqual(['@timestamp', 'beat.hostname']);
});
});
it('should not retry when ES is down', async () => {
const twoDaysBefore = toUtc()
.subtract(2, 'day')
.format('YYYY.MM.DD');
const range = twoWeekTimeSrv.timeRange();
datasourceRequestMock.mockImplementation(options => {
const { ds, timeSrv, fetchMock } = getTestContext({
from: 'now-2w',
jsonData: { interval: 'Daily', esVersion: 50 },
mockImplementation: options => {
if (options.url === `${ELASTICSEARCH_MOCK_URL}/asd-${twoDaysBefore}/_mapping`) {
return Promise.resolve(basicResponse);
return of(createFetchResponse(basicResponse));
}
return Promise.reject({ status: 500 });
return throwError({ status: 500 });
},
});
expect.assertions(2);
try {
await ctx.ds.getFields(undefined, range);
} catch (e) {
expect(e).toStrictEqual({ status: 500 });
expect(datasourceRequestMock).toBeCalledTimes(1);
}
const range = timeSrv.timeRange();
await expect(ds.getFields(undefined, range)).toEmitValuesWith(received => {
expect(received.length).toBe(1);
expect(received[0]).toStrictEqual({ status: 500 });
expect(fetchMock).toBeCalledTimes(1);
});
});
it('should not retry more than 7 indices', async () => {
const range = twoWeekTimeSrv.timeRange();
datasourceRequestMock.mockImplementation(() => {
return Promise.reject({ status: 404 });
const { ds, timeSrv, fetchMock } = getTestContext({
from: 'now-2w',
jsonData: { interval: 'Daily', esVersion: 50 },
mockImplementation: options => {
return throwError({ status: 404 });
},
});
const range = timeSrv.timeRange();
expect.assertions(2);
try {
await ctx.ds.getFields(undefined, range);
} catch (e) {
expect(e).toStrictEqual({ status: 404 });
expect(datasourceRequestMock).toBeCalledTimes(7);
}
await expect(ds.getFields(undefined, range)).toEmitValuesWith(received => {
expect(received.length).toBe(1);
expect(received[0]).toStrictEqual('Could not find an available index for this time range.');
expect(fetchMock).toBeCalledTimes(7);
});
});
});
describe('When getting fields from ES 7.0', () => {
beforeEach(() => {
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: 'genuine.es7._mapping.response',
jsonData: { esVersion: 70 } as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
datasourceRequestMock.mockImplementation(options => {
return Promise.resolve({
data: {
const data = {
'genuine.es7._mapping.response': {
mappings: {
properties: {
@ -657,16 +660,16 @@ describe('ElasticDatasource', function(this: any) {
},
},
},
},
});
});
});
};
it('should return nested fields', async () => {
const fieldObjects = await ctx.ds.getFields();
const { ds } = getTestContext({ data, database: 'genuine.es7._mapping.response', jsonData: { esVersion: 70 } });
const fields = _.map(fieldObjects, 'text');
await expect(ds.getFields()).toEmitValuesWith(received => {
expect(received.length).toBe(1);
const fieldObjects = received[0];
const fields = _.map(fieldObjects, 'text');
expect(fields).toEqual([
'@timestamp_millis',
'classification_terms',
@ -684,12 +687,16 @@ describe('ElasticDatasource', function(this: any) {
'ua_terms_short',
]);
});
});
it('should return number fields', async () => {
const fieldObjects = await ctx.ds.getFields('number');
const { ds } = getTestContext({ data, database: 'genuine.es7._mapping.response', jsonData: { esVersion: 70 } });
const fields = _.map(fieldObjects, 'text');
await expect(ds.getFields('number')).toEmitValuesWith(received => {
expect(received.length).toBe(1);
const fieldObjects = received[0];
const fields = _.map(fieldObjects, 'text');
expect(fields).toEqual([
'justification_blob.overall_vote_score',
'justification_blob.shallow.jsi.sdb.dsel2.bootlegged-gille.botness',
@ -699,74 +706,65 @@ describe('ElasticDatasource', function(this: any) {
'overall_vote_score',
]);
});
});
it('should return date fields', async () => {
const fieldObjects = await ctx.ds.getFields('date');
const { ds } = getTestContext({ data, database: 'genuine.es7._mapping.response', jsonData: { esVersion: 70 } });
const fields = _.map(fieldObjects, 'text');
await expect(ds.getFields('date')).toEmitValuesWith(received => {
expect(received.length).toBe(1);
const fieldObjects = received[0];
const fields = _.map(fieldObjects, 'text');
expect(fields).toEqual(['@timestamp_millis']);
});
});
describe('When issuing aggregation query on es5.x', () => {
let requestOptions: any, parts: any, header: any;
beforeEach(() => {
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: 'test',
jsonData: { esVersion: 5 } as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
datasourceRequestMock.mockImplementation(options => {
requestOptions = options;
return Promise.resolve({ data: { responses: [] } });
});
const query: DataQueryRequest<ElasticsearchQuery> = {
range: createTimeRange(dateTime([2015, 4, 30, 10]), dateTime([2015, 5, 1, 10])),
targets: [
describe('When issuing aggregation query on es5.x', () => {
async function runScenario() {
const range = createTimeRange(dateTime([2015, 4, 30, 10]), dateTime([2015, 5, 1, 10]));
const targets = [
{
refId: 'A',
bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '2' }],
metrics: [{ type: 'count', id: '1' }],
query: 'test',
},
],
} as DataQueryRequest<ElasticsearchQuery>;
];
const query: any = { range, targets };
const data = { responses: [] };
ctx.ds.query(query);
const { ds, fetchMock } = getTestContext({ jsonData: { esVersion: 5 }, data, database: 'test' });
parts = requestOptions.data.split('\n');
header = JSON.parse(parts[0]);
await expect(ds.query(query)).toEmitValuesWith(received => {
expect(received.length).toBe(1);
expect(received[0]).toEqual({ data: [] });
});
it('should not set search type to count', () => {
expect(fetchMock).toHaveBeenCalledTimes(1);
const requestOptions = fetchMock.mock.calls[0][0];
const parts = requestOptions.data.split('\n');
const header = JSON.parse(parts[0]);
const body = JSON.parse(parts[1]);
return { body, header };
}
it('should not set search type to count', async () => {
const { header } = await runScenario();
expect(header.search_type).not.toEqual('count');
});
it('should set size to 0', () => {
const body = JSON.parse(parts[1]);
it('should set size to 0', async () => {
const { body } = await runScenario();
expect(body.size).toBe(0);
});
});
describe('When issuing metricFind query on es5.x', () => {
let requestOptions: any, parts, header: any, body: any;
let results: MetricFindValue[];
beforeEach(() => {
createDatasource({
url: ELASTICSEARCH_MOCK_URL,
database: 'test',
jsonData: { esVersion: 5 } as ElasticsearchOptions,
} as DataSourceInstanceSettings<ElasticsearchOptions>);
datasourceRequestMock.mockImplementation(options => {
requestOptions = options;
return Promise.resolve({
data: {
async function runScenario() {
const data = {
responses: [
{
aggregations: {
@ -783,64 +781,65 @@ describe('ElasticDatasource', function(this: any) {
},
},
],
},
});
});
};
ctx.ds.metricFindQuery('{"find": "terms", "field": "test"}').then(res => {
results = res;
});
const { ds, fetchMock } = getTestContext({ jsonData: { esVersion: 5 }, data, database: 'test' });
parts = requestOptions.data.split('\n');
header = JSON.parse(parts[0]);
body = JSON.parse(parts[1]);
});
const results = await ds.metricFindQuery('{"find": "terms", "field": "test"}');
expect(fetchMock).toHaveBeenCalledTimes(1);
const requestOptions = fetchMock.mock.calls[0][0];
const parts = requestOptions.data.split('\n');
const header = JSON.parse(parts[0]);
const body = JSON.parse(parts[1]);
return { results, body, header };
}
it('should get results', () => {
it('should get results', async () => {
const { results } = await runScenario();
expect(results.length).toEqual(2);
});
it('should use key or key_as_string', () => {
it('should use key or key_as_string', async () => {
const { results } = await runScenario();
expect(results[0].text).toEqual('test');
expect(results[1].text).toEqual('test2_as_string');
});
it('should not set search type to count', () => {
it('should not set search type to count', async () => {
const { header } = await runScenario();
expect(header.search_type).not.toEqual('count');
});
it('should set size to 0', () => {
it('should set size to 0', async () => {
const { body } = await runScenario();
expect(body.size).toBe(0);
});
it('should not set terms aggregation size to 0', () => {
it('should not set terms aggregation size to 0', async () => {
const { body } = await runScenario();
expect(body['aggs']['1']['terms'].size).not.toBe(0);
});
});
describe('query', () => {
it('should replace range as integer not string', () => {
const dataSource = new ElasticDatasource(
{
url: ELASTICSEARCH_MOCK_URL,
database: '[asd-]YYYY.MM.DD',
jsonData: {
interval: 'Daily',
esVersion: 2,
timeField: '@time',
},
} as DataSourceInstanceSettings<ElasticsearchOptions>,
templateSrv as TemplateSrv
);
(dataSource as any).post = jest.fn(() => Promise.resolve({ responses: [] }));
dataSource.query(createElasticQuery());
it('should replace range as integer not string', async () => {
const { ds } = getTestContext({ jsonData: { interval: 'Daily', esVersion: 2, timeField: '@time' } });
const postMock = jest.fn((url: string, data: any) => of(createFetchResponse({ responses: [] })));
ds['post'] = postMock;
const query = ((dataSource as any).post as jest.Mock).mock.calls[0][1];
await expect(ds.query(createElasticQuery())).toEmitValuesWith(received => {
expect(postMock).toHaveBeenCalledTimes(1);
const query = postMock.mock.calls[0][1];
expect(typeof JSON.parse(query.split('\n')[1]).query.bool.filter[0].range['@time'].gte).toBe('number');
});
});
});
it('should correctly interpolate variables in query', () => {
const { ds } = getTestContext();
const query: ElasticsearchQuery = {
refId: 'A',
bucketAggs: [{ type: 'filters', settings: { filters: [{ query: '$var', label: '' }] }, id: '1' }],
@ -848,13 +847,14 @@ describe('ElasticDatasource', function(this: any) {
query: '$var',
};
const interpolatedQuery = ctx.ds.interpolateVariablesInQueries([query], {})[0];
const interpolatedQuery = ds.interpolateVariablesInQueries([query], {})[0];
expect(interpolatedQuery.query).toBe('resolvedVariable');
expect((interpolatedQuery.bucketAggs![0] as Filters).settings!.filters![0].query).toBe('resolvedVariable');
});
it('should correctly handle empty query strings', () => {
const { ds } = getTestContext();
const query: ElasticsearchQuery = {
refId: 'A',
bucketAggs: [{ type: 'filters', settings: { filters: [{ query: '', label: '' }] }, id: '1' }],
@ -862,7 +862,7 @@ describe('ElasticDatasource', function(this: any) {
query: '',
};
const interpolatedQuery = ctx.ds.interpolateVariablesInQueries([query], {})[0];
const interpolatedQuery = ds.interpolateVariablesInQueries([query], {})[0];
expect(interpolatedQuery.query).toBe('*');
expect((interpolatedQuery.bucketAggs![0] as Filters).settings!.filters![0].query).toBe('*');

@ -34,6 +34,8 @@ import {
} from './components/QueryEditor/MetricAggregationsEditor/aggregations';
import { bucketAggregationConfig } from './components/QueryEditor/BucketAggregationsEditor/utils';
import { isBucketAggregationWithField } from './components/QueryEditor/BucketAggregationsEditor/aggregations';
import { generate, Observable, of, throwError } from 'rxjs';
import { catchError, first, map, mergeMap, skipWhile, throwIfEmpty } from 'rxjs/operators';
// Those are metadata fields as defined in https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-fields.html#_identity_metadata_fields.
// custom fields can start with underscores, therefore is not safe to exclude anything that starts with one.
@ -101,7 +103,7 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
this.languageProvider = new LanguageProvider(this);
}
private request(method: string, url: string, data?: undefined) {
private request(method: string, url: string, data?: undefined): Observable<any> {
const options: any = {
url: this.url + '/' + url,
method: method,
@ -118,18 +120,25 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
}
return getBackendSrv()
.datasourceRequest(options)
.catch((err: any) => {
.fetch<any>(options)
.pipe(
map(results => {
results.data.$$config = results.config;
return results.data;
}),
catchError(err => {
if (err.data && err.data.error) {
throw {
return throwError({
message: 'Elasticsearch error: ' + err.data.error.reason,
error: err.data.error,
};
}
throw err;
});
}
return throwError(err);
})
);
}
async importQueries(queries: DataQuery[], originMeta: PluginMeta): Promise<ElasticsearchQuery[]> {
return this.languageProvider.importQueries(queries, originMeta.id);
}
@ -142,40 +151,45 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
*
* @param url the url to query the index on, for example `/_mapping`.
*/
private get(url: string, range = getDefaultTimeRange()) {
const indexList = this.indexPattern.getIndexList(range.from, range.to);
if (_.isArray(indexList) && indexList.length) {
return this.requestAllIndices(indexList, url).then((results: any) => {
results.data.$$config = results.config;
return results.data;
});
} else {
return this.request('GET', this.indexPattern.getIndexForToday() + url).then((results: any) => {
results.data.$$config = results.config;
return results.data;
});
private get(url: string, range = getDefaultTimeRange()): Observable<any> {
let indexList = this.indexPattern.getIndexList(range.from, range.to);
if (!Array.isArray(indexList)) {
indexList = [this.indexPattern.getIndexForToday()];
}
const indexUrlList = indexList.map(index => index + url);
return this.requestAllIndices(indexUrlList);
}
private async requestAllIndices(indexList: string[], url: string): Promise<any> {
private requestAllIndices(indexList: string[]): Observable<any> {
const maxTraversals = 7; // do not go beyond one week (for a daily pattern)
const listLen = indexList.length;
for (let i = 0; i < Math.min(listLen, maxTraversals); i++) {
try {
return await this.request('GET', indexList[listLen - i - 1] + url);
} catch (err) {
if (err.status !== 404 || i === maxTraversals - 1) {
throw err;
}
}
}
return generate(
0,
i => i < Math.min(listLen, maxTraversals),
i => i + 1
).pipe(
mergeMap(index => {
// catch all errors and emit an object with an err property to simplify checks later in the pipeline
return this.request('GET', indexList[listLen - index - 1]).pipe(catchError(err => of({ err })));
}),
skipWhile(resp => resp.err && resp.err.status === 404), // skip all requests that fail because missing Elastic index
throwIfEmpty(() => 'Could not find an available index for this time range.'), // when i === Math.min(listLen, maxTraversals) generate will complete but without emitting any values which means we didn't find a valid index
first(), // take the first value that isn't skipped
map(resp => {
if (resp.err) {
throw resp.err; // if there is some other error except 404 then we must throw it
}
return resp;
})
);
}
private post(url: string, data: any) {
return this.request('POST', url, data).then((results: any) => {
results.data.$$config = results.config;
return results.data;
});
private post(url: string, data: any): Observable<any> {
return this.request('POST', url, data);
}
annotationQuery(options: any): Promise<any> {
@ -248,7 +262,9 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
const payload = JSON.stringify(header) + '\n' + JSON.stringify(data) + '\n';
return this.post('_msearch', payload).then((res: any) => {
return this.post('_msearch', payload)
.pipe(
map(res => {
const list = [];
const hits = res.responses[0].hits.hits;
@ -316,7 +332,9 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
list.push(event);
}
return list;
});
})
)
.toPromise();
}
private interpolateLuceneQuery(queryString: string, scopedVars: ScopedVars) {
@ -349,26 +367,25 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
testDatasource() {
// validate that the index exist and has date field
return this.getFields('date').then(
(dateFields: any) => {
return this.getFields('date')
.pipe(
mergeMap(dateFields => {
const timeField: any = _.find(dateFields, { text: this.timeField });
if (!timeField) {
return {
status: 'error',
message: 'No date field named ' + this.timeField + ' found',
};
return of({ status: 'error', message: 'No date field named ' + this.timeField + ' found' });
}
return { status: 'success', message: 'Index OK. Time field name OK.' };
},
(err: any) => {
return of({ status: 'success', message: 'Index OK. Time field name OK.' });
}),
catchError(err => {
console.error(err);
if (err.message) {
return { status: 'error', message: err.message };
return of({ status: 'error', message: err.message });
} else {
return { status: 'error', message: err.status };
}
return of({ status: 'error', message: err.status });
}
);
})
)
.toPromise();
}
getQueryHeader(searchType: any, timeFrom?: DateTime, timeTo?: DateTime): string {
@ -507,7 +524,7 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
return logResponse;
};
query(options: DataQueryRequest<ElasticsearchQuery>): Promise<DataQueryResponse> {
query(options: DataQueryRequest<ElasticsearchQuery>): Observable<DataQueryResponse> {
let payload = '';
const targets = this.interpolateVariablesInQueries(_.cloneDeep(options.targets), options.scopedVars);
const sentTargets: ElasticsearchQuery[] = [];
@ -547,7 +564,7 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
}
if (sentTargets.length === 0) {
return Promise.resolve({ data: [] });
return of({ data: [] });
}
// We replace the range here for actual values. We need to replace it together with enclosing "" so that we replace
@ -560,7 +577,8 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
const url = this.getMultiSearchUrl();
return this.post(url, payload).then((res: any) => {
return this.post(url, payload).pipe(
map(res => {
const er = new ElasticResponse(sentTargets, res);
if (sentTargets.some(target => target.isLogsQuery)) {
@ -572,7 +590,8 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
}
return er.getTimeSeries();
});
})
);
}
isMetadataField(fieldName: string) {
@ -580,9 +599,10 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
}
// TODO: instead of being a string, this could be a custom type representing all the elastic types
async getFields(type?: string, range?: TimeRange): Promise<MetricFindValue[]> {
getFields(type?: string, range?: TimeRange): Observable<MetricFindValue[]> {
const configuredEsVersion = this.esVersion;
return this.get('/_mapping', range).then((result: any) => {
return this.get('/_mapping', range).pipe(
map(result => {
const typeMap: any = {
float: 'number',
double: 'number',
@ -664,10 +684,11 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
return _.map(fields, value => {
return value;
});
});
})
);
}
getTerms(queryDef: any, range = getDefaultTimeRange()) {
getTerms(queryDef: any, range = getDefaultTimeRange()): Observable<MetricFindValue[]> {
const searchType = this.esVersion >= 5 ? 'query_then_fetch' : 'count';
const header = this.getQueryHeader(searchType, range.from, range.to);
let esQuery = JSON.stringify(this.queryBuilder.getTermsQuery(queryDef));
@ -678,7 +699,8 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
const url = this.getMultiSearchUrl();
return this.post(url, esQuery).then((res: any) => {
return this.post(url, esQuery).pipe(
map(res => {
if (!res.responses[0].aggregations) {
return [];
}
@ -690,7 +712,8 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
value: bucket.key,
};
});
});
})
);
}
getMultiSearchUrl() {
@ -707,13 +730,13 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
if (query) {
if (parsedQuery.find === 'fields') {
parsedQuery.type = this.templateSrv.replace(parsedQuery.type, {}, 'lucene');
return this.getFields(parsedQuery.type, range);
return this.getFields(parsedQuery.type, range).toPromise();
}
if (parsedQuery.find === 'terms') {
parsedQuery.field = this.templateSrv.replace(parsedQuery.field, {}, 'lucene');
parsedQuery.query = this.templateSrv.replace(parsedQuery.query || '*', {}, 'lucene');
return this.getTerms(parsedQuery, range);
return this.getTerms(parsedQuery, range).toPromise();
}
}
@ -721,11 +744,11 @@ export class ElasticDatasource extends DataSourceApi<ElasticsearchQuery, Elastic
}
getTagKeys() {
return this.getFields();
return this.getFields().toPromise();
}
getTagValues(options: any) {
return this.getTerms({ field: options.key, query: '*' });
return this.getTerms({ field: options.key, query: '*' }).toPromise();
}
targetContainsTemplate(target: any) {

Loading…
Cancel
Save