From e237ff20a996c7313632b2e28f38032012f0e340 Mon Sep 17 00:00:00 2001 From: Andrej Ocenas Date: Wed, 17 Nov 2021 21:46:13 +0100 Subject: [PATCH] CloudWatch Logs: Add retry strategy for hitting max concurrent queries (#39290) * Add error passing and retry strategy * Change generic error to specific one * Make the error more generic * Refactor retry strategy * Add retry that handles multiple queries * Rollback some backend changes * Remove simple retry strategy * Add comments * Add tests * Small test fixes * Add log timeout config * Fix tests * Fix tests * Add validation * Remove commented code and add comment * Fix snapshots * Remove unnecessary cast --- .../src/services/backendSrv.ts | 2 +- pkg/tsdb/cloudwatch/log_actions.go | 28 ++- pkg/tsdb/cloudwatch/log_actions_test.go | 3 +- .../cloudwatch/components/ConfigEditor.tsx | 42 ++++ .../__snapshots__/ConfigEditor.test.tsx.snap | 115 +++++++++ .../datasource/cloudwatch/datasource.test.ts | 24 +- .../datasource/cloudwatch/datasource.ts | 104 +++++--- .../plugins/datasource/cloudwatch/types.ts | 13 +- .../cloudwatch/utils/logsRetry.test.ts | 225 ++++++++++++++++++ .../datasource/cloudwatch/utils/logsRetry.ts | 163 +++++++++++++ 10 files changed, 660 insertions(+), 59 deletions(-) create mode 100644 public/app/plugins/datasource/cloudwatch/utils/logsRetry.test.ts create mode 100644 public/app/plugins/datasource/cloudwatch/utils/logsRetry.ts diff --git a/packages/grafana-runtime/src/services/backendSrv.ts b/packages/grafana-runtime/src/services/backendSrv.ts index b673b00903c..4260ef410de 100644 --- a/packages/grafana-runtime/src/services/backendSrv.ts +++ b/packages/grafana-runtime/src/services/backendSrv.ts @@ -114,7 +114,7 @@ export interface FetchErrorDataProps { * * @public */ -export interface FetchError { +export interface FetchError { status: number; statusText?: string; data: T; diff --git a/pkg/tsdb/cloudwatch/log_actions.go b/pkg/tsdb/cloudwatch/log_actions.go index 8d9c1f93285..a9099d6cb87 100644 --- a/pkg/tsdb/cloudwatch/log_actions.go +++ b/pkg/tsdb/cloudwatch/log_actions.go @@ -14,9 +14,22 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/util/errutil" "golang.org/x/sync/errgroup" ) +var LimitExceededException = "LimitExceededException" + +type AWSError struct { + Code string + Message string + Payload map[string]string +} + +func (e *AWSError) Error() string { + return fmt.Sprintf("%s: %s", e.Code, e.Message) +} + func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { resp := backend.NewQueryDataResponse() @@ -33,6 +46,13 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend eg.Go(func() error { dataframe, err := e.executeLogAction(ectx, model, query, req.PluginContext) if err != nil { + var AWSError *AWSError + if errors.As(err, &AWSError) { + resultChan <- backend.Responses{ + query.RefID: backend.DataResponse{Frames: data.Frames{}, Error: AWSError}, + } + return nil + } return err } @@ -56,6 +76,7 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend for refID, response := range result { respD := resp.Responses[refID] respD.Frames = response.Frames + respD.Error = response.Error resp.Responses[refID] = respD } } @@ -96,7 +117,7 @@ func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, model *simple data, err = e.handleGetLogEvents(ctx, logsClient, model) } if err != nil { - return nil, err + return nil, errutil.Wrapf(err, "failed to execute log action with subtype: %s", subType) } return data, nil @@ -224,6 +245,11 @@ func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cl model *simplejson.Json, timeRange backend.TimeRange, refID string) (*data.Frame, error) { startQueryResponse, err := e.executeStartQuery(ctx, logsClient, model, timeRange) if err != nil { + var awsErr awserr.Error + if errors.As(err, &awsErr) && awsErr.Code() == "LimitExceededException" { + plog.Debug("executeStartQuery limit exceeded", "err", awsErr) + return nil, &AWSError{Code: LimitExceededException, Message: err.Error()} + } return nil, err } diff --git a/pkg/tsdb/cloudwatch/log_actions_test.go b/pkg/tsdb/cloudwatch/log_actions_test.go index f52a7338d58..5691d8f3e00 100644 --- a/pkg/tsdb/cloudwatch/log_actions_test.go +++ b/pkg/tsdb/cloudwatch/log_actions_test.go @@ -3,7 +3,6 @@ package cloudwatch import ( "context" "encoding/json" - "fmt" "testing" "time" @@ -285,7 +284,7 @@ func TestQuery_StartQuery(t *testing.T) { }) require.Error(t, err) - assert.Equal(t, fmt.Errorf("invalid time range: start time must be before end time"), err) + assert.Contains(t, err.Error(), "invalid time range: start time must be before end time") }) t.Run("valid time range", func(t *testing.T) { diff --git a/public/app/plugins/datasource/cloudwatch/components/ConfigEditor.tsx b/public/app/plugins/datasource/cloudwatch/components/ConfigEditor.tsx index 314f6552bd5..3179bdc7d9c 100644 --- a/public/app/plugins/datasource/cloudwatch/components/ConfigEditor.tsx +++ b/public/app/plugins/datasource/cloudwatch/components/ConfigEditor.tsx @@ -1,6 +1,8 @@ import React, { FC, useEffect, useState } from 'react'; +import { useDebounce } from 'react-use'; import { Input, InlineField } from '@grafana/ui'; import { + rangeUtil, DataSourcePluginOptionsEditorProps, onUpdateDatasourceJsonDataOption, updateDatasourcePluginJsonDataOption, @@ -23,6 +25,7 @@ export const ConfigEditor: FC = (props: Props) => { const datasource = useDatasource(options.name); useAuthenticationWarning(options.jsonData); + const logsTimeoutError = useTimoutValidation(props.options.jsonData.logsTimeout); return ( <> @@ -43,6 +46,24 @@ export const ConfigEditor: FC = (props: Props) => { +

CloudWatch Logs

+
+ + + +
+ updateDatasourcePluginJsonDataOption(props, 'tracingDatasourceUid', uid)} datasourceUid={options.jsonData.tracingDatasourceUid} @@ -84,3 +105,24 @@ function useDatasource(datasourceName: string) { return datasource; } + +function useTimoutValidation(value: string | undefined) { + const [err, setErr] = useState(undefined); + useDebounce( + () => { + if (value) { + try { + rangeUtil.describeInterval(value); + setErr(undefined); + } catch (e) { + setErr(e.toString()); + } + } else { + setErr(undefined); + } + }, + 350, + [value] + ); + return err; +} diff --git a/public/app/plugins/datasource/cloudwatch/components/__snapshots__/ConfigEditor.test.tsx.snap b/public/app/plugins/datasource/cloudwatch/components/__snapshots__/ConfigEditor.test.tsx.snap index 2fe58d873ac..35d0bb1046e 100644 --- a/public/app/plugins/datasource/cloudwatch/components/__snapshots__/ConfigEditor.test.tsx.snap +++ b/public/app/plugins/datasource/cloudwatch/components/__snapshots__/ConfigEditor.test.tsx.snap @@ -62,6 +62,29 @@ exports[`Render should disable access key id field 1`] = ` /> +

+ CloudWatch Logs +

+
+ + + +
@@ -125,6 +148,29 @@ exports[`Render should render component 1`] = ` /> +

+ CloudWatch Logs +

+
+ + + +
@@ -193,6 +239,29 @@ exports[`Render should show access key and secret access key fields 1`] = ` /> +

+ CloudWatch Logs +

+
+ + + +
@@ -261,6 +330,29 @@ exports[`Render should show arn role field 1`] = ` /> +

+ CloudWatch Logs +

+
+ + + +
@@ -329,6 +421,29 @@ exports[`Render should show credentials profile name field 1`] = ` /> +

+ CloudWatch Logs +

+
+ + + +
diff --git a/public/app/plugins/datasource/cloudwatch/datasource.test.ts b/public/app/plugins/datasource/cloudwatch/datasource.test.ts index 4ad016a6140..cda8ca02007 100644 --- a/public/app/plugins/datasource/cloudwatch/datasource.test.ts +++ b/public/app/plugins/datasource/cloudwatch/datasource.test.ts @@ -30,16 +30,20 @@ describe('datasource', () => { it('should interpolate variables in the query', async () => { const { datasource, fetchMock } = setupMockedDataSource(); - datasource.query({ - targets: [ - { - queryMode: 'Logs' as 'Logs', - region: '$region', - expression: 'fields $fields', - logGroupNames: ['/some/$group'], - }, - ], - } as any); + await lastValueFrom( + datasource + .query({ + targets: [ + { + queryMode: 'Logs', + region: '$region', + expression: 'fields $fields', + logGroupNames: ['/some/$group'], + }, + ], + } as any) + .pipe(toArray()) + ); expect(fetchMock.mock.calls[0][0].data.queries[0]).toMatchObject({ queryString: 'fields templatedField', logGroupNames: ['/some/templatedGroup'], diff --git a/public/app/plugins/datasource/cloudwatch/datasource.ts b/public/app/plugins/datasource/cloudwatch/datasource.ts index 50b716e053c..3909b681c19 100644 --- a/public/app/plugins/datasource/cloudwatch/datasource.ts +++ b/public/app/plugins/datasource/cloudwatch/datasource.ts @@ -3,10 +3,11 @@ import angular from 'angular'; import { find, findLast, isEmpty, isString, set } from 'lodash'; import { from, lastValueFrom, merge, Observable, of, throwError, zip } from 'rxjs'; import { catchError, concatMap, finalize, map, mergeMap, repeat, scan, share, takeWhile, tap } from 'rxjs/operators'; -import { DataSourceWithBackend, getBackendSrv, toDataQueryResponse } from '@grafana/runtime'; +import { DataSourceWithBackend, FetchError, getBackendSrv, toDataQueryResponse } from '@grafana/runtime'; import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider'; import { DataFrame, + DataQueryError, DataQueryErrorType, DataQueryRequest, DataQueryResponse, @@ -54,6 +55,7 @@ import { VariableWithMultiSupport } from 'app/features/variables/types'; import { increasingInterval } from './utils/rxjs/increasingInterval'; import { toTestingStatus } from '@grafana/runtime/src/utils/queryResponse'; import { addDataLinksToLogsResponse } from './utils/datalinks'; +import { runWithRetry } from './utils/logsRetry'; const DS_QUERY_ENDPOINT = '/api/ds/query'; @@ -85,6 +87,7 @@ export class CloudWatchDatasource datasourceName: string; languageProvider: CloudWatchLanguageProvider; tracingDataSourceUid?: string; + logsTimeout: string; type = 'cloudwatch'; standardStatistics = ['Average', 'Maximum', 'Minimum', 'Sum', 'SampleCount']; @@ -109,6 +112,7 @@ export class CloudWatchDatasource this.datasourceName = instanceSettings.name; this.languageProvider = new CloudWatchLanguageProvider(this); this.tracingDataSourceUid = instanceSettings.jsonData.tracingDatasourceUid; + this.logsTimeout = instanceSettings.jsonData.logsTimeout || '15m'; } query(options: DataQueryRequest): Observable { @@ -158,28 +162,42 @@ export class CloudWatchDatasource } const queryParams = logQueries.map((target: CloudWatchLogsQuery) => ({ - queryString: target.expression, + queryString: target.expression || '', refId: target.refId, logGroupNames: target.logGroupNames, region: this.replace(this.getActualRegion(target.region), options.scopedVars, true, 'region'), })); - // This first starts the query which returns queryId which can be used to retrieve results. - return this.makeLogActionRequest('StartQuery', queryParams, { - makeReplacements: true, - scopedVars: options.scopedVars, - skipCache: true, - }).pipe( - mergeMap((dataFrames) => + return runWithRetry( + (targets: StartQueryRequest[]) => { + return this.makeLogActionRequest('StartQuery', targets, { + makeReplacements: true, + scopedVars: options.scopedVars, + skipCache: true, + }); + }, + queryParams, + { + timeout: rangeUtil.intervalToMs(this.logsTimeout), + } + ).pipe( + mergeMap(({ frames, error }: { frames: DataFrame[]; error?: DataQueryError }) => // This queries for the results this.logsQuery( - dataFrames.map((dataFrame) => ({ + frames.map((dataFrame) => ({ queryId: dataFrame.fields[0].values.get(0), region: dataFrame.meta?.custom?.['Region'] ?? 'default', refId: dataFrame.refId!, statsGroups: (logQueries.find((target) => target.refId === dataFrame.refId)! as CloudWatchLogsQuery) .statsGroups, })) + ).pipe( + map((response: DataQueryResponse) => { + if (!response.error && error) { + response.error = error; + } + return response; + }) ) ), mergeMap((dataQueryResponse) => { @@ -544,37 +562,47 @@ export class CloudWatchDatasource const requestParams = { from: range.from.valueOf().toString(), to: range.to.valueOf().toString(), - queries: queryParams.map((param: any) => ({ - refId: 'A', - intervalMs: 1, // dummy - maxDataPoints: 1, // dummy - datasource: this.getRef(), - type: 'logAction', - subtype: subtype, - ...param, - })), + queries: queryParams.map( + (param: GetLogEventsRequest | StartQueryRequest | DescribeLogGroupsRequest | GetLogGroupFieldsRequest) => ({ + refId: (param as StartQueryRequest).refId || 'A', + intervalMs: 1, // dummy + maxDataPoints: 1, // dummy + datasource: this.getRef(), + type: 'logAction', + subtype: subtype, + ...param, + }) + ), }; if (options.makeReplacements) { - requestParams.queries.forEach((query) => { - const fieldsToReplace: Array< - keyof (GetLogEventsRequest & StartQueryRequest & DescribeLogGroupsRequest & GetLogGroupFieldsRequest) - > = ['queryString', 'logGroupNames', 'logGroupName', 'logGroupNamePrefix']; - - for (const fieldName of fieldsToReplace) { - if (query.hasOwnProperty(fieldName)) { - if (Array.isArray(query[fieldName])) { - query[fieldName] = query[fieldName].map((val: string) => - this.replace(val, options.scopedVars, true, fieldName) - ); - } else { - query[fieldName] = this.replace(query[fieldName], options.scopedVars, true, fieldName); + requestParams.queries.forEach( + (query: GetLogEventsRequest | StartQueryRequest | DescribeLogGroupsRequest | GetLogGroupFieldsRequest) => { + const fieldsToReplace: Array< + keyof (GetLogEventsRequest & StartQueryRequest & DescribeLogGroupsRequest & GetLogGroupFieldsRequest) + > = ['queryString', 'logGroupNames', 'logGroupName', 'logGroupNamePrefix']; + + const anyQuery: any = query; + for (const fieldName of fieldsToReplace) { + if (query.hasOwnProperty(fieldName)) { + if (Array.isArray(anyQuery[fieldName])) { + anyQuery[fieldName] = anyQuery[fieldName].map((val: string) => + this.replace(val, options.scopedVars, true, fieldName) + ); + } else { + anyQuery[fieldName] = this.replace(anyQuery[fieldName], options.scopedVars, true, fieldName); + } } } + // TODO: seems to be some sort of bug that we don't really send region with all queries. This means + // if you select different than default region in editor you will get results for autocomplete from wrong + // region. + if (anyQuery.region) { + anyQuery.region = this.replace(anyQuery.region, options.scopedVars, true, 'region'); + anyQuery.region = this.getActualRegion(anyQuery.region); + } } - query.region = this.replace(query.region, options.scopedVars, true, 'region'); - query.region = this.getActualRegion(query.region); - }); + ); } const resultsToDataFrames = (val: any): DataFrame[] => toDataQueryResponse(val).data || []; @@ -587,7 +615,11 @@ export class CloudWatchDatasource return this.awsRequest(DS_QUERY_ENDPOINT, requestParams, headers).pipe( map((response) => resultsToDataFrames({ data: response })), - catchError((err) => { + catchError((err: FetchError) => { + if (err.status === 400) { + throw err; + } + if (err.data?.error) { throw err.data.error; } else if (err.data?.message) { diff --git a/public/app/plugins/datasource/cloudwatch/types.ts b/public/app/plugins/datasource/cloudwatch/types.ts index 4b160b94a97..32e9bd0dc69 100644 --- a/public/app/plugins/datasource/cloudwatch/types.ts +++ b/public/app/plugins/datasource/cloudwatch/types.ts @@ -76,7 +76,8 @@ export interface CloudWatchJsonData extends AwsAuthDataSourceJsonData { database?: string; customMetricsNamespaces?: string; endpoint?: string; - + // Time string like 15s, 10m etc, see rangeUtils.intervalToMs. + logsTimeout?: string; // Used to create links if logs contain traceId. tracingDatasourceUid?: string; } @@ -284,14 +285,6 @@ export interface StartQueryRequest { * The list of log groups to be queried. You can include up to 20 log groups. A StartQuery operation must include a logGroupNames or a logGroupName parameter, but not both. */ logGroupNames?: string[]; - /** - * The beginning of the time range to query. The range is inclusive, so the specified start time is included in the query. Specified as epoch time, the number of seconds since January 1, 1970, 00:00:00 UTC. - */ - startTime: number; - /** - * The end of the time range to query. The range is inclusive, so the specified end time is included in the query. Specified as epoch time, the number of seconds since January 1, 1970, 00:00:00 UTC. - */ - endTime: number; /** * The query string to use. For more information, see CloudWatch Logs Insights Query Syntax. */ @@ -300,6 +293,8 @@ export interface StartQueryRequest { * The maximum number of log events to return in the query. If the query string uses the fields command, only the specified fields and their values are returned. The default is 1000. */ limit?: number; + refId: string; + region: string; } export interface StartQueryResponse { /** diff --git a/public/app/plugins/datasource/cloudwatch/utils/logsRetry.test.ts b/public/app/plugins/datasource/cloudwatch/utils/logsRetry.test.ts new file mode 100644 index 00000000000..42214064857 --- /dev/null +++ b/public/app/plugins/datasource/cloudwatch/utils/logsRetry.test.ts @@ -0,0 +1,225 @@ +import { runWithRetry } from './logsRetry'; +import { toArray } from 'rxjs/operators'; +import { lastValueFrom, of, throwError } from 'rxjs'; +import { dataFrameToJSON, MutableDataFrame } from '@grafana/data'; +import { DataResponse, FetchError } from '@grafana/runtime'; +import { StartQueryRequest } from '../types'; + +describe('runWithRetry', () => { + it('returns results if no retry is needed', async () => { + const queryFunc = jest.fn(); + queryFunc.mockReturnValueOnce(of([createResponseFrame('A')])); + const targets = [targetA]; + const values = await lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); + expect(queryFunc).toBeCalledTimes(1); + expect(queryFunc).toBeCalledWith(targets); + expect(values).toEqual([{ frames: [createResponseFrame('A')] }]); + }); + + it('retries if error', async () => { + jest.useFakeTimers(); + const targets = [targetA]; + const queryFunc = jest.fn(); + queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets))); + queryFunc.mockReturnValueOnce(of([createResponseFrame('A')])); + + const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); + jest.runAllTimers(); + const values = await valuesPromise; + + expect(queryFunc).toBeCalledTimes(2); + expect(queryFunc).nthCalledWith(1, targets); + expect(queryFunc).nthCalledWith(2, targets); + expect(values).toEqual([{ frames: [createResponseFrame('A')] }]); + }); + + it('fails if reaching timoeut and no data was retrieved', async () => { + jest.useFakeTimers(); + const targets = [targetA]; + const queryFunc = jest.fn(); + queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets))); + queryFunc.mockReturnValueOnce(of([createResponseFrame('A')])); + + const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, { timeout: 0 }).pipe(toArray())); + jest.runAllTimers(); + let error; + try { + await valuesPromise; + } catch (e) { + error = e; + } + + expect(queryFunc).toBeCalledTimes(1); + expect(queryFunc).nthCalledWith(1, targets); + expect(error).toEqual({ message: 'LimitExceededException', refId: 'A' }); + }); + + it('fails if we get unexpected error', async () => { + jest.useFakeTimers(); + const targets = [targetA]; + const queryFunc = jest.fn(); + queryFunc.mockReturnValueOnce(throwError(() => 'random error')); + + const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); + jest.runAllTimers(); + let error; + try { + await valuesPromise; + } catch (e) { + error = e; + } + + expect(queryFunc).toBeCalledTimes(1); + expect(queryFunc).nthCalledWith(1, targets); + expect(error).toEqual('random error'); + }); + + it('works with multiple queries if there is no error', async () => { + const targets = [targetA, targetB]; + const queryFunc = jest.fn(); + queryFunc.mockReturnValueOnce(of([createResponseFrame('A'), createResponseFrame('B')])); + + const values = await lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); + + expect(queryFunc).toBeCalledTimes(1); + expect(queryFunc).nthCalledWith(1, targets); + expect(values).toEqual([{ frames: [createResponseFrame('A'), createResponseFrame('B')] }]); + }); + + it('works with multiple queries only one errors out', async () => { + jest.useFakeTimers(); + const targets = [targetA, targetB]; + const queryFunc = jest.fn(); + queryFunc.mockReturnValueOnce( + throwError(() => + createErrorResponse(targets, { + A: { frames: [dataFrameToJSON(createResponseFrame('A'))] }, + B: { error: 'LimitExceededException' }, + }) + ) + ); + + queryFunc.mockReturnValueOnce(of([createResponseFrame('B')])); + + const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); + jest.runAllTimers(); + const values = await valuesPromise; + + expect(queryFunc).toBeCalledTimes(2); + expect(queryFunc).nthCalledWith(1, targets); + expect(queryFunc).nthCalledWith(2, [targetB]); + // Bit more involved because dataFrameToJSON and dataFrameFromJSON are not symmetrical and add some attributes to the + // dataframe fields + expect(values.length).toBe(1); + expect(values[0].frames.length).toBe(2); + expect(values[0].frames[0].fields[0].values.get(0)).toBe('A'); + expect(values[0].frames[1].fields[0].values.get(0)).toBe('B'); + }); + + it('sends data and also error if only one query gets limit error', async () => { + jest.useFakeTimers(); + const targets = [targetA, targetB]; + const queryFunc = jest.fn(); + queryFunc.mockReturnValueOnce( + throwError(() => + createErrorResponse(targets, { + A: { frames: [dataFrameToJSON(createResponseFrame('A'))] }, + B: { error: 'LimitExceededException' }, + }) + ) + ); + + const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, { timeout: 0 }).pipe(toArray())); + jest.runAllTimers(); + const values = await valuesPromise; + + expect(queryFunc).toBeCalledTimes(1); + expect(queryFunc).nthCalledWith(1, targets); + expect(values.length).toBe(1); + expect(values[0].frames.length).toBe(1); + expect(values[0].frames[0].fields[0].values.get(0)).toBe('A'); + expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' }); + }); + + it('sends all collected successful responses on timeout', async () => { + jest.useFakeTimers(); + const targets = [targetA, targetB, targetC]; + const queryFunc = jest.fn(); + queryFunc.mockReturnValueOnce( + throwError(() => + createErrorResponse(targets, { + A: { frames: [dataFrameToJSON(createResponseFrame('A'))] }, + B: { error: 'LimitExceededException' }, + C: { error: 'LimitExceededException' }, + }) + ) + ); + + queryFunc.mockReturnValueOnce( + throwError(() => + createErrorResponse(targets, { + B: { frames: [dataFrameToJSON(createResponseFrame('B'))] }, + C: { error: 'LimitExceededException' }, + }) + ) + ); + + queryFunc.mockReturnValueOnce( + throwError(() => + createErrorResponse(targets, { + C: { error: 'LimitExceededException' }, + }) + ) + ); + + const valuesPromise = lastValueFrom( + runWithRetry(queryFunc, targets, { timeoutFunc: (retry) => retry >= 2 }).pipe(toArray()) + ); + jest.runAllTimers(); + const values = await valuesPromise; + + expect(queryFunc).toBeCalledTimes(3); + expect(queryFunc).nthCalledWith(1, targets); + expect(queryFunc).nthCalledWith(2, [targetB, targetC]); + expect(queryFunc).nthCalledWith(3, [targetC]); + expect(values.length).toBe(1); + expect(values[0].frames.length).toBe(2); + expect(values[0].frames[0].fields[0].values.get(0)).toBe('A'); + expect(values[0].frames[1].fields[0].values.get(0)).toBe('B'); + expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' }); + }); +}); + +const targetA = makeTarget('A'); +const targetB = makeTarget('B'); +const targetC = makeTarget('C'); + +function makeTarget(refId: string) { + return { queryString: 'query ' + refId, refId, region: 'test' }; +} + +function createResponseFrame(ref: string) { + return new MutableDataFrame({ + fields: [{ name: 'queryId', values: [ref] }], + refId: ref, + }); +} + +function createErrorResponse(targets: StartQueryRequest[], results?: Record): FetchError { + return { + status: 400, + data: { + results: results || { + A: { + error: 'LimitExceededException', + }, + }, + }, + config: { + url: '', + data: { + queries: targets, + }, + }, + }; +} diff --git a/public/app/plugins/datasource/cloudwatch/utils/logsRetry.ts b/public/app/plugins/datasource/cloudwatch/utils/logsRetry.ts new file mode 100644 index 00000000000..0bb9405a214 --- /dev/null +++ b/public/app/plugins/datasource/cloudwatch/utils/logsRetry.ts @@ -0,0 +1,163 @@ +import { Observable, Subscription } from 'rxjs'; +import { FetchError, toDataQueryResponse } from '@grafana/runtime'; +import { StartQueryRequest } from '../types'; +import { DataFrame, DataFrameJSON, DataQueryError } from '@grafana/data'; + +type Result = { frames: DataFrameJSON[]; error?: string }; + +const defaultTimeout = 30_000; + +/** + * A retry strategy specifically for cloud watch logs query. Cloud watch logs queries need first starting the query + * and the polling for the results. The start query can fail because of the concurrent queries rate limit, + * and so we hove to retry the start query call if there is already lot of queries running. + * + * As we send multiple queries in single request some can fail and some can succeed and we have to also handle those + * cases by only retrying the failed queries. We retry the failed queries until we hit the time limit or all queries + * succeed and only then we pass the data forward. This means we wait longer but makes the code a bit simpler as we + * can treat starting the query and polling as steps in a pipeline. + * @param queryFun + * @param targets + * @param options + */ +export function runWithRetry( + queryFun: (targets: StartQueryRequest[]) => Observable, + targets: StartQueryRequest[], + options: { + timeout?: number; + timeoutFunc?: (retry: number, startTime: number) => boolean; + retryWaitFunc?: (retry: number) => number; + } = {} +): Observable<{ frames: DataFrame[]; error?: DataQueryError }> { + const startTime = new Date(); + let retries = 0; + let timerID: any; + let subscription: Subscription; + let collected = {}; + + const timeoutFunction = options.timeoutFunc + ? options.timeoutFunc + : (retry: number, startTime: number) => { + return Date.now() >= startTime + (options.timeout === undefined ? defaultTimeout : options.timeout); + }; + + const retryWaitFunction = options.retryWaitFunc + ? options.retryWaitFunc + : (retry: number) => { + return Math.pow(2, retry) * 1000 + Math.random() * 100; + }; + + return new Observable((observer) => { + // Run function is where the logic takes place. We have it in a function so we can call it recursively. + function run(currentQueryParams: StartQueryRequest[]) { + subscription = queryFun(currentQueryParams).subscribe({ + next(frames) { + // In case we successfully finished, merge the current response with whatever we already collected. + const collectedPreviously = toDataQueryResponse({ data: { results: collected } }).data || []; + observer.next({ frames: [...collectedPreviously, ...frames] }); + observer.complete(); + }, + error(error: FetchError<{ results?: Record }> | string) { + // In case of error we first try to figure out what kind of error it is + + // This means it was a generic 500 error probably so we just pass it on + if (typeof error === 'string') { + observer.error(error); + return; + } + + // In case of multiple queries this some can error while some may be ok + const errorData = splitErrorData(error); + + if (!errorData) { + // Not sure what happened but the error structure wasn't what we expected + observer.error(error); + return; + } + + if (!errorData!.errors.length) { + // So there is no limit error but some other errors so nothing to retry so we just pass it as it would be + // otherwise. + observer.error(error); + return; + } + + if (timeoutFunction(retries, startTime.valueOf())) { + // We timed out but we could have started some queries + if (Object.keys(collected).length || Object.keys(errorData.good).length) { + const dataResponse = toDataQueryResponse({ + data: { + results: { + ...(errorData.good ?? {}), + ...(collected ?? {}), + }, + }, + }); + dataResponse.error = { + ...(dataResponse.error ?? {}), + message: `Some queries timed out: ${errorData.errorMessage}`, + }; + // So we consider this a partial success and pass the data forward but also with error to be shown to + // the user. + observer.next({ + error: dataResponse.error, + frames: dataResponse.data, + }); + observer.complete(); + } else { + // So we timed out and there was no data to pass forward so we just pass the error + const dataResponse = toDataQueryResponse({ data: { results: error.data?.results ?? {} } }); + observer.error(dataResponse.error); + } + return; + } + + collected = { + ...collected, + ...errorData!.good, + }; + + timerID = setTimeout( + () => { + retries++; + console.log(`Attempt ${retries}`); + run(errorData!.errors); + }, + // We want to know how long to wait for the next retry. First time this will be 0. + retryWaitFunction(retries + 1) + ); + }, + }); + } + run(targets); + return () => { + // We clear only the latest timer and subscription but the observable should complete after one response so + // there should not be more things running at the same time. + clearTimeout(timerID); + subscription.unsubscribe(); + }; + }); +} + +function splitErrorData(error: FetchError<{ results?: Record }>) { + const results = error.data?.results; + if (!results) { + return undefined; + } + return Object.keys(results).reduce<{ + errors: StartQueryRequest[]; + good: Record; + errorMessage: string; + }>( + (acc, refId) => { + if (results[refId].error?.startsWith('LimitExceededException')) { + acc.errorMessage = results[refId].error!; + acc.errors.push(error.config.data.queries.find((q: any) => q.refId === refId)); + } else { + acc.good[refId] = results[refId]; + } + return acc; + }, + { errors: [], good: {}, errorMessage: '' } + ); +}