The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/public/app/core/services/FetchQueueWorker.ts

57 lines
2.6 KiB

import { concatMap, filter } from 'rxjs/operators';
import { FetchQueue, FetchStatus } from './FetchQueue';
import { BackendSrvRequest, GrafanaBootConfig } from '@grafana/runtime';
import { isDataQuery } from '../utils/query';
import { ResponseQueue } from './ResponseQueue';
interface WorkerEntry {
id: string;
options: BackendSrvRequest;
}
export class FetchQueueWorker {
constructor(fetchQueue: FetchQueue, responseQueue: ResponseQueue, config: GrafanaBootConfig) {
const maxParallelRequests = config?.http2Enabled ? 1000 : 5; // for tests that don't mock GrafanaBootConfig the config param will be undefined
// This will create an implicit live subscription for as long as this class lives.
// But as FetchQueueWorker is used by the singleton backendSrv that also lives for as long as Grafana app lives
// I think this ok. We could add some disposable pattern later if the need arises.
fetchQueue
.getUpdates()
.pipe(
filter(({ noOfPending }) => noOfPending > 0), // no reason to act if there is nothing to act upon
// Using concatMap instead of mergeMap so that the order with apiRequests first is preserved
// https://rxjs.dev/api/operators/concatMap
concatMap(({ state, noOfInProgress }) => {
const apiRequests = Object.keys(state)
.filter((k) => state[k].state === FetchStatus.Pending && !isDataQuery(state[k].options.url))
.reduce((all, key) => {
const entry = { id: key, options: state[key].options };
all.push(entry);
return all;
}, [] as WorkerEntry[]);
const dataRequests = Object.keys(state)
.filter((key) => state[key].state === FetchStatus.Pending && isDataQuery(state[key].options.url))
.reduce((all, key) => {
const entry = { id: key, options: state[key].options };
all.push(entry);
return all;
}, [] as WorkerEntry[]);
// apiRequests have precedence over data requests and should always be called directly
// this means we can end up with a negative value.
// Because the way Array.toSlice works with negative numbers we use Math.max below.
const noOfAllowedDataRequests = Math.max(maxParallelRequests - noOfInProgress - apiRequests.length, 0);
const dataRequestToFetch = dataRequests.slice(0, noOfAllowedDataRequests);
return apiRequests.concat(dataRequestToFetch);
})
)
.subscribe(({ id, options }) => {
// This will add an entry to the responseQueue
responseQueue.add(id, options);
});
}
}