Prometheus: Query Cache refactor (#66891)

* Pull out datasource specific implementation into calling context to make query cache generic
---------
Co-authored-by: Leon Sorokin <leeoniya@gmail.com>
gtk-grafana/issues/69578/exemplar-heatmap-labels-regression
Galen Kistler 2 years ago committed by GitHub
parent 7bd85faa7f
commit fa2e27e175
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      public/app/plugins/datasource/prometheus/datasource.tsx
  2. 113
      public/app/plugins/datasource/prometheus/querycache/QueryCache.test.ts
  3. 114
      public/app/plugins/datasource/prometheus/querycache/QueryCache.ts

@ -107,7 +107,7 @@ export class PrometheusDatasource
subType: PromApplication;
rulerEnabled: boolean;
cacheLevel: PrometheusCacheLevel;
cache: QueryCache;
cache: QueryCache<PromQuery>;
constructor(
instanceSettings: DataSourceInstanceSettings<PromOptions>,
@ -143,9 +143,12 @@ export class PrometheusDatasource
this.variables = new PrometheusVariableSupport(this, this.templateSrv, this.timeSrv);
this.exemplarsAvailable = true;
this.cacheLevel = instanceSettings.jsonData.cacheLevel ?? PrometheusCacheLevel.Low;
this.cache = new QueryCache(
instanceSettings.jsonData.incrementalQueryOverlapWindow ?? defaultPrometheusQueryOverlapWindow
);
this.cache = new QueryCache({
getTargetSignature: this.getPrometheusTargetSignature.bind(this),
overlapString: instanceSettings.jsonData.incrementalQueryOverlapWindow ?? defaultPrometheusQueryOverlapWindow,
profileFunction: this.getPrometheusProfileData.bind(this),
});
// This needs to be here and cannot be static because of how annotations typing affects casting of data source
// objects to DataSourceApi types.
@ -165,6 +168,26 @@ export class PrometheusDatasource
return query.expr;
}
getPrometheusProfileData(request: DataQueryRequest<PromQuery>, targ: PromQuery) {
return {
interval: targ.interval ?? request.interval,
expr: this.interpolateString(targ.expr),
datasource: 'Prometheus',
};
}
/**
* Get target signature for query caching
* @param request
* @param query
*/
getPrometheusTargetSignature(request: DataQueryRequest<PromQuery>, query: PromQuery) {
const targExpr = this.interpolateString(query.expr);
return `${targExpr}|${query.interval ?? request.interval}|${JSON.stringify(request.rangeRaw ?? '')}|${
query.exemplar
}`;
}
hasLabelsMatchAPISupport(): boolean {
return (
// https://github.com/prometheus/prometheus/releases/tag/v2.24.0
@ -451,12 +474,19 @@ export class PrometheusDatasource
return processedTargets;
}
intepolateStringHelper = (query: PromQuery): string => {
return this.interpolateString(query.expr);
};
query(request: DataQueryRequest<PromQuery>): Observable<DataQueryResponse> {
if (this.access === 'proxy') {
let fullOrPartialRequest: DataQueryRequest<PromQuery>;
let requestInfo: CacheRequestInfo | undefined = undefined;
if (this.hasIncrementalQuery) {
requestInfo = this.cache.requestInfo(request, this.interpolateString.bind(this));
let requestInfo: CacheRequestInfo<PromQuery> | undefined = undefined;
const hasInstantQuery = request.targets.some((target) => target.instant);
// Don't cache instant queries
if (this.hasIncrementalQuery && !hasInstantQuery) {
requestInfo = this.cache.requestInfo(request);
fullOrPartialRequest = requestInfo.requests[0];
} else {
fullOrPartialRequest = request;

@ -5,10 +5,21 @@ import { DataFrame, DataQueryRequest, DateTime, dateTime, TimeRange } from '@gra
import { QueryEditorMode } from '../querybuilder/shared/types';
import { PromQuery } from '../types';
import { getTargSig, QueryCache } from './QueryCache';
import { DatasourceProfileData, QueryCache } from './QueryCache';
import { IncrementalStorageDataFrameScenarios } from './QueryCacheTestData';
const mockRequest = (request?: Partial<DataQueryRequest<PromQuery>>): DataQueryRequest<PromQuery> => {
// Will not interpolate vars!
const interpolateStringTest = (query: PromQuery) => {
return query.expr;
};
const getPrometheusTargetSignature = (request: DataQueryRequest<PromQuery>, targ: PromQuery) => {
return `${interpolateStringTest(targ)}|${targ.interval ?? request.interval}|${JSON.stringify(
request.rangeRaw ?? ''
)}|${targ.exemplar}`;
};
const mockPromRequest = (request?: Partial<DataQueryRequest<PromQuery>>): DataQueryRequest<PromQuery> => {
// Histogram
const defaultRequest: DataQueryRequest<PromQuery> = {
app: 'undefined',
@ -48,14 +59,28 @@ const mockRequest = (request?: Partial<DataQueryRequest<PromQuery>>): DataQueryR
};
};
describe('QueryCache', function () {
const getPromProfileData = (request: DataQueryRequest, targ: PromQuery): DatasourceProfileData => {
return {
expr: targ.expr,
interval: targ.interval ?? request.interval,
datasource: 'prom',
};
};
describe('QueryCache: Generic', function () {
it('instantiates', () => {
const storage = new QueryCache();
const storage = new QueryCache({
getTargetSignature: () => '',
overlapString: '10m',
});
expect(storage).toBeInstanceOf(QueryCache);
});
it('will not modify or crash with empty response', () => {
const storage = new QueryCache();
const storage = new QueryCache({
getTargetSignature: () => '',
overlapString: '10m',
});
const firstFrames: DataFrame[] = [];
const secondFrames: DataFrame[] = [];
@ -99,7 +124,7 @@ describe('QueryCache', function () {
cache.set(targetIdentity, targetSignature);
const firstStoredFrames = storage.procFrames(
mockRequest({
mockPromRequest({
range: firstRange,
dashboardUID: dashboardId,
panelId: panelId,
@ -120,7 +145,7 @@ describe('QueryCache', function () {
// Should return the request frames unaltered
expect(firstStoredFrames).toEqual(firstFrames);
const secondRequest = mockRequest({
const secondRequest = mockPromRequest({
range: secondRange,
dashboardUID: dashboardId,
panelId: panelId,
@ -153,7 +178,9 @@ describe('QueryCache', function () {
expect(firstFramesLength).toBeGreaterThan(secondFramesLength);
});
});
});
describe('QueryCache: Prometheus', function () {
it('Merges incremental queries in storage', () => {
const scenarios = [
IncrementalStorageDataFrameScenarios.histogram.getSeriesWithGapAtEnd(),
@ -161,7 +188,11 @@ describe('QueryCache', function () {
IncrementalStorageDataFrameScenarios.histogram.getSeriesWithGapAtStart(),
];
scenarios.forEach((scenario, index) => {
const storage = new QueryCache();
const storage = new QueryCache<PromQuery>({
getTargetSignature: getPrometheusTargetSignature,
overlapString: '10m',
profileFunction: getPromProfileData,
});
const firstFrames = scenario.first.dataFrames as unknown as DataFrame[];
const secondFrames = scenario.second.dataFrames as unknown as DataFrame[];
@ -203,14 +234,14 @@ describe('QueryCache', function () {
// This can't change
const targetIdentity = `${dashboardId}|${panelId}|A`;
const request = mockRequest({
const request = mockPromRequest({
range: firstRange,
dashboardUID: dashboardId,
panelId: panelId,
});
// But the signature can, and we should clean up any non-matching signatures
const targetSignature = getTargSig(request.targets[0].expr, request, request.targets[0]);
const targetSignature = getPrometheusTargetSignature(request, request.targets[0]);
targetSignatures.set(targetIdentity, targetSignature);
@ -232,7 +263,7 @@ describe('QueryCache', function () {
// Should return the request frames unaltered
expect(firstStoredFrames).toEqual(firstFrames);
const secondRequest = mockRequest({
const secondRequest = mockPromRequest({
range: secondRange,
dashboardUID: dashboardId,
panelId: panelId,
@ -274,9 +305,6 @@ describe('QueryCache', function () {
});
});
const interpolateString = (s: string) => {
return s;
};
const secondRequestModified = {
...secondRequest,
range: {
@ -284,7 +312,7 @@ describe('QueryCache', function () {
to: dateTime(secondRequest.range.to.valueOf() + 30000),
},
};
const cacheRequest = storage.requestInfo(secondRequestModified, interpolateString);
const cacheRequest = storage.requestInfo(secondRequestModified);
expect(cacheRequest.requests[0].targets).toEqual(secondRequestModified.targets);
expect(cacheRequest.requests[0].range.to).toEqual(secondRequestModified.range.to);
expect(cacheRequest.requests[0].range.raw).toEqual(secondRequestModified.range.raw);
@ -296,7 +324,11 @@ describe('QueryCache', function () {
});
it('Will evict old dataframes, and use stored data when user shortens query window', () => {
const storage = new QueryCache();
const storage = new QueryCache<PromQuery>({
getTargetSignature: getPrometheusTargetSignature,
overlapString: '10m',
profileFunction: getPromProfileData,
});
// Initial request with all data for time range
const firstFrames = IncrementalStorageDataFrameScenarios.histogram.evictionRequests.first
@ -358,7 +390,7 @@ describe('QueryCache', function () {
const targetIdentity = `${dashboardId}|${panelId}|A`;
const request = mockRequest({
const request = mockPromRequest({
range: firstRange,
dashboardUID: dashboardId,
panelId: panelId,
@ -377,7 +409,7 @@ describe('QueryCache', function () {
const firstMergedLength = firstQueryResult[0].fields[0].values.length;
const secondQueryResult = storage.procFrames(
mockRequest({
mockPromRequest({
range: secondRange,
dashboardUID: dashboardId,
panelId: panelId,
@ -400,7 +432,7 @@ describe('QueryCache', function () {
cache.set(targetIdentity, `'1=1'|${interval}|${JSON.stringify(thirdRange.raw)}`);
storage.procFrames(
mockRequest({
mockPromRequest({
range: thirdRange,
dashboardUID: dashboardId,
panelId: panelId,
@ -435,17 +467,17 @@ describe('QueryCache', function () {
utcOffsetSec: -21600,
};
const request = mockRequest({
const request = mockPromRequest({
interval: requestInterval,
targets: [target],
});
const targSig = getTargSig('__EXPR__', request, target);
const targSig = getPrometheusTargetSignature(request, target);
expect(targSig).toContain(targetInterval);
expect(targSig.includes(requestInterval)).toBeFalsy();
});
it('will not modify request with absolute duration', () => {
const request = mockRequest({
const request = mockPromRequest({
range: {
from: moment('2023-01-30T19:33:01.332Z') as DateTime,
to: moment('2023-01-30T20:33:01.332Z') as DateTime,
@ -453,33 +485,36 @@ describe('QueryCache', function () {
},
rangeRaw: { from: '2023-01-30T19:33:01.332Z', to: '2023-01-30T20:33:01.332Z' },
});
const storage = new QueryCache();
const interpolateString = (s: string) => {
return s;
};
const cacheRequest = storage.requestInfo(request, interpolateString);
const storage = new QueryCache<PromQuery>({
getTargetSignature: getPrometheusTargetSignature,
overlapString: '10m',
profileFunction: getPromProfileData,
});
const cacheRequest = storage.requestInfo(request);
expect(cacheRequest.requests[0]).toBe(request);
expect(cacheRequest.shouldCache).toBe(false);
});
it('mark request as shouldCache', () => {
const request = mockRequest();
const storage = new QueryCache();
const interpolateString = (s: string) => {
return s;
};
const cacheRequest = storage.requestInfo(request, interpolateString);
const request = mockPromRequest();
const storage = new QueryCache<PromQuery>({
getTargetSignature: getPrometheusTargetSignature,
overlapString: '10m',
profileFunction: getPromProfileData,
});
const cacheRequest = storage.requestInfo(request);
expect(cacheRequest.requests[0]).toBe(request);
expect(cacheRequest.shouldCache).toBe(true);
});
it('Should modify request', () => {
const request = mockRequest();
const storage = new QueryCache();
const interpolateString = (s: string) => {
return s;
};
const cacheRequest = storage.requestInfo(request, interpolateString);
const request = mockPromRequest();
const storage = new QueryCache<PromQuery>({
getTargetSignature: getPrometheusTargetSignature,
overlapString: '10m',
profileFunction: getPromProfileData,
});
const cacheRequest = storage.requestInfo(request);
expect(cacheRequest.requests[0]).toBe(request);
expect(cacheRequest.shouldCache).toBe(true);
});

@ -4,6 +4,7 @@ import {
dateTime,
durationToMilliseconds,
Field,
incrRoundDn,
isValidDuration,
parseDuration,
} from '@grafana/data/src';
@ -17,16 +18,15 @@ import { PromQuery } from '../types';
// (must be stable across query changes, time range changes / interval changes / panel resizes / template variable changes)
type TargetIdent = string;
type RequestID = string;
// query + template variables + interval + raw time range
// used for full target cache busting -> full range re-query
type TargetSig = string;
type TimestampMs = number;
// Look like Q001, Q002, etc
type RequestID = string;
type StringInterpolator = (expr: string) => string;
type SupportedQueryTypes = PromQuery;
// string matching requirements defined in durationutil.ts
export const defaultPrometheusQueryOverlapWindow = '10m';
@ -37,12 +37,25 @@ interface TargetCache {
frames: DataFrame[];
}
export interface CacheRequestInfo {
requests: Array<DataQueryRequest<PromQuery>>;
export interface CacheRequestInfo<T extends SupportedQueryTypes> {
requests: Array<DataQueryRequest<T>>;
targSigs: Map<TargetIdent, TargetSig>;
shouldCache: boolean;
}
export interface DatasourceProfileData {
interval?: string;
expr: string;
datasource: string;
}
interface ProfileData extends DatasourceProfileData {
identity: string;
bytes: number | null;
dashboardUID: string;
panelId?: number;
}
/**
* Get field identity
* This is the string used to uniquely identify a field within a "target"
@ -50,41 +63,24 @@ export interface CacheRequestInfo {
*/
export const getFieldIdent = (field: Field) => `${field.type}|${field.name}|${JSON.stringify(field.labels ?? '')}`;
/**
* Get target signature
* @param targExpr
* @param request
* @param targ
*/
export function getTargSig(targExpr: string, request: DataQueryRequest<PromQuery>, targ: PromQuery) {
return `${targExpr}|${targ.interval ?? request.interval}|${JSON.stringify(request.rangeRaw ?? '')}|${targ.exemplar}`;
}
/**
* NOMENCLATURE
* Target: The request target (DataQueryRequest), i.e. a specific query reference within a panel
* Ident: Identity: the string that is not expected to change
* Sig: Signature: the string that is expected to change, upon which we wipe the cache fields
*/
export class QueryCache {
export class QueryCache<T extends SupportedQueryTypes> {
private overlapWindowMs: number;
private getTargetSignature: (request: DataQueryRequest<T>, target: T) => string;
private getProfileData?: (request: DataQueryRequest<T>, target: T) => DatasourceProfileData;
private perfObeserver?: PerformanceObserver;
private shouldProfile: boolean;
// send profile events every 5 minutes
sendEventsInterval = 60000 * 5;
pendingRequestIdsToTargSigs = new Map<
RequestID,
{
identity: string;
bytes: number | null;
dashboardUID?: string;
interval?: string;
panelId?: number;
expr?: string;
}
>();
pendingRequestIdsToTargSigs = new Map<RequestID, ProfileData>();
pendingAccumulatedEvents = new Map<
string,
@ -98,14 +94,18 @@ export class QueryCache {
expr: string;
interval: string;
sent: boolean;
datasource: string;
}
>();
cache = new Map<TargetIdent, TargetCache>();
constructor(overlapString?: string) {
const unverifiedOverlap = overlapString ?? defaultPrometheusQueryOverlapWindow;
constructor(options: {
getTargetSignature: (request: DataQueryRequest<T>, target: T) => string;
overlapString: string;
profileFunction?: (request: DataQueryRequest<T>, target: T) => DatasourceProfileData;
}) {
const unverifiedOverlap = options.overlapString;
if (isValidDuration(unverifiedOverlap)) {
const duration = parseDuration(unverifiedOverlap);
this.overlapWindowMs = durationToMilliseconds(duration);
@ -114,12 +114,14 @@ export class QueryCache {
this.overlapWindowMs = durationToMilliseconds(duration);
}
if (config.grafanaJavascriptAgent.enabled) {
if (config.grafanaJavascriptAgent.enabled && options.profileFunction !== undefined) {
this.profile();
this.shouldProfile = true;
} else {
this.shouldProfile = false;
}
this.getProfileData = options.profileFunction;
this.getTargetSignature = options.getTargetSignature;
}
private profile() {
@ -157,6 +159,7 @@ export class QueryCache {
const savedBytes = value.bytes - requestTransferSize;
this.pendingAccumulatedEvents.set(value.identity, {
datasource: value.datasource ?? 'N/A',
requestCount: (previous?.requestCount ?? 0) + 1,
savedBytesTotal: (previous?.savedBytesTotal ?? 0) + savedBytes,
initialRequestSize: value.bytes,
@ -199,16 +202,10 @@ export class QueryCache {
for (let [key, value] of entries) {
if (!value.sent) {
this.pendingAccumulatedEvents.set(key, { ...value, sent: true });
faro.api.pushMeasurement({
type: 'custom',
values: {
thing: 0,
thing2: 1,
},
});
faro.api.pushEvent(
'prometheus incremental query response size',
'incremental query response size',
{
datasource: value.datasource.toString(),
requestCount: value.requestCount.toString(),
savedBytesTotal: value.savedBytesTotal.toString(),
initialRequestSize: value.initialRequestSize.toString(),
@ -228,33 +225,32 @@ export class QueryCache {
};
// can be used to change full range request to partial, split into multiple requests
requestInfo(request: DataQueryRequest<PromQuery>, interpolateString: StringInterpolator): CacheRequestInfo {
requestInfo(request: DataQueryRequest<T>): CacheRequestInfo<T> {
// TODO: align from/to to interval to increase probability of hitting backend cache
const newFrom = request.range.from.valueOf();
const newTo = request.range.to.valueOf();
// only cache 'now'-relative queries (that can benefit from a backfill cache)
const shouldCache = request.rangeRaw?.to?.toString() === 'now' && !request.targets.some((targ) => targ.instant);
const shouldCache = request.rangeRaw?.to?.toString() === 'now';
// all targets are queried together, so we check for any that causes group cache invalidation & full re-query
let doPartialQuery = shouldCache;
let prevTo: TimestampMs;
let prevTo: TimestampMs | undefined = undefined;
// pre-compute reqTargSigs
const reqTargSigs = new Map<TargetIdent, TargetSig>();
request.targets.forEach((targ) => {
let targIdent = `${request.dashboardUID}|${request.panelId}|${targ.refId}`;
let targExpr = interpolateString(targ.expr);
let targSig = getTargSig(targExpr, request, targ); // ${request.maxDataPoints} ?
let targSig = this.getTargetSignature(request, targ); // ${request.maxDataPoints} ?
if (this.shouldProfile) {
if (this.shouldProfile && this.getProfileData) {
this.pendingRequestIdsToTargSigs.set(request.requestId, {
...this.getProfileData(request, targ),
identity: targIdent + '|' + targSig,
dashboardUID: request.dashboardUID ?? '',
interval: targ.interval ?? request.interval,
panelId: request.panelId,
expr: targExpr,
bytes: null,
panelId: request.panelId,
dashboardUID: request.dashboardUID ?? '',
});
}
@ -272,6 +268,7 @@ export class QueryCache {
// only do partial queries when new request range follows prior request range (possibly with overlap)
// e.g. now-6h with refresh <= 6h
prevTo = cached?.prevTo ?? Infinity;
doPartialQuery = newTo > prevTo && newFrom <= prevTo;
}
@ -280,19 +277,20 @@ export class QueryCache {
}
}
if (doPartialQuery) {
// 10m re-query overlap
if (doPartialQuery && prevTo) {
// clamp to make sure we don't re-query previous 10m when newFrom is ahead of it (e.g. 5min range, 30s refresh)
let newFromPartial = Math.max(prevTo! - this.overlapWindowMs, newFrom);
let newFromPartial = Math.max(prevTo - this.overlapWindowMs, newFrom);
const newToDate = dateTime(newTo);
const newFromPartialDate = dateTime(incrRoundDn(newFromPartial, request.intervalMs));
// modify to partial query
request = {
...request,
range: {
...request.range,
from: dateTime(newFromPartial),
to: dateTime(newTo),
from: newFromPartialDate,
to: newToDate,
},
};
} else {
@ -310,8 +308,8 @@ export class QueryCache {
// should amend existing cache with new frames and return full response
procFrames(
request: DataQueryRequest<PromQuery>,
requestInfo: CacheRequestInfo | undefined,
request: DataQueryRequest<T>,
requestInfo: CacheRequestInfo<T> | undefined,
respFrames: DataFrame[]
): DataFrame[] {
if (requestInfo?.shouldCache) {

Loading…
Cancel
Save