package clientmiddleware import ( "context" "strconv" "time" "github.com/grafana/grafana-aws-sdk/pkg/awsds" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/prometheus/client_golang/prometheus" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/services/caching" "github.com/grafana/grafana/pkg/services/contexthandler" "github.com/grafana/grafana/pkg/services/featuremgmt" ) // needed to mock the function for testing var shouldCacheQuery = awsds.ShouldCacheQuery // NewCachingMiddleware creates a new plugins.ClientMiddleware that will // attempt to read and write query results to the cache func NewCachingMiddleware(cachingService caching.CachingService) plugins.ClientMiddleware { return NewCachingMiddlewareWithFeatureManager(cachingService, nil) } // NewCachingMiddlewareWithFeatureManager creates a new plugins.ClientMiddleware that will // attempt to read and write query results to the cache with a feature manager func NewCachingMiddlewareWithFeatureManager(cachingService caching.CachingService, features *featuremgmt.FeatureManager) plugins.ClientMiddleware { log := log.New("caching_middleware") if err := prometheus.Register(QueryCachingRequestHistogram); err != nil { log.Error("Error registering prometheus collector 'QueryRequestHistogram'", "error", err) } if err := prometheus.Register(ResourceCachingRequestHistogram); err != nil { log.Error("Error registering prometheus collector 'ResourceRequestHistogram'", "error", err) } return plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client { return &CachingMiddleware{ next: next, caching: cachingService, log: log, features: features, } }) } type CachingMiddleware struct { next plugins.Client caching caching.CachingService log log.Logger features *featuremgmt.FeatureManager } // QueryData receives a data request and attempts to access results already stored in the cache for that request. // If data is found, it will return it immediately. Otherwise, it will perform the queries as usual, then write the response to the cache. // If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers. func (m *CachingMiddleware) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { if req == nil { return m.next.QueryData(ctx, req) } reqCtx := contexthandler.FromContext(ctx) if reqCtx == nil { return m.next.QueryData(ctx, req) } // time how long this request takes start := time.Now() // First look in the query cache if enabled hit, cr := m.caching.HandleQueryRequest(ctx, req) // record request duration if caching was used ch := reqCtx.Resp.Header().Get(caching.XCacheHeader) if ch != "" { defer func() { QueryCachingRequestHistogram.With(prometheus.Labels{ "datasource_type": req.PluginContext.DataSourceInstanceSettings.Type, "cache": ch, "query_type": getQueryType(reqCtx), }).Observe(time.Since(start).Seconds()) }() } // Cache hit; return the response if hit { return cr.Response, nil } // Cache miss; do the actual queries resp, err := m.next.QueryData(ctx, req) // Update the query cache with the result for this metrics request if err == nil && cr.UpdateCacheFn != nil { // If AWS async caching is not enabled, use the old code path if m.features == nil || !m.features.IsEnabled(ctx, featuremgmt.FlagAwsAsyncQueryCaching) { cr.UpdateCacheFn(ctx, resp) } else { // time how long shouldCacheQuery takes startShouldCacheQuery := time.Now() shouldCache := shouldCacheQuery(resp) ShouldCacheQueryHistogram.With(prometheus.Labels{ "datasource_type": req.PluginContext.DataSourceInstanceSettings.Type, "cache": ch, "shouldCache": strconv.FormatBool(shouldCache), "query_type": getQueryType(reqCtx), }).Observe(time.Since(startShouldCacheQuery).Seconds()) // If AWS async caching is enabled and resp is for a running async query, don't cache it if shouldCache { cr.UpdateCacheFn(ctx, resp) } } } return resp, err } // CallResource receives a resource request and attempts to access results already stored in the cache for that request. // If data is found, it will return it immediately. Otherwise, it will perform the request as usual. The caller of CallResource is expected to explicitly update the cache with any responses. // If the cache service is implemented, we capture the request duration as a metric. The service is expected to write any response headers. func (m *CachingMiddleware) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { if req == nil { return m.next.CallResource(ctx, req, sender) } reqCtx := contexthandler.FromContext(ctx) if reqCtx == nil { return m.next.CallResource(ctx, req, sender) } // time how long this request takes start := time.Now() // First look in the resource cache if enabled hit, cr := m.caching.HandleResourceRequest(ctx, req) // record request duration if caching was used if ch := reqCtx.Resp.Header().Get(caching.XCacheHeader); ch != "" { defer func() { ResourceCachingRequestHistogram.With(prometheus.Labels{ "plugin_id": req.PluginContext.PluginID, "cache": ch, }).Observe(time.Since(start).Seconds()) }() } // Cache hit; send the response and return if hit { return sender.Send(cr.Response) } // Cache miss; do the actual request // If there is no update cache func, just pass in the original sender if cr.UpdateCacheFn == nil { return m.next.CallResource(ctx, req, sender) } // Otherwise, intercept the responses in a wrapped sender so we can cache them first cacheSender := callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error { cr.UpdateCacheFn(ctx, res) return sender.Send(res) }) return m.next.CallResource(ctx, req, cacheSender) } func (m *CachingMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { return m.next.CheckHealth(ctx, req) } func (m *CachingMiddleware) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { return m.next.CollectMetrics(ctx, req) } func (m *CachingMiddleware) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { return m.next.SubscribeStream(ctx, req) } func (m *CachingMiddleware) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { return m.next.PublishStream(ctx, req) } func (m *CachingMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { return m.next.RunStream(ctx, req, sender) }