Caching: Consolidate resource cache checking and updating in plugin middleware (#67002)

* Update the HandleResourceRequest function to mimic the HandleQueryRequest function

* Remove CacheResourceResponse function from interface

* revert additional thing I missed
pull/66869/head
Michael Mandrus 2 years ago committed by GitHub
parent d02aee2479
commit a29cfe5d46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/api/http_server.go
  2. 13
      pkg/api/plugin_resource.go
  3. 1
      pkg/api/plugin_resource_test.go
  4. 22
      pkg/api/plugins_test.go
  5. 17
      pkg/plugins/manager/client/clienttest/clienttest.go
  6. 8
      pkg/services/caching/fake_caching_service.go
  7. 22
      pkg/services/caching/service.go
  8. 24
      pkg/services/pluginsintegration/clientmiddleware/caching_middleware.go
  9. 44
      pkg/services/pluginsintegration/clientmiddleware/caching_middleware_test.go

@ -41,7 +41,6 @@ import (
"github.com/grafana/grafana/pkg/services/apikey" "github.com/grafana/grafana/pkg/services/apikey"
"github.com/grafana/grafana/pkg/services/auth" "github.com/grafana/grafana/pkg/services/auth"
"github.com/grafana/grafana/pkg/services/authn" "github.com/grafana/grafana/pkg/services/authn"
"github.com/grafana/grafana/pkg/services/caching"
"github.com/grafana/grafana/pkg/services/cleanup" "github.com/grafana/grafana/pkg/services/cleanup"
"github.com/grafana/grafana/pkg/services/contexthandler" "github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/correlations" "github.com/grafana/grafana/pkg/services/correlations"
@ -208,7 +207,6 @@ type HTTPServer struct {
statsService stats.Service statsService stats.Service
authnService authn.Service authnService authn.Service
starApi *starApi.API starApi *starApi.API
cachingService caching.CachingService
} }
type ServerOptions struct { type ServerOptions struct {
@ -250,7 +248,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
accesscontrolService accesscontrol.Service, navTreeService navtree.Service, accesscontrolService accesscontrol.Service, navTreeService navtree.Service,
annotationRepo annotations.Repository, tagService tag.Service, searchv2HTTPService searchV2.SearchHTTPService, oauthTokenService oauthtoken.OAuthTokenService, annotationRepo annotations.Repository, tagService tag.Service, searchv2HTTPService searchV2.SearchHTTPService, oauthTokenService oauthtoken.OAuthTokenService,
statsService stats.Service, authnService authn.Service, pluginsCDNService *pluginscdn.Service, statsService stats.Service, authnService authn.Service, pluginsCDNService *pluginscdn.Service,
starApi *starApi.API, cachingService caching.CachingService, starApi *starApi.API,
) (*HTTPServer, error) { ) (*HTTPServer, error) {
web.Env = cfg.Env web.Env = cfg.Env
@ -355,7 +353,6 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
authnService: authnService, authnService: authnService,
pluginsCDNService: pluginsCDNService, pluginsCDNService: pluginsCDNService,
starApi: starApi, starApi: starApi,
cachingService: cachingService,
} }
if hs.Listener != nil { if hs.Listener != nil {
hs.log.Debug("Using provided listener") hs.log.Debug("Using provided listener")

@ -15,7 +15,6 @@ import (
"github.com/grafana/grafana/pkg/plugins/backendplugin" "github.com/grafana/grafana/pkg/plugins/backendplugin"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/util/proxyutil" "github.com/grafana/grafana/pkg/util/proxyutil"
"github.com/grafana/grafana/pkg/web" "github.com/grafana/grafana/pkg/web"
) )
@ -130,7 +129,7 @@ func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http
var flushStreamErr error var flushStreamErr error
go func() { go func() {
flushStreamErr = hs.flushStream(req.Context(), crReq, stream, w) flushStreamErr = hs.flushStream(stream, w)
wg.Done() wg.Done()
}() }()
@ -141,10 +140,8 @@ func (hs *HTTPServer) makePluginResourceRequest(w http.ResponseWriter, req *http
return flushStreamErr return flushStreamErr
} }
func (hs *HTTPServer) flushStream(ctx context.Context, req *backend.CallResourceRequest, stream callResourceClientResponseStream, w http.ResponseWriter) error { func (hs *HTTPServer) flushStream(stream callResourceClientResponseStream, w http.ResponseWriter) error {
processedStreams := 0 processedStreams := 0
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for { for {
resp, err := stream.Recv() resp, err := stream.Recv()
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
@ -200,12 +197,6 @@ func (hs *HTTPServer) flushStream(ctx context.Context, req *backend.CallResource
if _, err := w.Write(resp.Body); err != nil { if _, err := w.Write(resp.Body); err != nil {
hs.log.Error("Failed to write resource response", "err", err) hs.log.Error("Failed to write resource response", "err", err)
} else if hs.Features.IsEnabled(featuremgmt.FlagUseCachingService) {
// Placing the new service implementation behind a feature flag until it is known to be stable
// The enterprise implementation of this function will use the headers and status of the first response,
// And append the body of any subsequent responses. It waits for the context to be canceled before caching the cumulative result.
hs.cachingService.CacheResourceResponse(ctx, req, resp)
} }
if flusher, ok := w.(http.Flusher); ok { if flusher, ok := w.(http.Flusher); ok {

@ -78,7 +78,6 @@ func TestCallResource(t *testing.T) {
hs.QuotaService = quotatest.New(false, nil) hs.QuotaService = quotatest.New(false, nil)
hs.pluginStore = ps hs.pluginStore = ps
hs.pluginClient = pluginClient.ProvideService(reg, pCfg) hs.pluginClient = pluginClient.ProvideService(reg, pCfg)
hs.cachingService = &caching.OSSCachingService{}
}) })
t.Run("Test successful response is received for valid request", func(t *testing.T) { t.Run("Test successful response is received for valid request", func(t *testing.T) {

@ -30,9 +30,7 @@ import (
"github.com/grafana/grafana/pkg/plugins/manager/store" "github.com/grafana/grafana/pkg/plugins/manager/store"
"github.com/grafana/grafana/pkg/plugins/pluginscdn" "github.com/grafana/grafana/pkg/plugins/pluginscdn"
ac "github.com/grafana/grafana/pkg/services/accesscontrol" ac "github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/caching"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/org" "github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/org/orgtest" "github.com/grafana/grafana/pkg/services/org/orgtest"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginaccesscontrol" "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginaccesscontrol"
@ -373,11 +371,9 @@ func Test_GetPluginAssets(t *testing.T) {
func TestMakePluginResourceRequest(t *testing.T) { func TestMakePluginResourceRequest(t *testing.T) {
hs := HTTPServer{ hs := HTTPServer{
Cfg: setting.NewCfg(), Cfg: setting.NewCfg(),
log: log.New(), log: log.New(),
pluginClient: &fakePluginClient{}, pluginClient: &fakePluginClient{},
cachingService: &caching.OSSCachingService{},
Features: &featuremgmt.FeatureManager{},
} }
req := httptest.NewRequest(http.MethodGet, "/", nil) req := httptest.NewRequest(http.MethodGet, "/", nil)
@ -403,8 +399,6 @@ func TestMakePluginResourceRequestSetCookieNotPresent(t *testing.T) {
pluginClient: &fakePluginClient{ pluginClient: &fakePluginClient{
headers: map[string][]string{"Set-Cookie": {"monster"}}, headers: map[string][]string{"Set-Cookie": {"monster"}},
}, },
cachingService: &caching.OSSCachingService{},
Features: &featuremgmt.FeatureManager{},
} }
req := httptest.NewRequest(http.MethodGet, "/", nil) req := httptest.NewRequest(http.MethodGet, "/", nil)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
@ -439,8 +433,6 @@ func TestMakePluginResourceRequestContentTypeUnique(t *testing.T) {
"x-another": {"hello"}, "x-another": {"hello"},
}, },
}, },
cachingService: &caching.OSSCachingService{},
Features: &featuremgmt.FeatureManager{},
} }
req := httptest.NewRequest(http.MethodGet, "/", nil) req := httptest.NewRequest(http.MethodGet, "/", nil)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
@ -464,11 +456,9 @@ func TestMakePluginResourceRequestContentTypeEmpty(t *testing.T) {
statusCode: http.StatusNoContent, statusCode: http.StatusNoContent,
} }
hs := HTTPServer{ hs := HTTPServer{
Cfg: setting.NewCfg(), Cfg: setting.NewCfg(),
log: log.New(), log: log.New(),
pluginClient: pluginClient, pluginClient: pluginClient,
cachingService: &caching.OSSCachingService{},
Features: &featuremgmt.FeatureManager{},
} }
req := httptest.NewRequest(http.MethodGet, "/", nil) req := httptest.NewRequest(http.MethodGet, "/", nil)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()

@ -179,6 +179,9 @@ type ClientDecoratorTest struct {
SubscribeStreamCtx context.Context SubscribeStreamCtx context.Context
PublishStreamReq *backend.PublishStreamRequest PublishStreamReq *backend.PublishStreamRequest
PublishStreamCtx context.Context PublishStreamCtx context.Context
// When CallResource is called, the sender will be called with these values
callResourceResponses []*backend.CallResourceResponse
} }
type ClientDecoratorTestOption func(*ClientDecoratorTest) type ClientDecoratorTestOption func(*ClientDecoratorTest)
@ -197,6 +200,13 @@ func NewClientDecoratorTest(t *testing.T, opts ...ClientDecoratorTestOption) *Cl
CallResourceFunc: func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { CallResourceFunc: func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
cdt.CallResourceReq = req cdt.CallResourceReq = req
cdt.CallResourceCtx = ctx cdt.CallResourceCtx = ctx
if cdt.callResourceResponses != nil {
for _, r := range cdt.callResourceResponses {
if err := sender.Send(r); err != nil {
return err
}
}
}
return nil return nil
}, },
CheckHealthFunc: func(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { CheckHealthFunc: func(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
@ -262,3 +272,10 @@ func WithMiddlewares(middlewares ...plugins.ClientMiddleware) ClientDecoratorTes
cdt.Middlewares = append(cdt.Middlewares, middlewares...) cdt.Middlewares = append(cdt.Middlewares, middlewares...)
}) })
} }
// WithResourceResponses can be used to make the test client send simulated resource responses back over the sender stream
func WithResourceResponses(responses []*backend.CallResourceResponse) ClientDecoratorTestOption {
return ClientDecoratorTestOption(func(cdt *ClientDecoratorTest) {
cdt.callResourceResponses = responses
})
}

@ -11,20 +11,16 @@ import (
type FakeOSSCachingService struct { type FakeOSSCachingService struct {
calls map[string]int calls map[string]int
ReturnHit bool ReturnHit bool
ReturnResourceResponse *backend.CallResourceResponse ReturnResourceResponse CachedResourceDataResponse
ReturnQueryResponse CachedQueryDataResponse ReturnQueryResponse CachedQueryDataResponse
} }
func (f *FakeOSSCachingService) CacheResourceResponse(ctx context.Context, req *backend.CallResourceRequest, resp *backend.CallResourceResponse) {
f.calls["CacheResourceResponse"]++
}
func (f *FakeOSSCachingService) HandleQueryRequest(ctx context.Context, req *backend.QueryDataRequest) (bool, CachedQueryDataResponse) { func (f *FakeOSSCachingService) HandleQueryRequest(ctx context.Context, req *backend.QueryDataRequest) (bool, CachedQueryDataResponse) {
f.calls["HandleQueryRequest"]++ f.calls["HandleQueryRequest"]++
return f.ReturnHit, f.ReturnQueryResponse return f.ReturnHit, f.ReturnQueryResponse
} }
func (f *FakeOSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, *backend.CallResourceResponse) { func (f *FakeOSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, CachedResourceDataResponse) {
f.calls["HandleResourceRequest"]++ f.calls["HandleResourceRequest"]++
return f.ReturnHit, f.ReturnResourceResponse return f.ReturnHit, f.ReturnResourceResponse
} }

@ -16,6 +16,7 @@ const (
) )
type CacheQueryResponseFn func(context.Context, *backend.QueryDataResponse) type CacheQueryResponseFn func(context.Context, *backend.QueryDataResponse)
type CacheResourceResponseFn func(context.Context, *backend.CallResourceResponse)
type CachedQueryDataResponse struct { type CachedQueryDataResponse struct {
// The cached data response associated with a query, or nil if no cached data is found // The cached data response associated with a query, or nil if no cached data is found
@ -25,6 +26,15 @@ type CachedQueryDataResponse struct {
UpdateCacheFn CacheQueryResponseFn UpdateCacheFn CacheQueryResponseFn
} }
type CachedResourceDataResponse struct {
// The cached response associated with a resource request, or nil if no cached data is found
Response *backend.CallResourceResponse
// A function that should be used to cache a CallResourceResponse for a given resource request.
// It can be set to nil by the method implementation (if there is an error, for example), so it should be checked before being called.
// Because plugins can send multiple responses asyncronously, the implementation should be able to handle multiple calls to this function for one request.
UpdateCacheFn CacheResourceResponseFn
}
func ProvideCachingService() *OSSCachingService { func ProvideCachingService() *OSSCachingService {
return &OSSCachingService{} return &OSSCachingService{}
} }
@ -36,10 +46,7 @@ type CachingService interface {
HandleQueryRequest(context.Context, *backend.QueryDataRequest) (bool, CachedQueryDataResponse) HandleQueryRequest(context.Context, *backend.QueryDataRequest) (bool, CachedQueryDataResponse)
// HandleResourceRequest uses a CallResourceRequest to check the cache for any existing results for that request. If none are found, it should return false. // HandleResourceRequest uses a CallResourceRequest to check the cache for any existing results for that request. If none are found, it should return false.
// This function may populate any response headers (accessible through the context) with the cache status using the X-Cache header. // This function may populate any response headers (accessible through the context) with the cache status using the X-Cache header.
HandleResourceRequest(context.Context, *backend.CallResourceRequest) (bool, *backend.CallResourceResponse) HandleResourceRequest(context.Context, *backend.CallResourceRequest) (bool, CachedResourceDataResponse)
// CacheResourceResponse is used to cache resource responses for a resource request.
// Because plugins can send multiple responses asyncronously, the implementation should be able to handle multiple calls to this function for one request.
CacheResourceResponse(context.Context, *backend.CallResourceRequest, *backend.CallResourceResponse)
} }
// Implementation of interface - does nothing // Implementation of interface - does nothing
@ -50,11 +57,8 @@ func (s *OSSCachingService) HandleQueryRequest(ctx context.Context, req *backend
return false, CachedQueryDataResponse{} return false, CachedQueryDataResponse{}
} }
func (s *OSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, *backend.CallResourceResponse) { func (s *OSSCachingService) HandleResourceRequest(ctx context.Context, req *backend.CallResourceRequest) (bool, CachedResourceDataResponse) {
return false, nil return false, CachedResourceDataResponse{}
}
func (s *OSSCachingService) CacheResourceResponse(ctx context.Context, req *backend.CallResourceRequest, resp *backend.CallResourceResponse) {
} }
var _ CachingService = &OSSCachingService{} var _ CachingService = &OSSCachingService{}

@ -100,7 +100,7 @@ func (m *CachingMiddleware) CallResource(ctx context.Context, req *backend.CallR
start := time.Now() start := time.Now()
// First look in the resource cache if enabled // First look in the resource cache if enabled
hit, resp := m.caching.HandleResourceRequest(ctx, req) hit, cr := m.caching.HandleResourceRequest(ctx, req)
defer func() { defer func() {
// record request duration if caching was used // record request duration if caching was used
@ -114,13 +114,21 @@ func (m *CachingMiddleware) CallResource(ctx context.Context, req *backend.CallR
// Cache hit; send the response and return // Cache hit; send the response and return
if hit { if hit {
return sender.Send(resp) return sender.Send(cr.Response)
} }
// Cache miss; do the actual request // Cache miss; do the actual request
// The call to update the cache happens in /pkg/api/plugin_resource.go in the flushStream() func // If there is no update cache func, just pass in the original sender
// TODO: Implement updating the cache from this method if cr.UpdateCacheFn == nil {
return m.next.CallResource(ctx, req, sender) return m.next.CallResource(ctx, req, sender)
}
// Otherwise, intercept the responses in a wrapped sender so we can cache them first
cacheSender := cachedSenderFunc(func(res *backend.CallResourceResponse) error {
cr.UpdateCacheFn(ctx, res)
return sender.Send(res)
})
return m.next.CallResource(ctx, req, cacheSender)
} }
func (m *CachingMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { func (m *CachingMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
@ -142,3 +150,9 @@ func (m *CachingMiddleware) PublishStream(ctx context.Context, req *backend.Publ
func (m *CachingMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { func (m *CachingMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
return m.next.RunStream(ctx, req, sender) return m.next.RunStream(ctx, req, sender)
} }
type cachedSenderFunc func(res *backend.CallResourceResponse) error
func (fn cachedSenderFunc) Send(res *backend.CallResourceResponse) error {
return fn(res)
}

@ -97,10 +97,30 @@ func TestCachingMiddleware(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, "/resource/blah", nil) req, err := http.NewRequest(http.MethodGet, "/resource/blah", nil)
require.NoError(t, err) require.NoError(t, err)
// This is the response returned by the HandleResourceRequest call
// Track whether the update cache fn was called, depending on what the response headers are in the cache request
var updateCacheCalled bool
dataResponse := caching.CachedResourceDataResponse{
Response: &backend.CallResourceResponse{
Status: 200,
Body: []byte("bogus"),
},
UpdateCacheFn: func(ctx context.Context, rdr *backend.CallResourceResponse) {
updateCacheCalled = true
},
}
// This is the response sent via the passed-in sender when there is a cache miss
simulatedPluginResponse := &backend.CallResourceResponse{
Status: 201,
Body: []byte("bogus"),
}
cs := caching.NewFakeOSSCachingService() cs := caching.NewFakeOSSCachingService()
cdt := clienttest.NewClientDecoratorTest(t, cdt := clienttest.NewClientDecoratorTest(t,
clienttest.WithReqContext(req, &user.SignedInUser{}), clienttest.WithReqContext(req, &user.SignedInUser{}),
clienttest.WithMiddlewares(NewCachingMiddleware(cs)), clienttest.WithMiddlewares(NewCachingMiddleware(cs)),
clienttest.WithResourceResponses([]*backend.CallResourceResponse{simulatedPluginResponse}),
) )
jsonDataMap := map[string]interface{}{} jsonDataMap := map[string]interface{}{}
@ -121,11 +141,6 @@ func TestCachingMiddleware(t *testing.T) {
PluginContext: pluginCtx, PluginContext: pluginCtx,
} }
resourceResponse := &backend.CallResourceResponse{
Status: 200,
Body: []byte("bogus"),
}
var sentResponse *backend.CallResourceResponse var sentResponse *backend.CallResourceResponse
var storeOneResponseCallResourceSender = callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error { var storeOneResponseCallResourceSender = callResourceResponseSenderFunc(func(res *backend.CallResourceResponse) error {
sentResponse = res sentResponse = res
@ -139,32 +154,37 @@ func TestCachingMiddleware(t *testing.T) {
}) })
cs.ReturnHit = true cs.ReturnHit = true
cs.ReturnResourceResponse = resourceResponse cs.ReturnResourceResponse = dataResponse
err := cdt.Decorator.CallResource(req.Context(), crr, storeOneResponseCallResourceSender) err := cdt.Decorator.CallResource(req.Context(), crr, storeOneResponseCallResourceSender)
assert.NoError(t, err) assert.NoError(t, err)
// Cache service is called once // Cache service is called once
cs.AssertCalls(t, "HandleResourceRequest", 1) cs.AssertCalls(t, "HandleResourceRequest", 1)
// Equals the mocked response was sent // The mocked cached response was sent
assert.NotNil(t, sentResponse) assert.NotNil(t, sentResponse)
assert.Equal(t, resourceResponse, sentResponse) assert.Equal(t, dataResponse.Response, sentResponse)
// Cache was not updated by the middleware
assert.False(t, updateCacheCalled)
}) })
t.Run("If cache returns a miss, resource call is issued", func(t *testing.T) { t.Run("If cache returns a miss, resource call is issued and the update cache function is called", func(t *testing.T) {
t.Cleanup(func() { t.Cleanup(func() {
sentResponse = nil sentResponse = nil
cs.Reset() cs.Reset()
}) })
cs.ReturnHit = false cs.ReturnHit = false
cs.ReturnResourceResponse = resourceResponse cs.ReturnResourceResponse = dataResponse
err := cdt.Decorator.CallResource(req.Context(), crr, storeOneResponseCallResourceSender) err := cdt.Decorator.CallResource(req.Context(), crr, storeOneResponseCallResourceSender)
assert.NoError(t, err) assert.NoError(t, err)
// Cache service is called once // Cache service is called once
cs.AssertCalls(t, "HandleResourceRequest", 1) cs.AssertCalls(t, "HandleResourceRequest", 1)
// Nil response was sent // Simulated plugin response was sent
assert.Nil(t, sentResponse) assert.NotNil(t, sentResponse)
assert.Equal(t, simulatedPluginResponse, sentResponse)
// Since it was a miss, the middleware called the update func
assert.True(t, updateCacheCalled)
}) })
}) })

Loading…
Cancel
Save