|
|
|
@ -1,5 +1,5 @@ |
|
|
|
|
import omitBy from 'lodash/omitBy'; |
|
|
|
|
import { from, merge, MonoTypeOperatorFunction, Observable, Subject, throwError } from 'rxjs'; |
|
|
|
|
import { from, merge, MonoTypeOperatorFunction, Observable, of, Subject, throwError } from 'rxjs'; |
|
|
|
|
import { catchError, filter, map, mergeMap, retryWhen, share, takeUntil, tap, throwIfEmpty } from 'rxjs/operators'; |
|
|
|
|
import { fromFetch } from 'rxjs/fetch'; |
|
|
|
|
import { BackendSrv as BackendService, BackendSrvRequest } from '@grafana/runtime'; |
|
|
|
@ -49,6 +49,11 @@ interface ErrorResponse<T extends ErrorResponseProps = any> { |
|
|
|
|
cancelled?: boolean; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
enum CancellationType { |
|
|
|
|
request, |
|
|
|
|
dataSourceRequest, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
function serializeParams(data: Record<string, any>): string { |
|
|
|
|
return Object.keys(data) |
|
|
|
|
.map(key => { |
|
|
|
@ -90,8 +95,8 @@ export class BackendSrv implements BackendService { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async get(url: string, params?: any) { |
|
|
|
|
return await this.request({ method: 'GET', url, params }); |
|
|
|
|
async get(url: string, params?: any, requestId?: string) { |
|
|
|
|
return await this.request({ method: 'GET', url, params, requestId }); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async delete(url: string) { |
|
|
|
@ -150,6 +155,13 @@ export class BackendSrv implements BackendService { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
async request(options: BackendSrvRequest): Promise<any> { |
|
|
|
|
// A requestId is a unique identifier for a particular query.
|
|
|
|
|
// Every observable below has a takeUntil that subscribes to this.inFlightRequests and
|
|
|
|
|
// will cancel/unsubscribe that observable when a new datasourceRequest with the same requestId is made
|
|
|
|
|
if (options.requestId) { |
|
|
|
|
this.inFlightRequests.next(options.requestId); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
options = this.parseRequestOptions(options, this.dependencies.contextSrv.user?.orgId); |
|
|
|
|
|
|
|
|
|
const fromFetchStream = this.getFromFetchStream(options); |
|
|
|
@ -173,7 +185,8 @@ export class BackendSrv implements BackendService { |
|
|
|
|
// this setTimeout hack enables any caller catching this err to set isHandled to true
|
|
|
|
|
setTimeout(() => this.requestErrorHandler(err), 50); |
|
|
|
|
return throwError(err); |
|
|
|
|
}) |
|
|
|
|
}), |
|
|
|
|
this.handleStreamCancellation(options, CancellationType.request) |
|
|
|
|
) |
|
|
|
|
.toPromise(); |
|
|
|
|
} |
|
|
|
@ -233,28 +246,7 @@ export class BackendSrv implements BackendService { |
|
|
|
|
|
|
|
|
|
return throwError(err); |
|
|
|
|
}), |
|
|
|
|
takeUntil( |
|
|
|
|
this.inFlightRequests.pipe( |
|
|
|
|
filter(requestId => { |
|
|
|
|
let cancelRequest = false; |
|
|
|
|
if (options && options.requestId && options.requestId === requestId) { |
|
|
|
|
// when a new requestId is started it will be published to inFlightRequests
|
|
|
|
|
// if a previous long running request that hasn't finished yet has the same requestId
|
|
|
|
|
// we need to cancel that request
|
|
|
|
|
cancelRequest = true; |
|
|
|
|
} |
|
|
|
|
return cancelRequest; |
|
|
|
|
}) |
|
|
|
|
) |
|
|
|
|
), |
|
|
|
|
// when a request is cancelled by takeUntil it will complete without emitting anything
|
|
|
|
|
// throwIfEmpty will then throw an error with cancelled set to true
|
|
|
|
|
throwIfEmpty(() => ({ |
|
|
|
|
cancelled: true, |
|
|
|
|
status: this.HTTP_REQUEST_CANCELED, |
|
|
|
|
statusText: 'Request was aborted', |
|
|
|
|
request: { url: parseUrlFromOptions(options), ...parseInitFromOptions(options) }, |
|
|
|
|
})) |
|
|
|
|
this.handleStreamCancellation(options, CancellationType.dataSourceRequest) |
|
|
|
|
) |
|
|
|
|
.toPromise(); |
|
|
|
|
} |
|
|
|
@ -540,6 +532,48 @@ export class BackendSrv implements BackendService { |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
private handleStreamCancellation = ( |
|
|
|
|
options: BackendSrvRequest, |
|
|
|
|
resultType: CancellationType |
|
|
|
|
): MonoTypeOperatorFunction<FetchResponse | DataSourceSuccessResponse | SuccessResponse> => inputStream => |
|
|
|
|
inputStream.pipe( |
|
|
|
|
takeUntil( |
|
|
|
|
this.inFlightRequests.pipe( |
|
|
|
|
filter(requestId => { |
|
|
|
|
let cancelRequest = false; |
|
|
|
|
if (options && options.requestId && options.requestId === requestId) { |
|
|
|
|
// when a new requestId is started it will be published to inFlightRequests
|
|
|
|
|
// if a previous long running request that hasn't finished yet has the same requestId
|
|
|
|
|
// we need to cancel that request
|
|
|
|
|
cancelRequest = true; |
|
|
|
|
} |
|
|
|
|
return cancelRequest; |
|
|
|
|
}) |
|
|
|
|
) |
|
|
|
|
), |
|
|
|
|
// when a request is cancelled by takeUntil it will complete without emitting anything so we use throwIfEmpty to identify this case
|
|
|
|
|
// in throwIfEmpty we'll then throw an cancelled error and then we'll return the correct result in the catchError or rethrow
|
|
|
|
|
throwIfEmpty(() => ({ |
|
|
|
|
cancelled: true, |
|
|
|
|
})), |
|
|
|
|
catchError(err => { |
|
|
|
|
if (!err.cancelled) { |
|
|
|
|
throwError(err); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (resultType === CancellationType.dataSourceRequest) { |
|
|
|
|
return of({ |
|
|
|
|
data: [], |
|
|
|
|
status: this.HTTP_REQUEST_CANCELED, |
|
|
|
|
statusText: 'Request was aborted', |
|
|
|
|
request: { url: parseUrlFromOptions(options), ...parseInitFromOptions(options) }, |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return of([]); |
|
|
|
|
}) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
coreModule.factory('backendSrv', () => backendSrv); |
|
|
|
|