From 6093e45178a22f5bc39ab80404109f4e194eeb00 Mon Sep 17 00:00:00 2001 From: Matias Chomicki Date: Tue, 21 Mar 2023 16:54:18 +0100 Subject: [PATCH] Loki Query Chunking: Refactor naming conventions and reorganize code (#65056) * Rename splitting files to chunking * Rename time chunking functions * Move response functions to response utils * Remove some blank spaces * Add an extra test case for frame refIds and names --- .../datasource/loki/datasource.test.ts | 8 +- .../app/plugins/datasource/loki/datasource.ts | 8 +- ...Split.test.ts => logsTimeChunking.test.ts} | 4 +- .../{logsTimeSplit.ts => logsTimeChunking.ts} | 0 ...lit.test.ts => metricTimeChunking.test.ts} | 4 +- ...tricTimeSplit.ts => metricTimeChunking.ts} | 0 ...plitting.test.ts => queryChunking.test.ts} | 42 +-- .../{querySplitting.ts => queryChunking.ts} | 15 +- .../datasource/loki/queryUtils.test.ts | 315 +---------------- .../app/plugins/datasource/loki/queryUtils.ts | 109 +----- .../datasource/loki/responseUtils.test.ts | 326 +++++++++++++++++- .../plugins/datasource/loki/responseUtils.ts | 111 +++++- 12 files changed, 482 insertions(+), 460 deletions(-) rename public/app/plugins/datasource/loki/{logsTimeSplit.test.ts => logsTimeChunking.test.ts} (93%) rename public/app/plugins/datasource/loki/{logsTimeSplit.ts => logsTimeChunking.ts} (100%) rename public/app/plugins/datasource/loki/{metricTimeSplit.test.ts => metricTimeChunking.test.ts} (89%) rename public/app/plugins/datasource/loki/{metricTimeSplit.ts => metricTimeChunking.ts} (100%) rename public/app/plugins/datasource/loki/{querySplitting.test.ts => queryChunking.test.ts} (83%) rename public/app/plugins/datasource/loki/{querySplitting.ts => queryChunking.ts} (94%) diff --git a/public/app/plugins/datasource/loki/datasource.test.ts b/public/app/plugins/datasource/loki/datasource.test.ts index b99aeacf8c1..22c632775dd 100644 --- a/public/app/plugins/datasource/loki/datasource.test.ts +++ b/public/app/plugins/datasource/loki/datasource.test.ts @@ -33,7 +33,7 @@ import { CustomVariableModel } from '../../../features/variables/types'; import { LokiDatasource, REF_ID_DATA_SAMPLES } from './datasource'; import { createLokiDatasource, createMetadataRequest } from './mocks'; -import { runPartitionedQueries } from './querySplitting'; +import { runQueryInChunks } from './queryChunking'; import { parseToNodeNamesArray } from './queryUtils'; import { LokiOptions, LokiQuery, LokiQueryType, LokiVariableQueryType, SupportingQueryType } from './types'; import { LokiVariableSupport } from './variables'; @@ -45,7 +45,7 @@ jest.mock('@grafana/runtime', () => { }; }); -jest.mock('./querySplitting'); +jest.mock('./queryChunking'); const templateSrvStub = { getAdhocFilters: jest.fn(() => [] as unknown[]), @@ -1105,7 +1105,7 @@ describe('LokiDatasource', () => { describe('Query splitting', () => { beforeAll(() => { config.featureToggles.lokiQuerySplitting = true; - jest.mocked(runPartitionedQueries).mockReturnValue( + jest.mocked(runQueryInChunks).mockReturnValue( of({ data: [], }) @@ -1131,7 +1131,7 @@ describe('LokiDatasource', () => { }); await expect(ds.query(query)).toEmitValuesWith(() => { - expect(runPartitionedQueries).toHaveBeenCalled(); + expect(runQueryInChunks).toHaveBeenCalled(); }); }); }); diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index 972be00b110..67697ee6f34 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -67,8 +67,8 @@ import { findLastPosition, getLabelFilterPositions, } from './modifyQuery'; +import { runQueryInChunks } from './queryChunking'; import { getQueryHints } from './queryHints'; -import { runPartitionedQueries } from './querySplitting'; import { getLogQueryFromMetricsQuery, getNormalizedLokiQuery, @@ -76,7 +76,7 @@ import { getParserFromQuery, isLogsQuery, isValidQuery, - requestSupportsPartitioning, + requestSupporsChunking, } from './queryUtils'; import { sortDataFrameByTime, SortDirection } from './sortDataFrame'; import { doLokiChannelStream } from './streaming'; @@ -285,8 +285,8 @@ export class LokiDatasource return this.runLiveQueryThroughBackend(fixedRequest); } - if (config.featureToggles.lokiQuerySplitting && requestSupportsPartitioning(fixedRequest.targets)) { - return runPartitionedQueries(this, fixedRequest); + if (config.featureToggles.lokiQuerySplitting && requestSupporsChunking(fixedRequest.targets)) { + return runQueryInChunks(this, fixedRequest); } return this.runQuery(fixedRequest); diff --git a/public/app/plugins/datasource/loki/logsTimeSplit.test.ts b/public/app/plugins/datasource/loki/logsTimeChunking.test.ts similarity index 93% rename from public/app/plugins/datasource/loki/logsTimeSplit.test.ts rename to public/app/plugins/datasource/loki/logsTimeChunking.test.ts index 73b20d822ce..96edc27e093 100644 --- a/public/app/plugins/datasource/loki/logsTimeSplit.test.ts +++ b/public/app/plugins/datasource/loki/logsTimeChunking.test.ts @@ -1,6 +1,6 @@ -import { getRangeChunks } from './logsTimeSplit'; +import { getRangeChunks } from './logsTimeChunking'; -describe('querySplit', () => { +describe('logs getRangeChunks', () => { it('should split time range into chunks', () => { const start = Date.parse('2022-02-06T14:10:03.234'); const end = Date.parse('2022-02-06T14:11:03.567'); diff --git a/public/app/plugins/datasource/loki/logsTimeSplit.ts b/public/app/plugins/datasource/loki/logsTimeChunking.ts similarity index 100% rename from public/app/plugins/datasource/loki/logsTimeSplit.ts rename to public/app/plugins/datasource/loki/logsTimeChunking.ts diff --git a/public/app/plugins/datasource/loki/metricTimeSplit.test.ts b/public/app/plugins/datasource/loki/metricTimeChunking.test.ts similarity index 89% rename from public/app/plugins/datasource/loki/metricTimeSplit.test.ts rename to public/app/plugins/datasource/loki/metricTimeChunking.test.ts index 116e9c4a479..65050dba060 100644 --- a/public/app/plugins/datasource/loki/metricTimeSplit.test.ts +++ b/public/app/plugins/datasource/loki/metricTimeChunking.test.ts @@ -1,6 +1,6 @@ -import { getRangeChunks } from './metricTimeSplit'; +import { getRangeChunks } from './metricTimeChunking'; -describe('querySplit', () => { +describe('metric getRangeChunks', () => { it('should split time range into chunks', () => { const start = Date.parse('2022-02-06T14:10:03'); const end = Date.parse('2022-02-06T14:11:03'); diff --git a/public/app/plugins/datasource/loki/metricTimeSplit.ts b/public/app/plugins/datasource/loki/metricTimeChunking.ts similarity index 100% rename from public/app/plugins/datasource/loki/metricTimeSplit.ts rename to public/app/plugins/datasource/loki/metricTimeChunking.ts diff --git a/public/app/plugins/datasource/loki/querySplitting.test.ts b/public/app/plugins/datasource/loki/queryChunking.test.ts similarity index 83% rename from public/app/plugins/datasource/loki/querySplitting.test.ts rename to public/app/plugins/datasource/loki/queryChunking.test.ts index 174ec98866e..f57307e9cde 100644 --- a/public/app/plugins/datasource/loki/querySplitting.test.ts +++ b/public/app/plugins/datasource/loki/queryChunking.test.ts @@ -5,13 +5,13 @@ import { dateTime } from '@grafana/data'; import { LoadingState } from '@grafana/schema'; import { LokiDatasource } from './datasource'; -import * as logsTimeSplit from './logsTimeSplit'; -import * as metricTimeSplit from './metricTimeSplit'; +import * as logsTimeSplit from './logsTimeChunking'; +import * as metricTimeSplit from './metricTimeChunking'; import { createLokiDatasource, getMockFrames } from './mocks'; -import { runPartitionedQueries } from './querySplitting'; +import { runQueryInChunks } from './queryChunking'; import { LokiQuery, LokiQueryType } from './types'; -describe('runPartitionedQueries()', () => { +describe('runQueryInChunks()', () => { let datasource: LokiDatasource; const range = { from: dateTime('2023-02-08T05:00:00.000Z'), @@ -31,7 +31,7 @@ describe('runPartitionedQueries()', () => { }); test('Splits datasource queries', async () => { - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 3 requests. expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); @@ -41,7 +41,7 @@ describe('runPartitionedQueries()', () => { jest .spyOn(datasource, 'runQuery') .mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] })); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith((values) => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith((values) => { expect(values).toEqual([{ error: { refId: 'A', message: 'Error' }, data: [], state: LoadingState.Streaming }]); }); }); @@ -63,7 +63,7 @@ describe('runPartitionedQueries()', () => { jest.mocked(metricTimeSplit.getRangeChunks).mockRestore(); }); test('Ignores hidden queries', async () => { - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { expect(logsTimeSplit.getRangeChunks).toHaveBeenCalled(); expect(metricTimeSplit.getRangeChunks).not.toHaveBeenCalled(); }); @@ -80,14 +80,14 @@ describe('runPartitionedQueries()', () => { jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' })); }); test('Stops requesting once maxLines of logs have been received', async () => { - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 2 responses of 2 logs, 2 requests expect(datasource.runQuery).toHaveBeenCalledTimes(2); }); }); test('Performs all the requests if maxLines has not been reached', async () => { request.targets[0].maxLines = 9999; - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 3 responses of 2 logs, 3 requests expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); @@ -95,7 +95,7 @@ describe('runPartitionedQueries()', () => { test('Performs all the requests if not a log query', async () => { request.targets[0].maxLines = 1; request.targets[0].expr = 'count_over_time({a="b"}[1m])'; - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 3 responses of 2 logs, 3 requests expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); @@ -114,7 +114,7 @@ describe('runPartitionedQueries()', () => { ], range, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 1x Metric + 1x Log, 6 requests. expect(datasource.runQuery).toHaveBeenCalledTimes(6); }); @@ -127,7 +127,7 @@ describe('runPartitionedQueries()', () => { ], range, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 1x2 Metric, 3 requests. expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); @@ -140,7 +140,7 @@ describe('runPartitionedQueries()', () => { ], range, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 1x2 Logs, 3 requests. expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); @@ -153,7 +153,7 @@ describe('runPartitionedQueries()', () => { ], range, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // Instant queries are omitted from splitting expect(datasource.runQuery).toHaveBeenCalledTimes(1); }); @@ -170,7 +170,7 @@ describe('runPartitionedQueries()', () => { jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [], refId: 'B' })); jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameA], refId: 'A' })); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 1x Logs + 3x Metric, 3 requests. expect(datasource.runQuery).toHaveBeenCalledTimes(4); }); @@ -184,7 +184,7 @@ describe('runPartitionedQueries()', () => { ], range, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 3x Logs + 3x Metric + 1x Instant, 7 requests. expect(datasource.runQuery).toHaveBeenCalledTimes(7); }); @@ -208,7 +208,7 @@ describe('runPartitionedQueries()', () => { targets: [{ expr: '{a="b"}', refId: 'A', chunkDuration: '30m' }], range: range1h, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { expect(datasource.runQuery).toHaveBeenCalledTimes(2); }); }); @@ -217,7 +217,7 @@ describe('runPartitionedQueries()', () => { targets: [{ expr: '{a="b"}', refId: 'A', chunkDuration: '1h' }], range: range1h, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { expect(datasource.runQuery).toHaveBeenCalledTimes(1); }); }); @@ -229,7 +229,7 @@ describe('runPartitionedQueries()', () => { ], range: range1h, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { expect(datasource.runQuery).toHaveBeenCalledTimes(1); }); }); @@ -241,7 +241,7 @@ describe('runPartitionedQueries()', () => { ], range: range1h, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 2 x 30m + 1 x 1h expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); @@ -254,7 +254,7 @@ describe('runPartitionedQueries()', () => { ], range: range1h, }); - await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => { // 2 x 30m + 1 x 1h expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); diff --git a/public/app/plugins/datasource/loki/querySplitting.ts b/public/app/plugins/datasource/loki/queryChunking.ts similarity index 94% rename from public/app/plugins/datasource/loki/querySplitting.ts rename to public/app/plugins/datasource/loki/queryChunking.ts index bbee7a7821e..525387123d1 100644 --- a/public/app/plugins/datasource/loki/querySplitting.ts +++ b/public/app/plugins/datasource/loki/queryChunking.ts @@ -13,9 +13,10 @@ import { import { LoadingState } from '@grafana/schema'; import { LokiDatasource } from './datasource'; -import { getRangeChunks as getLogsRangeChunks } from './logsTimeSplit'; -import { getRangeChunks as getMetricRangeChunks } from './metricTimeSplit'; -import { combineResponses, isLogsQuery } from './queryUtils'; +import { getRangeChunks as getLogsRangeChunks } from './logsTimeChunking'; +import { getRangeChunks as getMetricRangeChunks } from './metricTimeChunking'; +import { isLogsQuery } from './queryUtils'; +import { combineResponses } from './responseUtils'; import { LokiQuery, LokiQueryType } from './types'; export function partitionTimeRange( @@ -30,7 +31,6 @@ export function partitionTimeRange( // we need to replicate this algo: // // https://github.com/grafana/grafana/blob/main/pkg/tsdb/loki/step.go#L23 - const start = originalTimeRange.from.toDate().getTime(); const end = originalTimeRange.to.toDate().getTime(); @@ -58,7 +58,6 @@ export function partitionTimeRange( * At the end, we will filter the targets that don't need to be executed in the next request batch, * becasue, for example, the `maxLines` have been reached. */ - function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQueryResponse | null): LokiQuery[] { if (!response) { return targets; @@ -84,7 +83,7 @@ function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQuer type LokiGroupedRequest = Array<{ request: DataQueryRequest; partition: TimeRange[] }>; -export function runGroupedQueries(datasource: LokiDatasource, requests: LokiGroupedRequest) { +export function runGroupedQueriesInChunks(datasource: LokiDatasource, requests: LokiGroupedRequest) { let mergedResponse: DataQueryResponse = { data: [], state: LoadingState.Streaming }; const totalRequests = Math.max(...requests.map(({ partition }) => partition.length)); @@ -168,7 +167,7 @@ function getNextRequestPointers(requests: LokiGroupedRequest, requestGroup: numb }; } -export function runPartitionedQueries(datasource: LokiDatasource, request: DataQueryRequest) { +export function runQueryInChunks(datasource: LokiDatasource, request: DataQueryRequest) { const queries = request.targets.filter((query) => !query.hide); const [instantQueries, normalQueries] = partition(queries, (query) => query.queryType === LokiQueryType.Instant); const [logQueries, metricQueries] = partition(normalQueries, (query) => isLogsQuery(query.expr)); @@ -217,5 +216,5 @@ export function runPartitionedQueries(datasource: LokiDatasource, request: DataQ }); } - return runGroupedQueries(datasource, requests); + return runGroupedQueriesInChunks(datasource, requests); } diff --git a/public/app/plugins/datasource/loki/queryUtils.test.ts b/public/app/plugins/datasource/loki/queryUtils.test.ts index c450329ae00..72343912253 100644 --- a/public/app/plugins/datasource/loki/queryUtils.test.ts +++ b/public/app/plugins/datasource/loki/queryUtils.test.ts @@ -1,6 +1,3 @@ -import { ArrayVector, DataQueryResponse, QueryResultMetaStat } from '@grafana/data'; - -import { getMockFrames } from './mocks'; import { getHighlighterExpressionsFromQuery, getNormalizedLokiQuery, @@ -11,9 +8,7 @@ import { parseToNodeNamesArray, getParserFromQuery, obfuscate, - combineResponses, - cloneQueryResponse, - requestSupportsPartitioning, + requestSupporsChunking, } from './queryUtils'; import { LokiQuery, LokiQueryType } from './types'; @@ -299,305 +294,7 @@ describe('getParserFromQuery', () => { }); }); -describe('cloneQueryResponse', () => { - const { logFrameA } = getMockFrames(); - const responseA: DataQueryResponse = { - data: [logFrameA], - }; - it('clones query responses', () => { - const clonedA = cloneQueryResponse(responseA); - expect(clonedA).not.toBe(responseA); - expect(clonedA).toEqual(clonedA); - }); -}); - -describe('combineResponses', () => { - it('combines logs frames', () => { - const { logFrameA, logFrameB } = getMockFrames(); - const responseA: DataQueryResponse = { - data: [logFrameA], - }; - const responseB: DataQueryResponse = { - data: [logFrameB], - }; - expect(combineResponses(responseA, responseB)).toEqual({ - data: [ - { - fields: [ - { - config: {}, - name: 'Time', - type: 'time', - values: new ArrayVector([1, 2, 3, 4]), - }, - { - config: {}, - name: 'Line', - type: 'string', - values: new ArrayVector(['line3', 'line4', 'line1', 'line2']), - }, - { - config: {}, - name: 'labels', - type: 'other', - values: new ArrayVector([ - { - otherLabel: 'other value', - }, - { - label: 'value', - }, - { - otherLabel: 'other value', - }, - ]), - }, - { - config: {}, - name: 'tsNs', - type: 'string', - values: new ArrayVector(['1000000', '2000000', '3000000', '4000000']), - }, - { - config: {}, - name: 'id', - type: 'string', - values: new ArrayVector(['id3', 'id4', 'id1', 'id2']), - }, - ], - length: 4, - meta: { - stats: [ - { - displayName: 'Summary: total bytes processed', - unit: 'decbytes', - value: 33, - }, - ], - }, - refId: 'A', - }, - ], - }); - }); - - it('combines metric frames', () => { - const { metricFrameA, metricFrameB } = getMockFrames(); - const responseA: DataQueryResponse = { - data: [metricFrameA], - }; - const responseB: DataQueryResponse = { - data: [metricFrameB], - }; - expect(combineResponses(responseA, responseB)).toEqual({ - data: [ - { - fields: [ - { - config: {}, - name: 'Time', - type: 'time', - values: new ArrayVector([1000000, 2000000, 3000000, 4000000]), - }, - { - config: {}, - name: 'Value', - type: 'number', - values: new ArrayVector([6, 7, 5, 4]), - }, - ], - length: 4, - meta: { - stats: [ - { - displayName: 'Summary: total bytes processed', - unit: 'decbytes', - value: 33, - }, - ], - }, - refId: 'A', - }, - ], - }); - }); - - it('combines and identifies new frames in the response', () => { - const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames(); - const responseA: DataQueryResponse = { - data: [metricFrameA], - }; - const responseB: DataQueryResponse = { - data: [metricFrameB, metricFrameC], - }; - expect(combineResponses(responseA, responseB)).toEqual({ - data: [ - { - fields: [ - { - config: {}, - name: 'Time', - type: 'time', - values: new ArrayVector([1000000, 2000000, 3000000, 4000000]), - }, - { - config: {}, - name: 'Value', - type: 'number', - values: new ArrayVector([6, 7, 5, 4]), - }, - ], - length: 4, - meta: { - stats: [ - { - displayName: 'Summary: total bytes processed', - unit: 'decbytes', - value: 33, - }, - ], - }, - refId: 'A', - }, - metricFrameC, - ], - }); - }); - - it('combines frames in a new response instance', () => { - const { metricFrameA, metricFrameB } = getMockFrames(); - const responseA: DataQueryResponse = { - data: [metricFrameA], - }; - const responseB: DataQueryResponse = { - data: [metricFrameB], - }; - expect(combineResponses(null, responseA)).not.toBe(responseA); - expect(combineResponses(null, responseB)).not.toBe(responseB); - }); - - it('combine when first param has errors', () => { - const { metricFrameA, metricFrameB } = getMockFrames(); - const errorA = { - message: 'errorA', - }; - const responseA: DataQueryResponse = { - data: [metricFrameA], - error: errorA, - errors: [errorA], - }; - const responseB: DataQueryResponse = { - data: [metricFrameB], - }; - - const combined = combineResponses(responseA, responseB); - expect(combined.data[0].length).toBe(4); - expect(combined.error?.message).toBe('errorA'); - expect(combined.errors).toHaveLength(1); - expect(combined.errors?.[0]?.message).toBe('errorA'); - }); - - it('combine when second param has errors', () => { - const { metricFrameA, metricFrameB } = getMockFrames(); - const responseA: DataQueryResponse = { - data: [metricFrameA], - }; - const errorB = { - message: 'errorB', - }; - const responseB: DataQueryResponse = { - data: [metricFrameB], - error: errorB, - errors: [errorB], - }; - - const combined = combineResponses(responseA, responseB); - expect(combined.data[0].length).toBe(4); - expect(combined.error?.message).toBe('errorB'); - expect(combined.errors).toHaveLength(1); - expect(combined.errors?.[0]?.message).toBe('errorB'); - }); - - it('combine when both params have errors', () => { - const { metricFrameA, metricFrameB } = getMockFrames(); - const errorA = { - message: 'errorA', - }; - const errorB = { - message: 'errorB', - }; - const responseA: DataQueryResponse = { - data: [metricFrameA], - error: errorA, - errors: [errorA], - }; - const responseB: DataQueryResponse = { - data: [metricFrameB], - error: errorB, - errors: [errorB], - }; - - const combined = combineResponses(responseA, responseB); - expect(combined.data[0].length).toBe(4); - expect(combined.error?.message).toBe('errorA'); - expect(combined.errors).toHaveLength(2); - expect(combined.errors?.[0]?.message).toBe('errorA'); - expect(combined.errors?.[1]?.message).toBe('errorB'); - }); - - describe('combine stats', () => { - const { metricFrameA } = getMockFrames(); - const makeResponse = (stats?: QueryResultMetaStat[]): DataQueryResponse => ({ - data: [ - { - ...metricFrameA, - meta: { - ...metricFrameA.meta, - stats, - }, - }, - ], - }); - it('two values', () => { - const responseA = makeResponse([ - { displayName: 'Ingester: total reached', value: 1 }, - { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, - ]); - const responseB = makeResponse([ - { displayName: 'Ingester: total reached', value: 2 }, - { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 22 }, - ]); - - expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([ - { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 }, - ]); - }); - - it('one value', () => { - const responseA = makeResponse([ - { displayName: 'Ingester: total reached', value: 1 }, - { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, - ]); - const responseB = makeResponse(); - - expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([ - { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, - ]); - - expect(combineResponses(responseB, responseA).data[0].meta.stats).toStrictEqual([ - { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, - ]); - }); - - it('no value', () => { - const responseA = makeResponse(); - const responseB = makeResponse(); - expect(combineResponses(responseA, responseB).data[0].meta.stats).toHaveLength(0); - }); - }); -}); - -describe('requestSupportsPartitioning', () => { +describe('requestSupporsChunking', () => { it('hidden requests are not partitioned', () => { const requests: LokiQuery[] = [ { @@ -606,7 +303,7 @@ describe('requestSupportsPartitioning', () => { hide: true, }, ]; - expect(requestSupportsPartitioning(requests)).toBe(false); + expect(requestSupporsChunking(requests)).toBe(false); }); it('special requests are not partitioned', () => { const requests: LokiQuery[] = [ @@ -615,7 +312,7 @@ describe('requestSupportsPartitioning', () => { refId: 'do-not-chunk', }, ]; - expect(requestSupportsPartitioning(requests)).toBe(false); + expect(requestSupporsChunking(requests)).toBe(false); }); it('empty requests are not partitioned', () => { const requests: LokiQuery[] = [ @@ -624,7 +321,7 @@ describe('requestSupportsPartitioning', () => { refId: 'A', }, ]; - expect(requestSupportsPartitioning(requests)).toBe(false); + expect(requestSupporsChunking(requests)).toBe(false); }); it('all other requests are partitioned', () => { const requests: LokiQuery[] = [ @@ -637,6 +334,6 @@ describe('requestSupportsPartitioning', () => { refId: 'B', }, ]; - expect(requestSupportsPartitioning(requests)).toBe(true); + expect(requestSupporsChunking(requests)).toBe(true); }); }); diff --git a/public/app/plugins/datasource/loki/queryUtils.ts b/public/app/plugins/datasource/loki/queryUtils.ts index d68953cb06e..e373b81c6b4 100644 --- a/public/app/plugins/datasource/loki/queryUtils.ts +++ b/public/app/plugins/datasource/loki/queryUtils.ts @@ -1,14 +1,6 @@ import { SyntaxNode } from '@lezer/common'; import { escapeRegExp } from 'lodash'; -import { - ArrayVector, - DataFrame, - DataQueryResponse, - DataQueryResponseData, - Field, - QueryResultMetaStat, -} from '@grafana/data'; import { parser, LineFilter, @@ -304,7 +296,7 @@ export function getStreamSelectorsFromQuery(query: string): string[] { return labelMatchers; } -export function requestSupportsPartitioning(allQueries: LokiQuery[]) { +export function requestSupporsChunking(allQueries: LokiQuery[]) { const queries = allQueries .filter((query) => !query.hide) .filter((query) => !query.refId.includes('do-not-chunk')) @@ -312,102 +304,3 @@ export function requestSupportsPartitioning(allQueries: LokiQuery[]) { return queries.length > 0; } - -function shouldCombine(frame1: DataFrame, frame2: DataFrame): boolean { - if (frame1.refId !== frame2.refId) { - return false; - } - - return frame1.name === frame2.name; -} - -export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) { - if (!currentResult) { - return cloneQueryResponse(newResult); - } - - newResult.data.forEach((newFrame) => { - const currentFrame = currentResult.data.find((frame) => shouldCombine(frame, newFrame)); - if (!currentFrame) { - currentResult.data.push(cloneDataFrame(newFrame)); - return; - } - combineFrames(currentFrame, newFrame); - }); - - const mergedErrors = [...(currentResult.errors ?? []), ...(newResult.errors ?? [])]; - - // we make sure to have `.errors` as undefined, instead of empty-array - // when no errors. - - if (mergedErrors.length > 0) { - currentResult.errors = mergedErrors; - } - - // the `.error` attribute is obsolete now, - // but we have to maintain it, otherwise - // some grafana parts do not behave well. - // we just choose the old error, if it exists, - // otherwise the new error, if it exists. - currentResult.error = currentResult.error ?? newResult.error; - - return currentResult; -} - -function combineFrames(dest: DataFrame, source: DataFrame) { - const totalFields = dest.fields.length; - for (let i = 0; i < totalFields; i++) { - dest.fields[i].values = new ArrayVector( - [].concat.apply(source.fields[i].values.toArray(), dest.fields[i].values.toArray()) - ); - } - dest.length += source.length; - dest.meta = { - ...dest.meta, - stats: getCombinedMetadataStats(dest.meta?.stats ?? [], source.meta?.stats ?? []), - }; -} - -const TOTAL_BYTES_STAT = 'Summary: total bytes processed'; - -function getCombinedMetadataStats( - destStats: QueryResultMetaStat[], - sourceStats: QueryResultMetaStat[] -): QueryResultMetaStat[] { - // in the current approach, we only handle a single stat - const destStat = destStats.find((s) => s.displayName === TOTAL_BYTES_STAT); - const sourceStat = sourceStats.find((s) => s.displayName === TOTAL_BYTES_STAT); - - if (sourceStat != null && destStat != null) { - return [{ value: sourceStat.value + destStat.value, displayName: TOTAL_BYTES_STAT, unit: destStat.unit }]; - } - - // maybe one of them exist - const eitherStat = sourceStat ?? destStat; - if (eitherStat != null) { - return [eitherStat]; - } - - return []; -} - -/** - * Deep clones a DataQueryResponse - */ -export function cloneQueryResponse(response: DataQueryResponse): DataQueryResponse { - const newResponse = { - ...response, - data: response.data.map(cloneDataFrame), - }; - return newResponse; -} - -function cloneDataFrame(frame: DataQueryResponseData): DataQueryResponseData { - return { - ...frame, - fields: frame.fields.map((field: Field) => ({ - ...field, - values: new ArrayVector(field.values.buffer), - })), - }; -} diff --git a/public/app/plugins/datasource/loki/responseUtils.test.ts b/public/app/plugins/datasource/loki/responseUtils.test.ts index b0721d875fa..c2b263267e3 100644 --- a/public/app/plugins/datasource/loki/responseUtils.test.ts +++ b/public/app/plugins/datasource/loki/responseUtils.test.ts @@ -1,7 +1,8 @@ import { cloneDeep } from 'lodash'; -import { ArrayVector, DataFrame, FieldType } from '@grafana/data'; +import { ArrayVector, DataQueryResponse, QueryResultMetaStat, DataFrame, FieldType } from '@grafana/data'; +import { getMockFrames } from './mocks'; import { dataFrameHasLevelLabel, dataFrameHasLokiError, @@ -9,6 +10,8 @@ import { extractLogParserFromDataFrame, extractLabelKeysFromDataFrame, extractUnwrapLabelKeysFromDataFrame, + cloneQueryResponse, + combineResponses, } from './responseUtils'; const frame: DataFrame = { @@ -119,3 +122,324 @@ describe('extractUnwrapLabelKeysFromDataFrame', () => { expect(extractUnwrapLabelKeysFromDataFrame(input)).toEqual(['number']); }); }); + +describe('cloneQueryResponse', () => { + const { logFrameA } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + it('clones query responses', () => { + const clonedA = cloneQueryResponse(responseA); + expect(clonedA).not.toBe(responseA); + expect(clonedA).toEqual(clonedA); + }); +}); + +describe('combineResponses', () => { + it('combines logs frames', () => { + const { logFrameA, logFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: new ArrayVector([1, 2, 3, 4]), + }, + { + config: {}, + name: 'Line', + type: 'string', + values: new ArrayVector(['line3', 'line4', 'line1', 'line2']), + }, + { + config: {}, + name: 'labels', + type: 'other', + values: new ArrayVector([ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ]), + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: new ArrayVector(['1000000', '2000000', '3000000', '4000000']), + }, + { + config: {}, + name: 'id', + type: 'string', + values: new ArrayVector(['id3', 'id4', 'id1', 'id2']), + }, + ], + length: 4, + meta: { + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('combines metric frames', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: new ArrayVector([1000000, 2000000, 3000000, 4000000]), + }, + { + config: {}, + name: 'Value', + type: 'number', + values: new ArrayVector([6, 7, 5, 4]), + }, + ], + length: 4, + meta: { + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('combines and identifies new frames in the response', () => { + const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB, metricFrameC], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: new ArrayVector([1000000, 2000000, 3000000, 4000000]), + }, + { + config: {}, + name: 'Value', + type: 'number', + values: new ArrayVector([6, 7, 5, 4]), + }, + ], + length: 4, + meta: { + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + metricFrameC, + ], + }); + }); + + it('combines frames prioritizing refIds over names', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const dataFrameA = { + ...metricFrameA, + refId: 'A', + name: 'A', + }; + const dataFrameB = { + ...metricFrameB, + refId: 'B', + name: 'A', + }; + const responseA: DataQueryResponse = { + data: [dataFrameA], + }; + const responseB: DataQueryResponse = { + data: [dataFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [dataFrameA, dataFrameB], + }); + }); + + it('combines frames in a new response instance', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + expect(combineResponses(null, responseA)).not.toBe(responseA); + expect(combineResponses(null, responseB)).not.toBe(responseB); + }); + + it('combine when first param has errors', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const errorA = { + message: 'errorA', + }; + const responseA: DataQueryResponse = { + data: [metricFrameA], + error: errorA, + errors: [errorA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + + const combined = combineResponses(responseA, responseB); + expect(combined.data[0].length).toBe(4); + expect(combined.error?.message).toBe('errorA'); + expect(combined.errors).toHaveLength(1); + expect(combined.errors?.[0]?.message).toBe('errorA'); + }); + + it('combine when second param has errors', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const errorB = { + message: 'errorB', + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + error: errorB, + errors: [errorB], + }; + + const combined = combineResponses(responseA, responseB); + expect(combined.data[0].length).toBe(4); + expect(combined.error?.message).toBe('errorB'); + expect(combined.errors).toHaveLength(1); + expect(combined.errors?.[0]?.message).toBe('errorB'); + }); + + it('combine when both params have errors', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const errorA = { + message: 'errorA', + }; + const errorB = { + message: 'errorB', + }; + const responseA: DataQueryResponse = { + data: [metricFrameA], + error: errorA, + errors: [errorA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + error: errorB, + errors: [errorB], + }; + + const combined = combineResponses(responseA, responseB); + expect(combined.data[0].length).toBe(4); + expect(combined.error?.message).toBe('errorA'); + expect(combined.errors).toHaveLength(2); + expect(combined.errors?.[0]?.message).toBe('errorA'); + expect(combined.errors?.[1]?.message).toBe('errorB'); + }); + + describe('combine stats', () => { + const { metricFrameA } = getMockFrames(); + const makeResponse = (stats?: QueryResultMetaStat[]): DataQueryResponse => ({ + data: [ + { + ...metricFrameA, + meta: { + ...metricFrameA.meta, + stats, + }, + }, + ], + }); + it('two values', () => { + const responseA = makeResponse([ + { displayName: 'Ingester: total reached', value: 1 }, + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, + ]); + const responseB = makeResponse([ + { displayName: 'Ingester: total reached', value: 2 }, + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 22 }, + ]); + + expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([ + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 }, + ]); + }); + + it('one value', () => { + const responseA = makeResponse([ + { displayName: 'Ingester: total reached', value: 1 }, + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, + ]); + const responseB = makeResponse(); + + expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([ + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, + ]); + + expect(combineResponses(responseB, responseA).data[0].meta.stats).toStrictEqual([ + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, + ]); + }); + + it('no value', () => { + const responseA = makeResponse(); + const responseB = makeResponse(); + expect(combineResponses(responseA, responseB).data[0].meta.stats).toHaveLength(0); + }); + }); +}); diff --git a/public/app/plugins/datasource/loki/responseUtils.ts b/public/app/plugins/datasource/loki/responseUtils.ts index aa7db223bd5..57e8bd2c1ae 100644 --- a/public/app/plugins/datasource/loki/responseUtils.ts +++ b/public/app/plugins/datasource/loki/responseUtils.ts @@ -1,4 +1,14 @@ -import { DataFrame, FieldType, isValidGoDuration, Labels } from '@grafana/data'; +import { + ArrayVector, + DataFrame, + DataQueryResponse, + DataQueryResponseData, + Field, + FieldType, + isValidGoDuration, + Labels, + QueryResultMetaStat, +} from '@grafana/data'; import { isBytesString } from './languageUtils'; import { isLogLineJSON, isLogLineLogfmt } from './lineParser'; @@ -100,3 +110,102 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n } return levelLikeLabel; } + +function shouldCombine(frame1: DataFrame, frame2: DataFrame): boolean { + if (frame1.refId !== frame2.refId) { + return false; + } + + return frame1.name === frame2.name; +} + +export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) { + if (!currentResult) { + return cloneQueryResponse(newResult); + } + + newResult.data.forEach((newFrame) => { + const currentFrame = currentResult.data.find((frame) => shouldCombine(frame, newFrame)); + if (!currentFrame) { + currentResult.data.push(cloneDataFrame(newFrame)); + return; + } + combineFrames(currentFrame, newFrame); + }); + + const mergedErrors = [...(currentResult.errors ?? []), ...(newResult.errors ?? [])]; + + // we make sure to have `.errors` as undefined, instead of empty-array + // when no errors. + + if (mergedErrors.length > 0) { + currentResult.errors = mergedErrors; + } + + // the `.error` attribute is obsolete now, + // but we have to maintain it, otherwise + // some grafana parts do not behave well. + // we just choose the old error, if it exists, + // otherwise the new error, if it exists. + currentResult.error = currentResult.error ?? newResult.error; + + return currentResult; +} + +function combineFrames(dest: DataFrame, source: DataFrame) { + const totalFields = dest.fields.length; + for (let i = 0; i < totalFields; i++) { + dest.fields[i].values = new ArrayVector( + [].concat.apply(source.fields[i].values.toArray(), dest.fields[i].values.toArray()) + ); + } + dest.length += source.length; + dest.meta = { + ...dest.meta, + stats: getCombinedMetadataStats(dest.meta?.stats ?? [], source.meta?.stats ?? []), + }; +} + +const TOTAL_BYTES_STAT = 'Summary: total bytes processed'; + +function getCombinedMetadataStats( + destStats: QueryResultMetaStat[], + sourceStats: QueryResultMetaStat[] +): QueryResultMetaStat[] { + // in the current approach, we only handle a single stat + const destStat = destStats.find((s) => s.displayName === TOTAL_BYTES_STAT); + const sourceStat = sourceStats.find((s) => s.displayName === TOTAL_BYTES_STAT); + + if (sourceStat != null && destStat != null) { + return [{ value: sourceStat.value + destStat.value, displayName: TOTAL_BYTES_STAT, unit: destStat.unit }]; + } + + // maybe one of them exist + const eitherStat = sourceStat ?? destStat; + if (eitherStat != null) { + return [eitherStat]; + } + + return []; +} + +/** + * Deep clones a DataQueryResponse + */ +export function cloneQueryResponse(response: DataQueryResponse): DataQueryResponse { + const newResponse = { + ...response, + data: response.data.map(cloneDataFrame), + }; + return newResponse; +} + +function cloneDataFrame(frame: DataQueryResponseData): DataQueryResponseData { + return { + ...frame, + fields: frame.fields.map((field: Field) => ({ + ...field, + values: new ArrayVector(field.values.buffer), + })), + }; +}