mirror of https://github.com/grafana/grafana
BackendSrv: Queues data source requests but passes through api requests (#26947)
* Refactor: initial commit * wip * Refactor: getting into a simpler model * Refactor: adds some comments * Refactor: renames statuses according to PR comments * Refactor: adds more comments * Tests: adds tests for FetchQueue * Tests: adds tests for ResponseQueue * Tests: adds tests for FetchQueueWorker * Tests: simplified the tests for ResponseQueue * Refactor: adds http2 scenario * Refactor: using Cfg instead of global variable Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com> * Refactor: reverted change in frontendsettings.go * Tests: fix test mocks * Fix: changes how cfg.Protocol gets its value Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>pull/27092/head
parent
95432fb684
commit
9e357d84a4
@ -0,0 +1,156 @@ |
||||
import { Observable } from 'rxjs'; |
||||
import { take } from 'rxjs/operators'; |
||||
import { BackendSrvRequest } from '@grafana/runtime'; |
||||
|
||||
import { FetchQueue, FetchQueueUpdate, FetchStatus } from './FetchQueue'; |
||||
|
||||
type SubscribeTesterArgs<T> = { |
||||
observable: Observable<T>; |
||||
expectCallback: (data: T) => void; |
||||
doneCallback: jest.DoneCallback; |
||||
}; |
||||
|
||||
export const subscribeTester = <T>({ observable, expectCallback, doneCallback }: SubscribeTesterArgs<T>) => { |
||||
observable.subscribe({ |
||||
next: data => expectCallback(data), |
||||
complete: () => { |
||||
doneCallback(); |
||||
}, |
||||
}); |
||||
}; |
||||
|
||||
describe('FetchQueue', () => { |
||||
describe('add', () => { |
||||
describe('when called twice', () => { |
||||
it('then an update with the correct state should be published', done => { |
||||
const id = 'id'; |
||||
const id2 = 'id2'; |
||||
const options: BackendSrvRequest = { url: 'http://someurl' }; |
||||
const options2: BackendSrvRequest = { url: 'http://someotherurl' }; |
||||
const expects: FetchQueueUpdate[] = [ |
||||
{ |
||||
noOfPending: 1, |
||||
noOfInProgress: 0, |
||||
state: { |
||||
['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, |
||||
}, |
||||
}, |
||||
{ |
||||
noOfPending: 2, |
||||
noOfInProgress: 0, |
||||
state: { |
||||
['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, |
||||
['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.Pending }, |
||||
}, |
||||
}, |
||||
]; |
||||
const queue = new FetchQueue(); |
||||
let calls = 0; |
||||
|
||||
subscribeTester({ |
||||
observable: queue.getUpdates().pipe(take(2)), |
||||
expectCallback: data => expect(data).toEqual(expects[calls++]), |
||||
doneCallback: done, |
||||
}); |
||||
|
||||
queue.add(id, options); |
||||
queue.add(id2, options2); |
||||
}); |
||||
}); |
||||
}); |
||||
|
||||
describe('setInProgress', () => { |
||||
describe('when called', () => { |
||||
it('then an update with the correct state should be published', done => { |
||||
const id = 'id'; |
||||
const id2 = 'id2'; |
||||
const options: BackendSrvRequest = { url: 'http://someurl' }; |
||||
const options2: BackendSrvRequest = { url: 'http://someotherurl' }; |
||||
const expects: FetchQueueUpdate[] = [ |
||||
{ |
||||
noOfPending: 1, |
||||
noOfInProgress: 0, |
||||
state: { |
||||
['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, |
||||
}, |
||||
}, |
||||
{ |
||||
noOfPending: 2, |
||||
noOfInProgress: 0, |
||||
state: { |
||||
['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, |
||||
['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.Pending }, |
||||
}, |
||||
}, |
||||
{ |
||||
noOfPending: 1, |
||||
noOfInProgress: 1, |
||||
state: { |
||||
['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, |
||||
['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.InProgress }, |
||||
}, |
||||
}, |
||||
]; |
||||
const queue = new FetchQueue(); |
||||
let calls = 0; |
||||
|
||||
subscribeTester({ |
||||
observable: queue.getUpdates().pipe(take(3)), |
||||
expectCallback: data => expect(data).toEqual(expects[calls++]), |
||||
doneCallback: done, |
||||
}); |
||||
|
||||
queue.add(id, options); |
||||
queue.add(id2, options2); |
||||
queue.setInProgress(id2); |
||||
}); |
||||
}); |
||||
}); |
||||
|
||||
describe('setDone', () => { |
||||
describe('when called', () => { |
||||
it('then an update with the correct state should be published', done => { |
||||
const id = 'id'; |
||||
const id2 = 'id2'; |
||||
const options: BackendSrvRequest = { url: 'http://someurl' }; |
||||
const options2: BackendSrvRequest = { url: 'http://someotherurl' }; |
||||
const expects: FetchQueueUpdate[] = [ |
||||
{ |
||||
noOfPending: 1, |
||||
noOfInProgress: 0, |
||||
state: { |
||||
['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, |
||||
}, |
||||
}, |
||||
{ |
||||
noOfPending: 2, |
||||
noOfInProgress: 0, |
||||
state: { |
||||
['id']: { options: { url: 'http://someurl' }, state: FetchStatus.Pending }, |
||||
['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.Pending }, |
||||
}, |
||||
}, |
||||
{ |
||||
noOfPending: 1, |
||||
noOfInProgress: 0, |
||||
state: { |
||||
['id2']: { options: { url: 'http://someotherurl' }, state: FetchStatus.Pending }, |
||||
}, |
||||
}, |
||||
]; |
||||
const queue = new FetchQueue(); |
||||
let calls = 0; |
||||
|
||||
subscribeTester({ |
||||
observable: queue.getUpdates().pipe(take(3)), |
||||
expectCallback: data => expect(data).toEqual(expects[calls++]), |
||||
doneCallback: done, |
||||
}); |
||||
|
||||
queue.add(id, options); |
||||
queue.add(id2, options2); |
||||
queue.setDone(id); |
||||
}); |
||||
}); |
||||
}); |
||||
}); |
||||
@ -0,0 +1,94 @@ |
||||
import { Observable, Subject } from 'rxjs'; |
||||
|
||||
import { BackendSrvRequest } from '@grafana/runtime'; |
||||
|
||||
export interface QueueState extends Record<string, { state: FetchStatus; options: BackendSrvRequest }> {} |
||||
|
||||
export enum FetchStatus { |
||||
Pending, |
||||
InProgress, |
||||
Done, |
||||
} |
||||
|
||||
export interface FetchQueueUpdate { |
||||
noOfInProgress: number; |
||||
noOfPending: number; |
||||
state: QueueState; |
||||
} |
||||
|
||||
interface QueueStateEntry { |
||||
id: string; |
||||
options?: BackendSrvRequest; |
||||
state: FetchStatus; |
||||
} |
||||
|
||||
export class FetchQueue { |
||||
private state: QueueState = {}; // internal queue state
|
||||
private queue: Subject<QueueStateEntry> = new Subject<QueueStateEntry>(); // internal stream for requests that are to be queued
|
||||
private updates: Subject<FetchQueueUpdate> = new Subject<FetchQueueUpdate>(); // external stream with updates to the queue state
|
||||
|
||||
constructor(debug = false) { |
||||
// This will create an implicit live subscription for as long as this class lives.
|
||||
// But as FetchQueue is used by the singleton backendSrv that also lives for as long as Grafana app lives
|
||||
// I think this ok. We could add some disposable pattern later if the need arises.
|
||||
this.queue.subscribe(entry => { |
||||
const { id, state, options } = entry; |
||||
|
||||
if (!this.state[id]) { |
||||
this.state[id] = { state: FetchStatus.Pending, options: {} as BackendSrvRequest }; |
||||
} |
||||
|
||||
if (state === FetchStatus.Done) { |
||||
delete this.state[id]; |
||||
const update = this.getUpdate(this.state); |
||||
this.publishUpdate(update, debug); |
||||
return; |
||||
} |
||||
|
||||
this.state[id].state = state; |
||||
|
||||
if (options) { |
||||
this.state[id].options = options; |
||||
} |
||||
|
||||
const update = this.getUpdate(this.state); |
||||
this.publishUpdate(update, debug); |
||||
}); |
||||
} |
||||
|
||||
add = (id: string, options: BackendSrvRequest): void => this.queue.next({ id, options, state: FetchStatus.Pending }); |
||||
|
||||
setInProgress = (id: string): void => this.queue.next({ id, state: FetchStatus.InProgress }); |
||||
|
||||
setDone = (id: string): void => this.queue.next({ id, state: FetchStatus.Done }); |
||||
|
||||
getUpdates = (): Observable<FetchQueueUpdate> => this.updates.asObservable(); |
||||
|
||||
private getUpdate = (state: QueueState): FetchQueueUpdate => { |
||||
const noOfInProgress = Object.keys(state).filter(key => state[key].state === FetchStatus.InProgress).length; |
||||
const noOfPending = Object.keys(state).filter(key => state[key].state === FetchStatus.Pending).length; |
||||
|
||||
return { noOfPending, noOfInProgress, state }; |
||||
}; |
||||
|
||||
private publishUpdate = (update: FetchQueueUpdate, debug: boolean): void => { |
||||
this.printState(update, debug); |
||||
this.updates.next(update); |
||||
}; |
||||
|
||||
private printState = (update: FetchQueueUpdate, debug: boolean): void => { |
||||
if (!debug) { |
||||
return; |
||||
} |
||||
|
||||
const entriesWithoutOptions = Object.keys(update.state).reduce((all, key) => { |
||||
const entry = { id: key, state: update.state[key].state }; |
||||
all.push(entry); |
||||
return all; |
||||
}, [] as Array<{ id: string; state: FetchStatus }>); |
||||
|
||||
console.log('FetchQueue noOfStarted', update.noOfInProgress); |
||||
console.log('FetchQueue noOfNotStarted', update.noOfPending); |
||||
console.log('FetchQueue state', entriesWithoutOptions); |
||||
}; |
||||
} |
||||
@ -0,0 +1,97 @@ |
||||
import { Subject } from 'rxjs'; |
||||
|
||||
import { FetchQueue, FetchQueueUpdate, FetchStatus } from './FetchQueue'; |
||||
import { ResponseQueue } from './ResponseQueue'; |
||||
import { FetchQueueWorker } from './FetchQueueWorker'; |
||||
import { expect } from '../../../test/lib/common'; |
||||
import { GrafanaBootConfig } from '@grafana/runtime'; |
||||
|
||||
const getTestContext = (http2Enabled = false) => { |
||||
const config: GrafanaBootConfig = ({ http2Enabled } as unknown) as GrafanaBootConfig; |
||||
const dataUrl = 'http://localhost:3000/api/ds/query?=abc'; |
||||
const apiUrl = 'http://localhost:3000/api/alerts?state=all'; |
||||
const updates: Subject<FetchQueueUpdate> = new Subject<FetchQueueUpdate>(); |
||||
|
||||
const queueMock: FetchQueue = ({ |
||||
add: jest.fn(), |
||||
setInProgress: jest.fn(), |
||||
setDone: jest.fn(), |
||||
getUpdates: () => updates.asObservable(), |
||||
} as unknown) as FetchQueue; |
||||
|
||||
const addMock = jest.fn(); |
||||
const responseQueueMock: ResponseQueue = ({ |
||||
add: addMock, |
||||
getResponses: jest.fn(), |
||||
} as unknown) as ResponseQueue; |
||||
|
||||
new FetchQueueWorker(queueMock, responseQueueMock, config); |
||||
|
||||
return { dataUrl, apiUrl, updates, queueMock, addMock }; |
||||
}; |
||||
|
||||
describe('FetchQueueWorker', () => { |
||||
describe('when an update is pushed in the stream', () => { |
||||
describe('and queue has no pending entries', () => { |
||||
it('then nothing should be added to the responseQueue', () => { |
||||
const { updates, addMock } = getTestContext(); |
||||
updates.next({ noOfPending: 0, noOfInProgress: 1, state: {} }); |
||||
|
||||
expect(addMock).toHaveBeenCalledTimes(0); |
||||
}); |
||||
}); |
||||
|
||||
describe('and queue has pending entries', () => { |
||||
describe('and there are no entries in progress', () => { |
||||
it('then api request should be added before data requests responseQueue', () => { |
||||
const { updates, addMock, dataUrl, apiUrl } = getTestContext(); |
||||
updates.next({ |
||||
noOfPending: 2, |
||||
noOfInProgress: 0, |
||||
state: { |
||||
['data']: { state: FetchStatus.Pending, options: { url: dataUrl } }, |
||||
['api']: { state: FetchStatus.Pending, options: { url: apiUrl } }, |
||||
}, |
||||
}); |
||||
|
||||
expect(addMock.mock.calls).toEqual([ |
||||
['api', { url: 'http://localhost:3000/api/alerts?state=all' }], |
||||
['data', { url: 'http://localhost:3000/api/ds/query?=abc' }], |
||||
]); |
||||
}); |
||||
}); |
||||
|
||||
describe('and there are max concurrent entries in progress', () => { |
||||
it('then api request should always pass through but no data requests should pass', () => { |
||||
const { updates, addMock, dataUrl, apiUrl } = getTestContext(); |
||||
updates.next({ |
||||
noOfPending: 2, |
||||
noOfInProgress: 5, |
||||
state: { |
||||
['data']: { state: FetchStatus.Pending, options: { url: dataUrl } }, |
||||
['api']: { state: FetchStatus.Pending, options: { url: apiUrl } }, |
||||
}, |
||||
}); |
||||
|
||||
expect(addMock.mock.calls).toEqual([['api', { url: 'http://localhost:3000/api/alerts?state=all' }]]); |
||||
}); |
||||
}); |
||||
|
||||
describe('and http2 is enabled and there are max concurrent entries in progress', () => { |
||||
it('then api request should always pass through but no data requests should pass', () => { |
||||
const { updates, addMock, dataUrl, apiUrl } = getTestContext(true); |
||||
updates.next({ |
||||
noOfPending: 2, |
||||
noOfInProgress: 1000, |
||||
state: { |
||||
['data']: { state: FetchStatus.Pending, options: { url: dataUrl } }, |
||||
['api']: { state: FetchStatus.Pending, options: { url: apiUrl } }, |
||||
}, |
||||
}); |
||||
|
||||
expect(addMock.mock.calls).toEqual([['api', { url: 'http://localhost:3000/api/alerts?state=all' }]]); |
||||
}); |
||||
}); |
||||
}); |
||||
}); |
||||
}); |
||||
@ -0,0 +1,58 @@ |
||||
import { concatMap, filter } from 'rxjs/operators'; |
||||
|
||||
import { FetchQueue, FetchStatus } from './FetchQueue'; |
||||
import { BackendSrvRequest } from '@grafana/runtime'; |
||||
import { isDataQuery } from '../utils/query'; |
||||
import { ResponseQueue } from './ResponseQueue'; |
||||
import { getConfig } from '../config'; |
||||
|
||||
interface WorkerEntry { |
||||
id: string; |
||||
options: BackendSrvRequest; |
||||
} |
||||
|
||||
export class FetchQueueWorker { |
||||
constructor(fetchQueue: FetchQueue, responseQueue: ResponseQueue, config = getConfig()) { |
||||
const maxParallelRequests = config.http2Enabled ? 1000 : 5; // assuming that 1000 parallel requests are enough for http2
|
||||
|
||||
// This will create an implicit live subscription for as long as this class lives.
|
||||
// But as FetchQueueWorker is used by the singleton backendSrv that also lives for as long as Grafana app lives
|
||||
// I think this ok. We could add some disposable pattern later if the need arises.
|
||||
fetchQueue |
||||
.getUpdates() |
||||
.pipe( |
||||
filter(({ noOfPending }) => noOfPending > 0), // no reason to act if there is nothing to act upon
|
||||
// Using concatMap instead of mergeMap so that the order with apiRequests first is preserved
|
||||
// https://rxjs.dev/api/operators/concatMap
|
||||
concatMap(({ state, noOfInProgress }) => { |
||||
const apiRequests = Object.keys(state) |
||||
.filter(k => state[k].state === FetchStatus.Pending && !isDataQuery(state[k].options.url)) |
||||
.reduce((all, key) => { |
||||
const entry = { id: key, options: state[key].options }; |
||||
all.push(entry); |
||||
return all; |
||||
}, [] as WorkerEntry[]); |
||||
|
||||
const dataRequests = Object.keys(state) |
||||
.filter(key => state[key].state === FetchStatus.Pending && isDataQuery(state[key].options.url)) |
||||
.reduce((all, key) => { |
||||
const entry = { id: key, options: state[key].options }; |
||||
all.push(entry); |
||||
return all; |
||||
}, [] as WorkerEntry[]); |
||||
|
||||
// apiRequests have precedence over data requests and should always be called directly
|
||||
// this means we can end up with a negative value.
|
||||
// Because the way Array.toSlice works with negative numbers we use Math.max below.
|
||||
const noOfAllowedDataRequests = Math.max(maxParallelRequests - noOfInProgress - apiRequests.length, 0); |
||||
const dataRequestToFetch = dataRequests.slice(0, noOfAllowedDataRequests); |
||||
|
||||
return apiRequests.concat(dataRequestToFetch); |
||||
}) |
||||
) |
||||
.subscribe(({ id, options }) => { |
||||
// This will add an entry to the responseQueue
|
||||
responseQueue.add(id, options); |
||||
}); |
||||
} |
||||
} |
||||
@ -0,0 +1,101 @@ |
||||
import { of } from 'rxjs'; |
||||
import { first } from 'rxjs/operators'; |
||||
import { BackendSrvRequest } from '@grafana/runtime'; |
||||
|
||||
import { FetchQueue, FetchQueueUpdate } from './FetchQueue'; |
||||
import { ResponseQueue } from './ResponseQueue'; |
||||
import { subscribeTester } from './FetchQueue.test'; |
||||
import { describe, expect } from '../../../test/lib/common'; |
||||
|
||||
const getTestContext = () => { |
||||
const id = 'id'; |
||||
const options: BackendSrvRequest = { url: 'http://someurl' }; |
||||
const expects: FetchQueueUpdate[] = []; |
||||
|
||||
const fetchResult = of({ |
||||
data: id, |
||||
status: 200, |
||||
statusText: 'OK', |
||||
ok: true, |
||||
headers: (null as unknown) as Headers, |
||||
redirected: false, |
||||
type: (null as unknown) as ResponseType, |
||||
url: options.url, |
||||
config: (null as unknown) as BackendSrvRequest, |
||||
}); |
||||
|
||||
const fetchMock = jest.fn().mockReturnValue(fetchResult); |
||||
const setInProgressMock = jest.fn(); |
||||
const setDoneMock = jest.fn(); |
||||
|
||||
const queueMock: FetchQueue = ({ |
||||
add: jest.fn(), |
||||
setInProgress: setInProgressMock, |
||||
setDone: setDoneMock, |
||||
getUpdates: jest.fn(), |
||||
} as unknown) as FetchQueue; |
||||
|
||||
const responseQueue = new ResponseQueue(queueMock, fetchMock); |
||||
|
||||
return { id, options, expects, fetchMock, setInProgressMock, setDoneMock, responseQueue, fetchResult }; |
||||
}; |
||||
|
||||
describe('ResponseQueue', () => { |
||||
describe('add', () => { |
||||
describe('when called', () => { |
||||
it('then the matching fetchQueue entry should be set to inProgress', () => { |
||||
const { id, options, setInProgressMock, setDoneMock, responseQueue } = getTestContext(); |
||||
|
||||
responseQueue.add(id, options); |
||||
|
||||
expect(setInProgressMock.mock.calls).toEqual([['id']]); |
||||
expect(setDoneMock).not.toHaveBeenCalled(); |
||||
}); |
||||
|
||||
it('then a response entry with correct id should be published', done => { |
||||
const { id, options, responseQueue } = getTestContext(); |
||||
|
||||
subscribeTester({ |
||||
observable: responseQueue.getResponses(id).pipe(first()), |
||||
expectCallback: data => expect(data.id).toEqual(id), |
||||
doneCallback: done, |
||||
}); |
||||
|
||||
responseQueue.add(id, options); |
||||
}); |
||||
|
||||
it('then fetch is called with correct options', done => { |
||||
const { id, options, responseQueue, fetchMock } = getTestContext(); |
||||
|
||||
subscribeTester({ |
||||
observable: responseQueue.getResponses(id).pipe(first()), |
||||
expectCallback: () => { |
||||
expect(fetchMock).toHaveBeenCalledTimes(1); |
||||
expect(fetchMock).toHaveBeenCalledWith({ url: 'http://someurl' }); |
||||
}, |
||||
doneCallback: done, |
||||
}); |
||||
|
||||
responseQueue.add(id, options); |
||||
}); |
||||
|
||||
describe('and when the fetch Observable is completed', () => { |
||||
it('then the matching fetchQueue entry should be set to Done', done => { |
||||
const { id, options, responseQueue, setInProgressMock, setDoneMock } = getTestContext(); |
||||
|
||||
subscribeTester({ |
||||
observable: responseQueue.getResponses(id).pipe(first()), |
||||
expectCallback: data => { |
||||
data.observable.subscribe().unsubscribe(); |
||||
expect(setInProgressMock.mock.calls).toEqual([['id']]); |
||||
expect(setDoneMock.mock.calls).toEqual([['id']]); |
||||
}, |
||||
doneCallback: done, |
||||
}); |
||||
|
||||
responseQueue.add(id, options); |
||||
}); |
||||
}); |
||||
}); |
||||
}); |
||||
}); |
||||
@ -0,0 +1,50 @@ |
||||
import { Observable, Subject } from 'rxjs'; |
||||
import { filter, finalize } from 'rxjs/operators'; |
||||
import { BackendSrvRequest, FetchResponse } from '@grafana/runtime'; |
||||
import { FetchQueue } from './FetchQueue'; |
||||
|
||||
interface FetchWorkEntry { |
||||
id: string; |
||||
options: BackendSrvRequest; |
||||
} |
||||
|
||||
interface FetchResponsesEntry<T> { |
||||
id: string; |
||||
observable: Observable<FetchResponse<T>>; |
||||
} |
||||
|
||||
export class ResponseQueue { |
||||
private queue: Subject<FetchWorkEntry> = new Subject<FetchWorkEntry>(); // internal stream for requests that are to be executed
|
||||
private responses: Subject<FetchResponsesEntry<any>> = new Subject<FetchResponsesEntry<any>>(); // external stream with responses from fetch
|
||||
|
||||
constructor(fetchQueue: FetchQueue, fetch: <T>(options: BackendSrvRequest) => Observable<FetchResponse<T>>) { |
||||
// This will create an implicit live subscription for as long as this class lives.
|
||||
// But as FetchQueue is used by the singleton backendSrv that also lives for as long as Grafana app lives
|
||||
// I think this ok. We could add some disposable pattern later if the need arises.
|
||||
this.queue.subscribe(entry => { |
||||
const { id, options } = entry; |
||||
|
||||
// Let the fetchQueue know that this id has started data fetching.
|
||||
fetchQueue.setInProgress(id); |
||||
|
||||
this.responses.next({ |
||||
id, |
||||
observable: fetch(options).pipe( |
||||
// finalize is called whenever this observable is unsubscribed/errored/completed/canceled
|
||||
// https://rxjs.dev/api/operators/finalize
|
||||
finalize(() => { |
||||
// Let the fetchQueue know that this id is done.
|
||||
fetchQueue.setDone(id); |
||||
}) |
||||
), |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
add = (id: string, options: BackendSrvRequest): void => { |
||||
this.queue.next({ id, options }); |
||||
}; |
||||
|
||||
getResponses = <T>(id: string): Observable<FetchResponsesEntry<T>> => |
||||
this.responses.asObservable().pipe(filter(entry => entry.id === id)); |
||||
} |
||||
Loading…
Reference in new issue