@ -1,5 +1,5 @@
import { map , m ergeMap , throttleTime } from 'rxjs/operators' ;
import { identity , Unsubscribable } from 'rxjs' ;
import { mergeMap , throttleTime } from 'rxjs/operators' ;
import { identity , Unsubscribable , of } from 'rxjs' ;
import {
DataQuery ,
DataQueryErrorType ,
@ -27,19 +27,14 @@ import { ExploreId, QueryOptions } from 'app/types/explore';
import { getTimeZone } from 'app/features/profile/state/selectors' ;
import { getShiftedTimeRange } from 'app/core/utils/timePicker' ;
import { notifyApp } from '../../../core/actions' ;
import { preProcessPanelData , runRequest } from '../../query/state/runRequest' ;
import {
decorateWithFrameTypeMetadata ,
decorateWithGraphResult ,
decorateWithLogsResult ,
decorateWithTableResult ,
} from '../utils/decorators' ;
import { runRequest } from '../../query/state/runRequest' ;
import { decorateData } from '../utils/decorators' ;
import { createErrorNotification } from '../../../core/copy/appNotification' ;
import { richHistoryUpdatedAction , stateSave } from './main' ;
import { AnyAction , createAction , PayloadAction } from '@reduxjs/toolkit' ;
import { updateTime } from './time' ;
import { historyUpdatedAction } from './history' ;
import { createEmptyQueryResponse } from './utils' ;
import { createEmptyQueryResponse , createCacheKey , getResultsFromCache } from './utils' ;
//
// Actions and Payloads
@ -164,6 +159,24 @@ export interface ScanStopPayload {
}
export const scanStopAction = createAction < ScanStopPayload > ( 'explore/scanStop' ) ;
/ * *
* Adds query results to cache .
* This is currently used to cache last 5 query results for log queries run from logs navigation ( pagination ) .
* /
export interface AddResultsToCachePayload {
exploreId : ExploreId ;
cacheKey : string ;
queryResponse : PanelData ;
}
export const addResultsToCacheAction = createAction < AddResultsToCachePayload > ( 'explore/addResultsToCache' ) ;
/ * *
* Clears cache .
* /
export interface ClearCachePayload {
exploreId : ExploreId ;
}
export const clearCacheAction = createAction < ClearCachePayload > ( 'explore/clearCache' ) ;
//
// Action creators
//
@ -309,100 +322,115 @@ export const runQueries = (exploreId: ExploreId, options?: { replaceUrl?: boolea
history ,
refreshInterval ,
absoluteRange ,
cache ,
} = exploreItemState ;
let newQuerySub ;
if ( ! hasNonEmptyQuery ( queries ) ) {
dispatch ( clearQueriesAction ( { exploreId } ) ) ;
dispatch ( stateSave ( { replace : options?.replaceUrl } ) ) ; // Remember to save to state and update location
return ;
}
const cachedValue = getResultsFromCache ( cache , absoluteRange ) ;
if ( ! datasourceInstance ) {
return ;
}
// Some datasource's query builders allow per-query interval limits,
// but we're using the datasource interval limit for now
const minInterval = datasourceInstance ? . interval ;
stopQueryState ( querySubscription ) ;
// If we have results saved in cache, we are going to use those results instead of running queries
if ( cachedValue ) {
newQuerySub = of ( cachedValue )
. pipe ( mergeMap ( ( data : PanelData ) = > decorateData ( data , queryResponse , absoluteRange , refreshInterval , queries ) ) )
. subscribe ( ( data ) = > {
if ( ! data . error ) {
dispatch ( stateSave ( ) ) ;
}
const datasourceId = datasourceInstance ? . meta . id ;
dispatch ( queryStreamUpdatedAction ( { exploreId , response : data } ) ) ;
} ) ;
const queryOptions : QueryOptions = {
minInterval ,
// maxDataPoints is used in:
// Loki - used for logs streaming for buffer size, with undefined it falls back to datasource config if it supports that.
// Elastic - limits the number of datapoints for the counts query and for logs it has hardcoded limit.
// Influx - used to correctly display logs in graph
// TODO:unification
// maxDataPoints: mode === ExploreMode.Logs && datasourceId === 'loki' ? undefined : containerWidth,
maxDataPoints : containerWidth ,
liveStreaming : live ,
} ;
// If we don't have resuls saved in cache, run new queries
} else {
if ( ! hasNonEmptyQuery ( queries ) ) {
dispatch ( clearQueriesAction ( { exploreId } ) ) ;
dispatch ( stateSave ( { replace : options?.replaceUrl } ) ) ; // Remember to save to state and update location
return ;
}
if ( ! datasourceInstance ) {
return ;
}
// Some datasource's query builders allow per-query interval limits,
// but we're using the datasource interval limit for now
const minInterval = datasourceInstance ? . interval ;
stopQueryState ( querySubscription ) ;
const datasourceId = datasourceInstance ? . meta . id ;
const queryOptions : QueryOptions = {
minInterval ,
// maxDataPoints is used in:
// Loki - used for logs streaming for buffer size, with undefined it falls back to datasource config if it supports that.
// Elastic - limits the number of datapoints for the counts query and for logs it has hardcoded limit.
// Influx - used to correctly display logs in graph
// TODO:unification
// maxDataPoints: mode === ExploreMode.Logs && datasourceId === 'loki' ? undefined : containerWidth,
maxDataPoints : containerWidth ,
liveStreaming : live ,
} ;
const datasourceName = datasourceInstance . name ;
const timeZone = getTimeZone ( getState ( ) . user ) ;
const transaction = buildQueryTransaction ( queries , queryOptions , range , scanning , timeZone ) ;
let firstResponse = true ;
dispatch ( changeLoadingStateAction ( { exploreId , loadingState : LoadingState.Loading } ) ) ;
const newQuerySub = runRequest ( datasourceInstance , transaction . request )
. pipe (
// Simple throttle for live tailing, in case of > 1000 rows per interval we spend about 200ms on processing and
// rendering. In case this is optimized this can be tweaked, but also it should be only as fast as user
// actually can see what is happening.
live ? throttleTime ( 500 ) : identity ,
map ( ( data : PanelData ) = > preProcessPanelData ( data , queryResponse ) ) ,
map ( decorateWithFrameTypeMetadata ) ,
map ( decorateWithGraphResult ) ,
map ( decorateWithLogsResult ( { absoluteRange , refreshInterval , queries } ) ) ,
mergeMap ( decorateWithTableResult )
)
. subscribe (
( data ) = > {
if ( ! data . error && firstResponse ) {
// Side-effect: Saving history in localstorage
const nextHistory = updateHistory ( history , datasourceId , queries ) ;
const nextRichHistory = addToRichHistory (
richHistory || [ ] ,
datasourceId ,
datasourceName ,
queries ,
false ,
'' ,
''
) ;
dispatch ( historyUpdatedAction ( { exploreId , history : nextHistory } ) ) ;
dispatch ( richHistoryUpdatedAction ( { richHistory : nextRichHistory } ) ) ;
// We save queries to the URL here so that only successfully run queries change the URL.
dispatch ( stateSave ( { replace : options?.replaceUrl } ) ) ;
}
const datasourceName = datasourceInstance . name ;
const timeZone = getTimeZone ( getState ( ) . user ) ;
const transaction = buildQueryTransaction ( queries , queryOptions , range , scanning , timeZone ) ;
let firstResponse = true ;
dispatch ( changeLoadingStateAction ( { exploreId , loadingState : LoadingState.Loading } ) ) ;
newQuerySub = runRequest ( datasourceInstance , transaction . request )
. pipe (
// Simple throttle for live tailing, in case of > 1000 rows per interval we spend about 200ms on processing and
// rendering. In case this is optimized this can be tweaked, but also it should be only as fast as user
// actually can see what is happening.
live ? throttleTime ( 500 ) : identity ,
mergeMap ( ( data : PanelData ) = > decorateData ( data , queryResponse , absoluteRange , refreshInterval , queries ) )
)
. subscribe (
( data ) = > {
if ( ! data . error && firstResponse ) {
// Side-effect: Saving history in localstorage
const nextHistory = updateHistory ( history , datasourceId , queries ) ;
const nextRichHistory = addToRichHistory (
richHistory || [ ] ,
datasourceId ,
datasourceName ,
queries ,
false ,
'' ,
''
) ;
dispatch ( historyUpdatedAction ( { exploreId , history : nextHistory } ) ) ;
dispatch ( richHistoryUpdatedAction ( { richHistory : nextRichHistory } ) ) ;
// We save queries to the URL here so that only successfully run queries change the URL.
dispatch ( stateSave ( { replace : options?.replaceUrl } ) ) ;
}
firstResponse = false ;
firstResponse = false ;
dispatch ( queryStreamUpdatedAction ( { exploreId , response : data } ) ) ;
dispatch ( queryStreamUpdatedAction ( { exploreId , response : data } ) ) ;
// Keep scanning for results if this was the last scanning transaction
if ( getState ( ) . explore [ exploreId ] ! . scanning ) {
if ( data . state === LoadingState . Done && data . series . length === 0 ) {
const range = getShiftedTimeRange ( - 1 , getState ( ) . explore [ exploreId ] ! . range ) ;
dispatch ( updateTime ( { exploreId , absoluteRange : range } ) ) ;
dispatch ( runQueries ( exploreId ) ) ;
} else {
// We can stop scanning if we have a result
dispatch ( scanStopAction ( { exploreId } ) ) ;
// Keep scanning for results if this was the last scanning transaction
if ( getState ( ) . explore [ exploreId ] ! . scanning ) {
if ( data . state === LoadingState . Done && data . series . length === 0 ) {
const range = getShiftedTimeRange ( - 1 , getState ( ) . explore [ exploreId ] ! . range ) ;
dispatch ( updateTime ( { exploreId , absoluteRange : range } ) ) ;
dispatch ( runQueries ( exploreId ) ) ;
} else {
// We can stop scanning if we have a result
dispatch ( scanStopAction ( { exploreId } ) ) ;
}
}
} ,
( error ) = > {
dispatch ( notifyApp ( createErrorNotification ( 'Query processing error' , error ) ) ) ;
dispatch ( changeLoadingStateAction ( { exploreId , loadingState : LoadingState.Error } ) ) ;
console . error ( error ) ;
}
} ,
( error ) = > {
dispatch ( notifyApp ( createErrorNotification ( 'Query processing error' , error ) ) ) ;
dispatch ( changeLoadingStateAction ( { exploreId , loadingState : LoadingState.Error } ) ) ;
console . error ( error ) ;
}
) ;
) ;
}
dispatch ( queryStoreSubscriptionAction ( { exploreId , querySubscription : newQuerySub } ) ) ;
} ;
@ -439,6 +467,25 @@ export function scanStart(exploreId: ExploreId): ThunkResult<void> {
} ;
}
export function addResultsToCache ( exploreId : ExploreId ) : ThunkResult < void > {
return ( dispatch , getState ) = > {
const queryResponse = getState ( ) . explore [ exploreId ] ! . queryResponse ;
const absoluteRange = getState ( ) . explore [ exploreId ] ! . absoluteRange ;
const cacheKey = createCacheKey ( absoluteRange ) ;
// Save results to cache only when all results recived and loading is done
if ( queryResponse . state === LoadingState . Done ) {
dispatch ( addResultsToCacheAction ( { exploreId , cacheKey , queryResponse } ) ) ;
}
} ;
}
export function clearCache ( exploreId : ExploreId ) : ThunkResult < void > {
return ( dispatch , getState ) = > {
dispatch ( clearCacheAction ( { exploreId } ) ) ;
} ;
}
//
// Reducer
//
@ -629,6 +676,32 @@ export const queryReducer = (state: ExploreItemState, action: AnyAction): Explor
} ;
}
if ( addResultsToCacheAction . match ( action ) ) {
const CACHE_LIMIT = 5 ;
const { cache } = state ;
const { queryResponse , cacheKey } = action . payload ;
let newCache = [ . . . cache ] ;
const isDuplicateKey = newCache . some ( ( c ) = > c . key === cacheKey ) ;
if ( ! isDuplicateKey ) {
const newCacheItem = { key : cacheKey , value : queryResponse } ;
newCache = [ newCacheItem , . . . newCache ] . slice ( 0 , CACHE_LIMIT ) ;
}
return {
. . . state ,
cache : newCache ,
} ;
}
if ( clearCacheAction . match ( action ) ) {
return {
. . . state ,
cache : [ ] ,
} ;
}
return state ;
} ;