|
|
|
|
@ -1,7 +1,14 @@ |
|
|
|
|
import { partition } from 'lodash'; |
|
|
|
|
import { Subscriber, Observable, Subscription } from 'rxjs'; |
|
|
|
|
|
|
|
|
|
import { DataQueryRequest, DataQueryResponse, dateTime, TimeRange } from '@grafana/data'; |
|
|
|
|
import { groupBy, partition } from 'lodash'; |
|
|
|
|
import { Observable, Subscriber, Subscription } from 'rxjs'; |
|
|
|
|
|
|
|
|
|
import { |
|
|
|
|
DataQueryRequest, |
|
|
|
|
DataQueryResponse, |
|
|
|
|
dateTime, |
|
|
|
|
durationToMilliseconds, |
|
|
|
|
parseDuration, |
|
|
|
|
TimeRange, |
|
|
|
|
} from '@grafana/data'; |
|
|
|
|
import { LoadingState } from '@grafana/schema'; |
|
|
|
|
|
|
|
|
|
import { LokiDatasource } from './datasource'; |
|
|
|
|
@ -10,24 +17,12 @@ import { getRangeChunks as getMetricRangeChunks } from './metricTimeSplit'; |
|
|
|
|
import { combineResponses, isLogsQuery } from './queryUtils'; |
|
|
|
|
import { LokiQuery, LokiQueryType } from './types'; |
|
|
|
|
|
|
|
|
|
declare global { |
|
|
|
|
interface Window { |
|
|
|
|
lokiChunkDuration: number; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Purposely exposing it to support doing tests without needing to update the repo. |
|
|
|
|
* TODO: remove. |
|
|
|
|
* Hardcoded to 1 day. |
|
|
|
|
*/ |
|
|
|
|
window.lokiChunkDuration = 24 * 60 * 60 * 1000; |
|
|
|
|
|
|
|
|
|
export function partitionTimeRange( |
|
|
|
|
isLogsQuery: boolean, |
|
|
|
|
originalTimeRange: TimeRange, |
|
|
|
|
intervalMs: number, |
|
|
|
|
resolution: number |
|
|
|
|
resolution: number, |
|
|
|
|
duration: number |
|
|
|
|
): TimeRange[] { |
|
|
|
|
// the `step` value that will be finally sent to Loki is rougly the same as `intervalMs`,
|
|
|
|
|
// but there are some complications.
|
|
|
|
|
@ -41,8 +36,6 @@ export function partitionTimeRange( |
|
|
|
|
const safeStep = Math.ceil((end - start) / 11000); |
|
|
|
|
const step = Math.max(intervalMs * resolution, safeStep); |
|
|
|
|
|
|
|
|
|
const duration = window.lokiChunkDuration; |
|
|
|
|
|
|
|
|
|
const ranges = isLogsQuery |
|
|
|
|
? getLogsRangeChunks(start, end, duration) |
|
|
|
|
: getMetricRangeChunks(start, end, step, duration); |
|
|
|
|
@ -179,19 +172,41 @@ export function runPartitionedQueries(datasource: LokiDatasource, request: DataQ |
|
|
|
|
const [instantQueries, normalQueries] = partition(queries, (query) => query.queryType === LokiQueryType.Instant); |
|
|
|
|
const [logQueries, metricQueries] = partition(normalQueries, (query) => isLogsQuery(query.expr)); |
|
|
|
|
|
|
|
|
|
const oneDayMs = 24 * 60 * 60 * 1000; |
|
|
|
|
const rangePartitionedLogQueries = groupBy(logQueries, (query) => |
|
|
|
|
query.chunkDuration ? durationToMilliseconds(parseDuration(query.chunkDuration)) : oneDayMs |
|
|
|
|
); |
|
|
|
|
const rangePartitionedMetricQueries = groupBy(metricQueries, (query) => |
|
|
|
|
query.chunkDuration ? durationToMilliseconds(parseDuration(query.chunkDuration)) : oneDayMs |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
const requests: LokiGroupedRequest = []; |
|
|
|
|
if (logQueries.length) { |
|
|
|
|
for (const [chunkRangeMs, queries] of Object.entries(rangePartitionedLogQueries)) { |
|
|
|
|
requests.push({ |
|
|
|
|
request: { ...request, targets: logQueries }, |
|
|
|
|
partition: partitionTimeRange(true, request.range, request.intervalMs, logQueries[0].resolution ?? 1), |
|
|
|
|
request: { ...request, targets: queries }, |
|
|
|
|
partition: partitionTimeRange( |
|
|
|
|
true, |
|
|
|
|
request.range, |
|
|
|
|
request.intervalMs, |
|
|
|
|
queries[0].resolution ?? 1, |
|
|
|
|
Number(chunkRangeMs) |
|
|
|
|
), |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
if (metricQueries.length) { |
|
|
|
|
|
|
|
|
|
for (const [chunkRangeMs, queries] of Object.entries(rangePartitionedMetricQueries)) { |
|
|
|
|
requests.push({ |
|
|
|
|
request: { ...request, targets: metricQueries }, |
|
|
|
|
partition: partitionTimeRange(false, request.range, request.intervalMs, metricQueries[0].resolution ?? 1), |
|
|
|
|
request: { ...request, targets: queries }, |
|
|
|
|
partition: partitionTimeRange( |
|
|
|
|
false, |
|
|
|
|
request.range, |
|
|
|
|
request.intervalMs, |
|
|
|
|
queries[0].resolution ?? 1, |
|
|
|
|
Number(chunkRangeMs) |
|
|
|
|
), |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (instantQueries.length) { |
|
|
|
|
requests.push({ |
|
|
|
|
request: { ...request, targets: instantQueries }, |
|
|
|
|
|